osnova-api-alert/redis_db/r_helper.py

36 lines
1.1 KiB
Python

from redis.asyncio import Redis
from config import conf
import logging as log
class RedisClient:
log.info("Connecting to redis_db. %s:%s", conf.redis.host, conf.redis.port)
connection_params = {
"host": conf.redis.host,
"port": conf.redis.port,
}
if conf.redis.login and conf.redis.pwd:
connection_params["username"] = conf.redis.login
connection_params["password"] = conf.redis.pwd
if conf.redis.db:
connection_params["db"] = conf.redis.db
async def __aenter__(self):
self.client = Redis(**self.connection_params)
log.info("Create client")
return self.client
async def __aexit__(self, exc_type, exc_val, exc_tb):
log.info("Close client")
await self.client.close()
async def set_value(self, key, value):
async with self as client:
await client.set(key, value)
log.info("Set %s = %s", key, value)
async def get_value(self, key):
async with self as client:
log.info("Get %s", key)
return await client.get(key)