Compare commits
2 Commits
2dd73490db
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 2c5a051772 | |||
| 279eed0d9d |
@@ -0,0 +1,97 @@
|
|||||||
|
-- ClickHouse DDL: promotion_kpi table
|
||||||
|
-- Run ONCE before the first ETL run
|
||||||
|
-- Connect: podman exec -it clickhouse-kelloggs clickhouse-client
|
||||||
|
|
||||||
|
CREATE DATABASE IF NOT EXISTS kelloggs;
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS kelloggs.promotion_kpi
|
||||||
|
(
|
||||||
|
-- ── Core identifiers ────────────────────────────────────────────────────
|
||||||
|
mid Int64,
|
||||||
|
project_id Int32,
|
||||||
|
store_id Int64,
|
||||||
|
employee_id Int64,
|
||||||
|
supervisor_id Int64,
|
||||||
|
|
||||||
|
-- ── Store dimension ─────────────────────────────────────────────────────
|
||||||
|
channel_id Int32,
|
||||||
|
chain_id Int64,
|
||||||
|
storetype_id Int32,
|
||||||
|
|
||||||
|
-- ── Visit ───────────────────────────────────────────────────────────────
|
||||||
|
visit_date Date32,
|
||||||
|
|
||||||
|
-- ── Promotion definition ────────────────────────────────────────────────
|
||||||
|
promo_definition_id Int32,
|
||||||
|
promo_definition_name String,
|
||||||
|
|
||||||
|
-- ── Promotion item (Category / SubCategory / Brand / SubBrand) ──────────
|
||||||
|
-- LowCardinality = dictionary-encoding — ideal for columns with few
|
||||||
|
-- distinct values (only 4 possible values here)
|
||||||
|
promotion_details LowCardinality(String), -- 'Category' | 'SubCategory' | 'Brand' | 'SubBrand'
|
||||||
|
promotion_details_id Int64, -- FK into the chosen master table
|
||||||
|
promo_value_name String, -- e.g. 'Chocos', 'Breakfast Cereals'
|
||||||
|
|
||||||
|
-- ── Execution status ────────────────────────────────────────────────────
|
||||||
|
is_present UInt8, -- 1 = present (Y), 0 = absent (N)
|
||||||
|
reason String, -- populated only when is_present = 0
|
||||||
|
|
||||||
|
-- ── Optional quiz ───────────────────────────────────────────────────────
|
||||||
|
question String,
|
||||||
|
answer String,
|
||||||
|
|
||||||
|
-- ── Photo proof ─────────────────────────────────────────────────────────
|
||||||
|
image1 String, -- full URL or empty string
|
||||||
|
image2 String,
|
||||||
|
|
||||||
|
-- ── Audit ───────────────────────────────────────────────────────────────
|
||||||
|
update_date DateTime DEFAULT now(),
|
||||||
|
update_by String
|
||||||
|
)
|
||||||
|
ENGINE = MergeTree()
|
||||||
|
-- Sort key: queries always filter on project_id + visit_date first
|
||||||
|
-- Adding promo_definition_id speeds up "show me all Promo X results"
|
||||||
|
ORDER BY (project_id, visit_date, mid, promo_definition_id)
|
||||||
|
PARTITION BY toYYYYMM(visit_date)
|
||||||
|
-- Bloom filter on store_id — speeds up store-level promotion reports
|
||||||
|
INDEX idx_store store_id TYPE bloom_filter GRANULARITY 1
|
||||||
|
-- Bloom filter on employee_id — speeds up per-rep promotion audits
|
||||||
|
INDEX idx_emp employee_id TYPE bloom_filter GRANULARITY 1
|
||||||
|
SETTINGS index_granularity = 8192;
|
||||||
|
|
||||||
|
|
||||||
|
-- ── Verification queries ─────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
-- 1. How many promotions loaded today?
|
||||||
|
-- SELECT count(), visit_date
|
||||||
|
-- FROM kelloggs.promotion_kpi
|
||||||
|
-- WHERE project_id = 40148
|
||||||
|
-- GROUP BY visit_date ORDER BY visit_date DESC LIMIT 10;
|
||||||
|
|
||||||
|
-- 2. Present vs absent breakdown
|
||||||
|
-- SELECT
|
||||||
|
-- visit_date,
|
||||||
|
-- countIf(is_present = 1) AS present,
|
||||||
|
-- countIf(is_present = 0) AS absent,
|
||||||
|
-- count() AS total,
|
||||||
|
-- round(present / total * 100, 1) AS present_pct
|
||||||
|
-- FROM kelloggs.promotion_kpi
|
||||||
|
-- WHERE project_id = 40148
|
||||||
|
-- GROUP BY visit_date ORDER BY visit_date DESC LIMIT 7;
|
||||||
|
|
||||||
|
-- 3. Top promotion definitions by absence
|
||||||
|
-- SELECT promo_definition_name, count() AS absent_count
|
||||||
|
-- FROM kelloggs.promotion_kpi
|
||||||
|
-- WHERE project_id = 40148
|
||||||
|
-- AND is_present = 0
|
||||||
|
-- AND visit_date >= today() - 7
|
||||||
|
-- GROUP BY promo_definition_name
|
||||||
|
-- ORDER BY absent_count DESC LIMIT 10;
|
||||||
|
|
||||||
|
-- 4. Absence reasons breakdown
|
||||||
|
-- SELECT reason, count() AS cnt
|
||||||
|
-- FROM kelloggs.promotion_kpi
|
||||||
|
-- WHERE project_id = 40148
|
||||||
|
-- AND is_present = 0
|
||||||
|
-- AND visit_date = yesterday()
|
||||||
|
-- GROUP BY reason ORDER BY cnt DESC;
|
||||||
@@ -0,0 +1,481 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
# /// script
|
||||||
|
# requires-python = ">=3.11"
|
||||||
|
# dependencies = [
|
||||||
|
# "polars>=0.20.0",
|
||||||
|
# "sqlalchemy>=2.0.0",
|
||||||
|
# "pyodbc>=5.0.0",
|
||||||
|
# "clickhouse-connect>=0.7.0",
|
||||||
|
# "pyarrow>=18.0.0",
|
||||||
|
# "python-dotenv>=1.0.0",
|
||||||
|
# ]
|
||||||
|
# ///
|
||||||
|
|
||||||
|
"""
|
||||||
|
ETL Pipeline: Promotion KPI — SQL Server → Polars → ClickHouse
|
||||||
|
---------------------------------------------------------------
|
||||||
|
Run with: uv run etl_promotion_kpi.py
|
||||||
|
|
||||||
|
What is Promotion KPI?
|
||||||
|
Kellogg's runs in-store promotions — standees, special shelf displays,
|
||||||
|
"buy 2 get 1 free" offers. The salesperson visits a store and records:
|
||||||
|
- Is the promotion physically present? (Present = Y/N)
|
||||||
|
- If absent, what is the reason? (Reason)
|
||||||
|
- What category/brand does it apply to? (PromoValueName)
|
||||||
|
- Photo proof (Image1, Image2)
|
||||||
|
- Quiz-style question + answer about the promo (Question, Answer)
|
||||||
|
|
||||||
|
The correlated CASE subqueries in the SP resolve the PromoTable column
|
||||||
|
('Master_Category' / 'Master_Brand' etc.) into a human-readable name.
|
||||||
|
SQL Server does this at query time — Polars receives the final string.
|
||||||
|
Nothing special needed in the transform for those columns.
|
||||||
|
|
||||||
|
Architecture: same 6-step pattern as Coverage KPI
|
||||||
|
1. Build SQLAlchemy + pyodbc engine (URL.create + odbc_connect DSN)
|
||||||
|
2. Collect MIDs for yesterday from T_StoreCoverage
|
||||||
|
3. Fetch Promotion rows in MID chunks via pl.read_database()
|
||||||
|
4. Transform with Polars (types, nulls, flags)
|
||||||
|
5. Delete stale ClickHouse rows → insert_arrow() in batches
|
||||||
|
6. Verify row count
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import logging
|
||||||
|
from datetime import date, timedelta
|
||||||
|
|
||||||
|
import polars as pl
|
||||||
|
from sqlalchemy import create_engine, text
|
||||||
|
from sqlalchemy.engine import Engine, URL
|
||||||
|
import clickhouse_connect
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
|
||||||
|
load_dotenv()
|
||||||
|
|
||||||
|
# ── Logging ───────────────────────────────────────────────────────────────────
|
||||||
|
logging.basicConfig(
|
||||||
|
level=logging.INFO,
|
||||||
|
format="%(asctime)s | %(levelname)-8s | %(message)s",
|
||||||
|
datefmt="%Y-%m-%d %H:%M:%S",
|
||||||
|
)
|
||||||
|
log = logging.getLogger("etl_promotion")
|
||||||
|
|
||||||
|
# ── Config ────────────────────────────────────────────────────────────────────
|
||||||
|
MSSQL_SERVER = os.getenv("MSSQL_SERVER", "your_server")
|
||||||
|
MSSQL_DB = os.getenv("MSSQL_DB", "insight")
|
||||||
|
MSSQL_USER = os.getenv("MSSQL_USER", "sa")
|
||||||
|
MSSQL_PASS = os.getenv("MSSQL_PASS", "")
|
||||||
|
|
||||||
|
# Build pyodbc DSN string first — keeps special chars in passwords safe.
|
||||||
|
# Then pass the whole DSN to URL.create() via "odbc_connect".
|
||||||
|
# URL.create() percent-encodes it internally so @ / + # never break parsing.
|
||||||
|
_ODBC_DSN = (
|
||||||
|
f"DRIVER={{ODBC Driver 18 for SQL Server}};"
|
||||||
|
f"SERVER={MSSQL_SERVER};"
|
||||||
|
f"DATABASE={MSSQL_DB};"
|
||||||
|
f"UID={MSSQL_USER};"
|
||||||
|
f"PWD={MSSQL_PASS};"
|
||||||
|
"TrustServerCertificate=yes;"
|
||||||
|
"timeout=30;"
|
||||||
|
)
|
||||||
|
|
||||||
|
MSSQL_SA_URL = URL.create(
|
||||||
|
drivername="mssql+pyodbc",
|
||||||
|
query={"odbc_connect": _ODBC_DSN},
|
||||||
|
)
|
||||||
|
|
||||||
|
CH_HOST = os.getenv("CH_HOST", "localhost")
|
||||||
|
CH_PORT = int(os.getenv("CH_PORT", "8123"))
|
||||||
|
CH_USER = os.getenv("CH_USER", "default")
|
||||||
|
CH_PASS = os.getenv("CH_PASS", "")
|
||||||
|
CH_DB = os.getenv("CH_DB", "kelloggs")
|
||||||
|
CH_TABLE = "promotion_kpi"
|
||||||
|
|
||||||
|
BATCH_SIZE = 5000 # rows per ClickHouse insert
|
||||||
|
MID_CHUNK = 1000 # SQL Server IN-clause safe chunk size
|
||||||
|
PROJECT_ID = 40148
|
||||||
|
|
||||||
|
|
||||||
|
# ── Step 1: Build SQLAlchemy Engine ───────────────────────────────────────────
|
||||||
|
def build_engine() -> Engine:
|
||||||
|
"""
|
||||||
|
SQLAlchemy = smart manager (handles pooling, dialect, parameterisation)
|
||||||
|
pyodbc = actual wire (speaks ODBC to SQL Server driver)
|
||||||
|
URL.create = safe assembler (never mangles passwords with special chars)
|
||||||
|
|
||||||
|
The engine is created ONCE and shared across all steps.
|
||||||
|
Each with engine.connect() borrows a slot from the pool and returns
|
||||||
|
it automatically when the block exits — no manual close() calls.
|
||||||
|
"""
|
||||||
|
log.info("Building SQLAlchemy + pyodbc engine...")
|
||||||
|
engine = create_engine(
|
||||||
|
MSSQL_SA_URL,
|
||||||
|
pool_size=3,
|
||||||
|
max_overflow=2,
|
||||||
|
pool_pre_ping=True,
|
||||||
|
echo=False,
|
||||||
|
connect_args={"fast_executemany": True},
|
||||||
|
)
|
||||||
|
with engine.connect() as conn:
|
||||||
|
conn.execute(text("SELECT 1"))
|
||||||
|
log.info("Engine ready.")
|
||||||
|
return engine
|
||||||
|
|
||||||
|
|
||||||
|
# ── Step 2: Collect MIDs ──────────────────────────────────────────────────────
|
||||||
|
def collect_mids(engine: Engine, target_date: date) -> list[int]:
|
||||||
|
"""
|
||||||
|
Exact replica of the SP's @MID_TABLE_COV logic.
|
||||||
|
UNION of CreateDate + UpdateDate = catches both new visits and edits.
|
||||||
|
:target_date is a named SQLAlchemy parameter — safe, no injection risk.
|
||||||
|
"""
|
||||||
|
sql = text("""
|
||||||
|
SELECT MID FROM OneApp_KelloggsMT.dbo.T_StoreCoverage
|
||||||
|
WHERE CONVERT(date, CreateDate) = :target_date
|
||||||
|
UNION
|
||||||
|
SELECT MID FROM OneApp_KelloggsMT.dbo.T_StoreCoverage
|
||||||
|
WHERE CONVERT(date, UpdateDate) = :target_date
|
||||||
|
""")
|
||||||
|
log.info(f"Collecting MIDs for: {target_date}")
|
||||||
|
with engine.connect() as conn:
|
||||||
|
result = conn.execute(sql, {"target_date": target_date})
|
||||||
|
mids = [row[0] for row in result.fetchall()]
|
||||||
|
log.info(f"Found {len(mids):,} MIDs")
|
||||||
|
return mids
|
||||||
|
|
||||||
|
|
||||||
|
# ── Step 3: Fetch Promotion data ──────────────────────────────────────────────
|
||||||
|
def fetch_promotion_data(engine: Engine, mids: list[int]) -> pl.DataFrame:
|
||||||
|
"""
|
||||||
|
The SQL here is a direct translation of the SP's Promotion INSERT SELECT.
|
||||||
|
Three things worth understanding:
|
||||||
|
|
||||||
|
1. Correlated CASE subqueries (PromoValueName)
|
||||||
|
The SP resolves the promo item name by looking up whichever master
|
||||||
|
table the PromoTable column points to. SQL Server runs these
|
||||||
|
subqueries at execution time and sends Polars a plain VARCHAR.
|
||||||
|
Nothing special needed in Python — they arrive as strings.
|
||||||
|
|
||||||
|
2. LEFT JOINs (images, reason, question)
|
||||||
|
These are optional — not every promo has a photo or a question.
|
||||||
|
NULLs from LEFT JOINs are handled in the transform step.
|
||||||
|
|
||||||
|
3. ISNULL(mpq.PromoQuestionName, '') and ISNULL(tpq.PromoAnswerName, '')
|
||||||
|
Already handled in SQL — arrive as empty strings, not NULLs.
|
||||||
|
We still do fill_null('') in Polars for safety.
|
||||||
|
|
||||||
|
MID chunking: SQL Server's IN clause limit is ~2100 parameters.
|
||||||
|
We split into chunks of 1000 MIDs each and concat the Polars frames.
|
||||||
|
pl.read_database() opens a connection from the pool per chunk and
|
||||||
|
releases it back automatically after the query finishes.
|
||||||
|
"""
|
||||||
|
if not mids:
|
||||||
|
log.warning("No MIDs — nothing to fetch.")
|
||||||
|
return pl.DataFrame()
|
||||||
|
|
||||||
|
# Exact SELECT from the SP's Promotion INSERT — columns renamed to
|
||||||
|
# snake_case for ClickHouse convention
|
||||||
|
promotion_sql = """
|
||||||
|
SELECT
|
||||||
|
sc.MID AS mid,
|
||||||
|
{project_id} AS project_id,
|
||||||
|
sc.StoreId AS store_id,
|
||||||
|
Em.EmpId AS employee_id,
|
||||||
|
CONVERT(date, sc.VisitDate) AS visit_date,
|
||||||
|
Em.SupervisorId AS supervisor_id,
|
||||||
|
sm.ChannelId AS channel_id,
|
||||||
|
sm.ChainId AS chain_id,
|
||||||
|
sm.StoreTypeId AS storetype_id,
|
||||||
|
msd.PromoDefinitionId AS promo_definition_id,
|
||||||
|
msd.PromoDefinitionName AS promo_definition_name,
|
||||||
|
|
||||||
|
-- Resolve which dimension level the promo targets
|
||||||
|
-- (Category / SubCategory / Brand / SubBrand)
|
||||||
|
CASE
|
||||||
|
WHEN ISNULL(ts.PromoTable,'') = 'Master_Category' THEN 'Category'
|
||||||
|
WHEN ISNULL(ts.PromoTable,'') = 'Master_SubCategory' THEN 'SubCategory'
|
||||||
|
WHEN ISNULL(ts.PromoTable,'') = 'Master_Brand' THEN 'Brand'
|
||||||
|
WHEN ISNULL(ts.PromoTable,'') = 'Master_SubBrand' THEN 'SubBrand'
|
||||||
|
ELSE ''
|
||||||
|
END AS promotion_details,
|
||||||
|
|
||||||
|
ts.PromoValue AS promotion_details_id,
|
||||||
|
|
||||||
|
-- Resolve the actual name of that dimension item
|
||||||
|
-- SQL Server evaluates exactly ONE of these subqueries per row
|
||||||
|
CASE
|
||||||
|
WHEN ISNULL(ts.PromoTable,'') = 'Master_Category'
|
||||||
|
THEN (SELECT a.CategoryName
|
||||||
|
FROM OneApp_KelloggsMT.dbo.Master_Category a
|
||||||
|
WHERE a.CategoryId = ts.PromoValue)
|
||||||
|
WHEN ISNULL(ts.PromoTable,'') = 'Master_SubCategory'
|
||||||
|
THEN (SELECT a.SubCategoryName
|
||||||
|
FROM OneApp_KelloggsMT.dbo.Master_SubCategory a
|
||||||
|
WHERE a.SubCategoryId = ts.PromoValue)
|
||||||
|
WHEN ISNULL(ts.PromoTable,'') = 'Master_Brand'
|
||||||
|
THEN (SELECT a.BrandName
|
||||||
|
FROM OneApp_KelloggsMT.dbo.Master_Brand a
|
||||||
|
WHERE a.BrandId = ts.PromoValue)
|
||||||
|
WHEN ISNULL(ts.PromoTable,'') = 'Master_SubBrand'
|
||||||
|
THEN (SELECT a.SubBrandName
|
||||||
|
FROM OneApp_KelloggsMT.dbo.Master_SubBrand a
|
||||||
|
WHERE a.SubBrandId = ts.PromoValue)
|
||||||
|
END AS promo_value_name,
|
||||||
|
|
||||||
|
-- Present flag: 0 → 'N', anything else → 'Y'
|
||||||
|
CASE WHEN ts.Present = 0 THEN 'N' ELSE 'Y' END AS is_present,
|
||||||
|
|
||||||
|
-- Reason only populated when promo is ABSENT
|
||||||
|
CASE
|
||||||
|
WHEN ts.Present = 1 THEN ''
|
||||||
|
ELSE ISNULL(mnp.PromoReason, '')
|
||||||
|
END AS reason,
|
||||||
|
|
||||||
|
ISNULL(mpq.PromoQuestionName, '') AS question,
|
||||||
|
ISNULL(tpq.PromoAnswerName, '') AS answer,
|
||||||
|
|
||||||
|
-- Image URLs — empty string when no image
|
||||||
|
CASE
|
||||||
|
WHEN ISNULL(SHI.PromoImage1,'') = '' THEN ''
|
||||||
|
ELSE 'https://kimt1.parinaam.in/Upload/PromotionImages/'
|
||||||
|
+ SHI.PromoImage1
|
||||||
|
END AS image1,
|
||||||
|
CASE
|
||||||
|
WHEN ISNULL(SHI.PromoImage2,'') = '' THEN ''
|
||||||
|
ELSE 'https://kimt1.parinaam.in/Upload/PromotionImages/'
|
||||||
|
+ SHI.PromoImage2
|
||||||
|
END AS image2,
|
||||||
|
|
||||||
|
GETDATE() AS update_date,
|
||||||
|
'ETL-SQLAlchemy' AS update_by
|
||||||
|
|
||||||
|
FROM OneApp_KelloggsMT.dbo.T_Promotion ts
|
||||||
|
INNER JOIN OneApp_KelloggsMT.dbo.T_StoreCoverage sc
|
||||||
|
ON ts.mid = sc.mid
|
||||||
|
INNER JOIN OneApp_KelloggsMT.dbo.vw_StoreDetail sm
|
||||||
|
ON sc.StoreId = sm.StoreId
|
||||||
|
INNER JOIN OneApp_KelloggsMT.dbo.vw_Employee_Detail Em
|
||||||
|
ON sc.EmpId = Em.EmpId
|
||||||
|
INNER JOIN OneApp_KelloggsMT.dbo.Master_PromotionDefinition msd
|
||||||
|
ON msd.PromoDefinitionId = ts.PromoDefinitionId
|
||||||
|
LEFT JOIN OneApp_KelloggsMT.dbo.T_PromotionImages SHI
|
||||||
|
ON ts.PId = SHI.PId
|
||||||
|
LEFT JOIN OneApp_KelloggsMT.dbo.Master_PromotionReason mnp
|
||||||
|
ON ts.PromoReasonId = mnp.PromoReasonId
|
||||||
|
LEFT JOIN OneApp_KelloggsMT.dbo.t_promotionquestion tpq
|
||||||
|
ON ts.PId = tpq.PId
|
||||||
|
LEFT JOIN OneApp_KelloggsMT.dbo.Master_PromotionQuestion mpq
|
||||||
|
ON tpq.PromoQuestionId = mpq.PromoQuestionId
|
||||||
|
|
||||||
|
WHERE Em.EmpName NOT LIKE 'test%'
|
||||||
|
AND sc.MID IN ({mid_placeholders})
|
||||||
|
"""
|
||||||
|
|
||||||
|
frames: list[pl.DataFrame] = []
|
||||||
|
|
||||||
|
for i in range(0, len(mids), MID_CHUNK):
|
||||||
|
chunk = mids[i : i + MID_CHUNK]
|
||||||
|
mid_list = ",".join(str(m) for m in chunk) # safe: all integers
|
||||||
|
sql = promotion_sql.format(
|
||||||
|
project_id=PROJECT_ID,
|
||||||
|
mid_placeholders=mid_list,
|
||||||
|
)
|
||||||
|
|
||||||
|
log.info(
|
||||||
|
f"Fetching chunk {i // MID_CHUNK + 1} "
|
||||||
|
f"(MIDs {i+1}–{min(i + MID_CHUNK, len(mids))} of {len(mids)})..."
|
||||||
|
)
|
||||||
|
|
||||||
|
# pl.read_database() borrows a connection from the SQLAlchemy pool,
|
||||||
|
# runs the query, converts the result directly to a Polars DataFrame,
|
||||||
|
# then returns the connection to the pool. One clean call per chunk.
|
||||||
|
df_chunk = pl.read_database(query=sql, connection=engine)
|
||||||
|
if len(df_chunk) > 0:
|
||||||
|
frames.append(df_chunk)
|
||||||
|
|
||||||
|
if not frames:
|
||||||
|
log.warning("No promotion rows returned.")
|
||||||
|
return pl.DataFrame()
|
||||||
|
|
||||||
|
df = pl.concat(frames)
|
||||||
|
log.info(f"Fetched {len(df):,} total rows from SQL Server")
|
||||||
|
return df
|
||||||
|
|
||||||
|
|
||||||
|
# ── Step 4: Transform with Polars ─────────────────────────────────────────────
|
||||||
|
def transform(df: pl.DataFrame) -> pl.DataFrame:
|
||||||
|
"""
|
||||||
|
Assembly-line approach: each .with_columns() block is one station.
|
||||||
|
No station does two different jobs — easy to debug and extend.
|
||||||
|
|
||||||
|
Promotion-specific notes:
|
||||||
|
┌──────────────────────┬──────────────────────────────────────────────┐
|
||||||
|
│ Column │ Why / How │
|
||||||
|
├──────────────────────┼──────────────────────────────────────────────┤
|
||||||
|
│ is_present │ 'Y'/'N' string → UInt8 (ClickHouse no Bool) │
|
||||||
|
│ promo_value_name │ Can be NULL when PromoTable is unknown → │
|
||||||
|
│ │ fill_null('') │
|
||||||
|
│ promotion_details_id │ INT from SQL Server → cast to Int64 │
|
||||||
|
│ image1 / image2 │ Already empty string from SQL, still safe │
|
||||||
|
│ update_date │ GETDATE() comes as datetime → cast Datetime │
|
||||||
|
└──────────────────────┴──────────────────────────────────────────────┘
|
||||||
|
"""
|
||||||
|
log.info("Transforming Promotion data...")
|
||||||
|
|
||||||
|
df = (
|
||||||
|
df
|
||||||
|
|
||||||
|
# ── IDs ───────────────────────────────────────────────────────────────
|
||||||
|
.with_columns([
|
||||||
|
pl.col("mid").cast(pl.Int64),
|
||||||
|
pl.col("project_id").cast(pl.Int32),
|
||||||
|
pl.col("store_id").cast(pl.Int64),
|
||||||
|
pl.col("employee_id").cast(pl.Int64),
|
||||||
|
pl.col("supervisor_id").cast(pl.Int64).fill_null(0),
|
||||||
|
pl.col("channel_id").cast(pl.Int32).fill_null(0),
|
||||||
|
pl.col("chain_id").cast(pl.Int64).fill_null(0),
|
||||||
|
pl.col("storetype_id").cast(pl.Int32).fill_null(0),
|
||||||
|
pl.col("promo_definition_id").cast(pl.Int32).fill_null(0),
|
||||||
|
# promotion_details_id = PromoValue (the FK into the chosen master)
|
||||||
|
pl.col("promotion_details_id").cast(pl.Int64).fill_null(0),
|
||||||
|
])
|
||||||
|
|
||||||
|
# ── Date ──────────────────────────────────────────────────────────────
|
||||||
|
# SQL Server CONVERT(date, ...) may arrive as Python datetime via ODBC
|
||||||
|
.with_columns(
|
||||||
|
pl.col("visit_date").cast(pl.Date)
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── Present flag: 'Y'/'N' → UInt8 (1/0) ─────────────────────────────
|
||||||
|
# ClickHouse uses UInt8 for booleans — no native Bool type
|
||||||
|
.with_columns(
|
||||||
|
pl.when(pl.col("is_present") == "Y")
|
||||||
|
.then(pl.lit(1, dtype=pl.UInt8))
|
||||||
|
.otherwise(pl.lit(0, dtype=pl.UInt8))
|
||||||
|
.alias("is_present")
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── Strings: NULL → empty string ──────────────────────────────────────
|
||||||
|
# ClickHouse String columns reject NULL — every string col needs this
|
||||||
|
# promo_value_name is the most likely NULL (unknown PromoTable value)
|
||||||
|
.with_columns([
|
||||||
|
pl.col("promo_definition_name").fill_null(""),
|
||||||
|
pl.col("promotion_details").fill_null(""),
|
||||||
|
pl.col("promo_value_name").fill_null(""),
|
||||||
|
pl.col("reason").fill_null(""),
|
||||||
|
pl.col("question").fill_null(""),
|
||||||
|
pl.col("answer").fill_null(""),
|
||||||
|
pl.col("image1").fill_null(""),
|
||||||
|
pl.col("image2").fill_null(""),
|
||||||
|
pl.col("update_by").fill_null("ETL-SQLAlchemy"),
|
||||||
|
])
|
||||||
|
|
||||||
|
# ── Timestamp ─────────────────────────────────────────────────────────
|
||||||
|
.with_columns(
|
||||||
|
pl.col("update_date").cast(pl.Datetime)
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── Dedup ─────────────────────────────────────────────────────────────
|
||||||
|
# A promo record is unique per MID + promo_definition_id + details_id.
|
||||||
|
# Chunked fetches can produce duplicates if MIDs overlap — drop them.
|
||||||
|
.unique(subset=["project_id", "mid", "promo_definition_id", "promotion_details_id"])
|
||||||
|
)
|
||||||
|
|
||||||
|
log.info(f"After transform: {len(df):,} rows | schema: {df.schema}")
|
||||||
|
return df
|
||||||
|
|
||||||
|
|
||||||
|
# ── Step 5: Load to ClickHouse ────────────────────────────────────────────────
|
||||||
|
def load_to_clickhouse(df: pl.DataFrame) -> tuple[int, date]:
|
||||||
|
"""
|
||||||
|
Delete-then-insert mirrors the SP exactly:
|
||||||
|
DELETE FROM Promotion WHERE MID IN (...) AND project_id=40148
|
||||||
|
INSERT INTO Promotion ...
|
||||||
|
|
||||||
|
We delete by visit_date (cheaper than per-MID deletes in ClickHouse).
|
||||||
|
insert_arrow() uses Polars' native Arrow backing — zero-copy, no pandas.
|
||||||
|
"""
|
||||||
|
target_date: date = df["visit_date"].min()
|
||||||
|
|
||||||
|
log.info(f"Connecting to ClickHouse {CH_HOST}:{CH_PORT} db={CH_DB}...")
|
||||||
|
client = clickhouse_connect.get_client(
|
||||||
|
host=CH_HOST, port=CH_PORT,
|
||||||
|
username=CH_USER, password=CH_PASS,
|
||||||
|
database=CH_DB,
|
||||||
|
)
|
||||||
|
|
||||||
|
log.info(f"Deleting: project_id={PROJECT_ID} visit_date={target_date}")
|
||||||
|
client.command(
|
||||||
|
f"ALTER TABLE {CH_TABLE} DELETE "
|
||||||
|
f"WHERE project_id = {PROJECT_ID} "
|
||||||
|
f"AND visit_date = '{target_date}'"
|
||||||
|
)
|
||||||
|
|
||||||
|
total = 0
|
||||||
|
for start in range(0, len(df), BATCH_SIZE):
|
||||||
|
batch = df.slice(start, BATCH_SIZE)
|
||||||
|
client.insert_arrow(CH_TABLE, batch.to_arrow())
|
||||||
|
total += len(batch)
|
||||||
|
log.info(f" Inserted {total:,} / {len(df):,} rows")
|
||||||
|
|
||||||
|
log.info(f"ClickHouse load complete: {total:,} rows")
|
||||||
|
return total, target_date
|
||||||
|
|
||||||
|
|
||||||
|
# ── Step 6: Verify ────────────────────────────────────────────────────────────
|
||||||
|
def verify(expected: int, target_date: date) -> bool:
|
||||||
|
"""
|
||||||
|
Final sanity check. Exit code 2 = data problem (not a crash).
|
||||||
|
Your cron/scheduler can distinguish: exit 0 = OK, exit 1 = crash,
|
||||||
|
exit 2 = ran fine but data didn't land correctly.
|
||||||
|
"""
|
||||||
|
client = clickhouse_connect.get_client(
|
||||||
|
host=CH_HOST, port=CH_PORT,
|
||||||
|
username=CH_USER, password=CH_PASS,
|
||||||
|
database=CH_DB,
|
||||||
|
)
|
||||||
|
result = client.query(
|
||||||
|
f"SELECT count() FROM {CH_TABLE} "
|
||||||
|
f"WHERE project_id = {PROJECT_ID} "
|
||||||
|
f"AND visit_date = '{target_date}'"
|
||||||
|
)
|
||||||
|
actual = result.result_rows[0][0]
|
||||||
|
log.info(f"Verify — expected={expected:,} clickhouse={actual:,}")
|
||||||
|
if actual != expected:
|
||||||
|
log.error(f"MISMATCH: sent {expected:,}, ClickHouse has {actual:,}")
|
||||||
|
return False
|
||||||
|
log.info("Verify passed.")
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
# ── Main ──────────────────────────────────────────────────────────────────────
|
||||||
|
def main() -> None:
|
||||||
|
target_date = date.today() - timedelta(days=1)
|
||||||
|
log.info(f"=== Promotion KPI ETL | date={target_date} ===")
|
||||||
|
|
||||||
|
engine = build_engine()
|
||||||
|
|
||||||
|
mids = collect_mids(engine, target_date)
|
||||||
|
if not mids:
|
||||||
|
log.info("No MIDs for yesterday. Nothing to do.")
|
||||||
|
sys.exit(0)
|
||||||
|
|
||||||
|
df_raw = fetch_promotion_data(engine, mids)
|
||||||
|
engine.dispose() # cleanly return all pooled connections to OS
|
||||||
|
|
||||||
|
if df_raw.is_empty():
|
||||||
|
log.warning("Empty result set. Exiting.")
|
||||||
|
sys.exit(0)
|
||||||
|
|
||||||
|
df_clean = transform(df_raw)
|
||||||
|
|
||||||
|
rows_inserted, inserted_date = load_to_clickhouse(df_clean)
|
||||||
|
|
||||||
|
if not verify(rows_inserted, inserted_date):
|
||||||
|
sys.exit(2)
|
||||||
|
|
||||||
|
log.info(f"=== Done. {rows_inserted:,} rows loaded for {inserted_date} ===")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
@@ -428,37 +428,40 @@ def verify(expected: int, target_date: date) -> bool:
|
|||||||
|
|
||||||
# ── Main ──────────────────────────────────────────────────────────────────────
|
# ── Main ──────────────────────────────────────────────────────────────────────
|
||||||
def main() -> None:
|
def main() -> None:
|
||||||
target_date = date.today() - timedelta(days=1)
|
#target_date = date.today() - timedelta(days=1)
|
||||||
log.info(f"=== Coverage KPI ETL | date={target_date} ===")
|
for i in range(1, 10):
|
||||||
|
target_date = date(2023, 3, 9) + timedelta(days=i)
|
||||||
|
print(target_date)
|
||||||
|
log.info(f"=== Coverage KPI ETL | date={target_date} ===")
|
||||||
|
|
||||||
# Build the SQLAlchemy engine once — shared across all steps
|
# Build the SQLAlchemy engine once — shared across all steps
|
||||||
engine = build_engine()
|
engine = build_engine()
|
||||||
|
|
||||||
# Collect which MIDs need processing
|
# Collect which MIDs need processing
|
||||||
mids = collect_mids(engine, target_date)
|
mids = collect_mids(engine, target_date)
|
||||||
if not mids:
|
if not mids:
|
||||||
log.info("No MIDs for yesterday. Nothing to do.")
|
log.info("No MIDs for yesterday. Nothing to do.")
|
||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
|
|
||||||
# Fetch raw data using pl.read_database()
|
# Fetch raw data using pl.read_database()
|
||||||
df_raw = fetch_coverage_data(engine, mids)
|
df_raw = fetch_coverage_data(engine, mids)
|
||||||
engine.dispose() # return all pooled connections to OS cleanly
|
engine.dispose() # return all pooled connections to OS cleanly
|
||||||
|
|
||||||
if df_raw.is_empty():
|
if df_raw.is_empty():
|
||||||
log.warning("Empty result set. Exiting.")
|
log.warning("Empty result set. Exiting.")
|
||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
|
|
||||||
# Transform
|
# Transform
|
||||||
df_clean = transform(df_raw)
|
df_clean = transform(df_raw)
|
||||||
|
|
||||||
# Load
|
# Load
|
||||||
rows_inserted, inserted_date = load_to_clickhouse(df_clean)
|
rows_inserted, inserted_date = load_to_clickhouse(df_clean)
|
||||||
|
|
||||||
# Verify
|
# Verify
|
||||||
if not verify(rows_inserted, inserted_date):
|
if not verify(rows_inserted, inserted_date):
|
||||||
sys.exit(2)
|
sys.exit(2)
|
||||||
|
|
||||||
log.info(f"=== Done. {rows_inserted:,} rows loaded for {inserted_date} ===")
|
log.info(f"=== Done. {rows_inserted:,} rows loaded for {inserted_date} ===")
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|||||||
Reference in New Issue
Block a user