net-backup/app/backup.py

220 lines
7.3 KiB
Python

from netmiko import ConnectHandler, exceptions
from config import cfg
import logging as log
import os
import re
from repo import add_file_and_commit
def get_bcp_file_path(name: str) -> [str, str|None]:
if cfg.bcp.in_folder:
match = re.match(cfg.bcp.folder_name_pattern, name)
if match:
result = match.group(1)
bcp_file_path = os.path.join(cfg.bcp.dir, result, name)
if not os.path.exists(os.path.join(cfg.bcp.dir, result)):
os.makedirs(os.path.join(cfg.bcp.dir, result))
log.info("create folder %r", result)
else:
log.info("folder %r exists", result)
return bcp_file_path, result
else:
return os.path.join(cfg.bcp.dir, name), None
else:
return os.path.join(cfg.bcp.dir, name), None
def read_device_list() -> dict[str, dict[str, str]]:
bcp_dev_file = os.path.join(
os.path.join(
os.path.abspath(os.path.dirname(__file__)),
"config",
),
cfg.bcp.file,
)
log.info('start load device from "%s"', bcp_dev_file)
all_devices = {}
with open(bcp_dev_file, "r") as f:
dev_list_from_file: list[str] = f.read().splitlines()
for line in dev_list_from_file:
match = re.match(
cfg.bcp.pattern,
line,
)
if match:
all_devices[match.group("name")] = {
"name": match.group("name"),
"ip": match.group("ip"),
"vendor": match.group("vendor"),
"model": match.group("model"),
}
log.info("success load %s devices", len(all_devices))
return all_devices
def connect_to_device(vendor: str, host: str, name: str="") -> ConnectHandler:
log_dir: str = os.path.join(os.path.abspath(os.path.dirname(__file__)), "logs")
conn_conf: dict = {
"host": host,
"username": cfg.net_dev.user,
"password": cfg.net_dev.pwd,
"port": cfg.net_dev.ssh_port,
}
if cfg.net_dev.debug:
conn_conf["session_log"] = os.path.join(log_dir, 'session_log.log')
if vendor.lower() == "mikrotik":
conn_conf["device_type"] = "mikrotik_routeros"
if cfg.net_dev.domain is not None:
conn_conf["username"] = cfg.net_dev.user + "@" + cfg.net_dev.domain
elif vendor.lower() == "cisco_sb":
conn_conf["device_type"] = "cisco_s300"
if cfg.net_dev.domain is not None:
conn_conf["username"] = cfg.net_dev.domain + "\\" + cfg.net_dev.user
elif vendor.lower() == "snr":
conn_conf["device_type"] = "cisco_ios"
if cfg.net_dev.domain is not None:
conn_conf["username"] = cfg.net_dev.domain + "\\" + cfg.net_dev.user
elif vendor.lower() == "cisco":
conn_conf["device_type"] = "cisco_ios"
if cfg.net_dev.domain is not None:
conn_conf["username"] = cfg.net_dev.domain + "\\" + cfg.net_dev.user
try:
connection: ConnectHandler = ConnectHandler(**conn_conf)
log.info("connect to %s %s %s", vendor, name, host)
return connection
except exceptions.NetmikoAuthenticationException:
log.warning("Authentication error %s %s ", name, host)
except exceptions.NetmikoTimeoutException:
log.warning("Connection time out error %s %s ", name, host)
except Exception as e:
log.warning("Connection error %s %s: %s", name, host, e)
def send_command(connection: ConnectHandler, command: str) -> str:
log.info("send command")
try:
result: str = connection.send_command(command, read_timeout=cfg.net_dev.read_timeout)
log.info('command send success')
except exceptions.NetmikoTimeoutException:
result = "NetmikoTimeoutException"
except exceptions.ReadTimeout:
result = "ReadTimeout"
if len(result.split("\n")) < cfg.bcp.min_string_count:
result = 'NotEnoughLines'
return result
def save_mikrotik_bcp(host: str, name: str) -> None:
connection = connect_to_device(
vendor="mikrotik",
host=host,
name=name,
)
if connection is None:
return
result = send_command(connection, "export terse show-sensitive")
connection.disconnect()
log.info("disconnected from %r", name)
if result == "NetmikoTimeoutException":
log.warning("Timeout error %r", name)
return
elif result == "ReadTimeout":
log.warning("Timeout read config from %r", name)
return
elif result == "NotEnoughLines":
log.warning("Small number of lines %r", name)
return
result = "\n".join(result.split("\n")[1:])
bcp_file_name, file_folder = get_bcp_file_path(name)
with open(bcp_file_name, "w") as f:
f.write(result)
log.info("Backup saved")
add_file_and_commit(file_name=name, file_folder=file_folder)
def save_snr_bcp(host: str, name: str) -> None:
connection = connect_to_device(
vendor="snr",
host=host,
name=name,
)
if connection is None:
return
result = send_command(connection, "show running-config")
connection.disconnect()
log.info("disconnected from %r", name)
if result == "NetmikoTimeoutException":
log.warning("Timeout error %r", name)
return
elif result == "ReadTimeout":
log.warning("Timeout read config from %r", name)
return
elif result == "NotEnoughLines":
log.warning("Small number of lines %r", name)
return
bcp_file_name, file_folder = get_bcp_file_path(name)
with open(bcp_file_name, "w") as f:
f.write(result)
log.info("Backup saved")
add_file_and_commit(file_name=name, file_folder=file_folder)
def save_cisco_sb_bcp(host: str, name: str) -> None:
connection = connect_to_device(
vendor="cisco_sb",
host=host,
name=name,
)
if connection is None:
return
result = send_command(connection, "show running-config")
connection.disconnect()
log.info("disconnected from %s", name)
if result == "NetmikoTimeoutException":
log.warning("Timeout error %r", name)
return
elif result == "ReadTimeout":
log.warning("Timeout read config from %r", name)
return
elif result == "NotEnoughLines":
log.warning("Small number of lines %r", name)
return
bcp_file_name, file_folder = get_bcp_file_path(name)
with open(bcp_file_name, "w") as f:
f.write(result)
log.info("Backup saved")
add_file_and_commit(file_name=name, file_folder=file_folder)
def save_cisco_bcp(host: str, name: str) -> None:
connection = connect_to_device(
vendor="cisco",
host=host,
name=name,
)
if connection is None:
return
result = send_command(connection, "show running-config")
connection.disconnect()
log.info("disconnected from %r", name)
if result == "NetmikoTimeoutException":
log.warning("Timeout error %r", name)
return
elif result == "ReadTimeout":
log.warning("Timeout read config from %r", name)
return
elif result == "NotEnoughLines":
log.warning("Small number of lines %r", name)
return
bcp_file_name, file_folder = get_bcp_file_path(name)
with open(bcp_file_name, "w") as f:
f.write(result)
log.info("Backup saved")
add_file_and_commit(file_name=name, file_folder=file_folder)