実践ネット自動化スクリプト集

現場で使えるPython:インターフェースエラー・使用率の自動収集と閾値監視スクリプト

Tags: Python, Netmiko, ネットワーク監視, インターフェース, 閾値監視

はじめに

ネットワークの安定稼働において、各機器のインターフェース状態を把握することは非常に重要です。インターフェースのエラーカウンタ増加や使用率の異常な上昇は、潜在的な問題やパフォーマンス低下の兆候である可能性があります。しかし、多数の機器やインターフェースを手動で監視するのは非効率であり、見落としが発生するリスクも伴います。

本記事では、Pythonを活用してネットワーク機器のインターフェース統計情報(エラーカウンタや使用率など)を自動的に収集し、事前に設定した閾値を超過した場合に異常を検知・通知するスクリプトの実装方法について解説します。Pythonの高いスキルをお持ちでありながら、ネットワーク機器の直接的な操作に不慣れな読者の皆様が、現場での運用効率化に役立てられるような実践的な内容を目指します。

なぜインターフェース統計情報の自動収集が必要か

ネットワーク機器のインターフェースには、送受信されたパケット数だけでなく、エラーパケット数、ドロップパケット数、帯域使用率などの様々な統計情報が含まれています。これらの情報は、以下のような状況の早期発見に繋がります。

これらの異常を手動で定期的に確認することは現実的ではありません。Pythonによる自動化を取り入れることで、監視漏れを防ぎつつ、異常発生時に迅速な対応が可能となります。

必要なライブラリと基本的な手法

ネットワーク機器から情報を取得するためには、主にSSH経由でCLIコマンドを実行する方法が一般的です。Pythonには、SSH接続を容易にするための強力なライブラリがいくつか存在します。

本記事では、多くのネットワーク機器に対応しており、CLIコマンドの実行結果取得に特化したnetmikoライブラリを中心に解説を進めます。また、取得したテキストデータを解析し、必要な数値を抽出するためにPythonの文字列操作や正規表現を使用します。より複雑なパースが必要な場合は、TextFSMやTTPといったパーサーツールも検討できますが、今回は基本的な方法で解説します。

その他、数値データを処理するために標準ライブラリ、通知にはSlack APIなどを利用するケースを想定します(通知部分の具体的な実装例は、環境依存が大きいため概念的な説明に留める場合があります)。

必要なライブラリはpipでインストールできます。

pip install netmiko paramiko
# 必要に応じて
# pip install slack_sdk

インターフェース統計情報を取得する

まずは、Netmikoを使ってネットワーク機器に接続し、インターフェースの統計情報を取得する基本的なスクリプトを作成します。多くの機器ではshow interface <interface_name>またはshow interfacesコマンドで詳細な情報を確認できます。

ここでは、特定のインターフェースの情報を取得する例を示します。

import os
from netmiko import ConnectHandler
from getpass import getpass

# 接続情報
# 実際の運用では認証情報は安全な方法で管理してください
device_info = {
    "device_type": "cisco_ios", # 適切なデバイスタイプを指定
    "host": "your_device_ip",
    "username": os.environ.get("NET_USERNAME", "admin"), # 環境変数からの読み込み例
    "password": os.environ.get("NET_PASSWORD", getpass("Enter password: ")),
    # "secret": "enable_password", # enableパスワードが必要な場合
}

interface_name = "GigabitEthernet1/0/1" # 監視対象のインターフェース名

def get_interface_stats(device, interface):
    """
    指定されたインターフェースの統計情報をCLIから取得する

    Args:
        device (dict): Netmiko接続情報を含む辞書
        interface (str): インターフェース名

    Returns:
        str: CLIコマンドの実行結果、またはエラーメッセージ
    """
    try:
        print(f"Connecting to {device['host']}...")
        with ConnectHandler(**device) as net_connect:
            # enableモードが必要な場合
            # net_connect.enable()

            command = f"show interface {interface}"
            print(f"Executing command: {command}")
            output = net_connect.send_command(command)

            # コマンド実行に失敗した場合などの基本的なチェック
            if "% Invalid input" in output or "command not found" in output:
                 print(f"Error executing command on {device['host']}: Invalid input or command not found")
                 return None
            if "Interface" not in output and interface not in output:
                 print(f"Error executing command on {device['host']}: Interface '{interface}' not found or output format unexpected")
                 return None

            return output

    except Exception as e:
        print(f"Error connecting or executing command on {device['host']}: {e}")
        return None

