実践ネット自動化スクリプト集

現場で役立つ!Pythonによるネットワーク疎通確認スクリプト:Ping/Traceroute結果を自動解析

Tags: Python, ネットワーク自動化, Netmiko, Ping, Traceroute, スクリプト, CLI自動化

システムインフラの構築や運用において、ネットワークの疎通確認は基本中の基本です。手動でネットワーク機器にログインし、PingやTracerouteコマンドを実行するのは日常的な作業ですが、対象が増えたり、定期的な確認が必要になったりすると、大きな負担となります。

Pythonを使ったネットワーク自動化は、このような定型作業を効率化するための強力な手段となります。Pythonスキルをお持ちの皆様であれば、ネットワーク機器への直接的な操作経験が限定的であっても、既存のライブラリを活用することで、疎通確認の自動化を比較的容易に実現できます。

本記事では、Pythonを使ってネットワーク機器上でPingやTracerouteコマンドを実行し、その結果を自動的に解析するスクリプトの実装方法について解説します。

なぜネットワーク疎通確認を自動化するのか

Pythonによる疎通確認自動化のステップ

Pythonでネットワーク機器上のコマンドを実行し、結果を解析するには、以下のステップが一般的です。

  1. ネットワーク機器への接続: SSHなどのプロトコルを使用して機器に接続します。
  2. コマンドの実行: 接続したセッションを通じて、PingやTracerouteコマンドを実行します。
  3. 結果の取得: コマンドの標準出力を取得します。
  4. 結果の解析: 取得した文字列から、必要な情報(成功/失敗、応答時間、経路など)を抽出します。
  5. 結果の出力/処理: 解析した結果を整形して出力したり、他のシステムと連携させたりします。

これらのステップを実現するために、Pythonにはいくつかの便利なライブラリがあります。中でも、ネットワーク機器とのCLI操作に特化したnetmikoは、多くのベンダーやOSに対応しており、容易にSSH接続やコマンド実行が可能です。

netmikoを使ったPingコマンド実行と結果解析

ここでは、netmikoを使用してネットワーク機器に接続し、Pingコマンドを実行して結果を解析する基本的なスクリプト例を示します。

事前にnetmikoライブラリをインストールしておく必要があります。

pip install netmiko

以下のコードは、指定したIPアドレスの機器にSSH接続し、特定の宛先IPアドレスへのPingを実行、その結果から疎通の成否やパケットロス率を判定する例です。

import os
import re
from netmiko import ConnectHandler, NetmikoTimeoutException, NetmikoAuthenticationException

# 機器接続情報 (サンプル)
# 実際の運用では認証情報を安全に管理してください(環境変数、シークレットマネージャーなど)
DEVICE_IP = 'YOUR_DEVICE_IP' # 接続対象のネットワーク機器IP
USERNAME = 'YOUR_USERNAME'   # ユーザー名
PASSWORD = 'YOUR_PASSWORD'   # パスワード
DEVICE_TYPE = 'cisco_ios'    # 機器タイプ (例: cisco_ios, juniper_junosなど)

# Ping実行対象
TARGET_IP = '8.8.8.8' # Pingテストする宛先IPアドレス

