Files
data_pipeline/db_con/connection.py
T
Ankit Malik 8aaae1e27d 3rd commit
2026-06-12 15:39:39 +05:30

87 lines
1.8 KiB
Python

from dotenv import load_dotenv
from sqlalchemy import create_engine, text
from sqlalchemy.engine import URL, Engine
import os
import clickhouse_connect
from log import *
load_dotenv()
# SQL Server
MSSQL_SERVER = os.getenv("MSSQL_SERVER")
MSSQL_DB = os.getenv("MSSQL_DB")
MSSQL_USER = os.getenv("MSSQL_USER")
MSSQL_PASS = os.getenv("MSSQL_PASS")
# ClickHouse
CH_HOST = os.getenv("CH_HOST")
CH_PORT = int(os.getenv("CH_PORT", "8123"))
CH_USER = os.getenv("CH_USER")
CH_PASS = os.getenv("CH_PASS")
CH_DB = os.getenv("CH_DB")
def build_sql_server_engine() -> Engine:
odbc_dsn = (
f"DRIVER={{ODBC Driver 18 for SQL Server}};"
f"SERVER={MSSQL_SERVER};"
f"DATABASE={MSSQL_DB};"
f"UID={MSSQL_USER};"
f"PWD={MSSQL_PASS};"
"TrustServerCertificate=yes;"
)
sql_url = URL.create(
drivername="mssql+pyodbc",
query={"odbc_connect": odbc_dsn},
)
engine = create_engine(
sql_url,
pool_size=3,
max_overflow=2,
pool_pre_ping=True,
connect_args={
"fast_executemany": True,
},
)
with engine.connect() as conn:
log.info(conn.execute(text("SELECT 1")))
return engine
def build_clickhouse_engine() -> Engine:
clickhouse_url = URL.create(
drivername="clickhouse+http",
username=CH_USER,
password=CH_PASS,
host=CH_HOST,
port=CH_PORT,
database=CH_DB,
)
engine = create_engine(
clickhouse_url,
pool_pre_ping=True,
)
with engine.connect() as conn:
log.info( conn.execute(text("SELECT 1")))
return engine
def get_clickhouse_client():
return clickhouse_connect.get_client(
host=CH_HOST,
port=CH_PORT,
username=CH_USER,
password=CH_PASS,
database=CH_DB
)