if __name__ == "__main__":
    stats_output = get_interface_stats(device_info, interface_name)
    if stats_output:
        print("\n--- CLI Output ---")
        print(stats_output)
        print("------------------")
    else:
        print("Failed to get interface stats.")

このスクリプトは、指定したネットワーク機器にSSH接続し、show interfaceコマンドを実行して結果を表示します。認証情報は環境変数から取得するか、対話式に入力するようにしています。実際の運用では、よりセキュアな方法(例: Vaultなどの秘密情報管理ツール)で管理することを強く推奨します。

また、デバイスタイプはcisco_ios以外にも多数のベンダー/OSに対応していますので、対象機器に合わせて適宜変更してください。簡単なエラーハンドリングも加えていますが、現場ではより網羅的なエラー対応が必要です。

取得データから必要な値を抽出する

CLIコマンドの出力は整形されていないテキストデータです。この中から、エラーカウンタや使用率といった必要な数値を抽出する必要があります。インターフェースコマンドの出力形式はベンダーやOSのバージョンによって異なりますが、ここでは一般的なCisco IOSの出力を想定した簡易的なパース例を示します。

抽出したい情報は例えば以下の部分です。

...
Input queue: 0/375/0/0 (size/max/drops/flushes); Total output drops: 0
  5 minute input rate 0 bits/sec, 0 packets/sec
  5 minute output rate 0 bits/sec, 0 packets/sec
     1000000000 bits/sec, 10000 packets/sec
     Input errors: 0, CRC: 0, frame: 0, overrun: 0, ignored: 0, abort: 0
     Output errors: 0, collisions: 0, interface resets: 0
     Unknown protocol drops: 0, Rate-limit drops: 0
...

ここから、「Input errors」「Output errors」「5 minute input rate」「5 minute output rate」「帯域幅 (bits/sec)」などを抽出します。正規表現を使うと、特定のパターンに一致する行から数値を抽出できます。

import re

def parse_interface_stats(cli_output):
    """
    show interface コマンドの出力から必要な統計情報を抽出する(簡易版)

    Args:
        cli_output (str): show interface コマンドの実行結果テキスト

    Returns:
        dict: 抽出した統計情報を含む辞書、またはNone
    """
    if not cli_output:
        return None

    stats = {}

    # Input/Output Errors
    error_match = re.search(r"Input errors: (\d+).*?Output errors: (\d+)", cli_output, re.DOTALL)
    if error_match:
        stats['input_errors'] = int(error_match.group(1))
        stats['output_errors'] = int(error_match.group(2))

    # 5 minute rate (bps)
    input_rate_match = re.search(r"5 minute input rate (\d+) bits/sec", cli_output)
    if input_rate_match:
        stats['input_rate_bps'] = int(input_rate_match.group(1))

    output_rate_match = re.search(r"5 minute output rate (\d+) bits/sec", cli_output)
    if output_rate_match:
        stats['output_rate_bps'] = int(output_rate_match.group(1))

    # Bandwidth (bps)
    # 機器によって表示形式が異なる場合があるため注意
    bw_match = re.search(r"(\d+) bits/sec.*?, \d+ packets/sec", cli_output)
    if bw_match:
         stats['bandwidth_bps'] = int(bw_match.group(1))
    else:
         # descriptionやspeedコマンドで設定された速度が別行に表示される場合など、
         # 別のパターンや取得方法が必要になることがあります
         pass # 別のパースロジックを追加するか、手動設定などで補完

    # 使用率計算(簡易版:5分平均レート / 帯域幅 * 100)
    if 'input_rate_bps' in stats and 'bandwidth_bps' in stats and stats['bandwidth_bps'] > 0:
        stats['input_util_percent'] = round((stats['input_rate_bps'] / stats['bandwidth_bps']) * 100, 2)
    if 'output_rate_bps' in stats and 'bandwidth_bps' in stats and stats['bandwidth_bps'] > 0:
        stats['output_util_percent'] = round((stats['output_rate_bps'] / stats['bandwidth_bps']) * 100, 2)

    return stats

