Compare commits
2 Commits
2dd73490db
...
2c5a051772
| Author | SHA1 | Date | |
|---|---|---|---|
| 2c5a051772 | |||
| 279eed0d9d |
@@ -0,0 +1,97 @@
|
||||
-- ClickHouse DDL: promotion_kpi table
|
||||
-- Run ONCE before the first ETL run
|
||||
-- Connect: podman exec -it clickhouse-kelloggs clickhouse-client
|
||||
|
||||
CREATE DATABASE IF NOT EXISTS kelloggs;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS kelloggs.promotion_kpi
|
||||
(
|
||||
-- ── Core identifiers ────────────────────────────────────────────────────
|
||||
mid Int64,
|
||||
project_id Int32,
|
||||
store_id Int64,
|
||||
employee_id Int64,
|
||||
supervisor_id Int64,
|
||||
|
||||
-- ── Store dimension ─────────────────────────────────────────────────────
|
||||
channel_id Int32,
|
||||
chain_id Int64,
|
||||
storetype_id Int32,
|
||||
|
||||
-- ── Visit ───────────────────────────────────────────────────────────────
|
||||
visit_date Date32,
|
||||
|
||||
-- ── Promotion definition ────────────────────────────────────────────────
|
||||
promo_definition_id Int32,
|
||||
promo_definition_name String,
|
||||
|
||||
-- ── Promotion item (Category / SubCategory / Brand / SubBrand) ──────────
|
||||
-- LowCardinality = dictionary-encoding — ideal for columns with few
|
||||
-- distinct values (only 4 possible values here)
|
||||
promotion_details LowCardinality(String), -- 'Category' | 'SubCategory' | 'Brand' | 'SubBrand'
|
||||
promotion_details_id Int64, -- FK into the chosen master table
|
||||
promo_value_name String, -- e.g. 'Chocos', 'Breakfast Cereals'
|
||||
|
||||
-- ── Execution status ────────────────────────────────────────────────────
|
||||
is_present UInt8, -- 1 = present (Y), 0 = absent (N)
|
||||
reason String, -- populated only when is_present = 0
|
||||
|
||||
-- ── Optional quiz ───────────────────────────────────────────────────────
|
||||
question String,
|
||||
answer String,
|
||||
|
||||
-- ── Photo proof ─────────────────────────────────────────────────────────
|
||||
image1 String, -- full URL or empty string
|
||||
image2 String,
|
||||
|
||||
-- ── Audit ───────────────────────────────────────────────────────────────
|
||||
update_date DateTime DEFAULT now(),
|
||||
update_by String
|
||||
)
|
||||
ENGINE = MergeTree()
|
||||
-- Sort key: queries always filter on project_id + visit_date first
|
||||
-- Adding promo_definition_id speeds up "show me all Promo X results"
|
||||
ORDER BY (project_id, visit_date, mid, promo_definition_id)
|
||||
PARTITION BY toYYYYMM(visit_date)
|
||||
-- Bloom filter on store_id — speeds up store-level promotion reports
|
||||
INDEX idx_store store_id TYPE bloom_filter GRANULARITY 1
|
||||
-- Bloom filter on employee_id — speeds up per-rep promotion audits
|
||||
INDEX idx_emp employee_id TYPE bloom_filter GRANULARITY 1
|
||||
SETTINGS index_granularity = 8192;
|
||||
|
||||
|
||||
-- ── Verification queries ─────────────────────────────────────────────────────
|
||||
|
||||
-- 1. How many promotions loaded today?
|
||||
-- SELECT count(), visit_date
|
||||
-- FROM kelloggs.promotion_kpi
|
||||
-- WHERE project_id = 40148
|
||||
-- GROUP BY visit_date ORDER BY visit_date DESC LIMIT 10;
|
||||
|
||||
-- 2. Present vs absent breakdown
|
||||
-- SELECT
|
||||
-- visit_date,
|
||||
-- countIf(is_present = 1) AS present,
|
||||
-- countIf(is_present = 0) AS absent,
|
||||
-- count() AS total,
|
||||
-- round(present / total * 100, 1) AS present_pct
|
||||
-- FROM kelloggs.promotion_kpi
|
||||
-- WHERE project_id = 40148
|
||||
-- GROUP BY visit_date ORDER BY visit_date DESC LIMIT 7;
|
||||
|
||||
-- 3. Top promotion definitions by absence
|
||||
-- SELECT promo_definition_name, count() AS absent_count
|
||||
-- FROM kelloggs.promotion_kpi
|
||||
-- WHERE project_id = 40148
|
||||
-- AND is_present = 0
|
||||
-- AND visit_date >= today() - 7
|
||||
-- GROUP BY promo_definition_name
|
||||
-- ORDER BY absent_count DESC LIMIT 10;
|
||||
|
||||
-- 4. Absence reasons breakdown
|
||||
-- SELECT reason, count() AS cnt
|
||||
-- FROM kelloggs.promotion_kpi
|
||||
-- WHERE project_id = 40148
|
||||
-- AND is_present = 0
|
||||
-- AND visit_date = yesterday()
|
||||
-- GROUP BY reason ORDER BY cnt DESC;
|
||||
@@ -0,0 +1,481 @@
|
||||
#!/usr/bin/env python3
|
||||
# /// script
|
||||
# requires-python = ">=3.11"
|
||||
# dependencies = [
|
||||
# "polars>=0.20.0",
|
||||
# "sqlalchemy>=2.0.0",
|
||||
# "pyodbc>=5.0.0",
|
||||
# "clickhouse-connect>=0.7.0",
|
||||
# "pyarrow>=18.0.0",
|
||||
# "python-dotenv>=1.0.0",
|
||||
# ]
|
||||
# ///
|
||||
|
||||
"""
|
||||
ETL Pipeline: Promotion KPI — SQL Server → Polars → ClickHouse
|
||||
---------------------------------------------------------------
|
||||
Run with: uv run etl_promotion_kpi.py
|
||||
|
||||
What is Promotion KPI?
|
||||
Kellogg's runs in-store promotions — standees, special shelf displays,
|
||||
"buy 2 get 1 free" offers. The salesperson visits a store and records:
|
||||
- Is the promotion physically present? (Present = Y/N)
|
||||
- If absent, what is the reason? (Reason)
|
||||
- What category/brand does it apply to? (PromoValueName)
|
||||
- Photo proof (Image1, Image2)
|
||||
- Quiz-style question + answer about the promo (Question, Answer)
|
||||
|
||||
The correlated CASE subqueries in the SP resolve the PromoTable column
|
||||
('Master_Category' / 'Master_Brand' etc.) into a human-readable name.
|
||||
SQL Server does this at query time — Polars receives the final string.
|
||||
Nothing special needed in the transform for those columns.
|
||||
|
||||
Architecture: same 6-step pattern as Coverage KPI
|
||||
1. Build SQLAlchemy + pyodbc engine (URL.create + odbc_connect DSN)
|
||||
2. Collect MIDs for yesterday from T_StoreCoverage
|
||||
3. Fetch Promotion rows in MID chunks via pl.read_database()
|
||||
4. Transform with Polars (types, nulls, flags)
|
||||
5. Delete stale ClickHouse rows → insert_arrow() in batches
|
||||
6. Verify row count
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import logging
|
||||
from datetime import date, timedelta
|
||||
|
||||
import polars as pl
|
||||
from sqlalchemy import create_engine, text
|
||||
from sqlalchemy.engine import Engine, URL
|
||||
import clickhouse_connect
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv()
|
||||
|
||||
# ── Logging ───────────────────────────────────────────────────────────────────
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s | %(levelname)-8s | %(message)s",
|
||||
datefmt="%Y-%m-%d %H:%M:%S",
|
||||
)
|
||||
log = logging.getLogger("etl_promotion")
|
||||
|
||||
# ── Config ────────────────────────────────────────────────────────────────────
|
||||
MSSQL_SERVER = os.getenv("MSSQL_SERVER", "your_server")
|
||||
MSSQL_DB = os.getenv("MSSQL_DB", "insight")
|
||||
MSSQL_USER = os.getenv("MSSQL_USER", "sa")
|
||||
MSSQL_PASS = os.getenv("MSSQL_PASS", "")
|
||||
|
||||
# Build pyodbc DSN string first — keeps special chars in passwords safe.
|
||||
# Then pass the whole DSN to URL.create() via "odbc_connect".
|
||||
# URL.create() percent-encodes it internally so @ / + # never break parsing.
|
||||
_ODBC_DSN = (
|
||||
f"DRIVER={{ODBC Driver 18 for SQL Server}};"
|
||||
f"SERVER={MSSQL_SERVER};"
|
||||
f"DATABASE={MSSQL_DB};"
|
||||
f"UID={MSSQL_USER};"
|
||||
f"PWD={MSSQL_PASS};"
|
||||
"TrustServerCertificate=yes;"
|
||||
"timeout=30;"
|
||||
)
|
||||
|
||||
MSSQL_SA_URL = URL.create(
|
||||
drivername="mssql+pyodbc",
|
||||
query={"odbc_connect": _ODBC_DSN},
|
||||
)
|
||||
|
||||
CH_HOST = os.getenv("CH_HOST", "localhost")
|
||||
CH_PORT = int(os.getenv("CH_PORT", "8123"))
|
||||
CH_USER = os.getenv("CH_USER", "default")
|
||||
CH_PASS = os.getenv("CH_PASS", "")
|
||||
CH_DB = os.getenv("CH_DB", "kelloggs")
|
||||
CH_TABLE = "promotion_kpi"
|
||||
|
||||
BATCH_SIZE = 5000 # rows per ClickHouse insert
|
||||
MID_CHUNK = 1000 # SQL Server IN-clause safe chunk size
|
||||
PROJECT_ID = 40148
|
||||
|
||||
|
||||
# ── Step 1: Build SQLAlchemy Engine ───────────────────────────────────────────
|
||||
def build_engine() -> Engine:
|
||||
"""
|
||||
SQLAlchemy = smart manager (handles pooling, dialect, parameterisation)
|
||||
pyodbc = actual wire (speaks ODBC to SQL Server driver)
|
||||
URL.create = safe assembler (never mangles passwords with special chars)
|
||||
|
||||
The engine is created ONCE and shared across all steps.
|
||||
Each with engine.connect() borrows a slot from the pool and returns
|
||||
it automatically when the block exits — no manual close() calls.
|
||||
"""
|
||||
log.info("Building SQLAlchemy + pyodbc engine...")
|
||||
engine = create_engine(
|
||||
MSSQL_SA_URL,
|
||||
pool_size=3,
|
||||
max_overflow=2,
|
||||
pool_pre_ping=True,
|
||||
echo=False,
|
||||
connect_args={"fast_executemany": True},
|
||||
)
|
||||
with engine.connect() as conn:
|
||||
conn.execute(text("SELECT 1"))
|
||||
log.info("Engine ready.")
|
||||
return engine
|
||||
|
||||
|
||||
# ── Step 2: Collect MIDs ──────────────────────────────────────────────────────
|
||||
def collect_mids(engine: Engine, target_date: date) -> list[int]:
|
||||
"""
|
||||
Exact replica of the SP's @MID_TABLE_COV logic.
|
||||
UNION of CreateDate + UpdateDate = catches both new visits and edits.
|
||||
:target_date is a named SQLAlchemy parameter — safe, no injection risk.
|
||||
"""
|
||||
sql = text("""
|
||||
SELECT MID FROM OneApp_KelloggsMT.dbo.T_StoreCoverage
|
||||
WHERE CONVERT(date, CreateDate) = :target_date
|
||||
UNION
|
||||
SELECT MID FROM OneApp_KelloggsMT.dbo.T_StoreCoverage
|
||||
WHERE CONVERT(date, UpdateDate) = :target_date
|
||||
""")
|
||||
log.info(f"Collecting MIDs for: {target_date}")
|
||||
with engine.connect() as conn:
|
||||
result = conn.execute(sql, {"target_date": target_date})
|
||||
mids = [row[0] for row in result.fetchall()]
|
||||
log.info(f"Found {len(mids):,} MIDs")
|
||||
return mids
|
||||
|
||||
|
||||
# ── Step 3: Fetch Promotion data ──────────────────────────────────────────────
|
||||
def fetch_promotion_data(engine: Engine, mids: list[int]) -> pl.DataFrame:
|
||||
"""
|
||||
The SQL here is a direct translation of the SP's Promotion INSERT SELECT.
|
||||
Three things worth understanding:
|
||||
|
||||
1. Correlated CASE subqueries (PromoValueName)
|
||||
The SP resolves the promo item name by looking up whichever master
|
||||
table the PromoTable column points to. SQL Server runs these
|
||||
subqueries at execution time and sends Polars a plain VARCHAR.
|
||||
Nothing special needed in Python — they arrive as strings.
|
||||
|
||||
2. LEFT JOINs (images, reason, question)
|
||||
These are optional — not every promo has a photo or a question.
|
||||
NULLs from LEFT JOINs are handled in the transform step.
|
||||
|
||||
3. ISNULL(mpq.PromoQuestionName, '') and ISNULL(tpq.PromoAnswerName, '')
|
||||
Already handled in SQL — arrive as empty strings, not NULLs.
|
||||
We still do fill_null('') in Polars for safety.
|
||||
|
||||
MID chunking: SQL Server's IN clause limit is ~2100 parameters.
|
||||
We split into chunks of 1000 MIDs each and concat the Polars frames.
|
||||
pl.read_database() opens a connection from the pool per chunk and
|
||||
releases it back automatically after the query finishes.
|
||||
"""
|
||||
if not mids:
|
||||
log.warning("No MIDs — nothing to fetch.")
|
||||
return pl.DataFrame()
|
||||
|
||||
# Exact SELECT from the SP's Promotion INSERT — columns renamed to
|
||||
# snake_case for ClickHouse convention
|
||||
promotion_sql = """
|
||||
SELECT
|
||||
sc.MID AS mid,
|
||||
{project_id} AS project_id,
|
||||
sc.StoreId AS store_id,
|
||||
Em.EmpId AS employee_id,
|
||||
CONVERT(date, sc.VisitDate) AS visit_date,
|
||||
Em.SupervisorId AS supervisor_id,
|
||||
sm.ChannelId AS channel_id,
|
||||
sm.ChainId AS chain_id,
|
||||
sm.StoreTypeId AS storetype_id,
|
||||
msd.PromoDefinitionId AS promo_definition_id,
|
||||
msd.PromoDefinitionName AS promo_definition_name,
|
||||
|
||||
-- Resolve which dimension level the promo targets
|
||||
-- (Category / SubCategory / Brand / SubBrand)
|
||||
CASE
|
||||
WHEN ISNULL(ts.PromoTable,'') = 'Master_Category' THEN 'Category'
|
||||
WHEN ISNULL(ts.PromoTable,'') = 'Master_SubCategory' THEN 'SubCategory'
|
||||
WHEN ISNULL(ts.PromoTable,'') = 'Master_Brand' THEN 'Brand'
|
||||
WHEN ISNULL(ts.PromoTable,'') = 'Master_SubBrand' THEN 'SubBrand'
|
||||
ELSE ''
|
||||
END AS promotion_details,
|
||||
|
||||
ts.PromoValue AS promotion_details_id,
|
||||
|
||||
-- Resolve the actual name of that dimension item
|
||||
-- SQL Server evaluates exactly ONE of these subqueries per row
|
||||
CASE
|
||||
WHEN ISNULL(ts.PromoTable,'') = 'Master_Category'
|
||||
THEN (SELECT a.CategoryName
|
||||
FROM OneApp_KelloggsMT.dbo.Master_Category a
|
||||
WHERE a.CategoryId = ts.PromoValue)
|
||||
WHEN ISNULL(ts.PromoTable,'') = 'Master_SubCategory'
|
||||
THEN (SELECT a.SubCategoryName
|
||||
FROM OneApp_KelloggsMT.dbo.Master_SubCategory a
|
||||
WHERE a.SubCategoryId = ts.PromoValue)
|
||||
WHEN ISNULL(ts.PromoTable,'') = 'Master_Brand'
|
||||
THEN (SELECT a.BrandName
|
||||
FROM OneApp_KelloggsMT.dbo.Master_Brand a
|
||||
WHERE a.BrandId = ts.PromoValue)
|
||||
WHEN ISNULL(ts.PromoTable,'') = 'Master_SubBrand'
|
||||
THEN (SELECT a.SubBrandName
|
||||
FROM OneApp_KelloggsMT.dbo.Master_SubBrand a
|
||||
WHERE a.SubBrandId = ts.PromoValue)
|
||||
END AS promo_value_name,
|
||||
|
||||
-- Present flag: 0 → 'N', anything else → 'Y'
|
||||
CASE WHEN ts.Present = 0 THEN 'N' ELSE 'Y' END AS is_present,
|
||||
|
||||
-- Reason only populated when promo is ABSENT
|
||||
CASE
|
||||
WHEN ts.Present = 1 THEN ''
|
||||
ELSE ISNULL(mnp.PromoReason, '')
|
||||
END AS reason,
|
||||
|
||||
ISNULL(mpq.PromoQuestionName, '') AS question,
|
||||
ISNULL(tpq.PromoAnswerName, '') AS answer,
|
||||
|
||||
-- Image URLs — empty string when no image
|
||||
CASE
|
||||
WHEN ISNULL(SHI.PromoImage1,'') = '' THEN ''
|
||||
ELSE 'https://kimt1.parinaam.in/Upload/PromotionImages/'
|
||||
+ SHI.PromoImage1
|
||||
END AS image1,
|
||||
CASE
|
||||
WHEN ISNULL(SHI.PromoImage2,'') = '' THEN ''
|
||||
ELSE 'https://kimt1.parinaam.in/Upload/PromotionImages/'
|
||||
+ SHI.PromoImage2
|
||||
END AS image2,
|
||||
|
||||
GETDATE() AS update_date,
|
||||
'ETL-SQLAlchemy' AS update_by
|
||||
|
||||
FROM OneApp_KelloggsMT.dbo.T_Promotion ts
|
||||
INNER JOIN OneApp_KelloggsMT.dbo.T_StoreCoverage sc
|
||||
ON ts.mid = sc.mid
|
||||
INNER JOIN OneApp_KelloggsMT.dbo.vw_StoreDetail sm
|
||||
ON sc.StoreId = sm.StoreId
|
||||
INNER JOIN OneApp_KelloggsMT.dbo.vw_Employee_Detail Em
|
||||
ON sc.EmpId = Em.EmpId
|
||||
INNER JOIN OneApp_KelloggsMT.dbo.Master_PromotionDefinition msd
|
||||
ON msd.PromoDefinitionId = ts.PromoDefinitionId
|
||||
LEFT JOIN OneApp_KelloggsMT.dbo.T_PromotionImages SHI
|
||||
ON ts.PId = SHI.PId
|
||||
LEFT JOIN OneApp_KelloggsMT.dbo.Master_PromotionReason mnp
|
||||
ON ts.PromoReasonId = mnp.PromoReasonId
|
||||
LEFT JOIN OneApp_KelloggsMT.dbo.t_promotionquestion tpq
|
||||
ON ts.PId = tpq.PId
|
||||
LEFT JOIN OneApp_KelloggsMT.dbo.Master_PromotionQuestion mpq
|
||||
ON tpq.PromoQuestionId = mpq.PromoQuestionId
|
||||
|
||||
WHERE Em.EmpName NOT LIKE 'test%'
|
||||
AND sc.MID IN ({mid_placeholders})
|
||||
"""
|
||||
|
||||
frames: list[pl.DataFrame] = []
|
||||
|
||||
for i in range(0, len(mids), MID_CHUNK):
|
||||
chunk = mids[i : i + MID_CHUNK]
|
||||
mid_list = ",".join(str(m) for m in chunk) # safe: all integers
|
||||
sql = promotion_sql.format(
|
||||
project_id=PROJECT_ID,
|
||||
mid_placeholders=mid_list,
|
||||
)
|
||||
|
||||
log.info(
|
||||
f"Fetching chunk {i // MID_CHUNK + 1} "
|
||||
f"(MIDs {i+1}–{min(i + MID_CHUNK, len(mids))} of {len(mids)})..."
|
||||
)
|
||||
|
||||
# pl.read_database() borrows a connection from the SQLAlchemy pool,
|
||||
# runs the query, converts the result directly to a Polars DataFrame,
|
||||
# then returns the connection to the pool. One clean call per chunk.
|
||||
df_chunk = pl.read_database(query=sql, connection=engine)
|
||||
if len(df_chunk) > 0:
|
||||
frames.append(df_chunk)
|
||||
|
||||
if not frames:
|
||||
log.warning("No promotion rows returned.")
|
||||
return pl.DataFrame()
|
||||
|
||||
df = pl.concat(frames)
|
||||
log.info(f"Fetched {len(df):,} total rows from SQL Server")
|
||||
return df
|
||||
|
||||
|
||||
# ── Step 4: Transform with Polars ─────────────────────────────────────────────
|
||||
def transform(df: pl.DataFrame) -> pl.DataFrame:
|
||||
"""
|
||||
Assembly-line approach: each .with_columns() block is one station.
|
||||
No station does two different jobs — easy to debug and extend.
|
||||
|
||||
Promotion-specific notes:
|
||||
┌──────────────────────┬──────────────────────────────────────────────┐
|
||||
│ Column │ Why / How │
|
||||
├──────────────────────┼──────────────────────────────────────────────┤
|
||||
│ is_present │ 'Y'/'N' string → UInt8 (ClickHouse no Bool) │
|
||||
│ promo_value_name │ Can be NULL when PromoTable is unknown → │
|
||||
│ │ fill_null('') │
|
||||
│ promotion_details_id │ INT from SQL Server → cast to Int64 │
|
||||
│ image1 / image2 │ Already empty string from SQL, still safe │
|
||||
│ update_date │ GETDATE() comes as datetime → cast Datetime │
|
||||
└──────────────────────┴──────────────────────────────────────────────┘
|
||||
"""
|
||||
log.info("Transforming Promotion data...")
|
||||
|
||||
df = (
|
||||
df
|
||||
|
||||
# ── IDs ───────────────────────────────────────────────────────────────
|
||||
.with_columns([
|
||||
pl.col("mid").cast(pl.Int64),
|
||||
pl.col("project_id").cast(pl.Int32),
|
||||
pl.col("store_id").cast(pl.Int64),
|
||||
pl.col("employee_id").cast(pl.Int64),
|
||||
pl.col("supervisor_id").cast(pl.Int64).fill_null(0),
|
||||
pl.col("channel_id").cast(pl.Int32).fill_null(0),
|
||||
pl.col("chain_id").cast(pl.Int64).fill_null(0),
|
||||
pl.col("storetype_id").cast(pl.Int32).fill_null(0),
|
||||
pl.col("promo_definition_id").cast(pl.Int32).fill_null(0),
|
||||
# promotion_details_id = PromoValue (the FK into the chosen master)
|
||||
pl.col("promotion_details_id").cast(pl.Int64).fill_null(0),
|
||||
])
|
||||
|
||||
# ── Date ──────────────────────────────────────────────────────────────
|
||||
# SQL Server CONVERT(date, ...) may arrive as Python datetime via ODBC
|
||||
.with_columns(
|
||||
pl.col("visit_date").cast(pl.Date)
|
||||
)
|
||||
|
||||
# ── Present flag: 'Y'/'N' → UInt8 (1/0) ─────────────────────────────
|
||||
# ClickHouse uses UInt8 for booleans — no native Bool type
|
||||
.with_columns(
|
||||
pl.when(pl.col("is_present") == "Y")
|
||||
.then(pl.lit(1, dtype=pl.UInt8))
|
||||
.otherwise(pl.lit(0, dtype=pl.UInt8))
|
||||
.alias("is_present")
|
||||
)
|
||||
|
||||
# ── Strings: NULL → empty string ──────────────────────────────────────
|
||||
# ClickHouse String columns reject NULL — every string col needs this
|
||||
# promo_value_name is the most likely NULL (unknown PromoTable value)
|
||||
.with_columns([
|
||||
pl.col("promo_definition_name").fill_null(""),
|
||||
pl.col("promotion_details").fill_null(""),
|
||||
pl.col("promo_value_name").fill_null(""),
|
||||
pl.col("reason").fill_null(""),
|
||||
pl.col("question").fill_null(""),
|
||||
pl.col("answer").fill_null(""),
|
||||
pl.col("image1").fill_null(""),
|
||||
pl.col("image2").fill_null(""),
|
||||
pl.col("update_by").fill_null("ETL-SQLAlchemy"),
|
||||
])
|
||||
|
||||
# ── Timestamp ─────────────────────────────────────────────────────────
|
||||
.with_columns(
|
||||
pl.col("update_date").cast(pl.Datetime)
|
||||
)
|
||||
|
||||
# ── Dedup ─────────────────────────────────────────────────────────────
|
||||
# A promo record is unique per MID + promo_definition_id + details_id.
|
||||
# Chunked fetches can produce duplicates if MIDs overlap — drop them.
|
||||
.unique(subset=["project_id", "mid", "promo_definition_id", "promotion_details_id"])
|
||||
)
|
||||
|
||||
log.info(f"After transform: {len(df):,} rows | schema: {df.schema}")
|
||||
return df
|
||||
|
||||
|
||||
# ── Step 5: Load to ClickHouse ────────────────────────────────────────────────
|
||||
def load_to_clickhouse(df: pl.DataFrame) -> tuple[int, date]:
|
||||
"""
|
||||
Delete-then-insert mirrors the SP exactly:
|
||||
DELETE FROM Promotion WHERE MID IN (...) AND project_id=40148
|
||||
INSERT INTO Promotion ...
|
||||
|
||||
We delete by visit_date (cheaper than per-MID deletes in ClickHouse).
|
||||
insert_arrow() uses Polars' native Arrow backing — zero-copy, no pandas.
|
||||
"""
|
||||
target_date: date = df["visit_date"].min()
|
||||
|
||||
log.info(f"Connecting to ClickHouse {CH_HOST}:{CH_PORT} db={CH_DB}...")
|
||||
client = clickhouse_connect.get_client(
|
||||
host=CH_HOST, port=CH_PORT,
|
||||
username=CH_USER, password=CH_PASS,
|
||||
database=CH_DB,
|
||||
)
|
||||
|
||||
log.info(f"Deleting: project_id={PROJECT_ID} visit_date={target_date}")
|
||||
client.command(
|
||||
f"ALTER TABLE {CH_TABLE} DELETE "
|
||||
f"WHERE project_id = {PROJECT_ID} "
|
||||
f"AND visit_date = '{target_date}'"
|
||||
)
|
||||
|
||||
total = 0
|
||||
for start in range(0, len(df), BATCH_SIZE):
|
||||
batch = df.slice(start, BATCH_SIZE)
|
||||
client.insert_arrow(CH_TABLE, batch.to_arrow())
|
||||
total += len(batch)
|
||||
log.info(f" Inserted {total:,} / {len(df):,} rows")
|
||||
|
||||
log.info(f"ClickHouse load complete: {total:,} rows")
|
||||
return total, target_date
|
||||
|
||||
|
||||
# ── Step 6: Verify ────────────────────────────────────────────────────────────
|
||||
def verify(expected: int, target_date: date) -> bool:
|
||||
"""
|
||||
Final sanity check. Exit code 2 = data problem (not a crash).
|
||||
Your cron/scheduler can distinguish: exit 0 = OK, exit 1 = crash,
|
||||
exit 2 = ran fine but data didn't land correctly.
|
||||
"""
|
||||
client = clickhouse_connect.get_client(
|
||||
host=CH_HOST, port=CH_PORT,
|
||||
username=CH_USER, password=CH_PASS,
|
||||
database=CH_DB,
|
||||
)
|
||||
result = client.query(
|
||||
f"SELECT count() FROM {CH_TABLE} "
|
||||
f"WHERE project_id = {PROJECT_ID} "
|
||||
f"AND visit_date = '{target_date}'"
|
||||
)
|
||||
actual = result.result_rows[0][0]
|
||||
log.info(f"Verify — expected={expected:,} clickhouse={actual:,}")
|
||||
if actual != expected:
|
||||
log.error(f"MISMATCH: sent {expected:,}, ClickHouse has {actual:,}")
|
||||
return False
|
||||
log.info("Verify passed.")
|
||||
return True
|
||||
|
||||
|
||||
# ── Main ──────────────────────────────────────────────────────────────────────
|
||||
def main() -> None:
|
||||
target_date = date.today() - timedelta(days=1)
|
||||
log.info(f"=== Promotion KPI ETL | date={target_date} ===")
|
||||
|
||||
engine = build_engine()
|
||||
|
||||
mids = collect_mids(engine, target_date)
|
||||
if not mids:
|
||||
log.info("No MIDs for yesterday. Nothing to do.")
|
||||
sys.exit(0)
|
||||
|
||||
df_raw = fetch_promotion_data(engine, mids)
|
||||
engine.dispose() # cleanly return all pooled connections to OS
|
||||
|
||||
if df_raw.is_empty():
|
||||
log.warning("Empty result set. Exiting.")
|
||||
sys.exit(0)
|
||||
|
||||
df_clean = transform(df_raw)
|
||||
|
||||
rows_inserted, inserted_date = load_to_clickhouse(df_clean)
|
||||
|
||||
if not verify(rows_inserted, inserted_date):
|
||||
sys.exit(2)
|
||||
|
||||
log.info(f"=== Done. {rows_inserted:,} rows loaded for {inserted_date} ===")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -428,37 +428,40 @@ def verify(expected: int, target_date: date) -> bool:
|
||||
|
||||
# ── Main ──────────────────────────────────────────────────────────────────────
|
||||
def main() -> None:
|
||||
target_date = date.today() - timedelta(days=1)
|
||||
log.info(f"=== Coverage KPI ETL | date={target_date} ===")
|
||||
#target_date = date.today() - timedelta(days=1)
|
||||
for i in range(1, 10):
|
||||
target_date = date(2023, 3, 9) + timedelta(days=i)
|
||||
print(target_date)
|
||||
log.info(f"=== Coverage KPI ETL | date={target_date} ===")
|
||||
|
||||
# Build the SQLAlchemy engine once — shared across all steps
|
||||
engine = build_engine()
|
||||
# Build the SQLAlchemy engine once — shared across all steps
|
||||
engine = build_engine()
|
||||
|
||||
# Collect which MIDs need processing
|
||||
mids = collect_mids(engine, target_date)
|
||||
if not mids:
|
||||
log.info("No MIDs for yesterday. Nothing to do.")
|
||||
sys.exit(0)
|
||||
# Collect which MIDs need processing
|
||||
mids = collect_mids(engine, target_date)
|
||||
if not mids:
|
||||
log.info("No MIDs for yesterday. Nothing to do.")
|
||||
sys.exit(0)
|
||||
|
||||
# Fetch raw data using pl.read_database()
|
||||
df_raw = fetch_coverage_data(engine, mids)
|
||||
engine.dispose() # return all pooled connections to OS cleanly
|
||||
# Fetch raw data using pl.read_database()
|
||||
df_raw = fetch_coverage_data(engine, mids)
|
||||
engine.dispose() # return all pooled connections to OS cleanly
|
||||
|
||||
if df_raw.is_empty():
|
||||
log.warning("Empty result set. Exiting.")
|
||||
sys.exit(0)
|
||||
if df_raw.is_empty():
|
||||
log.warning("Empty result set. Exiting.")
|
||||
sys.exit(0)
|
||||
|
||||
# Transform
|
||||
df_clean = transform(df_raw)
|
||||
# Transform
|
||||
df_clean = transform(df_raw)
|
||||
|
||||
# Load
|
||||
rows_inserted, inserted_date = load_to_clickhouse(df_clean)
|
||||
# Load
|
||||
rows_inserted, inserted_date = load_to_clickhouse(df_clean)
|
||||
|
||||
# Verify
|
||||
if not verify(rows_inserted, inserted_date):
|
||||
sys.exit(2)
|
||||
# Verify
|
||||
if not verify(rows_inserted, inserted_date):
|
||||
sys.exit(2)
|
||||
|
||||
log.info(f"=== Done. {rows_inserted:,} rows loaded for {inserted_date} ===")
|
||||
log.info(f"=== Done. {rows_inserted:,} rows loaded for {inserted_date} ===")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
Reference in New Issue
Block a user