def run_ping_test(device_ip, username, password, device_type, target_ip):
    """
    ネットワーク機器上でPingコマンドを実行し、結果を解析する関数
    """
    print(f"[{device_ip}] 宛先 {target_ip} へのPingテストを開始します...")

    device = {
        'device_type': device_type,
        'host': device_ip,
        'username': username,
        'password': password,
        'secret': password, # enableパスワードが必要な場合
    }

    try:
        # NetmikoでSSH接続
        with ConnectHandler(**device) as net_connect:
            print(f"[{device_ip}] 接続に成功しました。")

            # enableモードに移行 (必要であれば)
            # net_connect.enable()

            # Pingコマンド実行
            # コマンドは機器のOSによって異なります。ここではCisco IOSを想定。
            command = f"ping {target_ip}"
            print(f"[{device_ip}] コマンド実行: '{command}'")
            output = net_connect.send_command(command)

            print(f"[{device_ip}] コマンド出力:\n---\n{output}\n---")

            # Ping結果の解析 (Cisco IOSの出力形式を想定)
            # 解析部分は機器やOSによって適宜変更が必要です
            loss_match = re.search(r'(\d+)\% packet loss', output)
            sent_match = re.search(r'Sent: (\d+)', output)
            received_match = re.search(r'Received: (\d+)', output)
            min_rtt_match = re.search(r'min/avg/max = ([\d\.]+)/([\d\.]+)/([\d\.]+) ms', output)

            result = {
                'device': device_ip,
                'target': target_ip,
                'status': 'Unknown',
                'sent': 0,
                'received': 0,
                'loss_percent': 100,
                'rtt_min_ms': None,
                'rtt_avg_ms': None,
                'rtt_max_ms': None,
                'raw_output': output.strip()
            }

            if loss_match:
                loss_percent = int(loss_match.group(1))
                result['loss_percent'] = loss_percent

                # パケットロス率で疎通成否を判定
                if loss_percent == 0:
                    result['status'] = 'Success'
                else:
                    result['status'] = 'Failure'
                    print(f"[{device_ip}] Ping失敗: {loss_percent}% packet loss")

            if sent_match:
                result['sent'] = int(sent_match.group(1))
            if received_match:
                result['received'] = int(received_match.group(1))

            if min_rtt_match:
                result['rtt_min_ms'] = float(min_rtt_match.group(1))
                result['rtt_avg_ms'] = float(min_rtt_match.group(2))
                result['rtt_max_ms'] = float(min_rtt_match.group(3))


            print(f"[{device_ip}] Pingテスト結果: ステータス={result['status']}, パケットロス={result['loss_percent']}%")
            return result

    except NetmikoTimeoutException:
        print(f"[{device_ip}] エラー: 接続タイムアウト")
        return {'device': device_ip, 'target': target_ip, 'status': 'Connection Timeout', 'raw_output': ''}
    except NetmikoAuthenticationException:
        print(f"[{device_ip}] エラー: 認証失敗")
        return {'device': device_ip, 'target': target_ip, 'status': 'Authentication Failure', 'raw_output': ''}
    except Exception as e:
        print(f"[{device_ip}] エラー: {e}")
        return {'device': device_ip, 'target': target_ip, 'status': f'Error: {e}', 'raw_output': ''}

if __name__ == "__main__":
    # 実際の機器情報に置き換えて実行してください
    # 複数機器に対して実行する場合は、リストなどで機器情報を管理します
    # Example usage:
    # device_info = {
    #     'device_ip': '192.168.1.1',
    #     'username': 'admin',
    #     'password': 'password',
    #     'device_type': 'cisco_ios'
    # }
    # target = '8.8.8.8'
    # ping_result = run_ping_test(**device_info, target_ip=target)
    # print("\n--- Ping Result ---")
    # print(ping_result)

    print("実行には、上記のサンプルコード中の接続情報と宛先IPを適切に設定してください。")
    print("また、mainブロック内のコメントアウトされた実行例を参考に、関数を呼び出してください。")

コード解説:

netmikoを使ったTracerouteコマンド実行と結果解析

Tracerouteも基本的な考え方はPingと同じですが、出力形式がより複雑になります。経路上の各ホップのIPアドレスや応答時間などを解析する必要があります。

Tracerouteの出力例(Cisco IOS):

Type escape sequence to abort.
Tracing the route to 8.8.8.8

  1 192.168.1.254 4 msec 4 msec 4 msec
  2 10.0.0.1 8 msec 7 msec 8 msec
  3 203.0.113.1 12 msec 11 msec 12 msec
  ...

以下のコードは、Tracerouteコマンドを実行し、各ホップの情報を抽出する例です。

import re
# Netmikoのインポート部分は上記のPingスクリプトを参照

# Traceroute実行対象
TARGET_IP_TRACEROUTE = '8.8.8.8'

