osnova-api-alert/telegram/message.py

54 lines
1.6 KiB
Python

import logging as log
import aiohttp
from config import conf
async def send_message(
text: str,
chat_id: int,
message_thread_id: int | None = None,
) -> dict | None:
url = f"https://api.telegram.org/bot{conf.tg.bot_token}/sendMessage"
params = {
"chat_id": chat_id,
"text": text,
}
if message_thread_id:
params["message_thread_id"] = message_thread_id
async with aiohttp.ClientSession() as session:
async with session.post(
url,
json=params,
) as response:
log.info(f"Response status: {response.status}")
resp = await response.json()
if response.status == 200:
log.info(f"Message with ID: {resp['result']['message_id']} send")
return {
"status": response.status,
"msg_id": resp["result"]["message_id"],
}
log.warning(f"Message not send. Response status: {response.status}")
async def del_message(
message_id: int,
chat_id: int,
) -> dict | None:
url = f"https://api.telegram.org/bot{conf.tg.bot_token}/deleteMessage"
async with aiohttp.ClientSession() as session:
async with session.post(
url,
json={
"chat_id": chat_id,
"message_id": message_id,
},
) as response:
if response.status == 200:
log.info(f"Message ID {message_id} deleted")
return {
"status": response.status,
}
log.warning(f"Response status: {response.status}")