# 前述のget_interface_stats関数で取得した出力を使用
if __name__ == "__main__":
    # get_interface_statsで取得した stats_output を想定
    dummy_cli_output = """
GigabitEthernet1/0/1 is up, line protocol is up
  Hardware is Gigabit Ethernet, address is aaaa.bbbb.cccc (bia aaaa.bbbb.cccc)
  Description: Link to ServerX
  Internet address is 192.168.1.254/24
  MTU 1500 bytes, BW 1000000 Kbit/sec, DLY 10 usec,
     reliability 255/255, txload 1/255, rxload 1/255
  Encapsulation ARPA, Loopback not set
  Keepalive set (10 sec)
  Full Duplex, 1000Mbps, media type 10/100/1000BaseTX
  input flow-control is off, output flow-control is unsupported
  ARP type: ARPA, ARP Timeout 04:00:00
  Last input 00:00:05, output 00:00:00, output hang never
  Last clearing of "show interface" counters never
  Input queue: 0/375/0/0 (size/max/drops/flushes); Total output drops: 0
  Queueing strategy: fifo
  Output queue: 0/40 (size/max)
  5 minute input rate 1000000 bits/sec, 1 packets/sec
  5 minute output rate 500000000 bits/sec, 50000 packets/sec
     1000000000 bits/sec, 10000 packets/sec <--- ここからBandwidth取得
     Input errors: 15, CRC: 10, frame: 0, overrun: 0, ignored: 5, abort: 0
     Output errors: 2, collisions: 0, interface resets: 0
     Unknown protocol drops: 0, Rate-limit drops: 0
     Counters 64bit
     Last clearing of rate counters never
"""
    parsed_stats = parse_interface_stats(dummy_cli_output)
    if parsed_stats:
        print("\n--- Parsed Stats ---")
        print(parsed_stats)
        print("--------------------")
    else:
        print("Failed to parse stats.")

このparse_interface_stats関数は、正規表現を使ってCLI出力から特定の数値を抽出します。正規表現のパターンは、対象機器の実際の出力に合わせて調整が必要です。帯域幅(Bandwidth)の表示形式も機器や設定によって異なるため、複数のパターンに対応したり、別のコマンド(例: show interface <interface_name> | include Bandwidth)で補完することも考えられます。

使用率の計算は、多くの機器で表示される「5 minute input/output rate」と「Bandwidth」から単純計算しています。これはあくまで簡易的な指標であり、実際の帯域使用状況を正確に把握するには、SNMPによる詳細なデータ収集や、トラフィックフロー情報の活用などがより適している場合があります。

閾値判定と異常通知の実装

抽出した統計情報を使って、定義した閾値と比較し、異常があれば通知を行います。

閾値は、例えば以下のように定義できます。

エラーカウンタは通常リセットされないため、「増加したこと」を検知するのが一般的です。そのため、前回の実行時のカウンタ値をどこかに保存しておく必要があります。ファイルやデータベース、Redisなどに保存する方法が考えられます。ここでは簡単のため、メモリ上で前回の値を保持する例(ただし、スクリプト実行ごとにリセットされるため、定期実行には不向き)として概念を示しつつ、永続化の必要性に言及します。

使用率の閾値は、帯域枯渇の兆候を捉えるために設定します。

異常検出時の通知方法は様々ですが、開発・運用チームが日常的に利用しているSlackやMicrosoft Teams、または監視システムへのイベント連携(ZabbixのAPI、PrometheusのPushgatewayなど)が考えられます。ここではSlackへの通知を想定したコードの骨子を示します。

import os
import json # 設定ファイル例として使用
# from slack_sdk import WebClient # Slack通知ライブラリ

# 閾値設定 (本来は設定ファイル等から読み込む)
threshold_config = {
    "interface_thresholds": {
        "input_errors": "increase", # 'increase' または 具体的な数値
        "output_errors": "increase",
        "input_util_percent": 80,
        "output_util_percent": 80,
    }
}

# 前回の統計情報(永続化が必要な部分)
# 実際の運用ではファイルやDBに保存し、次回の実行時に読み込む
previous_stats = {} # 例: {"GigabitEthernet1/0/1": {"input_errors": 0, "output_errors": 0}}