def run_traceroute_test(device_ip, username, password, device_type, target_ip):
    """
    ネットワーク機器上でTracerouteコマンドを実行し、結果を解析する関数
    """
    print(f"[{device_ip}] 宛先 {target_ip} へのTracerouteテストを開始します...")

    device = {
        'device_type': device_type,
        'host': device_ip,
        'username': username,
        'password': password,
        'secret': password, # enableパスワードが必要な場合
    }

    try:
        with ConnectHandler(**device) as net_connect:
            print(f"[{device_ip}] 接続に成功しました。")

            # enableモードに移行 (必要であれば)
            # net_connect.enable()

            # Tracerouteコマンド実行
            # コマンドは機器のOSによって異なります。ここではCisco IOSを想定。
            command = f"traceroute {target_ip}"
            print(f"[{device_ip}] コマンド実行: '{command}'")
            # Tracerouteは完了まで時間がかかる可能性が高いため、timeoutを長めに設定することが推奨されます
            output = net_connect.send_command(command, cmd_timeout=120) # 例: 120秒タイムアウト

            print(f"[{device_ip}] コマンド出力:\n---\n{output}\n---")

            # Traceroute結果の解析 (Cisco IOSの出力形式を想定)
            # 各行がホップ情報を表すため、行ごとに処理します
            hops = []
            # 出力の中からホップ情報が記述されている行を抽出するための正規表現
            # 例: " 1 192.168.1.254 4 msec 4 msec 4 msec" のような行にマッチ
            # 注意: 出力形式は機器や設定により大きく異なる可能性があります
            hop_pattern = re.compile(r'^\s*(\d+)\s+([\w\.\*]+)\s+([\d\.\s]+)\s*msec') # シンプルなパターン例

            lines = output.splitlines()
            for line in lines:
                match = hop_pattern.match(line)
                if match:
                    hop_number = int(match.group(1))
                    ip_or_name = match.group(2)
                    rtt_values_str = match.group(3).strip()

                    # RTT値の解析 (複数の値がある場合がある)
                    rtt_values = [float(r) if r != '*' else None for r in rtt_values_str.split()]

                    hop_info = {
                        'hop': hop_number,
                        'target': ip_or_name,
                        'rtt_ms': rtt_values
                    }
                    hops.append(hop_info)

            if hops:
                print(f"[{device_ip}] Tracerouteテスト結果 ({len(hops)}ホップ):")
                for hop in hops:
                    print(f"  Hop {hop['hop']}: {hop['target']} (RTTs: {hop['rtt_ms']})")
                return {'device': device_ip, 'target': target_ip, 'hops': hops, 'raw_output': output.strip()}
            else:
                 print(f"[{device_ip}] Traceroute結果からホップ情報を解析できませんでした。")
                 return {'device': device_ip, 'target': target_ip, 'hops': [], 'raw_output': output.strip(), 'status': 'Parsing Error'}


    except NetmikoTimeoutException:
        print(f"[{device_ip}] エラー: 接続タイムアウト")
        return {'device': device_ip, 'target': target_ip, 'hops': [], 'raw_output': '', 'status': 'Connection Timeout'}
    except NetmikoAuthenticationException:
        print(f"[{device_ip}] エラー: 認証失敗")
        return {'device': device_ip, 'target': target_ip, 'hops': [], 'raw_output': '', 'status': 'Authentication Failure'}
    except Exception as e:
        print(f"[{device_ip}] エラー: {e}")
        return {'device': device_ip, 'target': target_ip, 'hops': [], 'raw_output': '', 'status': f'Error: {e}'}

# Tracerouteの実行例 (Pingと同様にコメントアウトを解除して実行)
# if __name__ == "__main__":
    # device_info = {
    #     'device_ip': '192.168.1.1',
    #     'username': 'admin',
    #     'password': 'password',
    #     'device_type': 'cisco_ios'
    # }
    # target = '8.8.8.8'
    # traceroute_result = run_traceroute_test(**device_info, target_ip=target)
    # print("\n--- Traceroute Result ---")
    # print(traceroute_result)

コード解説:

実践的な考慮点

現場でこれらのスクリプトを利用する際には、さらに以下の点を考慮することが重要です。

インフラ自動化における位置づけ

ネットワーク疎通確認の自動化スクリプトは、インフラ自動化のワークフローにおいて様々な場面で活用できます。

まとめ

本記事では、Pythonとnetmikoライブラリを使用して、ネットワーク機器上でPingおよびTracerouteコマンドを実行し、その結果を自動的に解析する基本的なスクリプトの実装方法について解説しました。

ネットワーク機器への直接的な操作経験が限定的であっても、Pythonの豊富なライブラリと文字列処理、正規表現のスキルを組み合わせることで、現場で役立つ実用的なネットワーク自動化スクリプトを作成することが可能です。

ここで示したコード例は基本的なものですが、これを基に、認証情報の安全な管理、複数機器への対応、詳細なエラーハンドリング、そして結果の多様な出力形式への対応などを実装していくことで、より堅牢で実践的な自動化ツールへと発展させることができます。

ネットワーク自動化は、インフラ運用の効率化と品質向上に不可欠な要素です。ぜひPythonスキルを活かして、ネットワークの自動化に取り組んでみてください。