# /// script # requires-python = ">=3.11" # dependencies = [ # "polars>=0.20.0", # "sqlalchemy>=2.0.0", # "pyodbc>=5.0.0", # "clickhouse-connect>=0.7.0", # "python-dotenv>=1.0.0", # "pyarrow>=18.0.0", # ] # /// """ ETL Pipeline: Coverage KPI — SQL Server → Polars → ClickHouse -------------------------------------------------------------- Run with: uv run etl_coverage_kpi.py Why SQLAlchemy instead of raw pyodbc? - pl.read_database() speaks SQLAlchemy Engine natively - Connection pooling built-in (reuses connections, no reconnect overhead) - Parameterised queries use :param syntax — cleaner and SQL-injection safe - One engine object shared across all steps — no passing conn handles around - Works with ANY database backend, not just SQL Server 1. Build one SQLAlchemy engine (the "pipe" to SQL Server) 2. Ask: which store visits happened yesterday? → get MIDs 3. Pull Coverage rows for those MIDs in chunks → Polars DataFrames 4. Clean + type-cast everything in Polars 5. Delete stale ClickHouse rows, insert fresh batch 6. Verify row count matches """ import os import pyarrow import sys import logging from datetime import date, timedelta import polars as pl from sqlalchemy import create_engine, text from sqlalchemy.engine import Engine, URL import clickhouse_connect from dotenv import load_dotenv load_dotenv() # ── Logging ─────────────────────────────────────────────────────────────────── logging.basicConfig( level=logging.INFO, format="%(asctime)s | %(levelname)-8s | %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) log = logging.getLogger("etl_coverage") # ── Config ──────────────────────────────────────────────────────────────────── # SQL Server credentials — read from .env, never hardcoded MSSQL_SERVER = os.getenv("MSSQL_SERVER", "your_server") MSSQL_DB = os.getenv("MSSQL_DB", "CPMIndiaBusinessInsight") MSSQL_USER = os.getenv("MSSQL_USER", "sa") MSSQL_PASS = os.getenv("MSSQL_PASS", "") # pyodbc connection string — built separately so special characters in # passwords (@ / + # etc.) never corrupt the URL. # The safe pattern: build the full pyodbc DSN as a plain string, # then hand it to URL.create() via the "odbc_connect" query key. # URL.create() percent-encodes it internally — no manual urllib needed. _ODBC_DSN = ( f"DRIVER={{ODBC Driver 18 for SQL Server}};" f"SERVER={MSSQL_SERVER};" f"DATABASE={MSSQL_DB};" f"UID={MSSQL_USER};" f"PWD={MSSQL_PASS};" "TrustServerCertificate=yes;" "timeout=30;" ) # Final SQLAlchemy URL — credentials live inside odbc_connect, not in the URL # This is the officially recommended pattern in SQLAlchemy docs for mssql+pyodbc MSSQL_SA_URL = URL.create( drivername="mssql+pyodbc", query={"odbc_connect": _ODBC_DSN}, # URL.create handles encoding internally ) CH_HOST = os.getenv("CH_HOST", "localhost") CH_PORT = int(os.getenv("CH_PORT", "8123")) CH_USER = os.getenv("CH_USER", "default") CH_PASS = os.getenv("CH_PASS", "") CH_DB = os.getenv("CH_DB", "kelloggs") CH_TABLE = "coverage_kpi" BATCH_SIZE = 5000 # rows per ClickHouse insert MID_CHUNK = 1000 # SQL Server IN-clause chunk size PROJECT_ID = 40148 # ── Step 1: Build SQLAlchemy Engine ─────────────────────────────────────────── def build_engine() -> Engine: """ The Engine is not a connection — it's the factory that makes connections. Think of it as the water main under the street, not the tap in your kitchen. How pyodbc + SQLAlchemy work together here: - SQLAlchemy's mssql+pyodbc dialect is the translator (it knows how to speak SQL Server's dialect) - pyodbc is the actual wire — it talks ODBC to the SQL Server driver - URL.create(drivername="mssql+pyodbc", query={"odbc_connect": DSN}) is the safe way to pass the pyodbc DSN through SQLAlchemy without URL-encoding bugs when passwords have special characters fast_executemany=True: pyodbc has a mode where it sends many rows to SQL Server in one network round-trip instead of one-row-at-a-time. We're only reading here, but it's good practice to enable it on the engine level. pool_size=3, max_overflow=2: Keep 3 live connections in the pool, allow 2 extras under burst load. After each `with engine.connect()` block exits, the connection goes back to the pool — not closed — so the next query reuses it instantly. """ log.info("Building SQLAlchemy + pyodbc engine for SQL Server...") engine = create_engine( MSSQL_SA_URL, pool_size=3, max_overflow=2, pool_pre_ping=True, # verify connection is alive before use echo=False, # True = print every SQL to console (debug) connect_args={ "fast_executemany": True, # pyodbc bulk-send optimisation }, ) # Smoke test — borrow one connection, run SELECT 1, return it to pool with engine.connect() as conn: conn.execute(text("SELECT 1")) log.info("SQL Server engine ready.") return engine # ── Step 2: Collect MIDs ────────────────────────────────────────────────────── def collect_mids(engine: Engine, target_date: date) -> list[int]: """ SQLAlchemy parameterised query using :param syntax. This is SQL-injection safe — SQLAlchemy never concatenates the date value into the string directly. We use engine.connect() as a context manager — the connection is returned to the pool automatically when done. """ sql = text(""" SELECT MID FROM OneApp_KelloggsMT.dbo.T_StoreCoverage WHERE CONVERT(date, CreateDate) = :target_date UNION SELECT MID FROM OneApp_KelloggsMT.dbo.T_StoreCoverage WHERE CONVERT(date, UpdateDate) = :target_date """) log.info(f"Collecting MIDs for: {target_date}") with engine.connect() as conn: result = conn.execute(sql, {"target_date": target_date}) mids = [row[0] for row in result.fetchall()] log.info(f"Found {len(mids):,} MIDs") return mids # ── Step 3: Fetch Coverage data via pl.read_database() ─────────────────────── def fetch_coverage_data(engine: Engine, mids: list[int]) -> pl.DataFrame: """ This is the key upgrade from pyodbc to SQLAlchemy+Polars. pl.read_database(query, connection) accepts a SQLAlchemy Engine directly. Polars handles the cursor, column name detection, and type inference automatically — no more manual cursor.fetchall() loops. We still chunk the MID list (SQL Server IN clause limit ~2100 params), but each chunk is now one clean pl.read_database() call. Feynman analogy: instead of manually scooping water bucket-by-bucket (pyodbc cursor loop), we turn on a tap and let it fill the container (pl.read_database manages the stream). """ if not mids: log.warning("No MIDs — nothing to fetch.") return pl.DataFrame() # SQL mirrors the Coverage INSERT from the stored procedure # :mid_list is replaced per chunk below coverage_sql = """ SELECT {project_id} AS project_id, JP.MID, sm.StoreId AS store_id, JP.EmpId AS employee_id, CONVERT(date, JP.VisitDate) AS visit_date, JP.InTime AS in_time, JP.OutTime AS out_time, CASE WHEN JP.OutTime IS NULL OR JP.InTime IS NULL THEN NULL WHEN JP.OutTime < JP.InTime THEN 0 ELSE DATEDIFF(SECOND, JP.InTime, JP.OutTime) / 60 END AS duration_minutes, CASE WHEN ( SELECT TOP 1 EmpId FROM OneApp_KelloggsMT.dbo.T_StoreCoverage SC WHERE SC.EmpId = JP.EmpId AND SC.StoreId = JP.StoreId AND SC.VisitDate = JP.VisitDate AND SC.ReasonId IN (0,1,3,9,10,19,20) ) > 0 THEN 'Y' ELSE 'N' END AS is_covered, CASE JP.Deviation WHEN 0 THEN 'Planned' WHEN 1 THEN 'Adhoc' WHEN 2 THEN 'Beat Plan' WHEN 3 THEN 'Non Merchandised' WHEN 4 THEN 'Add New Store' WHEN 5 THEN 'Non Program' ELSE '' END AS coverage_type, sm.StoreTypeId AS storetype_id, Em.SupervisorId AS supervisor_id, ISNULL(JP.ReasonId, 0) AS reason_id, sm.CameraAllow AS camera_allow, CAST( CASE WHEN sm.Latitude IS NULL OR sm.Latitude = 0 THEN 0 WHEN sm.Longitude IS NULL OR sm.Longitude = 0 THEN 0 WHEN JP.Latitude IS NULL OR JP.Latitude = 0 THEN 0 WHEN JP.Longitude IS NULL OR JP.Longitude = 0 THEN 0 ELSE SQRT( POWER(69.1 * (JP.Latitude - sm.Latitude), 2) + POWER(69.1 * (sm.Longitude - JP.Longitude) * COS(JP.Latitude / 57.3), 2) ) * 1000 END AS FLOAT ) AS distance_meters, GETDATE() AS update_date, 'ETL-SQLAlchemy' AS update_by FROM OneApp_KelloggsMT.dbo.T_StoreCoverage JP WITH (NOLOCK) INNER JOIN OneApp_KelloggsMT.dbo.vw_StoreDetail sm ON JP.StoreId = sm.StoreId INNER JOIN OneApp_KelloggsMT.dbo.vw_Employee_Detail Em ON JP.EmpId = Em.EmpId WHERE JP.MID IN ({mid_placeholders}) AND Em.UserName NOT LIKE 'test%' """ frames: list[pl.DataFrame] = [] for i in range(0, len(mids), MID_CHUNK): chunk = mids[i : i + MID_CHUNK] # Build the IN list as a literal — SQLAlchemy text() for IN-list with # variable length is cleanest done this way (all values are integers, # so no injection risk) mid_list = ",".join(str(m) for m in chunk) sql = coverage_sql.format( project_id=PROJECT_ID, mid_placeholders=mid_list, ) log.info( f"Fetching chunk {i // MID_CHUNK + 1} " f"(MIDs {i+1}–{min(i+MID_CHUNK, len(mids))} of {len(mids)})..." ) # pl.read_database() takes the raw SQL string + the SQLAlchemy engine. # It opens a connection from the pool, streams the result into a # Polars DataFrame, then releases the connection back automatically. df_chunk = pl.read_database(query=sql, connection=engine) if len(df_chunk) > 0: frames.append(df_chunk) if not frames: log.warning("No rows returned from SQL Server.") return pl.DataFrame() df = pl.concat(frames) log.info(f"Fetched {len(df):,} total rows from SQL Server") return df # ── Step 4: Transform with Polars ───────────────────────────────────────────── def transform(df: pl.DataFrame) -> pl.DataFrame: """ SQLAlchemy + Polars gives us richer type inference than raw pyodbc — but we still normalise everything explicitly so ClickHouse never gets a surprise type. The chain of .with_columns() calls is like an assembly line: each station does one job and passes the belt to the next. """ log.info("Transforming...") df = ( df # ── IDs ────────────────────────────────────────────────────────────── .with_columns([ pl.col("project_id").cast(pl.Int32), pl.col("MID").cast(pl.Int64), pl.col("store_id").cast(pl.Int64), pl.col("employee_id").cast(pl.Int64), pl.col("supervisor_id").cast(pl.Int64).fill_null(0), pl.col("storetype_id").cast(pl.Int32).fill_null(0), pl.col("reason_id").cast(pl.Int32).fill_null(0), ]) # ── Dates ───────────────────────────────────────────────────────────── # SQLAlchemy may return visit_date as datetime — cast to Date .with_columns( pl.col("visit_date").cast(pl.Date) ) # ── Numerics ────────────────────────────────────────────────────────── # DATEDIFF/CAST in SQL Server comes through as Decimal via ODBC # → cast to Float64 first, then to Int32 (avoids Decimal overflow) .with_columns( pl.col("duration_minutes") .cast(pl.Float64) .fill_null(0.0) .cast(pl.Int32) ) .with_columns( pl.col("distance_meters") .cast(pl.Float64) .fill_null(0.0) .round(2) ) # ── Flags ───────────────────────────────────────────────────────────── # is_covered: 'Y'/'N' → UInt8 (1/0) — ClickHouse has no Bool type .with_columns( pl.when(pl.col("is_covered") == "Y") .then(pl.lit(1, dtype=pl.UInt8)) .otherwise(pl.lit(0, dtype=pl.UInt8)) .alias("is_covered") ) # camera_allow: SQL Server BIT → UInt8 .with_columns( pl.col("camera_allow").cast(pl.UInt8).fill_null(0) ) # ── Strings ─────────────────────────────────────────────────────────── # ClickHouse String column rejects NULL — replace with empty string .with_columns([ pl.col("coverage_type").fill_null(""), pl.col("update_by").fill_null("ETL-SQLAlchemy"), ]) # ── Timestamps ──────────────────────────────────────────────────────── .with_columns( pl.col("update_date").cast(pl.Datetime) ) # ── Dedup ───────────────────────────────────────────────────────────── # MID is unique per project — drop any duplicates from chunked joins .unique(subset=["project_id", "MID"]) ) log.info(f"After transform: {len(df):,} rows | schema: {df.schema}") return df # ── Step 5: Load to ClickHouse ──────────────────────────────────────────────── def load_to_clickhouse(df: pl.DataFrame) -> tuple[int, date]: """ clickhouse-connect v0.7+ supports insert_arrow() which accepts a Polars DataFrame via its Arrow IPC backing — no pandas bridge needed. Delete-then-insert mirrors the stored procedure's pattern: wipe the day's data, reload fresh. Idempotent = safe to re-run. """ target_date: date = df["visit_date"].min() # always yesterday log.info(f"Connecting to ClickHouse {CH_HOST}:{CH_PORT} db={CH_DB}...") client = clickhouse_connect.get_client( host=CH_HOST, port=CH_PORT, username=CH_USER, password=CH_PASS, database=CH_DB, ) # Delete stale rows for this date + project log.info(f"Deleting existing rows: project_id={PROJECT_ID} visit_date={target_date}") client.command( f"ALTER TABLE {CH_TABLE} DELETE " f"WHERE project_id = {PROJECT_ID} " f"AND visit_date = '{target_date}'" ) # Insert in batches using Arrow — Polars → Arrow is zero-copy total = 0 for start in range(0, len(df), BATCH_SIZE): batch = df.slice(start, BATCH_SIZE) # to_arrow() converts the Polars DataFrame to Apache Arrow Table # insert_arrow() sends it directly — no pandas, no row-by-row overhead client.insert_arrow(CH_TABLE, batch.to_arrow()) total += len(batch) log.info(f" Inserted {total:,} / {len(df):,} rows") log.info(f"ClickHouse load complete: {total:,} rows") return total, target_date # ── Step 6: Verify ──────────────────────────────────────────────────────────── def verify(expected: int, target_date: date) -> bool: """ Count rows in ClickHouse for this date and compare. If they don't match — the ETL failed silently somewhere. Exit code 2 so your scheduler/cron knows it's a data problem, not a crash. """ client = clickhouse_connect.get_client( host=CH_HOST, port=CH_PORT, username=CH_USER, password=CH_PASS, database=CH_DB, ) result = client.query( f"SELECT count() FROM {CH_TABLE} " f"WHERE project_id = {PROJECT_ID} " f"AND visit_date = '{target_date}'" ) actual = result.result_rows[0][0] log.info(f"Verify — expected={expected:,} clickhouse={actual:,}") if actual != expected: log.error(f"MISMATCH: sent {expected:,}, ClickHouse has {actual:,}") return False log.info("Verify passed.") return True # ── Main ────────────────────────────────────────────────────────────────────── def main() -> None: #target_date = date.today() - timedelta(days=1) for i in range(1, 10): target_date = date(2023, 3, 9) + timedelta(days=i) print(target_date) log.info(f"=== Coverage KPI ETL | date={target_date} ===") # Build the SQLAlchemy engine once — shared across all steps engine = build_engine() # Collect which MIDs need processing mids = collect_mids(engine, target_date) if not mids: log.info("No MIDs for yesterday. Nothing to do.") sys.exit(0) # Fetch raw data using pl.read_database() df_raw = fetch_coverage_data(engine, mids) engine.dispose() # return all pooled connections to OS cleanly if df_raw.is_empty(): log.warning("Empty result set. Exiting.") sys.exit(0) # Transform df_clean = transform(df_raw) # Load rows_inserted, inserted_date = load_to_clickhouse(df_clean) # Verify if not verify(rows_inserted, inserted_date): sys.exit(2) log.info(f"=== Done. {rows_inserted:,} rows loaded for {inserted_date} ===") if __name__ == "__main__": main()