first commit
This commit is contained in:
@@ -0,0 +1,465 @@
|
||||
# /// script
|
||||
# requires-python = ">=3.11"
|
||||
# dependencies = [
|
||||
# "polars>=0.20.0",
|
||||
# "sqlalchemy>=2.0.0",
|
||||
# "pyodbc>=5.0.0",
|
||||
# "clickhouse-connect>=0.7.0",
|
||||
# "python-dotenv>=1.0.0",
|
||||
# "pyarrow>=18.0.0",
|
||||
# ]
|
||||
# ///
|
||||
|
||||
"""
|
||||
ETL Pipeline: Coverage KPI — SQL Server → Polars → ClickHouse
|
||||
--------------------------------------------------------------
|
||||
Run with: uv run etl_coverage_kpi.py
|
||||
|
||||
Why SQLAlchemy instead of raw pyodbc?
|
||||
- pl.read_database() speaks SQLAlchemy Engine natively
|
||||
- Connection pooling built-in (reuses connections, no reconnect overhead)
|
||||
- Parameterised queries use :param syntax — cleaner and SQL-injection safe
|
||||
- One engine object shared across all steps — no passing conn handles around
|
||||
- Works with ANY database backend, not just SQL Server
|
||||
|
||||
|
||||
1. Build one SQLAlchemy engine (the "pipe" to SQL Server)
|
||||
2. Ask: which store visits happened yesterday? → get MIDs
|
||||
3. Pull Coverage rows for those MIDs in chunks → Polars DataFrames
|
||||
4. Clean + type-cast everything in Polars
|
||||
5. Delete stale ClickHouse rows, insert fresh batch
|
||||
6. Verify row count matches
|
||||
"""
|
||||
|
||||
import os
|
||||
import pyarrow
|
||||
import sys
|
||||
import logging
|
||||
from datetime import date, timedelta
|
||||
|
||||
import polars as pl
|
||||
from sqlalchemy import create_engine, text
|
||||
from sqlalchemy.engine import Engine, URL
|
||||
import clickhouse_connect
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv()
|
||||
|
||||
# ── Logging ───────────────────────────────────────────────────────────────────
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s | %(levelname)-8s | %(message)s",
|
||||
datefmt="%Y-%m-%d %H:%M:%S",
|
||||
)
|
||||
log = logging.getLogger("etl_coverage")
|
||||
|
||||
# ── Config ────────────────────────────────────────────────────────────────────
|
||||
# SQL Server credentials — read from .env, never hardcoded
|
||||
|
||||
MSSQL_SERVER = os.getenv("MSSQL_SERVER", "your_server")
|
||||
MSSQL_DB = os.getenv("MSSQL_DB", "CPMIndiaBusinessInsight")
|
||||
MSSQL_USER = os.getenv("MSSQL_USER", "sa")
|
||||
MSSQL_PASS = os.getenv("MSSQL_PASS", "")
|
||||
|
||||
# pyodbc connection string — built separately so special characters in
|
||||
# passwords (@ / + # etc.) never corrupt the URL.
|
||||
# The safe pattern: build the full pyodbc DSN as a plain string,
|
||||
# then hand it to URL.create() via the "odbc_connect" query key.
|
||||
# URL.create() percent-encodes it internally — no manual urllib needed.
|
||||
_ODBC_DSN = (
|
||||
f"DRIVER={{ODBC Driver 18 for SQL Server}};"
|
||||
f"SERVER={MSSQL_SERVER};"
|
||||
f"DATABASE={MSSQL_DB};"
|
||||
f"UID={MSSQL_USER};"
|
||||
f"PWD={MSSQL_PASS};"
|
||||
"TrustServerCertificate=yes;"
|
||||
"timeout=30;"
|
||||
)
|
||||
# Final SQLAlchemy URL — credentials live inside odbc_connect, not in the URL
|
||||
# This is the officially recommended pattern in SQLAlchemy docs for mssql+pyodbc
|
||||
MSSQL_SA_URL = URL.create(
|
||||
drivername="mssql+pyodbc",
|
||||
query={"odbc_connect": _ODBC_DSN}, # URL.create handles encoding internally
|
||||
)
|
||||
|
||||
CH_HOST = os.getenv("CH_HOST", "localhost")
|
||||
CH_PORT = int(os.getenv("CH_PORT", "8123"))
|
||||
CH_USER = os.getenv("CH_USER", "default")
|
||||
CH_PASS = os.getenv("CH_PASS", "")
|
||||
CH_DB = os.getenv("CH_DB", "kelloggs")
|
||||
CH_TABLE = "coverage_kpi"
|
||||
|
||||
BATCH_SIZE = 5000 # rows per ClickHouse insert
|
||||
MID_CHUNK = 1000 # SQL Server IN-clause chunk size
|
||||
PROJECT_ID = 40148
|
||||
|
||||
|
||||
# ── Step 1: Build SQLAlchemy Engine ───────────────────────────────────────────
|
||||
def build_engine() -> Engine:
|
||||
"""
|
||||
The Engine is not a connection — it's the factory that makes connections.
|
||||
Think of it as the water main under the street, not the tap in your kitchen.
|
||||
|
||||
How pyodbc + SQLAlchemy work together here:
|
||||
- SQLAlchemy's mssql+pyodbc dialect is the translator
|
||||
(it knows how to speak SQL Server's dialect)
|
||||
- pyodbc is the actual wire — it talks ODBC to the SQL Server driver
|
||||
- URL.create(drivername="mssql+pyodbc", query={"odbc_connect": DSN})
|
||||
is the safe way to pass the pyodbc DSN through SQLAlchemy without
|
||||
URL-encoding bugs when passwords have special characters
|
||||
|
||||
fast_executemany=True:
|
||||
pyodbc has a mode where it sends many rows to SQL Server in one
|
||||
network round-trip instead of one-row-at-a-time. We're only reading
|
||||
here, but it's good practice to enable it on the engine level.
|
||||
|
||||
pool_size=3, max_overflow=2:
|
||||
Keep 3 live connections in the pool, allow 2 extras under burst load.
|
||||
After each `with engine.connect()` block exits, the connection goes
|
||||
back to the pool — not closed — so the next query reuses it instantly.
|
||||
"""
|
||||
log.info("Building SQLAlchemy + pyodbc engine for SQL Server...")
|
||||
engine = create_engine(
|
||||
MSSQL_SA_URL,
|
||||
pool_size=3,
|
||||
max_overflow=2,
|
||||
pool_pre_ping=True, # verify connection is alive before use
|
||||
echo=False, # True = print every SQL to console (debug)
|
||||
connect_args={
|
||||
"fast_executemany": True, # pyodbc bulk-send optimisation
|
||||
},
|
||||
)
|
||||
# Smoke test — borrow one connection, run SELECT 1, return it to pool
|
||||
with engine.connect() as conn:
|
||||
conn.execute(text("SELECT 1"))
|
||||
log.info("SQL Server engine ready.")
|
||||
return engine
|
||||
|
||||
|
||||
# ── Step 2: Collect MIDs ──────────────────────────────────────────────────────
|
||||
def collect_mids(engine: Engine, target_date: date) -> list[int]:
|
||||
"""
|
||||
SQLAlchemy parameterised query using :param syntax.
|
||||
This is SQL-injection safe — SQLAlchemy never concatenates
|
||||
the date value into the string directly.
|
||||
|
||||
We use engine.connect() as a context manager —
|
||||
the connection is returned to the pool automatically when done.
|
||||
"""
|
||||
sql = text("""
|
||||
SELECT MID FROM OneApp_KelloggsMT.dbo.T_StoreCoverage
|
||||
WHERE CONVERT(date, CreateDate) = :target_date
|
||||
UNION
|
||||
SELECT MID FROM OneApp_KelloggsMT.dbo.T_StoreCoverage
|
||||
WHERE CONVERT(date, UpdateDate) = :target_date
|
||||
""")
|
||||
log.info(f"Collecting MIDs for: {target_date}")
|
||||
with engine.connect() as conn:
|
||||
result = conn.execute(sql, {"target_date": target_date})
|
||||
mids = [row[0] for row in result.fetchall()]
|
||||
log.info(f"Found {len(mids):,} MIDs")
|
||||
return mids
|
||||
|
||||
|
||||
# ── Step 3: Fetch Coverage data via pl.read_database() ───────────────────────
|
||||
def fetch_coverage_data(engine: Engine, mids: list[int]) -> pl.DataFrame:
|
||||
"""
|
||||
This is the key upgrade from pyodbc to SQLAlchemy+Polars.
|
||||
|
||||
pl.read_database(query, connection) accepts a SQLAlchemy Engine directly.
|
||||
Polars handles the cursor, column name detection, and type inference
|
||||
automatically — no more manual cursor.fetchall() loops.
|
||||
|
||||
We still chunk the MID list (SQL Server IN clause limit ~2100 params),
|
||||
but each chunk is now one clean pl.read_database() call.
|
||||
|
||||
Feynman analogy: instead of manually scooping water bucket-by-bucket
|
||||
(pyodbc cursor loop), we turn on a tap and let it fill the container
|
||||
(pl.read_database manages the stream).
|
||||
"""
|
||||
if not mids:
|
||||
log.warning("No MIDs — nothing to fetch.")
|
||||
return pl.DataFrame()
|
||||
|
||||
# SQL mirrors the Coverage INSERT from the stored procedure
|
||||
# :mid_list is replaced per chunk below
|
||||
coverage_sql = """
|
||||
SELECT
|
||||
{project_id} AS project_id,
|
||||
JP.MID,
|
||||
sm.StoreId AS store_id,
|
||||
JP.EmpId AS employee_id,
|
||||
CONVERT(date, JP.VisitDate) AS visit_date,
|
||||
JP.InTime AS in_time,
|
||||
JP.OutTime AS out_time,
|
||||
CASE
|
||||
WHEN JP.OutTime IS NULL OR JP.InTime IS NULL THEN NULL
|
||||
WHEN JP.OutTime < JP.InTime THEN 0
|
||||
ELSE DATEDIFF(SECOND, JP.InTime, JP.OutTime) / 60
|
||||
END AS duration_minutes,
|
||||
CASE
|
||||
WHEN (
|
||||
SELECT TOP 1 EmpId
|
||||
FROM OneApp_KelloggsMT.dbo.T_StoreCoverage SC
|
||||
WHERE SC.EmpId = JP.EmpId
|
||||
AND SC.StoreId = JP.StoreId
|
||||
AND SC.VisitDate = JP.VisitDate
|
||||
AND SC.ReasonId IN (0,1,3,9,10,19,20)
|
||||
) > 0
|
||||
THEN 'Y' ELSE 'N'
|
||||
END AS is_covered,
|
||||
CASE JP.Deviation
|
||||
WHEN 0 THEN 'Planned'
|
||||
WHEN 1 THEN 'Adhoc'
|
||||
WHEN 2 THEN 'Beat Plan'
|
||||
WHEN 3 THEN 'Non Merchandised'
|
||||
WHEN 4 THEN 'Add New Store'
|
||||
WHEN 5 THEN 'Non Program'
|
||||
ELSE ''
|
||||
END AS coverage_type,
|
||||
sm.StoreTypeId AS storetype_id,
|
||||
Em.SupervisorId AS supervisor_id,
|
||||
ISNULL(JP.ReasonId, 0) AS reason_id,
|
||||
sm.CameraAllow AS camera_allow,
|
||||
CAST(
|
||||
CASE
|
||||
WHEN sm.Latitude IS NULL OR sm.Latitude = 0 THEN 0
|
||||
WHEN sm.Longitude IS NULL OR sm.Longitude = 0 THEN 0
|
||||
WHEN JP.Latitude IS NULL OR JP.Latitude = 0 THEN 0
|
||||
WHEN JP.Longitude IS NULL OR JP.Longitude = 0 THEN 0
|
||||
ELSE SQRT(
|
||||
POWER(69.1 * (JP.Latitude - sm.Latitude), 2) +
|
||||
POWER(69.1 * (sm.Longitude - JP.Longitude)
|
||||
* COS(JP.Latitude / 57.3), 2)
|
||||
) * 1000
|
||||
END AS FLOAT
|
||||
) AS distance_meters,
|
||||
GETDATE() AS update_date,
|
||||
'ETL-SQLAlchemy' AS update_by
|
||||
FROM OneApp_KelloggsMT.dbo.T_StoreCoverage JP WITH (NOLOCK)
|
||||
INNER JOIN OneApp_KelloggsMT.dbo.vw_StoreDetail sm
|
||||
ON JP.StoreId = sm.StoreId
|
||||
INNER JOIN OneApp_KelloggsMT.dbo.vw_Employee_Detail Em
|
||||
ON JP.EmpId = Em.EmpId
|
||||
WHERE JP.MID IN ({mid_placeholders})
|
||||
AND Em.UserName NOT LIKE 'test%'
|
||||
"""
|
||||
|
||||
frames: list[pl.DataFrame] = []
|
||||
|
||||
for i in range(0, len(mids), MID_CHUNK):
|
||||
chunk = mids[i : i + MID_CHUNK]
|
||||
# Build the IN list as a literal — SQLAlchemy text() for IN-list with
|
||||
# variable length is cleanest done this way (all values are integers,
|
||||
# so no injection risk)
|
||||
mid_list = ",".join(str(m) for m in chunk)
|
||||
sql = coverage_sql.format(
|
||||
project_id=PROJECT_ID,
|
||||
mid_placeholders=mid_list,
|
||||
)
|
||||
|
||||
log.info(
|
||||
f"Fetching chunk {i // MID_CHUNK + 1} "
|
||||
f"(MIDs {i+1}–{min(i+MID_CHUNK, len(mids))} of {len(mids)})..."
|
||||
)
|
||||
|
||||
# pl.read_database() takes the raw SQL string + the SQLAlchemy engine.
|
||||
# It opens a connection from the pool, streams the result into a
|
||||
# Polars DataFrame, then releases the connection back automatically.
|
||||
df_chunk = pl.read_database(query=sql, connection=engine)
|
||||
if len(df_chunk) > 0:
|
||||
frames.append(df_chunk)
|
||||
|
||||
if not frames:
|
||||
log.warning("No rows returned from SQL Server.")
|
||||
return pl.DataFrame()
|
||||
|
||||
df = pl.concat(frames)
|
||||
log.info(f"Fetched {len(df):,} total rows from SQL Server")
|
||||
return df
|
||||
|
||||
|
||||
# ── Step 4: Transform with Polars ─────────────────────────────────────────────
|
||||
def transform(df: pl.DataFrame) -> pl.DataFrame:
|
||||
"""
|
||||
SQLAlchemy + Polars gives us richer type inference than raw pyodbc —
|
||||
but we still normalise everything explicitly so ClickHouse never
|
||||
gets a surprise type.
|
||||
|
||||
The chain of .with_columns() calls is like an assembly line:
|
||||
each station does one job and passes the belt to the next.
|
||||
"""
|
||||
log.info("Transforming...")
|
||||
|
||||
df = (
|
||||
df
|
||||
# ── IDs ──────────────────────────────────────────────────────────────
|
||||
.with_columns([
|
||||
pl.col("project_id").cast(pl.Int32),
|
||||
pl.col("MID").cast(pl.Int64),
|
||||
pl.col("store_id").cast(pl.Int64),
|
||||
pl.col("employee_id").cast(pl.Int64),
|
||||
pl.col("supervisor_id").cast(pl.Int64).fill_null(0),
|
||||
pl.col("storetype_id").cast(pl.Int32).fill_null(0),
|
||||
pl.col("reason_id").cast(pl.Int32).fill_null(0),
|
||||
])
|
||||
|
||||
# ── Dates ─────────────────────────────────────────────────────────────
|
||||
# SQLAlchemy may return visit_date as datetime — cast to Date
|
||||
.with_columns(
|
||||
pl.col("visit_date").cast(pl.Date)
|
||||
)
|
||||
|
||||
# ── Numerics ──────────────────────────────────────────────────────────
|
||||
# DATEDIFF/CAST in SQL Server comes through as Decimal via ODBC
|
||||
# → cast to Float64 first, then to Int32 (avoids Decimal overflow)
|
||||
.with_columns(
|
||||
pl.col("duration_minutes")
|
||||
.cast(pl.Float64)
|
||||
.fill_null(0.0)
|
||||
.cast(pl.Int32)
|
||||
)
|
||||
.with_columns(
|
||||
pl.col("distance_meters")
|
||||
.cast(pl.Float64)
|
||||
.fill_null(0.0)
|
||||
.round(2)
|
||||
)
|
||||
|
||||
# ── Flags ─────────────────────────────────────────────────────────────
|
||||
# is_covered: 'Y'/'N' → UInt8 (1/0) — ClickHouse has no Bool type
|
||||
.with_columns(
|
||||
pl.when(pl.col("is_covered") == "Y")
|
||||
.then(pl.lit(1, dtype=pl.UInt8))
|
||||
.otherwise(pl.lit(0, dtype=pl.UInt8))
|
||||
.alias("is_covered")
|
||||
)
|
||||
# camera_allow: SQL Server BIT → UInt8
|
||||
.with_columns(
|
||||
pl.col("camera_allow").cast(pl.UInt8).fill_null(0)
|
||||
)
|
||||
|
||||
# ── Strings ───────────────────────────────────────────────────────────
|
||||
# ClickHouse String column rejects NULL — replace with empty string
|
||||
.with_columns([
|
||||
pl.col("coverage_type").fill_null(""),
|
||||
pl.col("update_by").fill_null("ETL-SQLAlchemy"),
|
||||
])
|
||||
|
||||
# ── Timestamps ────────────────────────────────────────────────────────
|
||||
.with_columns(
|
||||
pl.col("update_date").cast(pl.Datetime)
|
||||
)
|
||||
|
||||
# ── Dedup ─────────────────────────────────────────────────────────────
|
||||
# MID is unique per project — drop any duplicates from chunked joins
|
||||
.unique(subset=["project_id", "MID"])
|
||||
)
|
||||
|
||||
log.info(f"After transform: {len(df):,} rows | schema: {df.schema}")
|
||||
return df
|
||||
|
||||
|
||||
# ── Step 5: Load to ClickHouse ────────────────────────────────────────────────
|
||||
def load_to_clickhouse(df: pl.DataFrame) -> tuple[int, date]:
|
||||
"""
|
||||
clickhouse-connect v0.7+ supports insert_arrow() which accepts
|
||||
a Polars DataFrame via its Arrow IPC backing — no pandas bridge needed.
|
||||
|
||||
Delete-then-insert mirrors the stored procedure's pattern:
|
||||
wipe the day's data, reload fresh. Idempotent = safe to re-run.
|
||||
"""
|
||||
target_date: date = df["visit_date"].min() # always yesterday
|
||||
|
||||
log.info(f"Connecting to ClickHouse {CH_HOST}:{CH_PORT} db={CH_DB}...")
|
||||
client = clickhouse_connect.get_client(
|
||||
host=CH_HOST, port=CH_PORT,
|
||||
username=CH_USER, password=CH_PASS,
|
||||
database=CH_DB,
|
||||
)
|
||||
|
||||
# Delete stale rows for this date + project
|
||||
log.info(f"Deleting existing rows: project_id={PROJECT_ID} visit_date={target_date}")
|
||||
client.command(
|
||||
f"ALTER TABLE {CH_TABLE} DELETE "
|
||||
f"WHERE project_id = {PROJECT_ID} "
|
||||
f"AND visit_date = '{target_date}'"
|
||||
)
|
||||
|
||||
# Insert in batches using Arrow — Polars → Arrow is zero-copy
|
||||
total = 0
|
||||
for start in range(0, len(df), BATCH_SIZE):
|
||||
batch = df.slice(start, BATCH_SIZE)
|
||||
# to_arrow() converts the Polars DataFrame to Apache Arrow Table
|
||||
# insert_arrow() sends it directly — no pandas, no row-by-row overhead
|
||||
client.insert_arrow(CH_TABLE, batch.to_arrow())
|
||||
total += len(batch)
|
||||
log.info(f" Inserted {total:,} / {len(df):,} rows")
|
||||
|
||||
log.info(f"ClickHouse load complete: {total:,} rows")
|
||||
return total, target_date
|
||||
|
||||
|
||||
# ── Step 6: Verify ────────────────────────────────────────────────────────────
|
||||
def verify(expected: int, target_date: date) -> bool:
|
||||
"""
|
||||
Count rows in ClickHouse for this date and compare.
|
||||
If they don't match — the ETL failed silently somewhere.
|
||||
Exit code 2 so your scheduler/cron knows it's a data problem, not a crash.
|
||||
"""
|
||||
client = clickhouse_connect.get_client(
|
||||
host=CH_HOST, port=CH_PORT,
|
||||
username=CH_USER, password=CH_PASS,
|
||||
database=CH_DB,
|
||||
)
|
||||
result = client.query(
|
||||
f"SELECT count() FROM {CH_TABLE} "
|
||||
f"WHERE project_id = {PROJECT_ID} "
|
||||
f"AND visit_date = '{target_date}'"
|
||||
)
|
||||
actual = result.result_rows[0][0]
|
||||
log.info(f"Verify — expected={expected:,} clickhouse={actual:,}")
|
||||
if actual != expected:
|
||||
log.error(f"MISMATCH: sent {expected:,}, ClickHouse has {actual:,}")
|
||||
return False
|
||||
log.info("Verify passed.")
|
||||
return True
|
||||
|
||||
|
||||
# ── Main ──────────────────────────────────────────────────────────────────────
|
||||
def main() -> None:
|
||||
target_date = date.today() - timedelta(days=1)
|
||||
log.info(f"=== Coverage KPI ETL | date={target_date} ===")
|
||||
|
||||
# Build the SQLAlchemy engine once — shared across all steps
|
||||
engine = build_engine()
|
||||
|
||||
# Collect which MIDs need processing
|
||||
mids = collect_mids(engine, target_date)
|
||||
if not mids:
|
||||
log.info("No MIDs for yesterday. Nothing to do.")
|
||||
sys.exit(0)
|
||||
|
||||
# Fetch raw data using pl.read_database()
|
||||
df_raw = fetch_coverage_data(engine, mids)
|
||||
engine.dispose() # return all pooled connections to OS cleanly
|
||||
|
||||
if df_raw.is_empty():
|
||||
log.warning("Empty result set. Exiting.")
|
||||
sys.exit(0)
|
||||
|
||||
# Transform
|
||||
df_clean = transform(df_raw)
|
||||
|
||||
# Load
|
||||
rows_inserted, inserted_date = load_to_clickhouse(df_clean)
|
||||
|
||||
# Verify
|
||||
if not verify(rows_inserted, inserted_date):
|
||||
sys.exit(2)
|
||||
|
||||
log.info(f"=== Done. {rows_inserted:,} rows loaded for {inserted_date} ===")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user