Files
etl_code/etl_promotion_kpi.py
2026-06-03 17:24:38 +05:30

482 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "polars>=0.20.0",
# "sqlalchemy>=2.0.0",
# "pyodbc>=5.0.0",
# "clickhouse-connect>=0.7.0",
# "pyarrow>=18.0.0",
# "python-dotenv>=1.0.0",
# ]
# ///
"""
ETL Pipeline: Promotion KPI — SQL Server → Polars → ClickHouse
---------------------------------------------------------------
Run with: uv run etl_promotion_kpi.py
What is Promotion KPI?
Kellogg's runs in-store promotions — standees, special shelf displays,
"buy 2 get 1 free" offers. The salesperson visits a store and records:
- Is the promotion physically present? (Present = Y/N)
- If absent, what is the reason? (Reason)
- What category/brand does it apply to? (PromoValueName)
- Photo proof (Image1, Image2)
- Quiz-style question + answer about the promo (Question, Answer)
The correlated CASE subqueries in the SP resolve the PromoTable column
('Master_Category' / 'Master_Brand' etc.) into a human-readable name.
SQL Server does this at query time — Polars receives the final string.
Nothing special needed in the transform for those columns.
Architecture: same 6-step pattern as Coverage KPI
1. Build SQLAlchemy + pyodbc engine (URL.create + odbc_connect DSN)
2. Collect MIDs for yesterday from T_StoreCoverage
3. Fetch Promotion rows in MID chunks via pl.read_database()
4. Transform with Polars (types, nulls, flags)
5. Delete stale ClickHouse rows → insert_arrow() in batches
6. Verify row count
"""
import os
import sys
import logging
from datetime import date, timedelta
import polars as pl
from sqlalchemy import create_engine, text
from sqlalchemy.engine import Engine, URL
import clickhouse_connect
from dotenv import load_dotenv
load_dotenv()
# ── Logging ───────────────────────────────────────────────────────────────────
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)-8s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
log = logging.getLogger("etl_promotion")
# ── Config ────────────────────────────────────────────────────────────────────
MSSQL_SERVER = os.getenv("MSSQL_SERVER", "your_server")
MSSQL_DB = os.getenv("MSSQL_DB", "insight")
MSSQL_USER = os.getenv("MSSQL_USER", "sa")
MSSQL_PASS = os.getenv("MSSQL_PASS", "")
# Build pyodbc DSN string first — keeps special chars in passwords safe.
# Then pass the whole DSN to URL.create() via "odbc_connect".
# URL.create() percent-encodes it internally so @ / + # never break parsing.
_ODBC_DSN = (
f"DRIVER={{ODBC Driver 18 for SQL Server}};"
f"SERVER={MSSQL_SERVER};"
f"DATABASE={MSSQL_DB};"
f"UID={MSSQL_USER};"
f"PWD={MSSQL_PASS};"
"TrustServerCertificate=yes;"
"timeout=30;"
)
MSSQL_SA_URL = URL.create(
drivername="mssql+pyodbc",
query={"odbc_connect": _ODBC_DSN},
)
CH_HOST = os.getenv("CH_HOST", "localhost")
CH_PORT = int(os.getenv("CH_PORT", "8123"))
CH_USER = os.getenv("CH_USER", "default")
CH_PASS = os.getenv("CH_PASS", "")
CH_DB = os.getenv("CH_DB", "kelloggs")
CH_TABLE = "promotion_kpi"
BATCH_SIZE = 5000 # rows per ClickHouse insert
MID_CHUNK = 1000 # SQL Server IN-clause safe chunk size
PROJECT_ID = 40148
# ── Step 1: Build SQLAlchemy Engine ───────────────────────────────────────────
def build_engine() -> Engine:
"""
SQLAlchemy = smart manager (handles pooling, dialect, parameterisation)
pyodbc = actual wire (speaks ODBC to SQL Server driver)
URL.create = safe assembler (never mangles passwords with special chars)
The engine is created ONCE and shared across all steps.
Each with engine.connect() borrows a slot from the pool and returns
it automatically when the block exits — no manual close() calls.
"""
log.info("Building SQLAlchemy + pyodbc engine...")
engine = create_engine(
MSSQL_SA_URL,
pool_size=3,
max_overflow=2,
pool_pre_ping=True,
echo=False,
connect_args={"fast_executemany": True},
)
with engine.connect() as conn:
conn.execute(text("SELECT 1"))
log.info("Engine ready.")
return engine
# ── Step 2: Collect MIDs ──────────────────────────────────────────────────────
def collect_mids(engine: Engine, target_date: date) -> list[int]:
"""
Exact replica of the SP's @MID_TABLE_COV logic.
UNION of CreateDate + UpdateDate = catches both new visits and edits.
:target_date is a named SQLAlchemy parameter — safe, no injection risk.
"""
sql = text("""
SELECT MID FROM OneApp_KelloggsMT.dbo.T_StoreCoverage
WHERE CONVERT(date, CreateDate) = :target_date
UNION
SELECT MID FROM OneApp_KelloggsMT.dbo.T_StoreCoverage
WHERE CONVERT(date, UpdateDate) = :target_date
""")
log.info(f"Collecting MIDs for: {target_date}")
with engine.connect() as conn:
result = conn.execute(sql, {"target_date": target_date})
mids = [row[0] for row in result.fetchall()]
log.info(f"Found {len(mids):,} MIDs")
return mids
# ── Step 3: Fetch Promotion data ──────────────────────────────────────────────
def fetch_promotion_data(engine: Engine, mids: list[int]) -> pl.DataFrame:
"""
The SQL here is a direct translation of the SP's Promotion INSERT SELECT.
Three things worth understanding:
1. Correlated CASE subqueries (PromoValueName)
The SP resolves the promo item name by looking up whichever master
table the PromoTable column points to. SQL Server runs these
subqueries at execution time and sends Polars a plain VARCHAR.
Nothing special needed in Python — they arrive as strings.
2. LEFT JOINs (images, reason, question)
These are optional — not every promo has a photo or a question.
NULLs from LEFT JOINs are handled in the transform step.
3. ISNULL(mpq.PromoQuestionName, '') and ISNULL(tpq.PromoAnswerName, '')
Already handled in SQL — arrive as empty strings, not NULLs.
We still do fill_null('') in Polars for safety.
MID chunking: SQL Server's IN clause limit is ~2100 parameters.
We split into chunks of 1000 MIDs each and concat the Polars frames.
pl.read_database() opens a connection from the pool per chunk and
releases it back automatically after the query finishes.
"""
if not mids:
log.warning("No MIDs — nothing to fetch.")
return pl.DataFrame()
# Exact SELECT from the SP's Promotion INSERT — columns renamed to
# snake_case for ClickHouse convention
promotion_sql = """
SELECT
sc.MID AS mid,
{project_id} AS project_id,
sc.StoreId AS store_id,
Em.EmpId AS employee_id,
CONVERT(date, sc.VisitDate) AS visit_date,
Em.SupervisorId AS supervisor_id,
sm.ChannelId AS channel_id,
sm.ChainId AS chain_id,
sm.StoreTypeId AS storetype_id,
msd.PromoDefinitionId AS promo_definition_id,
msd.PromoDefinitionName AS promo_definition_name,
-- Resolve which dimension level the promo targets
-- (Category / SubCategory / Brand / SubBrand)
CASE
WHEN ISNULL(ts.PromoTable,'') = 'Master_Category' THEN 'Category'
WHEN ISNULL(ts.PromoTable,'') = 'Master_SubCategory' THEN 'SubCategory'
WHEN ISNULL(ts.PromoTable,'') = 'Master_Brand' THEN 'Brand'
WHEN ISNULL(ts.PromoTable,'') = 'Master_SubBrand' THEN 'SubBrand'
ELSE ''
END AS promotion_details,
ts.PromoValue AS promotion_details_id,
-- Resolve the actual name of that dimension item
-- SQL Server evaluates exactly ONE of these subqueries per row
CASE
WHEN ISNULL(ts.PromoTable,'') = 'Master_Category'
THEN (SELECT a.CategoryName
FROM OneApp_KelloggsMT.dbo.Master_Category a
WHERE a.CategoryId = ts.PromoValue)
WHEN ISNULL(ts.PromoTable,'') = 'Master_SubCategory'
THEN (SELECT a.SubCategoryName
FROM OneApp_KelloggsMT.dbo.Master_SubCategory a
WHERE a.SubCategoryId = ts.PromoValue)
WHEN ISNULL(ts.PromoTable,'') = 'Master_Brand'
THEN (SELECT a.BrandName
FROM OneApp_KelloggsMT.dbo.Master_Brand a
WHERE a.BrandId = ts.PromoValue)
WHEN ISNULL(ts.PromoTable,'') = 'Master_SubBrand'
THEN (SELECT a.SubBrandName
FROM OneApp_KelloggsMT.dbo.Master_SubBrand a
WHERE a.SubBrandId = ts.PromoValue)
END AS promo_value_name,
-- Present flag: 0 → 'N', anything else → 'Y'
CASE WHEN ts.Present = 0 THEN 'N' ELSE 'Y' END AS is_present,
-- Reason only populated when promo is ABSENT
CASE
WHEN ts.Present = 1 THEN ''
ELSE ISNULL(mnp.PromoReason, '')
END AS reason,
ISNULL(mpq.PromoQuestionName, '') AS question,
ISNULL(tpq.PromoAnswerName, '') AS answer,
-- Image URLs — empty string when no image
CASE
WHEN ISNULL(SHI.PromoImage1,'') = '' THEN ''
ELSE 'https://kimt1.parinaam.in/Upload/PromotionImages/'
+ SHI.PromoImage1
END AS image1,
CASE
WHEN ISNULL(SHI.PromoImage2,'') = '' THEN ''
ELSE 'https://kimt1.parinaam.in/Upload/PromotionImages/'
+ SHI.PromoImage2
END AS image2,
GETDATE() AS update_date,
'ETL-SQLAlchemy' AS update_by
FROM OneApp_KelloggsMT.dbo.T_Promotion ts
INNER JOIN OneApp_KelloggsMT.dbo.T_StoreCoverage sc
ON ts.mid = sc.mid
INNER JOIN OneApp_KelloggsMT.dbo.vw_StoreDetail sm
ON sc.StoreId = sm.StoreId
INNER JOIN OneApp_KelloggsMT.dbo.vw_Employee_Detail Em
ON sc.EmpId = Em.EmpId
INNER JOIN OneApp_KelloggsMT.dbo.Master_PromotionDefinition msd
ON msd.PromoDefinitionId = ts.PromoDefinitionId
LEFT JOIN OneApp_KelloggsMT.dbo.T_PromotionImages SHI
ON ts.PId = SHI.PId
LEFT JOIN OneApp_KelloggsMT.dbo.Master_PromotionReason mnp
ON ts.PromoReasonId = mnp.PromoReasonId
LEFT JOIN OneApp_KelloggsMT.dbo.t_promotionquestion tpq
ON ts.PId = tpq.PId
LEFT JOIN OneApp_KelloggsMT.dbo.Master_PromotionQuestion mpq
ON tpq.PromoQuestionId = mpq.PromoQuestionId
WHERE Em.EmpName NOT LIKE 'test%'
AND sc.MID IN ({mid_placeholders})
"""
frames: list[pl.DataFrame] = []
for i in range(0, len(mids), MID_CHUNK):
chunk = mids[i : i + MID_CHUNK]
mid_list = ",".join(str(m) for m in chunk) # safe: all integers
sql = promotion_sql.format(
project_id=PROJECT_ID,
mid_placeholders=mid_list,
)
log.info(
f"Fetching chunk {i // MID_CHUNK + 1} "
f"(MIDs {i+1}{min(i + MID_CHUNK, len(mids))} of {len(mids)})..."
)
# pl.read_database() borrows a connection from the SQLAlchemy pool,
# runs the query, converts the result directly to a Polars DataFrame,
# then returns the connection to the pool. One clean call per chunk.
df_chunk = pl.read_database(query=sql, connection=engine)
if len(df_chunk) > 0:
frames.append(df_chunk)
if not frames:
log.warning("No promotion rows returned.")
return pl.DataFrame()
df = pl.concat(frames)
log.info(f"Fetched {len(df):,} total rows from SQL Server")
return df
# ── Step 4: Transform with Polars ─────────────────────────────────────────────
def transform(df: pl.DataFrame) -> pl.DataFrame:
"""
Assembly-line approach: each .with_columns() block is one station.
No station does two different jobs — easy to debug and extend.
Promotion-specific notes:
┌──────────────────────┬──────────────────────────────────────────────┐
│ Column │ Why / How │
├──────────────────────┼──────────────────────────────────────────────┤
│ is_present │ 'Y'/'N' string → UInt8 (ClickHouse no Bool) │
│ promo_value_name │ Can be NULL when PromoTable is unknown → │
│ │ fill_null('') │
│ promotion_details_id │ INT from SQL Server → cast to Int64 │
│ image1 / image2 │ Already empty string from SQL, still safe │
│ update_date │ GETDATE() comes as datetime → cast Datetime │
└──────────────────────┴──────────────────────────────────────────────┘
"""
log.info("Transforming Promotion data...")
df = (
df
# ── IDs ───────────────────────────────────────────────────────────────
.with_columns([
pl.col("mid").cast(pl.Int64),
pl.col("project_id").cast(pl.Int32),
pl.col("store_id").cast(pl.Int64),
pl.col("employee_id").cast(pl.Int64),
pl.col("supervisor_id").cast(pl.Int64).fill_null(0),
pl.col("channel_id").cast(pl.Int32).fill_null(0),
pl.col("chain_id").cast(pl.Int64).fill_null(0),
pl.col("storetype_id").cast(pl.Int32).fill_null(0),
pl.col("promo_definition_id").cast(pl.Int32).fill_null(0),
# promotion_details_id = PromoValue (the FK into the chosen master)
pl.col("promotion_details_id").cast(pl.Int64).fill_null(0),
])
# ── Date ──────────────────────────────────────────────────────────────
# SQL Server CONVERT(date, ...) may arrive as Python datetime via ODBC
.with_columns(
pl.col("visit_date").cast(pl.Date)
)
# ── Present flag: 'Y'/'N' → UInt8 (1/0) ─────────────────────────────
# ClickHouse uses UInt8 for booleans — no native Bool type
.with_columns(
pl.when(pl.col("is_present") == "Y")
.then(pl.lit(1, dtype=pl.UInt8))
.otherwise(pl.lit(0, dtype=pl.UInt8))
.alias("is_present")
)
# ── Strings: NULL → empty string ──────────────────────────────────────
# ClickHouse String columns reject NULL — every string col needs this
# promo_value_name is the most likely NULL (unknown PromoTable value)
.with_columns([
pl.col("promo_definition_name").fill_null(""),
pl.col("promotion_details").fill_null(""),
pl.col("promo_value_name").fill_null(""),
pl.col("reason").fill_null(""),
pl.col("question").fill_null(""),
pl.col("answer").fill_null(""),
pl.col("image1").fill_null(""),
pl.col("image2").fill_null(""),
pl.col("update_by").fill_null("ETL-SQLAlchemy"),
])
# ── Timestamp ─────────────────────────────────────────────────────────
.with_columns(
pl.col("update_date").cast(pl.Datetime)
)
# ── Dedup ─────────────────────────────────────────────────────────────
# A promo record is unique per MID + promo_definition_id + details_id.
# Chunked fetches can produce duplicates if MIDs overlap — drop them.
.unique(subset=["project_id", "mid", "promo_definition_id", "promotion_details_id"])
)
log.info(f"After transform: {len(df):,} rows | schema: {df.schema}")
return df
# ── Step 5: Load to ClickHouse ────────────────────────────────────────────────
def load_to_clickhouse(df: pl.DataFrame) -> tuple[int, date]:
"""
Delete-then-insert mirrors the SP exactly:
DELETE FROM Promotion WHERE MID IN (...) AND project_id=40148
INSERT INTO Promotion ...
We delete by visit_date (cheaper than per-MID deletes in ClickHouse).
insert_arrow() uses Polars' native Arrow backing — zero-copy, no pandas.
"""
target_date: date = df["visit_date"].min()
log.info(f"Connecting to ClickHouse {CH_HOST}:{CH_PORT} db={CH_DB}...")
client = clickhouse_connect.get_client(
host=CH_HOST, port=CH_PORT,
username=CH_USER, password=CH_PASS,
database=CH_DB,
)
log.info(f"Deleting: project_id={PROJECT_ID} visit_date={target_date}")
client.command(
f"ALTER TABLE {CH_TABLE} DELETE "
f"WHERE project_id = {PROJECT_ID} "
f"AND visit_date = '{target_date}'"
)
total = 0
for start in range(0, len(df), BATCH_SIZE):
batch = df.slice(start, BATCH_SIZE)
client.insert_arrow(CH_TABLE, batch.to_arrow())
total += len(batch)
log.info(f" Inserted {total:,} / {len(df):,} rows")
log.info(f"ClickHouse load complete: {total:,} rows")
return total, target_date
# ── Step 6: Verify ────────────────────────────────────────────────────────────
def verify(expected: int, target_date: date) -> bool:
"""
Final sanity check. Exit code 2 = data problem (not a crash).
Your cron/scheduler can distinguish: exit 0 = OK, exit 1 = crash,
exit 2 = ran fine but data didn't land correctly.
"""
client = clickhouse_connect.get_client(
host=CH_HOST, port=CH_PORT,
username=CH_USER, password=CH_PASS,
database=CH_DB,
)
result = client.query(
f"SELECT count() FROM {CH_TABLE} "
f"WHERE project_id = {PROJECT_ID} "
f"AND visit_date = '{target_date}'"
)
actual = result.result_rows[0][0]
log.info(f"Verify — expected={expected:,} clickhouse={actual:,}")
if actual != expected:
log.error(f"MISMATCH: sent {expected:,}, ClickHouse has {actual:,}")
return False
log.info("Verify passed.")
return True
# ── Main ──────────────────────────────────────────────────────────────────────
def main() -> None:
target_date = date.today() - timedelta(days=1)
log.info(f"=== Promotion KPI ETL | date={target_date} ===")
engine = build_engine()
mids = collect_mids(engine, target_date)
if not mids:
log.info("No MIDs for yesterday. Nothing to do.")
sys.exit(0)
df_raw = fetch_promotion_data(engine, mids)
engine.dispose() # cleanly return all pooled connections to OS
if df_raw.is_empty():
log.warning("Empty result set. Exiting.")
sys.exit(0)
df_clean = transform(df_raw)
rows_inserted, inserted_date = load_to_clickhouse(df_clean)
if not verify(rows_inserted, inserted_date):
sys.exit(2)
log.info(f"=== Done. {rows_inserted:,} rows loaded for {inserted_date} ===")
if __name__ == "__main__":
main()