zbx-tg-bot/telegram/message.py

53 lines
1.6 KiB
Python

import logging as log
import aiohttp
from config import conf
async def send_message(
message: str,
) -> dict | None:
url = f"https://api.telegram.org/bot{conf.tgbot.token}/sendMessage"
params = {
"chat_id": conf.tgbot.chat_id,
"message_thread_id": conf.tgbot.tread_id,
"text": message,
}
async with aiohttp.ClientSession() as session:
async with session.post(
url,
json=params,
) as response:
log.info(f"Response status: {response.status}")
resp = await response.json()
if response.status == 200:
log.info(f"Message with ID: {resp['result']['message_id']} send")
return {
"status": response.status,
"msg_id": resp["result"]["message_id"],
}
log.warning(f"Message not send. Response status: {response.status}")
async def del_message(
message_id: int,
) -> dict | None:
url = f"https://api.telegram.org/bot{conf.tgbot.token}/deleteMessage"
async with aiohttp.ClientSession() as session:
async with session.post(
url,
json={
"chat_id": conf.tgbot.chat_id,
"message_id": message_id,
},
) as response:
if response.status == 200:
log.info(f"Message ID {message_id} deleted")
return {
"status": response.status,
}
else:
log.warning(f"Response status: {response.status}")