from dotenv import load_dotenv from sqlalchemy import create_engine, text from sqlalchemy.engine import URL, Engine import os import clickhouse_connect from log import * load_dotenv() # SQL Server MSSQL_SERVER = os.getenv("MSSQL_SERVER") MSSQL_DB = os.getenv("MSSQL_DB") MSSQL_USER = os.getenv("MSSQL_USER") MSSQL_PASS = os.getenv("MSSQL_PASS") # ClickHouse CH_HOST = os.getenv("CH_HOST") CH_PORT = int(os.getenv("CH_PORT", "8123")) CH_USER = os.getenv("CH_USER") CH_PASS = os.getenv("CH_PASS") CH_DB = os.getenv("CH_DB") def build_sql_server_engine() -> Engine: odbc_dsn = ( f"DRIVER={{ODBC Driver 18 for SQL Server}};" f"SERVER={MSSQL_SERVER};" f"DATABASE={MSSQL_DB};" f"UID={MSSQL_USER};" f"PWD={MSSQL_PASS};" "TrustServerCertificate=yes;" "Connect Timeout=30;" "Query Timeout=0;" "Keep Alive=30;" ) sql_url = URL.create( drivername="mssql+pyodbc", query={"odbc_connect": odbc_dsn}, ) engine = create_engine( sql_url, pool_size=3, max_overflow=2, pool_pre_ping=True, connect_args={ "fast_executemany": True, }, ) with engine.connect() as conn: log.info(conn.execute(text("SELECT 1"))) return engine def build_clickhouse_engine() -> Engine: clickhouse_url = URL.create( drivername="clickhouse+http", username=CH_USER, password=CH_PASS, host=CH_HOST, port=CH_PORT, database=CH_DB, ) engine = create_engine( clickhouse_url, pool_pre_ping=True, ) with engine.connect() as conn: log.info( conn.execute(text("SELECT 1"))) return engine def get_clickhouse_client(): return clickhouse_connect.get_client( host=CH_HOST, port=CH_PORT, username=CH_USER, password=CH_PASS, database=CH_DB )