#!/usr/bin/env python3 # /// script # requires-python = ">=3.11" # dependencies = [ # "polars>=0.20.0", # "sqlalchemy>=2.0.0", # "pyodbc>=5.0.0", # "clickhouse-connect>=0.7.0", # "pyarrow>=18.0.0", # "python-dotenv>=1.0.0", # ] # /// """ ETL Pipeline: Promotion KPI — SQL Server → Polars → ClickHouse --------------------------------------------------------------- Run with: uv run etl_promotion_kpi.py What is Promotion KPI? Kellogg's runs in-store promotions — standees, special shelf displays, "buy 2 get 1 free" offers. The salesperson visits a store and records: - Is the promotion physically present? (Present = Y/N) - If absent, what is the reason? (Reason) - What category/brand does it apply to? (PromoValueName) - Photo proof (Image1, Image2) - Quiz-style question + answer about the promo (Question, Answer) The correlated CASE subqueries in the SP resolve the PromoTable column ('Master_Category' / 'Master_Brand' etc.) into a human-readable name. SQL Server does this at query time — Polars receives the final string. Nothing special needed in the transform for those columns. Architecture: same 6-step pattern as Coverage KPI 1. Build SQLAlchemy + pyodbc engine (URL.create + odbc_connect DSN) 2. Collect MIDs for yesterday from T_StoreCoverage 3. Fetch Promotion rows in MID chunks via pl.read_database() 4. Transform with Polars (types, nulls, flags) 5. Delete stale ClickHouse rows → insert_arrow() in batches 6. Verify row count """ import os import sys import logging from datetime import date, timedelta import polars as pl from sqlalchemy import create_engine, text from sqlalchemy.engine import Engine, URL import clickhouse_connect from dotenv import load_dotenv load_dotenv() # ── Logging ─────────────────────────────────────────────────────────────────── logging.basicConfig( level=logging.INFO, format="%(asctime)s | %(levelname)-8s | %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) log = logging.getLogger("etl_promotion") # ── Config ──────────────────────────────────────────────────────────────────── MSSQL_SERVER = os.getenv("MSSQL_SERVER", "your_server") MSSQL_DB = os.getenv("MSSQL_DB", "insight") MSSQL_USER = os.getenv("MSSQL_USER", "sa") MSSQL_PASS = os.getenv("MSSQL_PASS", "") # Build pyodbc DSN string first — keeps special chars in passwords safe. # Then pass the whole DSN to URL.create() via "odbc_connect". # URL.create() percent-encodes it internally so @ / + # never break parsing. _ODBC_DSN = ( f"DRIVER={{ODBC Driver 18 for SQL Server}};" f"SERVER={MSSQL_SERVER};" f"DATABASE={MSSQL_DB};" f"UID={MSSQL_USER};" f"PWD={MSSQL_PASS};" "TrustServerCertificate=yes;" "timeout=30;" ) MSSQL_SA_URL = URL.create( drivername="mssql+pyodbc", query={"odbc_connect": _ODBC_DSN}, ) CH_HOST = os.getenv("CH_HOST", "localhost") CH_PORT = int(os.getenv("CH_PORT", "8123")) CH_USER = os.getenv("CH_USER", "default") CH_PASS = os.getenv("CH_PASS", "") CH_DB = os.getenv("CH_DB", "kelloggs") CH_TABLE = "promotion_kpi" BATCH_SIZE = 5000 # rows per ClickHouse insert MID_CHUNK = 1000 # SQL Server IN-clause safe chunk size PROJECT_ID = 40148 # ── Step 1: Build SQLAlchemy Engine ─────────────────────────────────────────── def build_engine() -> Engine: """ SQLAlchemy = smart manager (handles pooling, dialect, parameterisation) pyodbc = actual wire (speaks ODBC to SQL Server driver) URL.create = safe assembler (never mangles passwords with special chars) The engine is created ONCE and shared across all steps. Each with engine.connect() borrows a slot from the pool and returns it automatically when the block exits — no manual close() calls. """ log.info("Building SQLAlchemy + pyodbc engine...") engine = create_engine( MSSQL_SA_URL, pool_size=3, max_overflow=2, pool_pre_ping=True, echo=False, connect_args={"fast_executemany": True}, ) with engine.connect() as conn: conn.execute(text("SELECT 1")) log.info("Engine ready.") return engine # ── Step 2: Collect MIDs ────────────────────────────────────────────────────── def collect_mids(engine: Engine, target_date: date) -> list[int]: """ Exact replica of the SP's @MID_TABLE_COV logic. UNION of CreateDate + UpdateDate = catches both new visits and edits. :target_date is a named SQLAlchemy parameter — safe, no injection risk. """ sql = text(""" SELECT MID FROM OneApp_KelloggsMT.dbo.T_StoreCoverage WHERE CONVERT(date, CreateDate) = :target_date UNION SELECT MID FROM OneApp_KelloggsMT.dbo.T_StoreCoverage WHERE CONVERT(date, UpdateDate) = :target_date """) log.info(f"Collecting MIDs for: {target_date}") with engine.connect() as conn: result = conn.execute(sql, {"target_date": target_date}) mids = [row[0] for row in result.fetchall()] log.info(f"Found {len(mids):,} MIDs") return mids # ── Step 3: Fetch Promotion data ────────────────────────────────────────────── def fetch_promotion_data(engine: Engine, mids: list[int]) -> pl.DataFrame: """ The SQL here is a direct translation of the SP's Promotion INSERT SELECT. Three things worth understanding: 1. Correlated CASE subqueries (PromoValueName) The SP resolves the promo item name by looking up whichever master table the PromoTable column points to. SQL Server runs these subqueries at execution time and sends Polars a plain VARCHAR. Nothing special needed in Python — they arrive as strings. 2. LEFT JOINs (images, reason, question) These are optional — not every promo has a photo or a question. NULLs from LEFT JOINs are handled in the transform step. 3. ISNULL(mpq.PromoQuestionName, '') and ISNULL(tpq.PromoAnswerName, '') Already handled in SQL — arrive as empty strings, not NULLs. We still do fill_null('') in Polars for safety. MID chunking: SQL Server's IN clause limit is ~2100 parameters. We split into chunks of 1000 MIDs each and concat the Polars frames. pl.read_database() opens a connection from the pool per chunk and releases it back automatically after the query finishes. """ if not mids: log.warning("No MIDs — nothing to fetch.") return pl.DataFrame() # Exact SELECT from the SP's Promotion INSERT — columns renamed to # snake_case for ClickHouse convention promotion_sql = """ SELECT sc.MID AS mid, {project_id} AS project_id, sc.StoreId AS store_id, Em.EmpId AS employee_id, CONVERT(date, sc.VisitDate) AS visit_date, Em.SupervisorId AS supervisor_id, sm.ChannelId AS channel_id, sm.ChainId AS chain_id, sm.StoreTypeId AS storetype_id, msd.PromoDefinitionId AS promo_definition_id, msd.PromoDefinitionName AS promo_definition_name, -- Resolve which dimension level the promo targets -- (Category / SubCategory / Brand / SubBrand) CASE WHEN ISNULL(ts.PromoTable,'') = 'Master_Category' THEN 'Category' WHEN ISNULL(ts.PromoTable,'') = 'Master_SubCategory' THEN 'SubCategory' WHEN ISNULL(ts.PromoTable,'') = 'Master_Brand' THEN 'Brand' WHEN ISNULL(ts.PromoTable,'') = 'Master_SubBrand' THEN 'SubBrand' ELSE '' END AS promotion_details, ts.PromoValue AS promotion_details_id, -- Resolve the actual name of that dimension item -- SQL Server evaluates exactly ONE of these subqueries per row CASE WHEN ISNULL(ts.PromoTable,'') = 'Master_Category' THEN (SELECT a.CategoryName FROM OneApp_KelloggsMT.dbo.Master_Category a WHERE a.CategoryId = ts.PromoValue) WHEN ISNULL(ts.PromoTable,'') = 'Master_SubCategory' THEN (SELECT a.SubCategoryName FROM OneApp_KelloggsMT.dbo.Master_SubCategory a WHERE a.SubCategoryId = ts.PromoValue) WHEN ISNULL(ts.PromoTable,'') = 'Master_Brand' THEN (SELECT a.BrandName FROM OneApp_KelloggsMT.dbo.Master_Brand a WHERE a.BrandId = ts.PromoValue) WHEN ISNULL(ts.PromoTable,'') = 'Master_SubBrand' THEN (SELECT a.SubBrandName FROM OneApp_KelloggsMT.dbo.Master_SubBrand a WHERE a.SubBrandId = ts.PromoValue) END AS promo_value_name, -- Present flag: 0 → 'N', anything else → 'Y' CASE WHEN ts.Present = 0 THEN 'N' ELSE 'Y' END AS is_present, -- Reason only populated when promo is ABSENT CASE WHEN ts.Present = 1 THEN '' ELSE ISNULL(mnp.PromoReason, '') END AS reason, ISNULL(mpq.PromoQuestionName, '') AS question, ISNULL(tpq.PromoAnswerName, '') AS answer, -- Image URLs — empty string when no image CASE WHEN ISNULL(SHI.PromoImage1,'') = '' THEN '' ELSE 'https://kimt1.parinaam.in/Upload/PromotionImages/' + SHI.PromoImage1 END AS image1, CASE WHEN ISNULL(SHI.PromoImage2,'') = '' THEN '' ELSE 'https://kimt1.parinaam.in/Upload/PromotionImages/' + SHI.PromoImage2 END AS image2, GETDATE() AS update_date, 'ETL-SQLAlchemy' AS update_by FROM OneApp_KelloggsMT.dbo.T_Promotion ts INNER JOIN OneApp_KelloggsMT.dbo.T_StoreCoverage sc ON ts.mid = sc.mid INNER JOIN OneApp_KelloggsMT.dbo.vw_StoreDetail sm ON sc.StoreId = sm.StoreId INNER JOIN OneApp_KelloggsMT.dbo.vw_Employee_Detail Em ON sc.EmpId = Em.EmpId INNER JOIN OneApp_KelloggsMT.dbo.Master_PromotionDefinition msd ON msd.PromoDefinitionId = ts.PromoDefinitionId LEFT JOIN OneApp_KelloggsMT.dbo.T_PromotionImages SHI ON ts.PId = SHI.PId LEFT JOIN OneApp_KelloggsMT.dbo.Master_PromotionReason mnp ON ts.PromoReasonId = mnp.PromoReasonId LEFT JOIN OneApp_KelloggsMT.dbo.t_promotionquestion tpq ON ts.PId = tpq.PId LEFT JOIN OneApp_KelloggsMT.dbo.Master_PromotionQuestion mpq ON tpq.PromoQuestionId = mpq.PromoQuestionId WHERE Em.EmpName NOT LIKE 'test%' AND sc.MID IN ({mid_placeholders}) """ frames: list[pl.DataFrame] = [] for i in range(0, len(mids), MID_CHUNK): chunk = mids[i : i + MID_CHUNK] mid_list = ",".join(str(m) for m in chunk) # safe: all integers sql = promotion_sql.format( project_id=PROJECT_ID, mid_placeholders=mid_list, ) log.info( f"Fetching chunk {i // MID_CHUNK + 1} " f"(MIDs {i+1}–{min(i + MID_CHUNK, len(mids))} of {len(mids)})..." ) # pl.read_database() borrows a connection from the SQLAlchemy pool, # runs the query, converts the result directly to a Polars DataFrame, # then returns the connection to the pool. One clean call per chunk. df_chunk = pl.read_database(query=sql, connection=engine) if len(df_chunk) > 0: frames.append(df_chunk) if not frames: log.warning("No promotion rows returned.") return pl.DataFrame() df = pl.concat(frames) log.info(f"Fetched {len(df):,} total rows from SQL Server") return df # ── Step 4: Transform with Polars ───────────────────────────────────────────── def transform(df: pl.DataFrame) -> pl.DataFrame: """ Assembly-line approach: each .with_columns() block is one station. No station does two different jobs — easy to debug and extend. Promotion-specific notes: ┌──────────────────────┬──────────────────────────────────────────────┐ │ Column │ Why / How │ ├──────────────────────┼──────────────────────────────────────────────┤ │ is_present │ 'Y'/'N' string → UInt8 (ClickHouse no Bool) │ │ promo_value_name │ Can be NULL when PromoTable is unknown → │ │ │ fill_null('') │ │ promotion_details_id │ INT from SQL Server → cast to Int64 │ │ image1 / image2 │ Already empty string from SQL, still safe │ │ update_date │ GETDATE() comes as datetime → cast Datetime │ └──────────────────────┴──────────────────────────────────────────────┘ """ log.info("Transforming Promotion data...") df = ( df # ── IDs ─────────────────────────────────────────────────────────────── .with_columns([ pl.col("mid").cast(pl.Int64), pl.col("project_id").cast(pl.Int32), pl.col("store_id").cast(pl.Int64), pl.col("employee_id").cast(pl.Int64), pl.col("supervisor_id").cast(pl.Int64).fill_null(0), pl.col("channel_id").cast(pl.Int32).fill_null(0), pl.col("chain_id").cast(pl.Int64).fill_null(0), pl.col("storetype_id").cast(pl.Int32).fill_null(0), pl.col("promo_definition_id").cast(pl.Int32).fill_null(0), # promotion_details_id = PromoValue (the FK into the chosen master) pl.col("promotion_details_id").cast(pl.Int64).fill_null(0), ]) # ── Date ────────────────────────────────────────────────────────────── # SQL Server CONVERT(date, ...) may arrive as Python datetime via ODBC .with_columns( pl.col("visit_date").cast(pl.Date) ) # ── Present flag: 'Y'/'N' → UInt8 (1/0) ───────────────────────────── # ClickHouse uses UInt8 for booleans — no native Bool type .with_columns( pl.when(pl.col("is_present") == "Y") .then(pl.lit(1, dtype=pl.UInt8)) .otherwise(pl.lit(0, dtype=pl.UInt8)) .alias("is_present") ) # ── Strings: NULL → empty string ────────────────────────────────────── # ClickHouse String columns reject NULL — every string col needs this # promo_value_name is the most likely NULL (unknown PromoTable value) .with_columns([ pl.col("promo_definition_name").fill_null(""), pl.col("promotion_details").fill_null(""), pl.col("promo_value_name").fill_null(""), pl.col("reason").fill_null(""), pl.col("question").fill_null(""), pl.col("answer").fill_null(""), pl.col("image1").fill_null(""), pl.col("image2").fill_null(""), pl.col("update_by").fill_null("ETL-SQLAlchemy"), ]) # ── Timestamp ───────────────────────────────────────────────────────── .with_columns( pl.col("update_date").cast(pl.Datetime) ) # ── Dedup ───────────────────────────────────────────────────────────── # A promo record is unique per MID + promo_definition_id + details_id. # Chunked fetches can produce duplicates if MIDs overlap — drop them. .unique(subset=["project_id", "mid", "promo_definition_id", "promotion_details_id"]) ) log.info(f"After transform: {len(df):,} rows | schema: {df.schema}") return df # ── Step 5: Load to ClickHouse ──────────────────────────────────────────────── def load_to_clickhouse(df: pl.DataFrame) -> tuple[int, date]: """ Delete-then-insert mirrors the SP exactly: DELETE FROM Promotion WHERE MID IN (...) AND project_id=40148 INSERT INTO Promotion ... We delete by visit_date (cheaper than per-MID deletes in ClickHouse). insert_arrow() uses Polars' native Arrow backing — zero-copy, no pandas. """ target_date: date = df["visit_date"].min() log.info(f"Connecting to ClickHouse {CH_HOST}:{CH_PORT} db={CH_DB}...") client = clickhouse_connect.get_client( host=CH_HOST, port=CH_PORT, username=CH_USER, password=CH_PASS, database=CH_DB, ) log.info(f"Deleting: project_id={PROJECT_ID} visit_date={target_date}") client.command( f"ALTER TABLE {CH_TABLE} DELETE " f"WHERE project_id = {PROJECT_ID} " f"AND visit_date = '{target_date}'" ) total = 0 for start in range(0, len(df), BATCH_SIZE): batch = df.slice(start, BATCH_SIZE) client.insert_arrow(CH_TABLE, batch.to_arrow()) total += len(batch) log.info(f" Inserted {total:,} / {len(df):,} rows") log.info(f"ClickHouse load complete: {total:,} rows") return total, target_date # ── Step 6: Verify ──────────────────────────────────────────────────────────── def verify(expected: int, target_date: date) -> bool: """ Final sanity check. Exit code 2 = data problem (not a crash). Your cron/scheduler can distinguish: exit 0 = OK, exit 1 = crash, exit 2 = ran fine but data didn't land correctly. """ client = clickhouse_connect.get_client( host=CH_HOST, port=CH_PORT, username=CH_USER, password=CH_PASS, database=CH_DB, ) result = client.query( f"SELECT count() FROM {CH_TABLE} " f"WHERE project_id = {PROJECT_ID} " f"AND visit_date = '{target_date}'" ) actual = result.result_rows[0][0] log.info(f"Verify — expected={expected:,} clickhouse={actual:,}") if actual != expected: log.error(f"MISMATCH: sent {expected:,}, ClickHouse has {actual:,}") return False log.info("Verify passed.") return True # ── Main ────────────────────────────────────────────────────────────────────── def main() -> None: target_date = date.today() - timedelta(days=1) log.info(f"=== Promotion KPI ETL | date={target_date} ===") engine = build_engine() mids = collect_mids(engine, target_date) if not mids: log.info("No MIDs for yesterday. Nothing to do.") sys.exit(0) df_raw = fetch_promotion_data(engine, mids) engine.dispose() # cleanly return all pooled connections to OS if df_raw.is_empty(): log.warning("Empty result set. Exiting.") sys.exit(0) df_clean = transform(df_raw) rows_inserted, inserted_date = load_to_clickhouse(df_clean) if not verify(rows_inserted, inserted_date): sys.exit(2) log.info(f"=== Done. {rows_inserted:,} rows loaded for {inserted_date} ===") if __name__ == "__main__": main()