55 lines
1.5 KiB
Python
55 lines
1.5 KiB
Python
|
import socket
|
||
|
import re
|
||
|
from typing import List, Dict, Union
|
||
|
import logging
|
||
|
|
||
|
|
||
|
def check_conn(ip: str, port: int) -> bool:
|
||
|
logging.info("")
|
||
|
conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||
|
conn.settimeout(3)
|
||
|
try:
|
||
|
conn.connect((ip, port))
|
||
|
logging.info(f"{ip}:{port} - True")
|
||
|
return True
|
||
|
except TimeoutError:
|
||
|
logging.warning(f"{ip}:{port} - Timeout")
|
||
|
return False
|
||
|
|
||
|
|
||
|
def parse_input(text: str) -> List[Dict[str, Union[int, str]]]:
|
||
|
logging.info("")
|
||
|
pattern = r"Error;.*?(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}):(\d+)/(\w+)"
|
||
|
matches = re.findall(pattern, text)
|
||
|
result = [
|
||
|
{"ip": match[0], "port": int(match[1]), "protocol": match[2]}
|
||
|
for match in matches
|
||
|
]
|
||
|
return result
|
||
|
|
||
|
|
||
|
def start_check(
|
||
|
text: str,
|
||
|
result_type: str = "json",
|
||
|
) -> List[Dict[str, Union[int, str, bool]]] | List[str]:
|
||
|
logging.info("")
|
||
|
result = []
|
||
|
for i in parse_input(text):
|
||
|
if i["protocol"] == "tcp":
|
||
|
if check_conn(i["ip"], i["port"]):
|
||
|
i["result"] = True
|
||
|
result.append(i)
|
||
|
else:
|
||
|
i["result"] = False
|
||
|
result.append(i)
|
||
|
|
||
|
if result_type == "text":
|
||
|
results = []
|
||
|
for i in result:
|
||
|
if i["result"]:
|
||
|
results.append(f"{i['ip']}:{i['port']}/{i['protocol']} - OK")
|
||
|
else:
|
||
|
results.append(f"{i['ip']}:{i['port']}/{i['protocol']} - FAIL")
|
||
|
return results
|
||
|
return result
|