zbx-tg-bot/telegram/message.py

76 lines
2.5 KiB
Python

import logging as log
import aiohttp
from config import conf
import json
async def send_message(message: str, event_id: int) -> dict:
url = f"https://api.telegram.org/bot{conf.tgbot.token}/sendMessage"
inline_buttons = [
[
{"text": "🛠 на 1 час", "callback_data": f"h{event_id}"},
{"text": "🛠 на 1 день", "callback_data": f"d{event_id}"},
{"text": "✅ Закрыть", "callback_data": f"c{event_id}"},
],
]
reply_markup = {"inline_keyboard": inline_buttons}
params = {
"chat_id": conf.tgbot.chat_id,
"message_thread_id": conf.tgbot.tread_id,
"text": message,
"reply_markup": json.dumps(reply_markup),
}
async with aiohttp.ClientSession() as session:
try:
async with session.post(
url,
json=params,
) as response:
log.info(f"Response status: {response.status}")
resp = await response.json()
if response.status == 200:
log.info(f"Message with ID: {resp['result']['message_id']} send")
return {
"status": response.status,
"msg_id": resp["result"]["message_id"],
}
else:
log.warning(f"Message not send. Response status: {response.status}")
return {"status": response.status}
except Exception as e:
log.warning(f"Exception: {e}")
return {"status": e}
async def del_message(
message_id: int,
) -> dict:
url = f"https://api.telegram.org/bot{conf.tgbot.token}/deleteMessage"
async with aiohttp.ClientSession() as session:
try:
async with session.post(
url,
json={
"chat_id": conf.tgbot.chat_id,
"message_id": message_id,
},
) as response:
if response.status == 200:
log.info(f"Message ID {message_id} deleted")
return {
"status": response.status,
}
else:
log.warning(
f"Message ID {message_id} NOT deleted. Response status: {response.status}"
)
return {
"status": response.status,
}
except Exception as e:
log.warning(f"Exception: {e}")
return {"status": e}