173 lines
5.5 KiB
Python
173 lines
5.5 KiB
Python
from netmiko import ConnectHandler, exceptions
|
|
from config import cfg
|
|
import logging as log
|
|
import os
|
|
import re
|
|
from repo import add_file_and_commit
|
|
|
|
|
|
def read_device_list() -> dict[str, dict[str, str]]:
|
|
|
|
bcp_dev_file = os.path.join(
|
|
os.path.join(
|
|
os.path.abspath(os.path.dirname(__file__)),
|
|
"config",
|
|
),
|
|
cfg.bcp.file,
|
|
)
|
|
log.info('start load device from "%s"', bcp_dev_file)
|
|
|
|
all_devices = {}
|
|
with open(bcp_dev_file, "r") as f:
|
|
dev_list_from_file: list[str] = f.read().splitlines()
|
|
for line in dev_list_from_file:
|
|
match = re.match(
|
|
cfg.bcp.pattern,
|
|
line,
|
|
)
|
|
if match:
|
|
all_devices[match.group("name")] = {
|
|
"name": match.group("name"),
|
|
"ip": match.group("ip"),
|
|
"vendor": match.group("vendor"),
|
|
"model": match.group("model"),
|
|
}
|
|
log.info("success load %s devices", len(all_devices))
|
|
return all_devices
|
|
|
|
|
|
def connect_to_device(vendor: str, host: str, name: str="") -> ConnectHandler:
|
|
log_dir: str = os.path.join(os.path.abspath(os.path.dirname(__file__)), "logs")
|
|
|
|
conn_conf: dict = {
|
|
"host": host,
|
|
"username": cfg.net_dev.user,
|
|
"password": cfg.net_dev.pwd,
|
|
"port": cfg.net_dev.ssh_port,
|
|
}
|
|
if cfg.net_dev.debug:
|
|
conn_conf["session_log"] = os.path.join(log_dir, 'session_log.log')
|
|
if vendor.lower() == "mikrotik":
|
|
conn_conf["device_type"] = "mikrotik_routeros"
|
|
if cfg.net_dev.domain is not None:
|
|
conn_conf["username"] = cfg.net_dev.user + "@" + cfg.net_dev.domain
|
|
elif vendor.lower() == "cisco_sb":
|
|
conn_conf["device_type"] = "cisco_s300"
|
|
if cfg.net_dev.domain is not None:
|
|
conn_conf["username"] = cfg.net_dev.domain + "\\" + cfg.net_dev.user
|
|
elif vendor.lower() == "snr":
|
|
conn_conf["device_type"] = "cisco_ios"
|
|
if cfg.net_dev.domain is not None:
|
|
conn_conf["username"] = cfg.net_dev.domain + "\\" + cfg.net_dev.user
|
|
elif vendor.lower() == "cisco":
|
|
conn_conf["device_type"] = "cisco_ios"
|
|
if cfg.net_dev.domain is not None:
|
|
conn_conf["username"] = cfg.net_dev.domain + "\\" + cfg.net_dev.user
|
|
try:
|
|
connection: ConnectHandler = ConnectHandler(**conn_conf)
|
|
log.info("connect to %s %s", name, host)
|
|
return connection
|
|
except exceptions.NetmikoAuthenticationException:
|
|
log.warning("Authentication error %s %s ", name, host)
|
|
except exceptions.NetmikoTimeoutException:
|
|
log.warning("Connection time out error %s %s ", name, host)
|
|
except Exception as e:
|
|
log.warning("Connection error %s %s: %s", name, host, e)
|
|
|
|
|
|
def send_command(connection: ConnectHandler, command: str) -> str:
|
|
log.info("send command")
|
|
try:
|
|
result: str = connection.send_command(command, read_timeout=cfg.net_dev.read_timeout)
|
|
log.info('command send success')
|
|
except exceptions.NetmikoTimeoutException:
|
|
result = "NetmikoTimeoutException"
|
|
except exceptions.ReadTimeout:
|
|
result = "ReadTimeout"
|
|
return result
|
|
|
|
|
|
def save_mikrotik_bcp(host: str, name: str) -> None:
|
|
|
|
connection = connect_to_device(
|
|
vendor="mikrotik",
|
|
host=host,
|
|
name=name,
|
|
)
|
|
if connection is None:
|
|
return
|
|
result = send_command(connection, "export terse show-sensitive")
|
|
connection.disconnect()
|
|
log.info("disconnected from %r", name)
|
|
if result == "NetmikoTimeoutException":
|
|
log.warning("Timeout error %r", name)
|
|
return
|
|
elif result == "ReadTimeout":
|
|
log.warning("Timeout read config from %r", name)
|
|
return
|
|
|
|
result = "\n".join(result.split("\n")[1:])
|
|
with open(os.path.join(cfg.bcp.dir, name), "w") as f:
|
|
f.write(result)
|
|
log.info("Backup saved")
|
|
|
|
add_file_and_commit(file_name=name)
|
|
|
|
|
|
def save_snr_bcp(host: str, name: str) -> None:
|
|
connection = connect_to_device(
|
|
vendor="snr",
|
|
host=host,
|
|
)
|
|
if connection is None:
|
|
return
|
|
result = send_command(connection, "show running-config")
|
|
connection.disconnect()
|
|
log.info("disconnected from %r", name)
|
|
if result == "NetmikoTimeoutException":
|
|
log.warning("Timeout read config from %s", name)
|
|
return
|
|
with open(os.path.join(cfg.bcp.dir, name), "w") as f:
|
|
f.write(result)
|
|
log.info("Backup saved")
|
|
|
|
add_file_and_commit(file_name=name)
|
|
|
|
|
|
def save_cisco_sb_bcp(host: str, name: str) -> None:
|
|
connection = connect_to_device(
|
|
vendor="cisco_sb",
|
|
host=host,
|
|
)
|
|
if connection is None:
|
|
return
|
|
result = send_command(connection, "show running-config")
|
|
connection.disconnect()
|
|
log.info("disconnected from %s", name)
|
|
if result == "NetmikoTimeoutException":
|
|
log.warning("Timeout read config from %s", name)
|
|
return
|
|
with open(os.path.join(cfg.bcp.dir, name), "w") as f:
|
|
f.write(result)
|
|
log.info("Backup saved")
|
|
add_file_and_commit(file_name=name)
|
|
|
|
|
|
def save_cisco_bcp(host: str, name: str) -> None:
|
|
connection = connect_to_device(
|
|
vendor="cisco",
|
|
host=host,
|
|
)
|
|
if connection is None:
|
|
return
|
|
result = send_command(connection, "show running-config")
|
|
connection.disconnect()
|
|
log.info("disconnected from %r", name)
|
|
if result == "NetmikoTimeoutException":
|
|
log.warning("Timeout read config from %r", name)
|
|
return
|
|
with open(os.path.join(cfg.bcp.dir, name), "w") as f:
|
|
f.write(result)
|
|
log.info("Backup saved")
|
|
add_file_and_commit(file_name=name)
|