def check_thresholds(interface_name, current_stats, previous_stats_data, thresholds):
    """
    現在の統計情報を閾値と比較し、異常があれば通知メッセージを生成する

    Args:
        interface_name (str): インターフェース名
        current_stats (dict): 現在の統計情報辞書
        previous_stats_data (dict): 前回の統計情報辞書
        thresholds (dict): 閾値設定辞書

    Returns:
        list: 通知すべき異常メッセージのリスト
    """
    alerts = []
    prev_stats = previous_stats_data.get(interface_name, {})

    # エラーカウンタのチェック
    for error_type in ['input_errors', 'output_errors']:
        if error_type in current_stats and error_type in thresholds:
            current_value = current_stats[error_type]
            threshold_setting = thresholds[error_type]

            if threshold_setting == "increase":
                prev_value = prev_stats.get(error_type, current_value) # 初回は増加なしとみなす
                if current_value > prev_value:
                    alerts.append(f"[{interface_name}] {error_type.replace('_', ' ').title()} increased from {prev_value} to {current_value}")
            elif isinstance(threshold_setting, int):
                 # 特定の数値を超えた場合を検知することも可能(あまり一般的ではないが)
                 if current_value > threshold_setting:
                     alerts.append(f"[{interface_name}] {error_type.replace('_', ' ').title()} ({current_value}) exceeded threshold ({threshold_setting})")

    # 使用率のチェック
    for util_type in ['input_util_percent', 'output_util_percent']:
         if util_type in current_stats and util_type in thresholds:
             current_value = current_stats[util_type]
             threshold_value = thresholds[util_type]
             if current_value > threshold_value:
                 alerts.append(f"[{interface_name}] {util_type.replace('_percent', '%').replace('_', ' ').title()} ({current_value:.2f}%) exceeded threshold ({threshold_value}%)")

    return alerts

# 通知関数(Slackを想定)
def send_alert_notification(messages):
    """
    異常メッセージを通知先に送信する(例:Slack)

    Args:
        messages (list): 通知メッセージのリスト
    """
    if not messages:
        return

    # Slack通知の例(slack_sdkを使用する場合)
    # try:
    #     slack_token = os.environ.get("SLACK_BOT_TOKEN")
    #     if not slack_token:
    #         print("SLACK_BOT_TOKEN environment variable not set. Skipping Slack notification.")
    #         return
    #
    #     client = WebClient(token=slack_token)
    #     # 通知先のチャンネルID
    #     channel_id = "your_slack_channel_id"
    #
    #     notification_text = "ネットワークインターフェース異常検知:\n" + "\n".join(messages)
    #
    #     response = client.chat_postMessage(channel=channel_id, text=notification_text)
    #     print(f"Slack notification sent: {response['ts']}")
    #
    # except Exception as e:
    #     print(f"Failed to send Slack notification: {e}")

    # ここでは標準出力に表示する代替処理
    print("\n--- ALERTS ---")
    for msg in messages:
        print(msg)
    print("--------------")


if __name__ == "__main__":
    # 実際のスクリプトでは、前回の統計情報をファイルやDBから読み込む
    # 例:前回の情報ファイルが存在すれば読み込む
    previous_stats_file = "previous_stats.json"
    if os.path.exists(previous_stats_file):
        try:
            with open(previous_stats_file, "r") as f:
                previous_stats = json.load(f)
            print(f"Loaded previous stats from {previous_stats_file}")
        except Exception as e:
            print(f"Failed to load previous stats: {e}")
            previous_stats = {} # ロード失敗時は空にする

    # === ここからメインの処理 ===
    # 実際の運用では、監視対象の全機器/全インターフェースに対してループ処理を行います
    # 例として、前述の dummy_cli_output と device_info を使用

    device_info = {
        "device_type": "cisco_ios",
        "host": "your_device_ip",
        "username": os.environ.get("NET_USERNAME", "admin"),
        "password": os.environ.get("NET_PASSWORD", "password"), # Example
    }
    interface_name = "GigabitEthernet1/0/1"

    # 1. 統計情報を取得
    # 実際のget_interface_stats関数呼び出し
    # stats_output = get_interface_stats(device_info, interface_name)
    # デモ用にダミー出力を使用
    dummy_cli_output = """... (前述のダミー出力テキスト) ..."""
    stats_output = dummy_cli_output

    current_stats = None
    if stats_output:
        # 2. 取得データをパース
        current_stats = parse_interface_stats(stats_output)

    if current_stats:
        print(f"Current stats for {interface_name}: {current_stats}")

        # 3. 閾値判定
        alerts = check_thresholds(interface_name, current_stats, previous_stats, threshold_config["interface_thresholds"])

        # 4. 異常があれば通知
        send_alert_notification(alerts)

        # 5. 現在の統計情報を次回の実行のために保存(永続化処理)
        # 実際の運用では、複数インターフェース/機器の情報をまとめて保存
        previous_stats[interface_name] = current_stats
        try:
             with open(previous_stats_file, "w") as f:
                 json.dump(previous_stats, f, indent=4)
             print(f"Saved current stats to {previous_stats_file}")
        except Exception as e:
             print(f"Failed to save previous stats: {e}")

    else:
        print(f"Could not process stats for {interface_name}")


