netadm/scripts/check_available_ports.py

55 lines
1.5 KiB
Python
Raw Permalink Normal View History

2024-10-20 15:53:02 +00:00
import socket
import re
from typing import List, Dict, Union
import logging
def check_conn(ip: str, port: int) -> bool:
logging.info("")
conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
conn.settimeout(3)
try:
conn.connect((ip, port))
logging.info(f"{ip}:{port} - True")
return True
except TimeoutError:
logging.warning(f"{ip}:{port} - Timeout")
return False
def parse_input(text: str) -> List[Dict[str, Union[int, str]]]:
logging.info("")
pattern = r"Error;.*?(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}):(\d+)/(\w+)"
matches = re.findall(pattern, text)
result = [
{"ip": match[0], "port": int(match[1]), "protocol": match[2]}
for match in matches
]
return result
def start_check(
text: str,
result_type: str = "json",
) -> List[Dict[str, Union[int, str, bool]]] | List[str]:
logging.info("")
result = []
for i in parse_input(text):
if i["protocol"] == "tcp":
if check_conn(i["ip"], i["port"]):
i["result"] = True
result.append(i)
else:
i["result"] = False
result.append(i)
if result_type == "text":
results = []
for i in result:
if i["result"]:
results.append(f"{i['ip']}:{i['port']}/{i['protocol']} - OK")
else:
results.append(f"{i['ip']}:{i['port']}/{i['protocol']} - FAIL")
return results
return result