osnova-api-alert/routers/from_zbx.py

51 lines
1.4 KiB
Python
Raw Normal View History

2024-10-10 10:55:50 +00:00
from fastapi import APIRouter, Depends
import logging as log
from schemas import (
zbxMessageToDashboard,
zbxMessageToNetworkChat,
)
from auth import verify_token_zabbix
from redis_db import set_value, pop_value
from config import conf
import re
from telegram import send_message
from telegram import del_message
router = APIRouter()
@router.post("/send-to-dashboard")
async def send_message_to_dashboard(
message: zbxMessageToDashboard,
token: str = Depends(verify_token_zabbix),
):
match = re.search(conf.zbx.close_alert_pattern, message.text)
log.info(f"match: {match}")
if match:
msg_id = await pop_value(message.problem_id)
if msg_id:
msg_id = int(msg_id.decode("utf-8"))
await del_message(message_id=msg_id, chat_id=conf.tg.chat_id)
return
result = await send_message(
text=message.subject + "\n\n" + message.text,
chat_id=conf.tg.chat_id,
message_thread_id=conf.tg.dashboard_tred_id,
)
if result and result["status"] == 200:
await set_value(message.problem_id, result["msg_id"])
@router.post("/send-to-net-chat")
async def send_message_to_net_chat(
message: zbxMessageToNetworkChat,
token: str = Depends(verify_token_zabbix),
):
await send_message(
text=message.subject + "\n\n" + message.text,
chat_id=conf.tg.chat_id,
message_thread_id=conf.tg.net_tred_id,
)