実践ネット自動化スクリプト集

現場で使えるPython:ネットワーク機器への並列操作で作業時間を短縮

Tags: Python, ネットワーク自動化, 並列処理, Nornir, concurrent.futures

はじめに:なぜネットワーク自動化に並列処理が必要なのか

近年、インフラストラクチャの規模は拡大し、管理対象となるネットワーク機器の数も増加の一途をたどっています。設定変更、状態確認、ログ収集といった日常的な運用タスクを、機器ごとに手作業で実行することは非現実的になりつつあります。このような背景から、Pythonを活用したネットワーク自動化への関心が高まっています。

Pythonによる自動化は多くのメリットをもたらしますが、単にスクリプトを作成して機器ごとに順番に実行するだけでは、対象機器数が増えるにつれて全体の処理時間が膨大になるという課題に直面します。特に、ネットワーク機器への接続やコマンド実行には一定の待ち時間が発生するため、直列処理ではこの待ち時間の合計が大きなボトルネックとなります。

この課題を解決するために、ネットワーク自動化においては「並列処理」が非常に重要になります。複数の機器に対して同時に接続し、コマンドを実行することで、全体の処理時間を大幅に短縮することが可能です。

この記事では、Pythonを用いてネットワーク機器への操作を並列化するための基本的な考え方と、具体的なライブラリやフレームワークを活用した実践的な手法をご紹介します。Pythonスキルを活かして、ネットワーク運用作業の効率を飛躍的に向上させるためのヒントを提供します。

並列処理の基本的な考え方とネットワーク自動化への適用

計算機科学における並列処理には、主に「マルチプロセス」と「マルチスレッド」、そして「非同期IO」といったアプローチがあります。

ネットワーク機器への操作は、機器との接続確立、コマンド送信、結果受信など、I/O待ち時間が大半を占める典型的なI/Oバウンドな処理です。そのため、マルチスレッドや非同期IOといった手法がネットワーク自動化における並列処理には特に有効です。

Python標準ライブラリやサードパーティライブラリは、これらの並列処理を実現するための様々なツールを提供しています。

Python標準ライブラリによる並列実行の検討

Python標準ライブラリに含まれる concurrent.futuresasyncio は、並列処理や並行処理を実現するための強力な機能を提供します。

concurrent.futures を使った簡単な並列処理

concurrent.futures は、スレッドプールやプロセスプールを使ってタスクを並列実行するための高レベルなインターフェースを提供します。ThreadPoolExecutorを使用すれば、I/Oバウンドなネットワーク処理を比較的容易に並列化できます。

import concurrent.futures
import time
import subprocess # 簡単な例としてpingを使用

# ネットワーク機器のリスト(IPアドレスやホスト名)を想定
devices = ["8.8.8.8", "1.1.1.1", "google.com", "yahoo.co.jp", "invalid.domain"]

def check_ping(device):
    """指定された機器にpingを実行し、結果を返す関数"""
    print(f"{device} へのpingを開始します...")
    try:
        # OSコマンドとしてpingを実行
        # timeout秒以内に完了しない場合は例外発生
        # Windowsの場合は '-n 1', Linux/macOSは '-c 1' を使用
        param = '-n' if platform.system().lower() == 'windows' else '-c'
        command = ['ping', param, '1', device]
        result = subprocess.run(command, capture_output=True, text=True, timeout=5)

        if result.returncode == 0:
            status = "Success"
        else:
            status = f"Failed (Return Code: {result.returncode})"
            # エラー出力も取得
            if result.stderr:
                status += f", Error: {result.stderr.strip()}"
            elif result.stdout:
                 status += f", Output: {result.stdout.strip()}"

        print(f"{device} のpingが完了しました ({status}).")
        return device, status

    except subprocess.TimeoutExpired:
        print(f"{device} へのpingがタイムアウトしました.")
        return device, "Timeout"
    except Exception as e:
        print(f"{device} へのpingでエラーが発生しました: {e}")
        return device, f"Error: {e}"

import platform

if __name__ == "__main__":
    start_time = time.time()
    results = {}

    # 最大worker数を指定してThreadPoolExecutorを作成
    # worker数は同時に実行するタスク数に相当
    max_workers = 3 # 例として同時に3つまで実行

    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        # 各機器に対するタスクをexecutorに投入
        future_to_device = {executor.submit(check_ping, device): device for device in devices}

        # 完了したタスクから順に結果を取得
        for future in concurrent.futures.as_completed(future_to_device):
            device = future_to_device[future]
            try:
                device, status = future.result() # ここでタスクの完了を待つ
                results[device] = status
            except Exception as exc:
                # タスク実行中に例外が発生した場合
                results[device] = f"Generated an exception: {exc}"

    end_time = time.time()

    print("\n--- 結果 ---")
    for device, status in results.items():
        print(f"{device}: {status}")

    print(f"\n全処理時間: {end_time - start_time:.2f}秒")

