Source code for src.connection

"""Redis connection from inside container network."""

import logging
import os
import ssl as ssl_module
from pathlib import Path

import redis

logger = logging.getLogger(__name__)


[docs] def docker_redis_connect() -> redis.Redis: """Connect to redis from inside sandbox container.""" conn_args = { "host": os.getenv("REDIS_HOST"), "port": int(os.getenv("REDIS_PORT")), "db": int(os.getenv("REDIS_DB")), "username": "app", "password": Path("/run/secrets/app_password").read_text().strip(), "decode_responses": True, } mtls_kwargs = { "ssl": True, "ssl_certfile": "/run/secrets/app.pem", "ssl_keyfile": "/run/secrets/app.key", "ssl_ca_certs": "/run/secrets/ca.pem", "ssl_check_hostname": True, "ssl_cert_reqs": ssl_module.CERT_REQUIRED, } logger.info( f"Connecting to redis://{conn_args['username']}:***@{conn_args['host']}:{conn_args['port']}/{conn_args['db']}" ) return redis.Redis( **conn_args, **mtls_kwargs, )