482 lines
21 KiB
Python
482 lines
21 KiB
Python
#!/usr/bin/env python3
|
||
# /// script
|
||
# requires-python = ">=3.11"
|
||
# dependencies = [
|
||
# "polars>=0.20.0",
|
||
# "sqlalchemy>=2.0.0",
|
||
# "pyodbc>=5.0.0",
|
||
# "clickhouse-connect>=0.7.0",
|
||
# "pyarrow>=18.0.0",
|
||
# "python-dotenv>=1.0.0",
|
||
# ]
|
||
# ///
|
||
|
||
"""
|
||
ETL Pipeline: Promotion KPI — SQL Server → Polars → ClickHouse
|
||
---------------------------------------------------------------
|
||
Run with: uv run etl_promotion_kpi.py
|
||
|
||
What is Promotion KPI?
|
||
Kellogg's runs in-store promotions — standees, special shelf displays,
|
||
"buy 2 get 1 free" offers. The salesperson visits a store and records:
|
||
- Is the promotion physically present? (Present = Y/N)
|
||
- If absent, what is the reason? (Reason)
|
||
- What category/brand does it apply to? (PromoValueName)
|
||
- Photo proof (Image1, Image2)
|
||
- Quiz-style question + answer about the promo (Question, Answer)
|
||
|
||
The correlated CASE subqueries in the SP resolve the PromoTable column
|
||
('Master_Category' / 'Master_Brand' etc.) into a human-readable name.
|
||
SQL Server does this at query time — Polars receives the final string.
|
||
Nothing special needed in the transform for those columns.
|
||
|
||
Architecture: same 6-step pattern as Coverage KPI
|
||
1. Build SQLAlchemy + pyodbc engine (URL.create + odbc_connect DSN)
|
||
2. Collect MIDs for yesterday from T_StoreCoverage
|
||
3. Fetch Promotion rows in MID chunks via pl.read_database()
|
||
4. Transform with Polars (types, nulls, flags)
|
||
5. Delete stale ClickHouse rows → insert_arrow() in batches
|
||
6. Verify row count
|
||
"""
|
||
|
||
import os
|
||
import sys
|
||
import logging
|
||
from datetime import date, timedelta
|
||
|
||
import polars as pl
|
||
from sqlalchemy import create_engine, text
|
||
from sqlalchemy.engine import Engine, URL
|
||
import clickhouse_connect
|
||
from dotenv import load_dotenv
|
||
|
||
load_dotenv()
|
||
|
||
# ── Logging ───────────────────────────────────────────────────────────────────
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format="%(asctime)s | %(levelname)-8s | %(message)s",
|
||
datefmt="%Y-%m-%d %H:%M:%S",
|
||
)
|
||
log = logging.getLogger("etl_promotion")
|
||
|
||
# ── Config ────────────────────────────────────────────────────────────────────
|
||
MSSQL_SERVER = os.getenv("MSSQL_SERVER", "your_server")
|
||
MSSQL_DB = os.getenv("MSSQL_DB", "insight")
|
||
MSSQL_USER = os.getenv("MSSQL_USER", "sa")
|
||
MSSQL_PASS = os.getenv("MSSQL_PASS", "")
|
||
|
||
# Build pyodbc DSN string first — keeps special chars in passwords safe.
|
||
# Then pass the whole DSN to URL.create() via "odbc_connect".
|
||
# URL.create() percent-encodes it internally so @ / + # never break parsing.
|
||
_ODBC_DSN = (
|
||
f"DRIVER={{ODBC Driver 18 for SQL Server}};"
|
||
f"SERVER={MSSQL_SERVER};"
|
||
f"DATABASE={MSSQL_DB};"
|
||
f"UID={MSSQL_USER};"
|
||
f"PWD={MSSQL_PASS};"
|
||
"TrustServerCertificate=yes;"
|
||
"timeout=30;"
|
||
)
|
||
|
||
MSSQL_SA_URL = URL.create(
|
||
drivername="mssql+pyodbc",
|
||
query={"odbc_connect": _ODBC_DSN},
|
||
)
|
||
|
||
CH_HOST = os.getenv("CH_HOST", "localhost")
|
||
CH_PORT = int(os.getenv("CH_PORT", "8123"))
|
||
CH_USER = os.getenv("CH_USER", "default")
|
||
CH_PASS = os.getenv("CH_PASS", "")
|
||
CH_DB = os.getenv("CH_DB", "kelloggs")
|
||
CH_TABLE = "promotion_kpi"
|
||
|
||
BATCH_SIZE = 5000 # rows per ClickHouse insert
|
||
MID_CHUNK = 1000 # SQL Server IN-clause safe chunk size
|
||
PROJECT_ID = 40148
|
||
|
||
|
||
# ── Step 1: Build SQLAlchemy Engine ───────────────────────────────────────────
|
||
def build_engine() -> Engine:
|
||
"""
|
||
SQLAlchemy = smart manager (handles pooling, dialect, parameterisation)
|
||
pyodbc = actual wire (speaks ODBC to SQL Server driver)
|
||
URL.create = safe assembler (never mangles passwords with special chars)
|
||
|
||
The engine is created ONCE and shared across all steps.
|
||
Each with engine.connect() borrows a slot from the pool and returns
|
||
it automatically when the block exits — no manual close() calls.
|
||
"""
|
||
log.info("Building SQLAlchemy + pyodbc engine...")
|
||
engine = create_engine(
|
||
MSSQL_SA_URL,
|
||
pool_size=3,
|
||
max_overflow=2,
|
||
pool_pre_ping=True,
|
||
echo=False,
|
||
connect_args={"fast_executemany": True},
|
||
)
|
||
with engine.connect() as conn:
|
||
conn.execute(text("SELECT 1"))
|
||
log.info("Engine ready.")
|
||
return engine
|
||
|
||
|
||
# ── Step 2: Collect MIDs ──────────────────────────────────────────────────────
|
||
def collect_mids(engine: Engine, target_date: date) -> list[int]:
|
||
"""
|
||
Exact replica of the SP's @MID_TABLE_COV logic.
|
||
UNION of CreateDate + UpdateDate = catches both new visits and edits.
|
||
:target_date is a named SQLAlchemy parameter — safe, no injection risk.
|
||
"""
|
||
sql = text("""
|
||
SELECT MID FROM OneApp_KelloggsMT.dbo.T_StoreCoverage
|
||
WHERE CONVERT(date, CreateDate) = :target_date
|
||
UNION
|
||
SELECT MID FROM OneApp_KelloggsMT.dbo.T_StoreCoverage
|
||
WHERE CONVERT(date, UpdateDate) = :target_date
|
||
""")
|
||
log.info(f"Collecting MIDs for: {target_date}")
|
||
with engine.connect() as conn:
|
||
result = conn.execute(sql, {"target_date": target_date})
|
||
mids = [row[0] for row in result.fetchall()]
|
||
log.info(f"Found {len(mids):,} MIDs")
|
||
return mids
|
||
|
||
|
||
# ── Step 3: Fetch Promotion data ──────────────────────────────────────────────
|
||
def fetch_promotion_data(engine: Engine, mids: list[int]) -> pl.DataFrame:
|
||
"""
|
||
The SQL here is a direct translation of the SP's Promotion INSERT SELECT.
|
||
Three things worth understanding:
|
||
|
||
1. Correlated CASE subqueries (PromoValueName)
|
||
The SP resolves the promo item name by looking up whichever master
|
||
table the PromoTable column points to. SQL Server runs these
|
||
subqueries at execution time and sends Polars a plain VARCHAR.
|
||
Nothing special needed in Python — they arrive as strings.
|
||
|
||
2. LEFT JOINs (images, reason, question)
|
||
These are optional — not every promo has a photo or a question.
|
||
NULLs from LEFT JOINs are handled in the transform step.
|
||
|
||
3. ISNULL(mpq.PromoQuestionName, '') and ISNULL(tpq.PromoAnswerName, '')
|
||
Already handled in SQL — arrive as empty strings, not NULLs.
|
||
We still do fill_null('') in Polars for safety.
|
||
|
||
MID chunking: SQL Server's IN clause limit is ~2100 parameters.
|
||
We split into chunks of 1000 MIDs each and concat the Polars frames.
|
||
pl.read_database() opens a connection from the pool per chunk and
|
||
releases it back automatically after the query finishes.
|
||
"""
|
||
if not mids:
|
||
log.warning("No MIDs — nothing to fetch.")
|
||
return pl.DataFrame()
|
||
|
||
# Exact SELECT from the SP's Promotion INSERT — columns renamed to
|
||
# snake_case for ClickHouse convention
|
||
promotion_sql = """
|
||
SELECT
|
||
sc.MID AS mid,
|
||
{project_id} AS project_id,
|
||
sc.StoreId AS store_id,
|
||
Em.EmpId AS employee_id,
|
||
CONVERT(date, sc.VisitDate) AS visit_date,
|
||
Em.SupervisorId AS supervisor_id,
|
||
sm.ChannelId AS channel_id,
|
||
sm.ChainId AS chain_id,
|
||
sm.StoreTypeId AS storetype_id,
|
||
msd.PromoDefinitionId AS promo_definition_id,
|
||
msd.PromoDefinitionName AS promo_definition_name,
|
||
|
||
-- Resolve which dimension level the promo targets
|
||
-- (Category / SubCategory / Brand / SubBrand)
|
||
CASE
|
||
WHEN ISNULL(ts.PromoTable,'') = 'Master_Category' THEN 'Category'
|
||
WHEN ISNULL(ts.PromoTable,'') = 'Master_SubCategory' THEN 'SubCategory'
|
||
WHEN ISNULL(ts.PromoTable,'') = 'Master_Brand' THEN 'Brand'
|
||
WHEN ISNULL(ts.PromoTable,'') = 'Master_SubBrand' THEN 'SubBrand'
|
||
ELSE ''
|
||
END AS promotion_details,
|
||
|
||
ts.PromoValue AS promotion_details_id,
|
||
|
||
-- Resolve the actual name of that dimension item
|
||
-- SQL Server evaluates exactly ONE of these subqueries per row
|
||
CASE
|
||
WHEN ISNULL(ts.PromoTable,'') = 'Master_Category'
|
||
THEN (SELECT a.CategoryName
|
||
FROM OneApp_KelloggsMT.dbo.Master_Category a
|
||
WHERE a.CategoryId = ts.PromoValue)
|
||
WHEN ISNULL(ts.PromoTable,'') = 'Master_SubCategory'
|
||
THEN (SELECT a.SubCategoryName
|
||
FROM OneApp_KelloggsMT.dbo.Master_SubCategory a
|
||
WHERE a.SubCategoryId = ts.PromoValue)
|
||
WHEN ISNULL(ts.PromoTable,'') = 'Master_Brand'
|
||
THEN (SELECT a.BrandName
|
||
FROM OneApp_KelloggsMT.dbo.Master_Brand a
|
||
WHERE a.BrandId = ts.PromoValue)
|
||
WHEN ISNULL(ts.PromoTable,'') = 'Master_SubBrand'
|
||
THEN (SELECT a.SubBrandName
|
||
FROM OneApp_KelloggsMT.dbo.Master_SubBrand a
|
||
WHERE a.SubBrandId = ts.PromoValue)
|
||
END AS promo_value_name,
|
||
|
||
-- Present flag: 0 → 'N', anything else → 'Y'
|
||
CASE WHEN ts.Present = 0 THEN 'N' ELSE 'Y' END AS is_present,
|
||
|
||
-- Reason only populated when promo is ABSENT
|
||
CASE
|
||
WHEN ts.Present = 1 THEN ''
|
||
ELSE ISNULL(mnp.PromoReason, '')
|
||
END AS reason,
|
||
|
||
ISNULL(mpq.PromoQuestionName, '') AS question,
|
||
ISNULL(tpq.PromoAnswerName, '') AS answer,
|
||
|
||
-- Image URLs — empty string when no image
|
||
CASE
|
||
WHEN ISNULL(SHI.PromoImage1,'') = '' THEN ''
|
||
ELSE 'https://kimt1.parinaam.in/Upload/PromotionImages/'
|
||
+ SHI.PromoImage1
|
||
END AS image1,
|
||
CASE
|
||
WHEN ISNULL(SHI.PromoImage2,'') = '' THEN ''
|
||
ELSE 'https://kimt1.parinaam.in/Upload/PromotionImages/'
|
||
+ SHI.PromoImage2
|
||
END AS image2,
|
||
|
||
GETDATE() AS update_date,
|
||
'ETL-SQLAlchemy' AS update_by
|
||
|
||
FROM OneApp_KelloggsMT.dbo.T_Promotion ts
|
||
INNER JOIN OneApp_KelloggsMT.dbo.T_StoreCoverage sc
|
||
ON ts.mid = sc.mid
|
||
INNER JOIN OneApp_KelloggsMT.dbo.vw_StoreDetail sm
|
||
ON sc.StoreId = sm.StoreId
|
||
INNER JOIN OneApp_KelloggsMT.dbo.vw_Employee_Detail Em
|
||
ON sc.EmpId = Em.EmpId
|
||
INNER JOIN OneApp_KelloggsMT.dbo.Master_PromotionDefinition msd
|
||
ON msd.PromoDefinitionId = ts.PromoDefinitionId
|
||
LEFT JOIN OneApp_KelloggsMT.dbo.T_PromotionImages SHI
|
||
ON ts.PId = SHI.PId
|
||
LEFT JOIN OneApp_KelloggsMT.dbo.Master_PromotionReason mnp
|
||
ON ts.PromoReasonId = mnp.PromoReasonId
|
||
LEFT JOIN OneApp_KelloggsMT.dbo.t_promotionquestion tpq
|
||
ON ts.PId = tpq.PId
|
||
LEFT JOIN OneApp_KelloggsMT.dbo.Master_PromotionQuestion mpq
|
||
ON tpq.PromoQuestionId = mpq.PromoQuestionId
|
||
|
||
WHERE Em.EmpName NOT LIKE 'test%'
|
||
AND sc.MID IN ({mid_placeholders})
|
||
"""
|
||
|
||
frames: list[pl.DataFrame] = []
|
||
|
||
for i in range(0, len(mids), MID_CHUNK):
|
||
chunk = mids[i : i + MID_CHUNK]
|
||
mid_list = ",".join(str(m) for m in chunk) # safe: all integers
|
||
sql = promotion_sql.format(
|
||
project_id=PROJECT_ID,
|
||
mid_placeholders=mid_list,
|
||
)
|
||
|
||
log.info(
|
||
f"Fetching chunk {i // MID_CHUNK + 1} "
|
||
f"(MIDs {i+1}–{min(i + MID_CHUNK, len(mids))} of {len(mids)})..."
|
||
)
|
||
|
||
# pl.read_database() borrows a connection from the SQLAlchemy pool,
|
||
# runs the query, converts the result directly to a Polars DataFrame,
|
||
# then returns the connection to the pool. One clean call per chunk.
|
||
df_chunk = pl.read_database(query=sql, connection=engine)
|
||
if len(df_chunk) > 0:
|
||
frames.append(df_chunk)
|
||
|
||
if not frames:
|
||
log.warning("No promotion rows returned.")
|
||
return pl.DataFrame()
|
||
|
||
df = pl.concat(frames)
|
||
log.info(f"Fetched {len(df):,} total rows from SQL Server")
|
||
return df
|
||
|
||
|
||
# ── Step 4: Transform with Polars ─────────────────────────────────────────────
|
||
def transform(df: pl.DataFrame) -> pl.DataFrame:
|
||
"""
|
||
Assembly-line approach: each .with_columns() block is one station.
|
||
No station does two different jobs — easy to debug and extend.
|
||
|
||
Promotion-specific notes:
|
||
┌──────────────────────┬──────────────────────────────────────────────┐
|
||
│ Column │ Why / How │
|
||
├──────────────────────┼──────────────────────────────────────────────┤
|
||
│ is_present │ 'Y'/'N' string → UInt8 (ClickHouse no Bool) │
|
||
│ promo_value_name │ Can be NULL when PromoTable is unknown → │
|
||
│ │ fill_null('') │
|
||
│ promotion_details_id │ INT from SQL Server → cast to Int64 │
|
||
│ image1 / image2 │ Already empty string from SQL, still safe │
|
||
│ update_date │ GETDATE() comes as datetime → cast Datetime │
|
||
└──────────────────────┴──────────────────────────────────────────────┘
|
||
"""
|
||
log.info("Transforming Promotion data...")
|
||
|
||
df = (
|
||
df
|
||
|
||
# ── IDs ───────────────────────────────────────────────────────────────
|
||
.with_columns([
|
||
pl.col("mid").cast(pl.Int64),
|
||
pl.col("project_id").cast(pl.Int32),
|
||
pl.col("store_id").cast(pl.Int64),
|
||
pl.col("employee_id").cast(pl.Int64),
|
||
pl.col("supervisor_id").cast(pl.Int64).fill_null(0),
|
||
pl.col("channel_id").cast(pl.Int32).fill_null(0),
|
||
pl.col("chain_id").cast(pl.Int64).fill_null(0),
|
||
pl.col("storetype_id").cast(pl.Int32).fill_null(0),
|
||
pl.col("promo_definition_id").cast(pl.Int32).fill_null(0),
|
||
# promotion_details_id = PromoValue (the FK into the chosen master)
|
||
pl.col("promotion_details_id").cast(pl.Int64).fill_null(0),
|
||
])
|
||
|
||
# ── Date ──────────────────────────────────────────────────────────────
|
||
# SQL Server CONVERT(date, ...) may arrive as Python datetime via ODBC
|
||
.with_columns(
|
||
pl.col("visit_date").cast(pl.Date)
|
||
)
|
||
|
||
# ── Present flag: 'Y'/'N' → UInt8 (1/0) ─────────────────────────────
|
||
# ClickHouse uses UInt8 for booleans — no native Bool type
|
||
.with_columns(
|
||
pl.when(pl.col("is_present") == "Y")
|
||
.then(pl.lit(1, dtype=pl.UInt8))
|
||
.otherwise(pl.lit(0, dtype=pl.UInt8))
|
||
.alias("is_present")
|
||
)
|
||
|
||
# ── Strings: NULL → empty string ──────────────────────────────────────
|
||
# ClickHouse String columns reject NULL — every string col needs this
|
||
# promo_value_name is the most likely NULL (unknown PromoTable value)
|
||
.with_columns([
|
||
pl.col("promo_definition_name").fill_null(""),
|
||
pl.col("promotion_details").fill_null(""),
|
||
pl.col("promo_value_name").fill_null(""),
|
||
pl.col("reason").fill_null(""),
|
||
pl.col("question").fill_null(""),
|
||
pl.col("answer").fill_null(""),
|
||
pl.col("image1").fill_null(""),
|
||
pl.col("image2").fill_null(""),
|
||
pl.col("update_by").fill_null("ETL-SQLAlchemy"),
|
||
])
|
||
|
||
# ── Timestamp ─────────────────────────────────────────────────────────
|
||
.with_columns(
|
||
pl.col("update_date").cast(pl.Datetime)
|
||
)
|
||
|
||
# ── Dedup ─────────────────────────────────────────────────────────────
|
||
# A promo record is unique per MID + promo_definition_id + details_id.
|
||
# Chunked fetches can produce duplicates if MIDs overlap — drop them.
|
||
.unique(subset=["project_id", "mid", "promo_definition_id", "promotion_details_id"])
|
||
)
|
||
|
||
log.info(f"After transform: {len(df):,} rows | schema: {df.schema}")
|
||
return df
|
||
|
||
|
||
# ── Step 5: Load to ClickHouse ────────────────────────────────────────────────
|
||
def load_to_clickhouse(df: pl.DataFrame) -> tuple[int, date]:
|
||
"""
|
||
Delete-then-insert mirrors the SP exactly:
|
||
DELETE FROM Promotion WHERE MID IN (...) AND project_id=40148
|
||
INSERT INTO Promotion ...
|
||
|
||
We delete by visit_date (cheaper than per-MID deletes in ClickHouse).
|
||
insert_arrow() uses Polars' native Arrow backing — zero-copy, no pandas.
|
||
"""
|
||
target_date: date = df["visit_date"].min()
|
||
|
||
log.info(f"Connecting to ClickHouse {CH_HOST}:{CH_PORT} db={CH_DB}...")
|
||
client = clickhouse_connect.get_client(
|
||
host=CH_HOST, port=CH_PORT,
|
||
username=CH_USER, password=CH_PASS,
|
||
database=CH_DB,
|
||
)
|
||
|
||
log.info(f"Deleting: project_id={PROJECT_ID} visit_date={target_date}")
|
||
client.command(
|
||
f"ALTER TABLE {CH_TABLE} DELETE "
|
||
f"WHERE project_id = {PROJECT_ID} "
|
||
f"AND visit_date = '{target_date}'"
|
||
)
|
||
|
||
total = 0
|
||
for start in range(0, len(df), BATCH_SIZE):
|
||
batch = df.slice(start, BATCH_SIZE)
|
||
client.insert_arrow(CH_TABLE, batch.to_arrow())
|
||
total += len(batch)
|
||
log.info(f" Inserted {total:,} / {len(df):,} rows")
|
||
|
||
log.info(f"ClickHouse load complete: {total:,} rows")
|
||
return total, target_date
|
||
|
||
|
||
# ── Step 6: Verify ────────────────────────────────────────────────────────────
|
||
def verify(expected: int, target_date: date) -> bool:
|
||
"""
|
||
Final sanity check. Exit code 2 = data problem (not a crash).
|
||
Your cron/scheduler can distinguish: exit 0 = OK, exit 1 = crash,
|
||
exit 2 = ran fine but data didn't land correctly.
|
||
"""
|
||
client = clickhouse_connect.get_client(
|
||
host=CH_HOST, port=CH_PORT,
|
||
username=CH_USER, password=CH_PASS,
|
||
database=CH_DB,
|
||
)
|
||
result = client.query(
|
||
f"SELECT count() FROM {CH_TABLE} "
|
||
f"WHERE project_id = {PROJECT_ID} "
|
||
f"AND visit_date = '{target_date}'"
|
||
)
|
||
actual = result.result_rows[0][0]
|
||
log.info(f"Verify — expected={expected:,} clickhouse={actual:,}")
|
||
if actual != expected:
|
||
log.error(f"MISMATCH: sent {expected:,}, ClickHouse has {actual:,}")
|
||
return False
|
||
log.info("Verify passed.")
|
||
return True
|
||
|
||
|
||
# ── Main ──────────────────────────────────────────────────────────────────────
|
||
def main() -> None:
|
||
target_date = date.today() - timedelta(days=1)
|
||
log.info(f"=== Promotion KPI ETL | date={target_date} ===")
|
||
|
||
engine = build_engine()
|
||
|
||
mids = collect_mids(engine, target_date)
|
||
if not mids:
|
||
log.info("No MIDs for yesterday. Nothing to do.")
|
||
sys.exit(0)
|
||
|
||
df_raw = fetch_promotion_data(engine, mids)
|
||
engine.dispose() # cleanly return all pooled connections to OS
|
||
|
||
if df_raw.is_empty():
|
||
log.warning("Empty result set. Exiting.")
|
||
sys.exit(0)
|
||
|
||
df_clean = transform(df_raw)
|
||
|
||
rows_inserted, inserted_date = load_to_clickhouse(df_clean)
|
||
|
||
if not verify(rows_inserted, inserted_date):
|
||
sys.exit(2)
|
||
|
||
log.info(f"=== Done. {rows_inserted:,} rows loaded for {inserted_date} ===")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|