この例では、concurrent.futures.ThreadPoolExecutor を使用して、複数の機器へのpingチェックを並列で実行しています。executor.submit() で関数と引数を渡すとタスクがキューイングされ、executorが管理するスレッドプールで順次実行されます。concurrent.futures.as_completed() を使うと、完了したタスクから結果を非同期的に取得できます。

これはシンプルな例ですが、check_ping 関数をネットワーク機器へのSSH接続やAPI呼び出しを行う関数に置き換えれば、実際のネットワーク操作を並列化できます。例えば、Netmikoを使ってshow versionコマンドを実行する関数を並列実行するといった応用が考えられます。

asyncio による非同期ネットワーク操作

asyncio は非同期IOのためのフレームワークで、協調的マルチタスクを実現します。async/await シンタックスを用いて、I/O待ちが発生する部分で処理を別のタスクに切り替えることができます。

asyncio をネットワーク自動化に活用するには、対象のライブラリが asyncio に対応している必要があります。例えば、非同期SSHライブラリである asyncssh や、asyncio に対応したネットワーク自動化ライブラリなどが利用できます。

# asyncsshを使った非同期SSH接続の概念的なコード例
import asyncio
import asyncssh
import time

# ネットワーク機器のリスト(IPアドレス、ユーザー名、パスワードなどを想定)
# 実際の認証情報は安全な方法で管理する必要があります
devices = [
    {'host': '192.168.1.1', 'username': 'user', 'password': 'password'},
    {'host': '192.168.1.2', 'username': 'user', 'password': 'password'},
    {'host': '192.168.1.3', 'username': 'user', 'password': 'password'},
]

async def send_command(device, command):
    """指定された機器にSSH接続し、コマンドを実行する非同期関数"""
    print(f"{device['host']} に接続を開始します...")
    try:
        async with asyncssh.connect(device['host'], username=device['username'], password=device['password']) as conn:
            print(f"{device['host']} に接続しました。コマンド '{command}' を実行します。")
            result = await conn.run(command)
            print(f"{device['host']} でコマンド実行が完了しました。")
            return device['host'], result.stdout.strip(), result.stderr.strip()
    except asyncssh.Error as exc:
        print(f"{device['host']} への接続またはコマンド実行でエラーが発生しました: {exc}")
        return device['host'], None, f"Error: {exc}"
    except Exception as e:
         print(f"{device['host']} で予期せぬエラーが発生しました: {e}")
         return device['host'], None, f"Unexpected Error: {e}"

async def main():
    """複数の機器に非同期でコマンドを実行するメイン関数"""
    start_time = time.time()
    tasks = []
    command = "show ip interface brief" # 実行したいコマンド

    # 各機器に対する非同期タスクを作成
    for device in devices:
        task = asyncio.create_task(send_command(device, command))
        tasks.append(task)

    # 全てのタスクが完了するのを待つ
    results = await asyncio.gather(*tasks)

    end_time = time.time()

    print("\n--- 結果 ---")
    for host, stdout, stderr in results:
        print(f"\n--- {host} ---")
        if stdout:
            print("STDOUT:")
            print(stdout)
        if stderr:
            print("STDERR:")
            print(stderr)
        if not stdout and not stderr:
             print("No output or error.")

    print(f"\n全処理時間: {end_time - start_time:.2f}秒")

if __name__ == "__main__":
    # Windows環境でasyncioのイベントループポリシーを設定
    # この設定がないと非同期処理が正しく動作しない場合があります
    import platform
    if platform.system() == "Windows":
        asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

    asyncio.run(main())

asyncio を使用することで、ネットワーク通信の待ち時間中に他の機器への処理を進めることが可能になり、高いスループットを実現できます。ただし、asyncio を使う場合は、使用するライブラリが非同期に対応している必要があります。Netmiko自体は同期ライブラリですが、Nornirのようなフレームワークが内部で非同期処理をラップして並列実行を可能にしているケースもあります。

ネットワーク自動化フレームワークによる並列処理 (Nornirの活用)

特定のネットワーク自動化フレームワークは、並列処理をフレームワークの機能として内包しており、より容易に複数機器への操作を並列化できます。代表的な例がNornirです。

NornirはPythonで書かれたネットワーク自動化フレームワークであり、インベントリ管理、プラグインによる拡張性、そしてデフォルトで並列処理をサポートしている点が特徴です。対象機器のリスト(インベントリ)と実行したいタスク(関数)を定義するだけで、Nornirが内部で並列処理を管理してくれます。

以下はNornirを使って複数機器から情報を並列取得する基本的なコード例です。

まず、Nornirの設定ファイル(config.yaml)とインベントリファイル(inventory.yaml)を準備します。

config.yaml:

---
# シンプルな設定例
inventory:
  plugin: SimpleInventory
  options:
    host_file: "inventory.yaml"
    group_file: "groups.yaml" # 必要に応じてグループファイルも
    # defaults_file: "defaults.yaml" # 必要に応じてデフォルトファイルも

