現場で使えるPython:ネットワーク機器への並列操作で作業時間を短縮
はじめに:なぜネットワーク自動化に並列処理が必要なのか
近年、インフラストラクチャの規模は拡大し、管理対象となるネットワーク機器の数も増加の一途をたどっています。設定変更、状態確認、ログ収集といった日常的な運用タスクを、機器ごとに手作業で実行することは非現実的になりつつあります。このような背景から、Pythonを活用したネットワーク自動化への関心が高まっています。
Pythonによる自動化は多くのメリットをもたらしますが、単にスクリプトを作成して機器ごとに順番に実行するだけでは、対象機器数が増えるにつれて全体の処理時間が膨大になるという課題に直面します。特に、ネットワーク機器への接続やコマンド実行には一定の待ち時間が発生するため、直列処理ではこの待ち時間の合計が大きなボトルネックとなります。
この課題を解決するために、ネットワーク自動化においては「並列処理」が非常に重要になります。複数の機器に対して同時に接続し、コマンドを実行することで、全体の処理時間を大幅に短縮することが可能です。
この記事では、Pythonを用いてネットワーク機器への操作を並列化するための基本的な考え方と、具体的なライブラリやフレームワークを活用した実践的な手法をご紹介します。Pythonスキルを活かして、ネットワーク運用作業の効率を飛躍的に向上させるためのヒントを提供します。
並列処理の基本的な考え方とネットワーク自動化への適用
計算機科学における並列処理には、主に「マルチプロセス」と「マルチスレッド」、そして「非同期IO」といったアプローチがあります。
- マルチプロセス: 複数の独立したプロセスを起動し、それぞれが異なるタスクを実行します。CPUコアを最大限に活用できるため、CPUバウンドな処理(計算量の多い処理)に適しています。ただし、プロセスの生成やプロセス間の通信にはオーバーヘッドがあります。
- マルチスレッド: 同一プロセス内で複数のスレッドを生成し、それぞれがタスクを実行します。プロセスの生成より軽量で、リソースの共有が容易です。I/Oバウンドな処理(外部との通信待ちなど)に適していますが、PythonのGlobal Interpreter Lock(GIL)により、CPUバウンドな処理の真の並列実行には制限があります。
- 非同期IO: タスクの完了を待つ間に別のタスクに切り替えることで、効率的にI/O待ち時間を活用する手法です。シングルプロセス/シングルスレッドでも高い並行性を実現できます。ネットワーク通信のようなI/O待ちが多い処理に非常に適しています。
ネットワーク機器への操作は、機器との接続確立、コマンド送信、結果受信など、I/O待ち時間が大半を占める典型的なI/Oバウンドな処理です。そのため、マルチスレッドや非同期IOといった手法がネットワーク自動化における並列処理には特に有効です。
Python標準ライブラリやサードパーティライブラリは、これらの並列処理を実現するための様々なツールを提供しています。
Python標準ライブラリによる並列実行の検討
Python標準ライブラリに含まれる concurrent.futures
や asyncio
は、並列処理や並行処理を実現するための強力な機能を提供します。
concurrent.futures
を使った簡単な並列処理
concurrent.futures
は、スレッドプールやプロセスプールを使ってタスクを並列実行するための高レベルなインターフェースを提供します。ThreadPoolExecutorを使用すれば、I/Oバウンドなネットワーク処理を比較的容易に並列化できます。
import concurrent.futures
import time
import subprocess # 簡単な例としてpingを使用
# ネットワーク機器のリスト(IPアドレスやホスト名)を想定
devices = ["8.8.8.8", "1.1.1.1", "google.com", "yahoo.co.jp", "invalid.domain"]
def check_ping(device):
"""指定された機器にpingを実行し、結果を返す関数"""
print(f"{device} へのpingを開始します...")
try:
# OSコマンドとしてpingを実行
# timeout秒以内に完了しない場合は例外発生
# Windowsの場合は '-n 1', Linux/macOSは '-c 1' を使用
param = '-n' if platform.system().lower() == 'windows' else '-c'
command = ['ping', param, '1', device]
result = subprocess.run(command, capture_output=True, text=True, timeout=5)
if result.returncode == 0:
status = "Success"
else:
status = f"Failed (Return Code: {result.returncode})"
# エラー出力も取得
if result.stderr:
status += f", Error: {result.stderr.strip()}"
elif result.stdout:
status += f", Output: {result.stdout.strip()}"
print(f"{device} のpingが完了しました ({status}).")
return device, status
except subprocess.TimeoutExpired:
print(f"{device} へのpingがタイムアウトしました.")
return device, "Timeout"
except Exception as e:
print(f"{device} へのpingでエラーが発生しました: {e}")
return device, f"Error: {e}"
import platform
if __name__ == "__main__":
start_time = time.time()
results = {}
# 最大worker数を指定してThreadPoolExecutorを作成
# worker数は同時に実行するタスク数に相当
max_workers = 3 # 例として同時に3つまで実行
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# 各機器に対するタスクをexecutorに投入
future_to_device = {executor.submit(check_ping, device): device for device in devices}
# 完了したタスクから順に結果を取得
for future in concurrent.futures.as_completed(future_to_device):
device = future_to_device[future]
try:
device, status = future.result() # ここでタスクの完了を待つ
results[device] = status
except Exception as exc:
# タスク実行中に例外が発生した場合
results[device] = f"Generated an exception: {exc}"
end_time = time.time()
print("\n--- 結果 ---")
for device, status in results.items():
print(f"{device}: {status}")
print(f"\n全処理時間: {end_time - start_time:.2f}秒")
この例では、concurrent.futures.ThreadPoolExecutor
を使用して、複数の機器へのpingチェックを並列で実行しています。executor.submit()
で関数と引数を渡すとタスクがキューイングされ、executorが管理するスレッドプールで順次実行されます。concurrent.futures.as_completed()
を使うと、完了したタスクから結果を非同期的に取得できます。
これはシンプルな例ですが、check_ping
関数をネットワーク機器へのSSH接続やAPI呼び出しを行う関数に置き換えれば、実際のネットワーク操作を並列化できます。例えば、Netmikoを使ってshow version
コマンドを実行する関数を並列実行するといった応用が考えられます。
asyncio
による非同期ネットワーク操作
asyncio
は非同期IOのためのフレームワークで、協調的マルチタスクを実現します。async/await
シンタックスを用いて、I/O待ちが発生する部分で処理を別のタスクに切り替えることができます。
asyncio
をネットワーク自動化に活用するには、対象のライブラリが asyncio
に対応している必要があります。例えば、非同期SSHライブラリである asyncssh
や、asyncio
に対応したネットワーク自動化ライブラリなどが利用できます。
# asyncsshを使った非同期SSH接続の概念的なコード例
import asyncio
import asyncssh
import time
# ネットワーク機器のリスト(IPアドレス、ユーザー名、パスワードなどを想定)
# 実際の認証情報は安全な方法で管理する必要があります
devices = [
{'host': '192.168.1.1', 'username': 'user', 'password': 'password'},
{'host': '192.168.1.2', 'username': 'user', 'password': 'password'},
{'host': '192.168.1.3', 'username': 'user', 'password': 'password'},
]
async def send_command(device, command):
"""指定された機器にSSH接続し、コマンドを実行する非同期関数"""
print(f"{device['host']} に接続を開始します...")
try:
async with asyncssh.connect(device['host'], username=device['username'], password=device['password']) as conn:
print(f"{device['host']} に接続しました。コマンド '{command}' を実行します。")
result = await conn.run(command)
print(f"{device['host']} でコマンド実行が完了しました。")
return device['host'], result.stdout.strip(), result.stderr.strip()
except asyncssh.Error as exc:
print(f"{device['host']} への接続またはコマンド実行でエラーが発生しました: {exc}")
return device['host'], None, f"Error: {exc}"
except Exception as e:
print(f"{device['host']} で予期せぬエラーが発生しました: {e}")
return device['host'], None, f"Unexpected Error: {e}"
async def main():
"""複数の機器に非同期でコマンドを実行するメイン関数"""
start_time = time.time()
tasks = []
command = "show ip interface brief" # 実行したいコマンド
# 各機器に対する非同期タスクを作成
for device in devices:
task = asyncio.create_task(send_command(device, command))
tasks.append(task)
# 全てのタスクが完了するのを待つ
results = await asyncio.gather(*tasks)
end_time = time.time()
print("\n--- 結果 ---")
for host, stdout, stderr in results:
print(f"\n--- {host} ---")
if stdout:
print("STDOUT:")
print(stdout)
if stderr:
print("STDERR:")
print(stderr)
if not stdout and not stderr:
print("No output or error.")
print(f"\n全処理時間: {end_time - start_time:.2f}秒")
if __name__ == "__main__":
# Windows環境でasyncioのイベントループポリシーを設定
# この設定がないと非同期処理が正しく動作しない場合があります
import platform
if platform.system() == "Windows":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
asyncio.run(main())
asyncio
を使用することで、ネットワーク通信の待ち時間中に他の機器への処理を進めることが可能になり、高いスループットを実現できます。ただし、asyncio
を使う場合は、使用するライブラリが非同期に対応している必要があります。Netmiko自体は同期ライブラリですが、Nornirのようなフレームワークが内部で非同期処理をラップして並列実行を可能にしているケースもあります。
ネットワーク自動化フレームワークによる並列処理 (Nornirの活用)
特定のネットワーク自動化フレームワークは、並列処理をフレームワークの機能として内包しており、より容易に複数機器への操作を並列化できます。代表的な例がNornirです。
NornirはPythonで書かれたネットワーク自動化フレームワークであり、インベントリ管理、プラグインによる拡張性、そしてデフォルトで並列処理をサポートしている点が特徴です。対象機器のリスト(インベントリ)と実行したいタスク(関数)を定義するだけで、Nornirが内部で並列処理を管理してくれます。
以下はNornirを使って複数機器から情報を並列取得する基本的なコード例です。
まず、Nornirの設定ファイル(config.yaml
)とインベントリファイル(inventory.yaml
)を準備します。
config.yaml
:
---
# シンプルな設定例
inventory:
plugin: SimpleInventory
options:
host_file: "inventory.yaml"
group_file: "groups.yaml" # 必要に応じてグループファイルも
# defaults_file: "defaults.yaml" # 必要に応じてデフォルトファイルも
# 並列実行の設定 (optional: 指定しない場合はCPU数などに基づいて自動調整)
runner:
plugin: threaded
options:
num_workers: 10 # 同時に接続・実行する最大機器数
inventory.yaml
:
---
# ホスト定義
host1.local:
hostname: 192.168.1.1
platform: cisco_ios # 機器の種類を指定
username: user
password: password
host2.local:
hostname: 192.168.1.2
platform: juniper_junos
username: user
password: password
# 必要に応じてさらに機器を追加
次に、Nornirを使ってタスクを実行するPythonスクリプトを作成します。NetmikoはNornirのプラグインとしてよく利用され、NornirがNetmikoへの接続を並列で処理します。
from nornir import InitNornir
from nornir_netmiko.tasks import netmiko_send_command
from nornir.core.filter import F
import time
import logging
# Nornirのログレベルを設定 (デフォルトはWARNING)
# DEBUGにすると詳細な実行ログを確認できます
logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)
def get_interface_status(task):
"""
Nornirタスク関数:
各機器で 'show ip interface brief' コマンドを実行し、結果を返す
"""
log.info(f"Connecting to {task.host.hostname} ({task.host.platform})...")
# netmiko_send_command タスクを実行
# Nornirが自動的に対象ホストへの接続を確立し、コマンドを実行
result = task.run(task=netmiko_send_command, command_string="show ip interface brief")
# 結果は Result オブジェクトの result 属性に格納される(通常は標準出力)
command_output = result.result
log.info(f"Received output from {task.host.hostname}.")
return command_output # この関数の戻り値がNornirのResultオブジェクトに格納される
if __name__ == "__main__":
start_time = time.time()
# Nornirを初期化 (config.yamlを読み込む)
nr = InitNornir(config_file="config.yaml")
# 必要に応じて特定の機器やグループにフィルタリング
# filtered_nr = nr.filter(F(platform="cisco_ios"))
# Nornirの run メソッドを使ってタスクを実行
# get_interface_status 関数がインベントリ内の各機器に対して並列実行される
# run() メソッドは Result オブジェクトを返す
results = nr.run(task=get_interface_status)
end_time = time.time()
print("\n--- 結果 ---")
# 結果をホストごとに表示
# results は AggregatedResult オブジェクトであり、各ホストの結果(Resultオブジェクト)を保持する
# 各ホストのResultオブジェクトは、タスク関数の戻り値を格納している
for host, result in results.items():
print(f"\n--- {host} ---")
# タスク関数の戻り値にアクセス
# ここでは get_interface_status 関数の戻り値(コマンド出力文字列)を取得
if result.failed:
print(f"Task failed: {result.exception()}")
else:
# 正常終了の場合、タスク関数の戻り値は Result オブジェクトの result 属性に格納される
print(result.result)
print(f"\n全処理時間: {end_time - start_time:.2f}秒")
# 失敗したタスクの情報を表示することも可能
if results.failed:
print("\n--- 失敗した機器 ---")
print(results.failed_hosts)
この例では、InitNornir
で設定を読み込み、nr.run()
メソッドに並列実行したいタスク関数 (get_interface_status
) を渡すだけです。Nornirがインベントリに基づいて対象機器を特定し、config.yaml
で指定された num_workers
に従って、netmiko_send_command
プラグイン(内部でNetmikoを使用)を使ったSSH接続とコマンド実行を並列で行います。
Nornirを利用することで、並列処理の低レベルな実装(スレッド/プロセス管理、非同期ループ)から解放され、ネットワークタスクそのものの記述に集中できます。また、インベントリ管理機能があるため、対象機器の管理も容易になります。
実践的な考慮点
ネットワーク自動化で並列処理を導入する際には、以下の実践的な考慮点が重要です。
- 並列数の決定: 同時に実行するタスク数(スレッド数、非同期タスク数、Nornirのworker数)は、実行元サーバのリソース(CPU、メモリ)とネットワーク環境(帯域幅、機器側の同時接続上限)を考慮して決定する必要があります。過剰な並列数は、かえってサーバやネットワークに負荷をかけ、処理速度の低下や機器の応答停止を引き起こす可能性があります。まずは少数から試し、徐々に増やしていくのが安全です。
- エラーハンドリングとリカバリ: 並列処理では、一部の機器でエラーが発生しても他の機器での処理は続行されることが一般的です。特定機器での接続失敗、コマンドエラー、タイムアウトなどが発生した場合に、全体処理を中断するか、エラー情報を記録して続行するかといったエラーハンドリングの戦略が必要です。Nornirのようなフレームワークは、失敗したホストの情報やエラーメッセージを自動的に収集する機能を提供しています。
- タイムアウト設定: ネットワーク通信は不安定になる可能性があります。接続確立、認証、コマンド実行、結果受信など、各ステップで適切なタイムアウトを設定することで、ハングアップを防ぎ、エラー発生時に早期に検知できます。Netmikoやasyncsshなどのライブラリはタイムアウトオプションを提供しています。
- 認証情報の安全な管理: 多数の機器への接続には認証情報が必要です。スクリプト内に直接ハードコードするのではなく、環境変数、設定ファイル(権限を制限)、外部のシークレット管理ツール(HashiCorp Vaultなど)を利用するなど、安全な方法で管理することが不可欠です。Nornirはインベントリファイルでの管理や外部プラグインとの連携でこの課題に対応できます。
- 冪等性: 特に設定変更を並列で行う場合、処理が複数回実行されてもシステムの状態が変わらない「冪等性」が重要です。冪等性のない操作を並列で行うと、意図しない結果を招く可能性があります。設定変更の自動化においては、冪等性を考慮した設計(例: 設定差分のみを適用、desired state model)を行うか、冪等性を保証するツール(Ansibleなど)を検討します。
- ログ出力: 並列処理の状況を把握するためには、適切なログ出力が不可欠です。どの機器に対してどのような処理が実行され、成功したか失敗したか、エラー内容は何かといった情報を詳細に記録することで、問題発生時の原因特定が容易になります。ホスト名を含めたログ出力や、ログレベルに応じた出力制御が役立ちます。
まとめ
Pythonを使ったネットワーク自動化において、複数機器に対する操作の並列化は、作業効率を劇的に向上させるための重要な手法です。Python標準ライブラリのconcurrent.futures
やasyncio
を活用することで、基本的な並列実行を実現できます。さらに、Nornirのようなネットワーク自動化フレームワークを利用することで、並列処理の複雑さを抽象化し、より効率的かつ堅牢な自動化スクリプトを開発することが可能です。
この記事で紹介した手法や考慮点を参考に、ぜひご自身のネットワーク環境における自動化スクリプトに並列処理を組み込んでみてください。並列処理は、ネットワーク運用業務における時間のかかる反復作業から解放され、より戦略的な業務に集中するための強力なツールとなるでしょう。
今後、ネットワークの規模がさらに拡大し、IaCツールとの連携が進むにつれて、大量のネットワーク機器を効率的に管理・操作する技術はますます重要になります。今回ご紹介した並列処理の概念と実践的なアプローチが、皆様の現場での自動化推進の一助となれば幸いです。