108 lines
3.6 KiB
Python
108 lines
3.6 KiB
Python
import logging as log
|
|
|
|
import aiohttp
|
|
from config import conf
|
|
import json
|
|
|
|
|
|
async def send_message(message: str, event_id: int) -> dict:
|
|
url = f"https://api.telegram.org/bot{conf.tgbot.token}/sendMessage"
|
|
inline_buttons = [
|
|
[
|
|
{"text": "🛠 на 1 час", "callback_data": f"h{event_id}"},
|
|
{"text": "🛠 на 1 день", "callback_data": f"d{event_id}"},
|
|
{"text": "✅ Закрыть", "callback_data": f"c{event_id}"},
|
|
],
|
|
]
|
|
reply_markup = {"inline_keyboard": inline_buttons}
|
|
|
|
params = {
|
|
"chat_id": conf.tgbot.chat_id,
|
|
"message_thread_id": conf.tgbot.tread_id,
|
|
"text": message,
|
|
"reply_markup": json.dumps(reply_markup),
|
|
}
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
try:
|
|
async with session.post(
|
|
url,
|
|
json=params,
|
|
) as response:
|
|
log.info(f"Response status: {response.status}")
|
|
resp = await response.json()
|
|
if response.status == 200:
|
|
log.info(f"Message with ID: {resp['result']['message_id']} send")
|
|
return {
|
|
"status": response.status,
|
|
"msg_id": resp["result"]["message_id"],
|
|
}
|
|
else:
|
|
log.warning(f"Message not send. Response status: {response.status}")
|
|
return {"status": response.status}
|
|
except Exception as e:
|
|
log.warning(f"Exception: {e}")
|
|
return {"status": e}
|
|
|
|
|
|
async def edit_message(
|
|
message_id: int,
|
|
new_message: str,
|
|
) -> dict:
|
|
url = f"https://api.telegram.org/bot{conf.tgbot.token}/editMessageText"
|
|
async with aiohttp.ClientSession() as session:
|
|
try:
|
|
async with session.post(
|
|
url,
|
|
json={
|
|
"chat_id": conf.tgbot.chat_id,
|
|
"message_id": message_id,
|
|
"text": new_message,
|
|
},
|
|
) as response:
|
|
if response.status == 200:
|
|
log.info(f"Message ID {message_id} edited")
|
|
return {
|
|
"status": response.status,
|
|
}
|
|
else:
|
|
log.warning(
|
|
f"Message ID {message_id} NOT edited. Response status: {response.status}"
|
|
)
|
|
return {
|
|
"status": response.status,
|
|
}
|
|
except Exception as e:
|
|
log.warning(f"Exception: {e}")
|
|
return {"status": e}
|
|
|
|
|
|
async def del_message(
|
|
message_id: int,
|
|
) -> dict:
|
|
url = f"https://api.telegram.org/bot{conf.tgbot.token}/deleteMessage"
|
|
async with aiohttp.ClientSession() as session:
|
|
try:
|
|
async with session.post(
|
|
url,
|
|
json={
|
|
"chat_id": conf.tgbot.chat_id,
|
|
"message_id": message_id,
|
|
},
|
|
) as response:
|
|
if response.status == 200:
|
|
log.info(f"Message ID {message_id} deleted")
|
|
return {
|
|
"status": response.status,
|
|
}
|
|
else:
|
|
log.warning(
|
|
f"Message ID {message_id} NOT deleted. Response status: {response.status}"
|
|
)
|
|
return {
|
|
"status": response.status,
|
|
}
|
|
except Exception as e:
|
|
log.warning(f"Exception: {e}")
|
|
return {"status": e}
|