# 並列実行の設定 (optional: 指定しない場合はCPU数などに基づいて自動調整)
runner:
  plugin: threaded
  options:
    num_workers: 10 # 同時に接続・実行する最大機器数

inventory.yaml:

---
# ホスト定義
host1.local:
  hostname: 192.168.1.1
  platform: cisco_ios # 機器の種類を指定
  username: user
  password: password

host2.local:
  hostname: 192.168.1.2
  platform: juniper_junos
  username: user
  password: password

# 必要に応じてさらに機器を追加

次に、Nornirを使ってタスクを実行するPythonスクリプトを作成します。NetmikoはNornirのプラグインとしてよく利用され、NornirがNetmikoへの接続を並列で処理します。

from nornir import InitNornir
from nornir_netmiko.tasks import netmiko_send_command
from nornir.core.filter import F
import time
import logging

# Nornirのログレベルを設定 (デフォルトはWARNING)
# DEBUGにすると詳細な実行ログを確認できます
logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)

def get_interface_status(task):
    """
    Nornirタスク関数:
    各機器で 'show ip interface brief' コマンドを実行し、結果を返す
    """
    log.info(f"Connecting to {task.host.hostname} ({task.host.platform})...")
    # netmiko_send_command タスクを実行
    # Nornirが自動的に対象ホストへの接続を確立し、コマンドを実行
    result = task.run(task=netmiko_send_command, command_string="show ip interface brief")

    # 結果は Result オブジェクトの result 属性に格納される(通常は標準出力)
    command_output = result.result
    log.info(f"Received output from {task.host.hostname}.")
    return command_output # この関数の戻り値がNornirのResultオブジェクトに格納される

if __name__ == "__main__":
    start_time = time.time()

    # Nornirを初期化 (config.yamlを読み込む)
    nr = InitNornir(config_file="config.yaml")

    # 必要に応じて特定の機器やグループにフィルタリング
    # filtered_nr = nr.filter(F(platform="cisco_ios"))

    # Nornirの run メソッドを使ってタスクを実行
    # get_interface_status 関数がインベントリ内の各機器に対して並列実行される
    # run() メソッドは Result オブジェクトを返す
    results = nr.run(task=get_interface_status)

    end_time = time.time()

    print("\n--- 結果 ---")
    # 結果をホストごとに表示
    # results は AggregatedResult オブジェクトであり、各ホストの結果(Resultオブジェクト)を保持する
    # 各ホストのResultオブジェクトは、タスク関数の戻り値を格納している
    for host, result in results.items():
        print(f"\n--- {host} ---")
        # タスク関数の戻り値にアクセス
        # ここでは get_interface_status 関数の戻り値(コマンド出力文字列)を取得
        if result.failed:
            print(f"Task failed: {result.exception()}")
        else:
            # 正常終了の場合、タスク関数の戻り値は Result オブジェクトの result 属性に格納される
            print(result.result)

    print(f"\n全処理時間: {end_time - start_time:.2f}秒")

    # 失敗したタスクの情報を表示することも可能
    if results.failed:
        print("\n--- 失敗した機器 ---")
        print(results.failed_hosts)

この例では、InitNornir で設定を読み込み、nr.run() メソッドに並列実行したいタスク関数 (get_interface_status) を渡すだけです。Nornirがインベントリに基づいて対象機器を特定し、config.yaml で指定された num_workers に従って、netmiko_send_command プラグイン(内部でNetmikoを使用)を使ったSSH接続とコマンド実行を並列で行います。

Nornirを利用することで、並列処理の低レベルな実装(スレッド/プロセス管理、非同期ループ)から解放され、ネットワークタスクそのものの記述に集中できます。また、インベントリ管理機能があるため、対象機器の管理も容易になります。

実践的な考慮点

ネットワーク自動化で並列処理を導入する際には、以下の実践的な考慮点が重要です。

まとめ

Pythonを使ったネットワーク自動化において、複数機器に対する操作の並列化は、作業効率を劇的に向上させるための重要な手法です。Python標準ライブラリのconcurrent.futuresasyncioを活用することで、基本的な並列実行を実現できます。さらに、Nornirのようなネットワーク自動化フレームワークを利用することで、並列処理の複雑さを抽象化し、より効率的かつ堅牢な自動化スクリプトを開発することが可能です。

この記事で紹介した手法や考慮点を参考に、ぜひご自身のネットワーク環境における自動化スクリプトに並列処理を組み込んでみてください。並列処理は、ネットワーク運用業務における時間のかかる反復作業から解放され、より戦略的な業務に集中するための強力なツールとなるでしょう。

今後、ネットワークの規模がさらに拡大し、IaCツールとの連携が進むにつれて、大量のネットワーク機器を効率的に管理・操作する技術はますます重要になります。今回ご紹介した並列処理の概念と実践的なアプローチが、皆様の現場での自動化推進の一助となれば幸いです。