diff --git a/etl_promotion_kpi.py b/etl_promotion_kpi.py new file mode 100644 index 0000000..5c4f6f6 --- /dev/null +++ b/etl_promotion_kpi.py @@ -0,0 +1,481 @@ +#!/usr/bin/env python3 +# /// script +# requires-python = ">=3.11" +# dependencies = [ +# "polars>=0.20.0", +# "sqlalchemy>=2.0.0", +# "pyodbc>=5.0.0", +# "clickhouse-connect>=0.7.0", +# "pyarrow>=18.0.0", +# "python-dotenv>=1.0.0", +# ] +# /// + +""" +ETL Pipeline: Promotion KPI — SQL Server → Polars → ClickHouse +--------------------------------------------------------------- +Run with: uv run etl_promotion_kpi.py + +What is Promotion KPI? + Kellogg's runs in-store promotions — standees, special shelf displays, + "buy 2 get 1 free" offers. The salesperson visits a store and records: + - Is the promotion physically present? (Present = Y/N) + - If absent, what is the reason? (Reason) + - What category/brand does it apply to? (PromoValueName) + - Photo proof (Image1, Image2) + - Quiz-style question + answer about the promo (Question, Answer) + + The correlated CASE subqueries in the SP resolve the PromoTable column + ('Master_Category' / 'Master_Brand' etc.) into a human-readable name. + SQL Server does this at query time — Polars receives the final string. + Nothing special needed in the transform for those columns. + +Architecture: same 6-step pattern as Coverage KPI + 1. Build SQLAlchemy + pyodbc engine (URL.create + odbc_connect DSN) + 2. Collect MIDs for yesterday from T_StoreCoverage + 3. Fetch Promotion rows in MID chunks via pl.read_database() + 4. Transform with Polars (types, nulls, flags) + 5. Delete stale ClickHouse rows → insert_arrow() in batches + 6. Verify row count +""" + +import os +import sys +import logging +from datetime import date, timedelta + +import polars as pl +from sqlalchemy import create_engine, text +from sqlalchemy.engine import Engine, URL +import clickhouse_connect +from dotenv import load_dotenv + +load_dotenv() + +# ── Logging ─────────────────────────────────────────────────────────────────── +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s | %(levelname)-8s | %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", +) +log = logging.getLogger("etl_promotion") + +# ── Config ──────────────────────────────────────────────────────────────────── +MSSQL_SERVER = os.getenv("MSSQL_SERVER", "your_server") +MSSQL_DB = os.getenv("MSSQL_DB", "insight") +MSSQL_USER = os.getenv("MSSQL_USER", "sa") +MSSQL_PASS = os.getenv("MSSQL_PASS", "") + +# Build pyodbc DSN string first — keeps special chars in passwords safe. +# Then pass the whole DSN to URL.create() via "odbc_connect". +# URL.create() percent-encodes it internally so @ / + # never break parsing. +_ODBC_DSN = ( + f"DRIVER={{ODBC Driver 18 for SQL Server}};" + f"SERVER={MSSQL_SERVER};" + f"DATABASE={MSSQL_DB};" + f"UID={MSSQL_USER};" + f"PWD={MSSQL_PASS};" + "TrustServerCertificate=yes;" + "timeout=30;" +) + +MSSQL_SA_URL = URL.create( + drivername="mssql+pyodbc", + query={"odbc_connect": _ODBC_DSN}, +) + +CH_HOST = os.getenv("CH_HOST", "localhost") +CH_PORT = int(os.getenv("CH_PORT", "8123")) +CH_USER = os.getenv("CH_USER", "default") +CH_PASS = os.getenv("CH_PASS", "") +CH_DB = os.getenv("CH_DB", "kelloggs") +CH_TABLE = "promotion_kpi" + +BATCH_SIZE = 5000 # rows per ClickHouse insert +MID_CHUNK = 1000 # SQL Server IN-clause safe chunk size +PROJECT_ID = 40148 + + +# ── Step 1: Build SQLAlchemy Engine ─────────────────────────────────────────── +def build_engine() -> Engine: + """ + SQLAlchemy = smart manager (handles pooling, dialect, parameterisation) + pyodbc = actual wire (speaks ODBC to SQL Server driver) + URL.create = safe assembler (never mangles passwords with special chars) + + The engine is created ONCE and shared across all steps. + Each with engine.connect() borrows a slot from the pool and returns + it automatically when the block exits — no manual close() calls. + """ + log.info("Building SQLAlchemy + pyodbc engine...") + engine = create_engine( + MSSQL_SA_URL, + pool_size=3, + max_overflow=2, + pool_pre_ping=True, + echo=False, + connect_args={"fast_executemany": True}, + ) + with engine.connect() as conn: + conn.execute(text("SELECT 1")) + log.info("Engine ready.") + return engine + + +# ── Step 2: Collect MIDs ────────────────────────────────────────────────────── +def collect_mids(engine: Engine, target_date: date) -> list[int]: + """ + Exact replica of the SP's @MID_TABLE_COV logic. + UNION of CreateDate + UpdateDate = catches both new visits and edits. + :target_date is a named SQLAlchemy parameter — safe, no injection risk. + """ + sql = text(""" + SELECT MID FROM OneApp_KelloggsMT.dbo.T_StoreCoverage + WHERE CONVERT(date, CreateDate) = :target_date + UNION + SELECT MID FROM OneApp_KelloggsMT.dbo.T_StoreCoverage + WHERE CONVERT(date, UpdateDate) = :target_date + """) + log.info(f"Collecting MIDs for: {target_date}") + with engine.connect() as conn: + result = conn.execute(sql, {"target_date": target_date}) + mids = [row[0] for row in result.fetchall()] + log.info(f"Found {len(mids):,} MIDs") + return mids + + +# ── Step 3: Fetch Promotion data ────────────────────────────────────────────── +def fetch_promotion_data(engine: Engine, mids: list[int]) -> pl.DataFrame: + """ + The SQL here is a direct translation of the SP's Promotion INSERT SELECT. + Three things worth understanding: + + 1. Correlated CASE subqueries (PromoValueName) + The SP resolves the promo item name by looking up whichever master + table the PromoTable column points to. SQL Server runs these + subqueries at execution time and sends Polars a plain VARCHAR. + Nothing special needed in Python — they arrive as strings. + + 2. LEFT JOINs (images, reason, question) + These are optional — not every promo has a photo or a question. + NULLs from LEFT JOINs are handled in the transform step. + + 3. ISNULL(mpq.PromoQuestionName, '') and ISNULL(tpq.PromoAnswerName, '') + Already handled in SQL — arrive as empty strings, not NULLs. + We still do fill_null('') in Polars for safety. + + MID chunking: SQL Server's IN clause limit is ~2100 parameters. + We split into chunks of 1000 MIDs each and concat the Polars frames. + pl.read_database() opens a connection from the pool per chunk and + releases it back automatically after the query finishes. + """ + if not mids: + log.warning("No MIDs — nothing to fetch.") + return pl.DataFrame() + + # Exact SELECT from the SP's Promotion INSERT — columns renamed to + # snake_case for ClickHouse convention + promotion_sql = """ + SELECT + sc.MID AS mid, + {project_id} AS project_id, + sc.StoreId AS store_id, + Em.EmpId AS employee_id, + CONVERT(date, sc.VisitDate) AS visit_date, + Em.SupervisorId AS supervisor_id, + sm.ChannelId AS channel_id, + sm.ChainId AS chain_id, + sm.StoreTypeId AS storetype_id, + msd.PromoDefinitionId AS promo_definition_id, + msd.PromoDefinitionName AS promo_definition_name, + + -- Resolve which dimension level the promo targets + -- (Category / SubCategory / Brand / SubBrand) + CASE + WHEN ISNULL(ts.PromoTable,'') = 'Master_Category' THEN 'Category' + WHEN ISNULL(ts.PromoTable,'') = 'Master_SubCategory' THEN 'SubCategory' + WHEN ISNULL(ts.PromoTable,'') = 'Master_Brand' THEN 'Brand' + WHEN ISNULL(ts.PromoTable,'') = 'Master_SubBrand' THEN 'SubBrand' + ELSE '' + END AS promotion_details, + + ts.PromoValue AS promotion_details_id, + + -- Resolve the actual name of that dimension item + -- SQL Server evaluates exactly ONE of these subqueries per row + CASE + WHEN ISNULL(ts.PromoTable,'') = 'Master_Category' + THEN (SELECT a.CategoryName + FROM OneApp_KelloggsMT.dbo.Master_Category a + WHERE a.CategoryId = ts.PromoValue) + WHEN ISNULL(ts.PromoTable,'') = 'Master_SubCategory' + THEN (SELECT a.SubCategoryName + FROM OneApp_KelloggsMT.dbo.Master_SubCategory a + WHERE a.SubCategoryId = ts.PromoValue) + WHEN ISNULL(ts.PromoTable,'') = 'Master_Brand' + THEN (SELECT a.BrandName + FROM OneApp_KelloggsMT.dbo.Master_Brand a + WHERE a.BrandId = ts.PromoValue) + WHEN ISNULL(ts.PromoTable,'') = 'Master_SubBrand' + THEN (SELECT a.SubBrandName + FROM OneApp_KelloggsMT.dbo.Master_SubBrand a + WHERE a.SubBrandId = ts.PromoValue) + END AS promo_value_name, + + -- Present flag: 0 → 'N', anything else → 'Y' + CASE WHEN ts.Present = 0 THEN 'N' ELSE 'Y' END AS is_present, + + -- Reason only populated when promo is ABSENT + CASE + WHEN ts.Present = 1 THEN '' + ELSE ISNULL(mnp.PromoReason, '') + END AS reason, + + ISNULL(mpq.PromoQuestionName, '') AS question, + ISNULL(tpq.PromoAnswerName, '') AS answer, + + -- Image URLs — empty string when no image + CASE + WHEN ISNULL(SHI.PromoImage1,'') = '' THEN '' + ELSE 'https://kimt1.parinaam.in/Upload/PromotionImages/' + + SHI.PromoImage1 + END AS image1, + CASE + WHEN ISNULL(SHI.PromoImage2,'') = '' THEN '' + ELSE 'https://kimt1.parinaam.in/Upload/PromotionImages/' + + SHI.PromoImage2 + END AS image2, + + GETDATE() AS update_date, + 'ETL-SQLAlchemy' AS update_by + + FROM OneApp_KelloggsMT.dbo.T_Promotion ts + INNER JOIN OneApp_KelloggsMT.dbo.T_StoreCoverage sc + ON ts.mid = sc.mid + INNER JOIN OneApp_KelloggsMT.dbo.vw_StoreDetail sm + ON sc.StoreId = sm.StoreId + INNER JOIN OneApp_KelloggsMT.dbo.vw_Employee_Detail Em + ON sc.EmpId = Em.EmpId + INNER JOIN OneApp_KelloggsMT.dbo.Master_PromotionDefinition msd + ON msd.PromoDefinitionId = ts.PromoDefinitionId + LEFT JOIN OneApp_KelloggsMT.dbo.T_PromotionImages SHI + ON ts.PId = SHI.PId + LEFT JOIN OneApp_KelloggsMT.dbo.Master_PromotionReason mnp + ON ts.PromoReasonId = mnp.PromoReasonId + LEFT JOIN OneApp_KelloggsMT.dbo.t_promotionquestion tpq + ON ts.PId = tpq.PId + LEFT JOIN OneApp_KelloggsMT.dbo.Master_PromotionQuestion mpq + ON tpq.PromoQuestionId = mpq.PromoQuestionId + + WHERE Em.EmpName NOT LIKE 'test%' + AND sc.MID IN ({mid_placeholders}) + """ + + frames: list[pl.DataFrame] = [] + + for i in range(0, len(mids), MID_CHUNK): + chunk = mids[i : i + MID_CHUNK] + mid_list = ",".join(str(m) for m in chunk) # safe: all integers + sql = promotion_sql.format( + project_id=PROJECT_ID, + mid_placeholders=mid_list, + ) + + log.info( + f"Fetching chunk {i // MID_CHUNK + 1} " + f"(MIDs {i+1}–{min(i + MID_CHUNK, len(mids))} of {len(mids)})..." + ) + + # pl.read_database() borrows a connection from the SQLAlchemy pool, + # runs the query, converts the result directly to a Polars DataFrame, + # then returns the connection to the pool. One clean call per chunk. + df_chunk = pl.read_database(query=sql, connection=engine) + if len(df_chunk) > 0: + frames.append(df_chunk) + + if not frames: + log.warning("No promotion rows returned.") + return pl.DataFrame() + + df = pl.concat(frames) + log.info(f"Fetched {len(df):,} total rows from SQL Server") + return df + + +# ── Step 4: Transform with Polars ───────────────────────────────────────────── +def transform(df: pl.DataFrame) -> pl.DataFrame: + """ + Assembly-line approach: each .with_columns() block is one station. + No station does two different jobs — easy to debug and extend. + + Promotion-specific notes: + ┌──────────────────────┬──────────────────────────────────────────────┐ + │ Column │ Why / How │ + ├──────────────────────┼──────────────────────────────────────────────┤ + │ is_present │ 'Y'/'N' string → UInt8 (ClickHouse no Bool) │ + │ promo_value_name │ Can be NULL when PromoTable is unknown → │ + │ │ fill_null('') │ + │ promotion_details_id │ INT from SQL Server → cast to Int64 │ + │ image1 / image2 │ Already empty string from SQL, still safe │ + │ update_date │ GETDATE() comes as datetime → cast Datetime │ + └──────────────────────┴──────────────────────────────────────────────┘ + """ + log.info("Transforming Promotion data...") + + df = ( + df + + # ── IDs ─────────────────────────────────────────────────────────────── + .with_columns([ + pl.col("mid").cast(pl.Int64), + pl.col("project_id").cast(pl.Int32), + pl.col("store_id").cast(pl.Int64), + pl.col("employee_id").cast(pl.Int64), + pl.col("supervisor_id").cast(pl.Int64).fill_null(0), + pl.col("channel_id").cast(pl.Int32).fill_null(0), + pl.col("chain_id").cast(pl.Int64).fill_null(0), + pl.col("storetype_id").cast(pl.Int32).fill_null(0), + pl.col("promo_definition_id").cast(pl.Int32).fill_null(0), + # promotion_details_id = PromoValue (the FK into the chosen master) + pl.col("promotion_details_id").cast(pl.Int64).fill_null(0), + ]) + + # ── Date ────────────────────────────────────────────────────────────── + # SQL Server CONVERT(date, ...) may arrive as Python datetime via ODBC + .with_columns( + pl.col("visit_date").cast(pl.Date) + ) + + # ── Present flag: 'Y'/'N' → UInt8 (1/0) ───────────────────────────── + # ClickHouse uses UInt8 for booleans — no native Bool type + .with_columns( + pl.when(pl.col("is_present") == "Y") + .then(pl.lit(1, dtype=pl.UInt8)) + .otherwise(pl.lit(0, dtype=pl.UInt8)) + .alias("is_present") + ) + + # ── Strings: NULL → empty string ────────────────────────────────────── + # ClickHouse String columns reject NULL — every string col needs this + # promo_value_name is the most likely NULL (unknown PromoTable value) + .with_columns([ + pl.col("promo_definition_name").fill_null(""), + pl.col("promotion_details").fill_null(""), + pl.col("promo_value_name").fill_null(""), + pl.col("reason").fill_null(""), + pl.col("question").fill_null(""), + pl.col("answer").fill_null(""), + pl.col("image1").fill_null(""), + pl.col("image2").fill_null(""), + pl.col("update_by").fill_null("ETL-SQLAlchemy"), + ]) + + # ── Timestamp ───────────────────────────────────────────────────────── + .with_columns( + pl.col("update_date").cast(pl.Datetime) + ) + + # ── Dedup ───────────────────────────────────────────────────────────── + # A promo record is unique per MID + promo_definition_id + details_id. + # Chunked fetches can produce duplicates if MIDs overlap — drop them. + .unique(subset=["project_id", "mid", "promo_definition_id", "promotion_details_id"]) + ) + + log.info(f"After transform: {len(df):,} rows | schema: {df.schema}") + return df + + +# ── Step 5: Load to ClickHouse ──────────────────────────────────────────────── +def load_to_clickhouse(df: pl.DataFrame) -> tuple[int, date]: + """ + Delete-then-insert mirrors the SP exactly: + DELETE FROM Promotion WHERE MID IN (...) AND project_id=40148 + INSERT INTO Promotion ... + + We delete by visit_date (cheaper than per-MID deletes in ClickHouse). + insert_arrow() uses Polars' native Arrow backing — zero-copy, no pandas. + """ + target_date: date = df["visit_date"].min() + + log.info(f"Connecting to ClickHouse {CH_HOST}:{CH_PORT} db={CH_DB}...") + client = clickhouse_connect.get_client( + host=CH_HOST, port=CH_PORT, + username=CH_USER, password=CH_PASS, + database=CH_DB, + ) + + log.info(f"Deleting: project_id={PROJECT_ID} visit_date={target_date}") + client.command( + f"ALTER TABLE {CH_TABLE} DELETE " + f"WHERE project_id = {PROJECT_ID} " + f"AND visit_date = '{target_date}'" + ) + + total = 0 + for start in range(0, len(df), BATCH_SIZE): + batch = df.slice(start, BATCH_SIZE) + client.insert_arrow(CH_TABLE, batch.to_arrow()) + total += len(batch) + log.info(f" Inserted {total:,} / {len(df):,} rows") + + log.info(f"ClickHouse load complete: {total:,} rows") + return total, target_date + + +# ── Step 6: Verify ──────────────────────────────────────────────────────────── +def verify(expected: int, target_date: date) -> bool: + """ + Final sanity check. Exit code 2 = data problem (not a crash). + Your cron/scheduler can distinguish: exit 0 = OK, exit 1 = crash, + exit 2 = ran fine but data didn't land correctly. + """ + client = clickhouse_connect.get_client( + host=CH_HOST, port=CH_PORT, + username=CH_USER, password=CH_PASS, + database=CH_DB, + ) + result = client.query( + f"SELECT count() FROM {CH_TABLE} " + f"WHERE project_id = {PROJECT_ID} " + f"AND visit_date = '{target_date}'" + ) + actual = result.result_rows[0][0] + log.info(f"Verify — expected={expected:,} clickhouse={actual:,}") + if actual != expected: + log.error(f"MISMATCH: sent {expected:,}, ClickHouse has {actual:,}") + return False + log.info("Verify passed.") + return True + + +# ── Main ────────────────────────────────────────────────────────────────────── +def main() -> None: + target_date = date.today() - timedelta(days=1) + log.info(f"=== Promotion KPI ETL | date={target_date} ===") + + engine = build_engine() + + mids = collect_mids(engine, target_date) + if not mids: + log.info("No MIDs for yesterday. Nothing to do.") + sys.exit(0) + + df_raw = fetch_promotion_data(engine, mids) + engine.dispose() # cleanly return all pooled connections to OS + + if df_raw.is_empty(): + log.warning("Empty result set. Exiting.") + sys.exit(0) + + df_clean = transform(df_raw) + + rows_inserted, inserted_date = load_to_clickhouse(df_clean) + + if not verify(rows_inserted, inserted_date): + sys.exit(2) + + log.info(f"=== Done. {rows_inserted:,} rows loaded for {inserted_date} ===") + + +if __name__ == "__main__": + main()