from netmiko import ConnectHandler, exceptions from config import cfg import logging as log import os import re from repo import add_file_and_commit def read_device_list() -> dict[str, dict[str, str]]: bcp_dev_file = os.path.join( os.path.join( os.path.abspath(os.path.dirname(__file__)), "config", ), cfg.bcp.file, ) log.info('start load device from "%s"', bcp_dev_file) all_devices = {} with open(bcp_dev_file, "r") as f: dev_list_from_file: list[str] = f.read().splitlines() for line in dev_list_from_file: match = re.match( cfg.bcp.pattern, line, ) if match: all_devices[match.group("name")] = { "name": match.group("name"), "ip": match.group("ip"), "vendor": match.group("vendor"), "model": match.group("model"), } log.info("success load %s devices", len(all_devices)) return all_devices def connect_to_device(vendor: str, host: str, name: str="") -> ConnectHandler: log_dir: str = os.path.join(os.path.abspath(os.path.dirname(__file__)), "logs") conn_conf: dict = { "host": host, "username": cfg.net_dev.user, "password": cfg.net_dev.pwd, "port": cfg.net_dev.ssh_port, } if cfg.net_dev.debug: conn_conf["session_log"] = os.path.join(log_dir, 'session_log.log') if vendor.lower() == "mikrotik": conn_conf["device_type"] = "mikrotik_routeros" if cfg.net_dev.domain is not None: conn_conf["username"] = cfg.net_dev.user + "@" + cfg.net_dev.domain elif vendor.lower() == "cisco_sb": conn_conf["device_type"] = "cisco_s300" if cfg.net_dev.domain is not None: conn_conf["username"] = cfg.net_dev.domain + "\\" + cfg.net_dev.user elif vendor.lower() == "snr": conn_conf["device_type"] = "cisco_ios" if cfg.net_dev.domain is not None: conn_conf["username"] = cfg.net_dev.domain + "\\" + cfg.net_dev.user elif vendor.lower() == "cisco": conn_conf["device_type"] = "cisco_ios" if cfg.net_dev.domain is not None: conn_conf["username"] = cfg.net_dev.domain + "\\" + cfg.net_dev.user try: connection: ConnectHandler = ConnectHandler(**conn_conf) log.info("connect to %s %s", name, host) return connection except exceptions.NetmikoAuthenticationException: log.warning("Authentication error %s %s ", name, host) except exceptions.NetmikoTimeoutException: log.warning("Connection time out error %s %s ", name, host) except Exception as e: log.warning("Connection error %s %s: %s", name, host, e) def send_command(connection: ConnectHandler, command: str) -> str: log.info("send command") try: result: str = connection.send_command(command, read_timeout=cfg.net_dev.read_timeout, delay_factor=2) log.info('command send success') except exceptions.NetmikoTimeoutException: result = "NetmikoTimeoutException" except exceptions.ReadTimeout: result = "ReadTimeout" return result def save_mikrotik_bcp(host: str, name: str) -> None: connection = connect_to_device( vendor="mikrotik", host=host, name=name, ) if connection is None: return result = send_command(connection, "export terse show-sensitive") connection.disconnect() log.info("disconnected from %r", name) if result == "NetmikoTimeoutException": log.warning("Timeout error %r", name) return elif result == "ReadTimeout": log.warning("Timeout read config from %r", name) return result = "\n".join(result.split("\n")[1:]) with open(os.path.join(cfg.bcp.dir, name), "w") as f: f.write(result) log.info("Backup saved") add_file_and_commit(file_name=name) def save_snr_bcp(host: str, name: str) -> None: connection = connect_to_device( vendor="snr", host=host, ) if connection is None: return result = send_command(connection, "show running-config") connection.disconnect() log.info("disconnected from %r", name) if result == "NetmikoTimeoutException": log.warning("Timeout error %r", name) return elif result == "ReadTimeout": log.warning("Timeout read config from %r", name) return with open(os.path.join(cfg.bcp.dir, name), "w") as f: f.write(result) log.info("Backup saved") add_file_and_commit(file_name=name) def save_cisco_sb_bcp(host: str, name: str) -> None: connection = connect_to_device( vendor="cisco_sb", host=host, ) if connection is None: return result = send_command(connection, "show running-config") connection.disconnect() log.info("disconnected from %s", name) if result == "NetmikoTimeoutException": log.warning("Timeout error %r", name) return elif result == "ReadTimeout": log.warning("Timeout read config from %r", name) return with open(os.path.join(cfg.bcp.dir, name), "w") as f: f.write(result) log.info("Backup saved") add_file_and_commit(file_name=name) def save_cisco_bcp(host: str, name: str) -> None: connection = connect_to_device( vendor="cisco", host=host, ) if connection is None: return result = send_command(connection, "show running-config") connection.disconnect() log.info("disconnected from %r", name) if result == "NetmikoTimeoutException": log.warning("Timeout error %r", name) return elif result == "ReadTimeout": log.warning("Timeout read config from %r", name) return with open(os.path.join(cfg.bcp.dir, name), "w") as f: f.write(result) log.info("Backup saved") add_file_and_commit(file_name=name)