osnova-api-alert/redis_db/r_helper.py

32 lines
903 B
Python

from redis.asyncio import Redis
from redis import ConnectionError
from config import conf
import logging as log
class RedisManager:
def __init__(self):
self.client = None
self.connect_params = {
"host": conf.redis.host,
"port": conf.redis.port,
}
if conf.redis.pwd:
self.connect_params["password"] = conf.redis.pwd
async def __aenter__(self):
self.client = Redis(**self.connect_params)
try:
await self.client.ping()
log.info("connected to Redis")
return self
except ConnectionError:
log.warning("failed to connect to Redis")
self.client = None
return None
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.client:
await self.client.close()
log.info("closed connection to Redis")