このスクリプトでは、取得した統計情報と定義した閾値を比較し、異常が見つかればメッセージを生成します。check_thresholds関数は、エラーカウンタの増加(前回の値との比較)と使用率の閾値超過をチェックします。

重要な点として、エラーカウンタの「増加」を検知するためには、前回のスクリプト実行時のカウンタ値を保持しておく必要があります。上記のコードでは、previous_statsという辞書を使用していますが、スクリプトが終了するとメモリ上のデータは失われます。定期実行スクリプトとして運用するためには、このデータをファイル、データベース、またはKVS(Key-Value Store)などに永続化し、次回の実行時に読み込む実装が必要です。例としてJSONファイルへの保存/読み込みの骨子を追加しました。

使用率の計算に使用する帯域幅(Bandwidth)も、コマンド出力から正確に取得できない場合があるため、設計時には代替手段(例: インベントリ情報として手動で管理、SNMP情報の利用など)も考慮する必要があります。

通知部分(send_alert_notification関数)は、コメントアウトされたSlack通知の例として示しています。実際の通知方法は、利用されている監視ツールや連携システムに応じて実装してください。標準出力への表示は、デバッグや簡易的な実行確認に役立ちます。

スクリプトの定期実行と運用

作成したスクリプトを定期的に実行するためには、OSのスケジューリング機能を利用します。Linux/macOSではcron、Windowsではタスクスケジューラが一般的です。

例えば、Linuxで5分ごとに実行する場合のcronエントリは以下のようになります。

*/5 * * * * /usr/bin/python3 /path/to/your_script.py

定期実行する際には、以下の点に注意が必要です。

これらの考慮点を踏まえることで、より現場で利用可能な実用的な自動化スクリプトとなります。

より高度な監視システムとの連携

本記事で紹介したようなPythonスクリプトは、小規模な環境での監視や、特定のニッチな要件を満たすためのカスタム監視として非常に有効です。しかし、大規模な環境や、長期的なメトリクス蓄積、グラフ化、依存関係の管理といった機能が必要な場合は、Zabbix、Prometheus、Nagiosといった専用の監視システムを導入することが一般的です。

作成したスクリプトは、これらの監視システムへのデータ連携やイベント連携の手段としても活用できます。例えば、

といった連携パターンが考えられます。このように、Pythonスクリプトは単体での自動化ツールとしてだけでなく、既存の運用基盤と連携するGlueコードとしても重要な役割を果たします。

まとめ

本記事では、Pythonとnetmikoライブラリを使用して、ネットワーク機器のインターフェース統計情報(エラーカウンタ、使用率)を自動収集し、閾値監視と異常通知を行うスクリプトの基本的な実装方法について解説しました。

CLIコマンドの実行結果から必要な情報をパースし、Pythonで数値データとして扱うことで、手動では困難な継続的な状態監視を効率化できます。エラーカウンタの増加や使用率の閾値超過を早期に検知し通知することで、潜在的なネットワーク問題を迅速に把握し、サービス影響を最小限に抑えることが期待できます。

実践的な運用には、認証情報の安全な管理、堅牢なエラーハンドリング、詳細なロギング、そして定期実行のための仕組みが必要です。さらに、複数機器への対応や、より高度な監視ツールとの連携を検討することで、自動化の効果を最大限に高めることができます。

今回解説した内容は、ネットワーク自動化のほんの一例です。取得する情報の種類や監視のロジックを応用することで、様々なネットワーク状態の自動チェックに応用可能です。ぜひ、皆様の現場の課題解決にPythonによるネットワーク自動化をご活用ください。