実践Python:ネットワーク障害時の経路追跡と解析を自動化する
はじめに
システムやサービスの運用において、ネットワーク障害の切り分けは重要な作業の一つです。特に、特定のサーバーから外部リソースへの通信に問題が発生した場合、「どこまで通信できているのか」を迅速に把握する必要があります。この際、traceroute
(Windows環境ではtracert
)コマンドがよく利用されます。しかし、手動でのコマンド実行と結果の目視確認は、対象が多い場合や緊急時には大きな負担となります。
本記事では、Pythonを活用してこのtraceroute
による経路追跡とその結果解析を自動化する方法をご紹介します。これにより、ネットワーク機器の直接操作に慣れていない方でも、Pythonのスキルを活かして一次切り分けの効率を大幅に向上させることが可能です。
traceroute
コマンドによる経路追跡の基礎
traceroute
コマンドは、指定したホストまでのネットワーク経路上のルーター(ホップ)を順に表示し、各ホップまでの応答時間などを計測するツールです。これにより、通信が途中で途切れている箇所や、特定の区間での遅延などを特定する手がかりを得ることができます。
コマンドの基本的な使用法は以下の通りです。
# Linux/macOS
traceroute <宛先ホスト名またはIPアドレス>
# Windows
tracert <宛先ホスト名またはIPアドレス>
実行すると、以下のような出力が得られます(OSやバージョンによって出力形式は異なります)。
traceroute to example.com (93.184.216.34), 30 hops max, 60 byte packets
1 router.local (192.168.1.1) 1.234 ms 2.345 ms 3.456 ms
2 isp.router.jp (203.0.113.1) 10.123 ms 11.234 ms 12.345 ms
3 * * *
4 backbone.router.net (198.51.100.1) 20.456 ms 21.567 ms 22.678 ms
...
この出力から、各ホップのIPアドレス(またはホスト名)と、そのホップまでの往復時間(RTT)が読み取れます。* * *
と表示される箇所は、そのホップからの応答がなかったことを示し、通信経路上の問題やファイアウォールによるパケット破棄などの可能性を示唆します。
Pythonによるtraceroute
実行と出力取得
Pythonから外部コマンドを実行し、その出力を取得するには、標準ライブラリのsubprocess
モジュールが便利です。
以下のコードは、指定した宛先に対してtraceroute
(またはtracert
)を実行し、その標準出力を取得する基本的な例です。
import subprocess
import sys
import platform
def run_traceroute(destination):
"""
指定された宛先に対してtraceroute/tracertを実行し、出力を取得する。
"""
# OSによってコマンド名を切り替える
if platform.system() == "Windows":
command = ["tracert", "-d", destination] # -d: 名前解決をスキップしてIPアドレスを表示
else:
command = ["traceroute", "-n", destination] # -n: 名前解決をスキップしてIPアドレスを表示
try:
# コマンドを実行し、標準出力と標準エラーを取得
# text=True または encoding='utf-8' でテキストとして扱う
# capture_output=True でstdoutとstderrをキャプチャ
# timeout で実行時間を制限することも可能
result = subprocess.run(
command,
capture_output=True,
text=True,
encoding='utf-8',
check=True, # コマンドがゼロ以外の終了コードを返した場合にCalledProcessErrorを発生させる
timeout=60 # 例として60秒でタイムアウト
)
return result.stdout
except FileNotFoundError:
print(f"エラー: '{command[0]}' コマンドが見つかりません。traceroute/tracertがインストールされているか確認してください。", file=sys.stderr)
return None
except subprocess.CalledProcessError as e:
print(f"コマンド実行エラー: {e}", file=sys.stderr)
print(f"標準エラー出力:\n{e.stderr}", file=sys.stderr)
return None
except subprocess.TimeoutExpired:
print(f"エラー: コマンドがタイムアウトしました。", file=sys.stderr)
return None
except Exception as e:
print(f"予期せぬエラーが発生しました: {e}", file=sys.stderr)
return None
if __name__ == "__main__":
target = "google.com"
# target = "8.8.8.8" # IPアドレスでも指定可能
print(f"{target} への経路を追跡します...")
traceroute_output = run_traceroute(target)
if traceroute_output:
print("\n--- traceroute/tracert 結果 ---")
print(traceroute_output)
print("-----------------------------")
else:
print(f"{target} への経路追跡に失敗しました。")
このスクリプトは、OSを判定して適切なコマンド(traceroute
またはtracert
)を実行します。-n
オプション(Linux/macOS)または-d
オプション(Windows)は、DNSルックアップをスキップしてIPアドレスを直接表示するためのもので、解析を容易にします。try...except
ブロックでコマンドが見つからない場合や実行エラー、タイムアウトなどを捕捉し、堅牢性を高めています。
traceroute
出力の解析と問題箇所の特定
取得したtraceroute
の出力は、そのままでは構造化されていません。問題箇所をプログラムで特定するためには、この文字列データを解析(パース)し、各ホップの情報(ホップ番号、IPアドレス、応答時間など)を抽出する必要があります。
出力形式はOSやバージョン、実行時のオプションによって異なりますが、一般的なパターンに基づいてパースする例を示します。ここでは、比較的シンプルな正規表現を使用する方法を見てみましょう。
import re
def parse_traceroute_output(output):
"""
traceroute/tracertの出力を解析し、ホップごとの情報をリストで返す。
"""
hops = []
# Windows tracert出力のパターン例:
# 1 10 ms <1 ms <1 ms 192.168.1.1
# 2 20 ms 25 ms 30 ms 203.0.113.1
# 3 * * * Request timed out.
# Linux/macOS traceroute出力のパターン例:
# 1 192.168.1.1 (192.168.1.1) 1.234 ms 2.345 ms 3.456 ms
# 2 203.0.113.1 (203.0.113.1) 10.123 ms 11.234 ms 12.345 ms
# 3 * * *
# OSごとのパターンに対応するためのシンプルな正規表現
# ホップ番号、IPアドレス(または'*')、応答時間(または'*'/'timed out')を抽出
# より厳密なパースには、OSごとの出力形式を詳細に考慮する必要があります。
# この例では、ホップ番号、ホスト情報(IPまたは'*')、応答時間部分を大まかに捉えます。
# パターンはあくまで一般的なものとし、実際の出力に合わせて調整が必要です。
# (\d+)\s+ : ホップ番号とスペース
# (?: ... ) : グループ化するがキャプチャしない (非キャプチャグループ)
# \S+ : スペース以外の1文字以上の連続 (IPアドレス、ホスト名、'*')
# (\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}|\*) : IPアドレスまたは '*' をキャプチャ
# (?:.*?): 応答時間部分などを非貪欲にキャプチャ(ここでは詳細はパースしない簡略化)
# r'^\s*(\d+)\s+(.*?)\s*$' # 各行をホップ番号とそれ以降に分解する簡易パターン
# より詳細にホップ情報(番号、IP/ホスト名、RTTs)を抽出する試み
# この正規表現は非常に複雑になるため、ここでは簡易的なアプローチを採用します。
# 各行からホップ番号、ホスト情報、応答時間情報を抽出する基本的な考え方を示します。
# 各行を処理
for line in output.strip().split('\n'):
line = line.strip()
if not line or line.startswith(('traceroute to', 'tracert to', 'Tracing route to')):
continue # ヘッダー行はスキップ
parts = line.split()
if not parts:
continue
try:
hop_number = int(parts[0])
except ValueError:
continue # ホップ番号で始まらない行はスキップ(例: 注釈行)
# 残りの部分を解析してホスト情報と応答時間を抽出
# ここをOSの出力形式に合わせて詳細にパースする必要があります。
# 例: Windowsの場合、 hop_number, ms1, ms2, ms3, ip/hostname の順
# 例: Linuxの場合、 hop_number, hostname/ip, (ip), ms1, ms2, ms3 の順
hop_info = {"hop": hop_number, "host": "不明", "rtts": []}
if platform.system() == "Windows":
# Windows形式の簡易パース
if len(parts) >= 5:
# parts[1], parts[2], parts[3] が応答時間または '*'
hop_info["rtts"] = [p for p in parts[1:4] if p not in ('*', 'ms', '<1')] # '<1 ms' は '<1' で抽出される場合あり
hop_info["host"] = parts[4] if parts[4] != "Request" else "Timeout" # Request timed out. の場合
elif len(parts) == 4 and parts[3] == "out.": # Request timed out. の末尾
hop_info["rtts"] = ["*", "*", "*"]
hop_info["host"] = "Timeout"
elif len(parts) == 4 and parts[3] != "out.": # 例: 1 <1 ms <1 ms 192.168.1.1
hop_info["rtts"] = [parts[1], parts[2], '<1'] # 適宜修正
hop_info["host"] = parts[3]
else: # Linux/macOS形式の簡易パース
# Linux/macOSの出力はホスト名の後にIPアドレスが括弧で囲まれていることが多い
# 例: 1 router.local (192.168.1.1) 1.234 ms 2.345 ms 3.456 ms
# 例: 3 * * *
# 例: 4 backbone.router.net 20.456 ms 21.567 ms 22.678 ms (IPなしの場合)
# ホスト名/IP部分とRTTs部分を分ける
host_rtts_str = " ".join(parts[1:])
# IPアドレス部分 '(...)', RTTs 'N ms' などを抽出する正規表現の補助を使う
# より堅牢なパースのためには、特定のパターンを適用するか、
# ライブラリ (例: `scrapli.textfsm`) の利用も検討する
# ここでは、スペース区切りで単純に処理する例
if parts[1] == '*':
hop_info["host"] = "Timeout"
hop_info["rtts"] = ["*", "*", "*"]
else:
# ホスト名/IP部分の終わりとRTT部分の始まりを探す
# ' ms' または '<1 ms' の手前がRTTsの開始と推測
rtt_start_index = -1
for i in range(len(parts)):
if 'ms' in parts[i] or parts[i] == '*':
rtt_start_index = i
break
if rtt_start_index != -1:
hop_info["host"] = " ".join(parts[1:rtt_start_index])
hop_info["rtts"] = [p for p in parts[rtt_start_index:] if p != 'ms']
else:
# RTTsが見つからない場合 (最終ホップなど)
hop_info["host"] = " ".join(parts[1:])
hop_info["rtts"] = [] # または適切なデフォルト値
# 抽出した情報をリストに追加
hops.append(hop_info)
return hops
def analyze_hops(hops):
"""
解析したホップ情報から問題の可能性のある箇所を特定する。
"""
potential_issues = []
last_successful_hop = None
for hop in hops:
is_timeout = all(rtt in ('*', 'Timeout') for rtt in hop['rtts'])
avg_rtt = 0
valid_rtts = []
for rtt_str in hop['rtts']:
if rtt_str not in ('*', 'Timeout'):
try:
# ' ms', '<1 ms' 等を考慮して数値に変換
rtt_value = float(rtt_str.replace(' ms', '').replace('<1', '0.1')) # '<1' は 0.1ms とする例
valid_rtts.append(rtt_value)
except ValueError:
pass # 数値に変換できない場合は無視
if valid_rtts:
avg_rtt = sum(valid_rtts) / len(valid_rtts)
if is_timeout:
# タイムアウトが続くホップは問題の可能性が高い
potential_issues.append({
"type": "timeout",
"hop": hop['hop'],
"host": hop['host'],
"message": f"ホップ {hop['hop']} ({hop['host']}) でタイムアウトが発生しています。"
})
elif avg_rtt > 200: # 例: 200msを超える平均RTTは遅延が大きいと判断
potential_issues.append({
"type": "high_latency",
"hop": hop['hop'],
"host": hop['host'],
"message": f"ホップ {hop['hop']} ({hop['host']}) の遅延が大きいです (平均 {avg_rtt:.2f}ms)。"
})
# タイムアウトしていない最後のホップを記録
if not is_timeout:
last_successful_hop = hop
if potential_issues:
print("\n--- 解析結果:問題の可能性 ---")
for issue in potential_issues:
print(issue['message'])
if last_successful_hop:
print(f"\n最後に通信に応答したホップは {last_successful_hop['hop']} ({last_successful_hop['host']}) です。")
print("このホップ以降に問題が発生している可能性があります。")
else:
print("\n最初のホップから応答がありません。ローカルネットワークまたは最初のルーターに問題がある可能性があります。")
else:
print("\n--- 解析結果 ---")
print("経路追跡の結果、顕著な問題は検出されませんでした。(ただし、これは簡易的な解析です)")
return potential_issues
if __name__ == "__main__":
target = "google.com"
# target = "8.8.8.8"
print(f"{target} への経路を追跡します...")
traceroute_output = run_traceroute(target)
if traceroute_output:
print("\n--- traceroute/tracert 結果 ---")
print(traceroute_output)
print("-----------------------------")
print("\n結果を解析しています...")
traced_hops = parse_traceroute_output(traceroute_output)
if traced_hops:
print("\n--- 抽出されたホップ情報 ---")
for hop in traced_hops:
print(f" ホップ {hop['hop']}: ホスト={hop['host']}, RTTs={hop['rtts']}")
print("--------------------------")
analyze_hops(traced_hops)
else:
print("traceroute出力の解析に失敗しました。")
else:
print(f"{target} への経路追跡に失敗しました。")
上記のコードは、取得したテキスト出力をparse_traceroute_output
関数で解析し、analyze_hops
関数で問題箇所を特定する例です。
parse_traceroute_output
関数内のパース処理は、OSごとの出力形式の多様性に対応するため、非常に複雑になりがちです。上記のコードはあくまで基本的な考え方を示す簡易的な実装であり、実際の運用ではより頑強なパースロジック(例えば、正規表現をOSごとに分ける、特定のキーワードで判定するなど)や、より高機能なライブラリ(例えば、構造化テキストパーサーであるTextFSMや、ネットワークデバイスとの連携に特化したNAPALMやScrapliなどが出力パース機能を提供している場合があります)の利用を検討することをお勧めします。
analyze_hops
関数では、ホップごとの応答時間(RTT)に*
が多い場合や、平均RTTが一定値を超える場合に、そのホップに問題がある可能性があると判断しています。最後の応答があったホップを特定することで、問題が発生しているネットワーク区間を絞り込む手がかりとすることができます。
実践的な考慮点
このスクリプトを現場で利用する際には、いくつかの実践的な考慮点があります。
- OSごとの出力形式の違い: 前述の通り、
traceroute
やtracert
の出力形式はOSによって大きく異なります。クロスプラットフォームで利用する場合は、実行環境のOSを判定し、それぞれの形式に対応したパースロジックを実装する必要があります。 - コマンド実行権限:
traceroute
コマンドはRAWソケットを使用するため、多くのOSでは管理者権限(root権限)が必要となる場合があります。スクリプトを実行するユーザーの権限を確認してください。 - タイムアウトとエラーハンドリング: ネットワークの状態によっては、
traceroute
が完了するまでに時間がかかったり、途中でエラーになったりすることがあります。subprocess.run
のtimeout
引数で実行時間に制限を設けたり、check=True
とtry...except subprocess.CalledProcessError
でコマンド自体のエラーを捕捉したりすることが重要です。 - 詳細な経路情報の取得:
traceroute
だけでは、ルーターの内部的な状態(CPU負荷、インターフェースエラーなど)は分かりません。より詳細な情報を得るためには、ネットワーク機器のAPI(NETCONF/RESTCONF/SNMPなど)を利用して機器の状態を取得したり、ルーティングテーブル情報を取得したりする自動化と組み合わせる必要があります。 - 結果のレポートと連携: スクリプトの実行結果を単に表示するだけでなく、ログファイルに保存する、メールやチャットツールに通知する、監視システムやダッシュボードに連携するなど、後続の処理や情報共有を自動化することで、より実践的なツールとなります。IaCツール(Ansible, Terraformなど)のRunbookとして組み込むことも考えられます。
まとめ
本記事では、Pythonのsubprocess
モジュールを利用してtraceroute
/tracert
コマンドを実行し、その結果を解析してネットワーク障害時の問題箇所を特定する自動化スクリプトの基本的な実装方法をご紹介しました。
traceroute
の出力パースにはOSごとの形式の違いを考慮する必要があり、この部分が最も複雑になる可能性があります。しかし、一度堅牢なパース処理を実装すれば、得られた構造化データを用いて様々な自動解析やレポート作成が可能になります。
このスクリプトは、ネットワーク機器のCLI操作に不慣れな開発・インフラエンジニアの方が、Pythonのスキルを活かしてネットワークの一次切り分けを効率化するための一歩となるでしょう。さらに、API連携や他の自動化ツールとの組み合わせにより、より高度なネットワーク運用の自動化へと発展させることができます。ぜひ、皆様の現場の課題に合わせて本スクリプトを応用してみてください。