Files
2026-06-03 17:25:17 +05:30

468 lines
20 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "polars>=0.20.0",
# "sqlalchemy>=2.0.0",
# "pyodbc>=5.0.0",
# "clickhouse-connect>=0.7.0",
# "python-dotenv>=1.0.0",
# "pyarrow>=18.0.0",
# ]
# ///
"""
ETL Pipeline: Coverage KPI — SQL Server → Polars → ClickHouse
--------------------------------------------------------------
Run with: uv run etl_coverage_kpi.py
Why SQLAlchemy instead of raw pyodbc?
- pl.read_database() speaks SQLAlchemy Engine natively
- Connection pooling built-in (reuses connections, no reconnect overhead)
- Parameterised queries use :param syntax — cleaner and SQL-injection safe
- One engine object shared across all steps — no passing conn handles around
- Works with ANY database backend, not just SQL Server
1. Build one SQLAlchemy engine (the "pipe" to SQL Server)
2. Ask: which store visits happened yesterday? → get MIDs
3. Pull Coverage rows for those MIDs in chunks → Polars DataFrames
4. Clean + type-cast everything in Polars
5. Delete stale ClickHouse rows, insert fresh batch
6. Verify row count matches
"""
import os
import pyarrow
import sys
import logging
from datetime import date, timedelta
import polars as pl
from sqlalchemy import create_engine, text
from sqlalchemy.engine import Engine, URL
import clickhouse_connect
from dotenv import load_dotenv
load_dotenv()
# ── Logging ───────────────────────────────────────────────────────────────────
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)-8s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
log = logging.getLogger("etl_coverage")
# ── Config ────────────────────────────────────────────────────────────────────
# SQL Server credentials — read from .env, never hardcoded
MSSQL_SERVER = os.getenv("MSSQL_SERVER", "your_server")
MSSQL_DB = os.getenv("MSSQL_DB", "CPMIndiaBusinessInsight")
MSSQL_USER = os.getenv("MSSQL_USER", "sa")
MSSQL_PASS = os.getenv("MSSQL_PASS", "")
# pyodbc connection string — built separately so special characters in
# passwords (@ / + # etc.) never corrupt the URL.
# The safe pattern: build the full pyodbc DSN as a plain string,
# then hand it to URL.create() via the "odbc_connect" query key.
# URL.create() percent-encodes it internally — no manual urllib needed.
_ODBC_DSN = (
f"DRIVER={{ODBC Driver 18 for SQL Server}};"
f"SERVER={MSSQL_SERVER};"
f"DATABASE={MSSQL_DB};"
f"UID={MSSQL_USER};"
f"PWD={MSSQL_PASS};"
"TrustServerCertificate=yes;"
"timeout=30;"
)
# Final SQLAlchemy URL — credentials live inside odbc_connect, not in the URL
# This is the officially recommended pattern in SQLAlchemy docs for mssql+pyodbc
MSSQL_SA_URL = URL.create(
drivername="mssql+pyodbc",
query={"odbc_connect": _ODBC_DSN}, # URL.create handles encoding internally
)
CH_HOST = os.getenv("CH_HOST", "localhost")
CH_PORT = int(os.getenv("CH_PORT", "8123"))
CH_USER = os.getenv("CH_USER", "default")
CH_PASS = os.getenv("CH_PASS", "")
CH_DB = os.getenv("CH_DB", "kelloggs")
CH_TABLE = "coverage_kpi"
BATCH_SIZE = 5000 # rows per ClickHouse insert
MID_CHUNK = 1000 # SQL Server IN-clause chunk size
PROJECT_ID = 40148
# ── Step 1: Build SQLAlchemy Engine ───────────────────────────────────────────
def build_engine() -> Engine:
"""
The Engine is not a connection — it's the factory that makes connections.
Think of it as the water main under the street, not the tap in your kitchen.
How pyodbc + SQLAlchemy work together here:
- SQLAlchemy's mssql+pyodbc dialect is the translator
(it knows how to speak SQL Server's dialect)
- pyodbc is the actual wire — it talks ODBC to the SQL Server driver
- URL.create(drivername="mssql+pyodbc", query={"odbc_connect": DSN})
is the safe way to pass the pyodbc DSN through SQLAlchemy without
URL-encoding bugs when passwords have special characters
fast_executemany=True:
pyodbc has a mode where it sends many rows to SQL Server in one
network round-trip instead of one-row-at-a-time. We're only reading
here, but it's good practice to enable it on the engine level.
pool_size=3, max_overflow=2:
Keep 3 live connections in the pool, allow 2 extras under burst load.
After each `with engine.connect()` block exits, the connection goes
back to the pool — not closed — so the next query reuses it instantly.
"""
log.info("Building SQLAlchemy + pyodbc engine for SQL Server...")
engine = create_engine(
MSSQL_SA_URL,
pool_size=3,
max_overflow=2,
pool_pre_ping=True, # verify connection is alive before use
echo=False, # True = print every SQL to console (debug)
connect_args={
"fast_executemany": True, # pyodbc bulk-send optimisation
},
)
# Smoke test — borrow one connection, run SELECT 1, return it to pool
with engine.connect() as conn:
conn.execute(text("SELECT 1"))
log.info("SQL Server engine ready.")
return engine
# ── Step 2: Collect MIDs ──────────────────────────────────────────────────────
def collect_mids(engine: Engine, target_date: date) -> list[int]:
"""
SQLAlchemy parameterised query using :param syntax.
This is SQL-injection safe — SQLAlchemy never concatenates
the date value into the string directly.
We use engine.connect() as a context manager —
the connection is returned to the pool automatically when done.
"""
sql = text("""
SELECT MID FROM OneApp_KelloggsMT.dbo.T_StoreCoverage
WHERE CONVERT(date, CreateDate) = :target_date
UNION
SELECT MID FROM OneApp_KelloggsMT.dbo.T_StoreCoverage
WHERE CONVERT(date, UpdateDate) = :target_date
""")
log.info(f"Collecting MIDs for: {target_date}")
with engine.connect() as conn:
result = conn.execute(sql, {"target_date": target_date})
mids = [row[0] for row in result.fetchall()]
log.info(f"Found {len(mids):,} MIDs")
return mids
# ── Step 3: Fetch Coverage data via pl.read_database() ───────────────────────
def fetch_coverage_data(engine: Engine, mids: list[int]) -> pl.DataFrame:
"""
This is the key upgrade from pyodbc to SQLAlchemy+Polars.
pl.read_database(query, connection) accepts a SQLAlchemy Engine directly.
Polars handles the cursor, column name detection, and type inference
automatically — no more manual cursor.fetchall() loops.
We still chunk the MID list (SQL Server IN clause limit ~2100 params),
but each chunk is now one clean pl.read_database() call.
Feynman analogy: instead of manually scooping water bucket-by-bucket
(pyodbc cursor loop), we turn on a tap and let it fill the container
(pl.read_database manages the stream).
"""
if not mids:
log.warning("No MIDs — nothing to fetch.")
return pl.DataFrame()
# SQL mirrors the Coverage INSERT from the stored procedure
# :mid_list is replaced per chunk below
coverage_sql = """
SELECT
{project_id} AS project_id,
JP.MID,
sm.StoreId AS store_id,
JP.EmpId AS employee_id,
CONVERT(date, JP.VisitDate) AS visit_date,
JP.InTime AS in_time,
JP.OutTime AS out_time,
CASE
WHEN JP.OutTime IS NULL OR JP.InTime IS NULL THEN NULL
WHEN JP.OutTime < JP.InTime THEN 0
ELSE DATEDIFF(SECOND, JP.InTime, JP.OutTime) / 60
END AS duration_minutes,
CASE
WHEN (
SELECT TOP 1 EmpId
FROM OneApp_KelloggsMT.dbo.T_StoreCoverage SC
WHERE SC.EmpId = JP.EmpId
AND SC.StoreId = JP.StoreId
AND SC.VisitDate = JP.VisitDate
AND SC.ReasonId IN (0,1,3,9,10,19,20)
) > 0
THEN 'Y' ELSE 'N'
END AS is_covered,
CASE JP.Deviation
WHEN 0 THEN 'Planned'
WHEN 1 THEN 'Adhoc'
WHEN 2 THEN 'Beat Plan'
WHEN 3 THEN 'Non Merchandised'
WHEN 4 THEN 'Add New Store'
WHEN 5 THEN 'Non Program'
ELSE ''
END AS coverage_type,
sm.StoreTypeId AS storetype_id,
Em.SupervisorId AS supervisor_id,
ISNULL(JP.ReasonId, 0) AS reason_id,
sm.CameraAllow AS camera_allow,
CAST(
CASE
WHEN sm.Latitude IS NULL OR sm.Latitude = 0 THEN 0
WHEN sm.Longitude IS NULL OR sm.Longitude = 0 THEN 0
WHEN JP.Latitude IS NULL OR JP.Latitude = 0 THEN 0
WHEN JP.Longitude IS NULL OR JP.Longitude = 0 THEN 0
ELSE SQRT(
POWER(69.1 * (JP.Latitude - sm.Latitude), 2) +
POWER(69.1 * (sm.Longitude - JP.Longitude)
* COS(JP.Latitude / 57.3), 2)
) * 1000
END AS FLOAT
) AS distance_meters,
GETDATE() AS update_date,
'ETL-SQLAlchemy' AS update_by
FROM OneApp_KelloggsMT.dbo.T_StoreCoverage JP WITH (NOLOCK)
INNER JOIN OneApp_KelloggsMT.dbo.vw_StoreDetail sm
ON JP.StoreId = sm.StoreId
INNER JOIN OneApp_KelloggsMT.dbo.vw_Employee_Detail Em
ON JP.EmpId = Em.EmpId
WHERE JP.MID IN ({mid_placeholders})
AND Em.UserName NOT LIKE 'test%'
"""
frames: list[pl.DataFrame] = []
for i in range(0, len(mids), MID_CHUNK):
chunk = mids[i : i + MID_CHUNK]
# Build the IN list as a literal — SQLAlchemy text() for IN-list with
# variable length is cleanest done this way (all values are integers,
# so no injection risk)
mid_list = ",".join(str(m) for m in chunk)
sql = coverage_sql.format(
project_id=PROJECT_ID,
mid_placeholders=mid_list,
)
log.info(
f"Fetching chunk {i // MID_CHUNK + 1} "
f"(MIDs {i+1}{min(i+MID_CHUNK, len(mids))} of {len(mids)})..."
)
# pl.read_database() takes the raw SQL string + the SQLAlchemy engine.
# It opens a connection from the pool, streams the result into a
# Polars DataFrame, then releases the connection back automatically.
df_chunk = pl.read_database(query=sql, connection=engine)
if len(df_chunk) > 0:
frames.append(df_chunk)
if not frames:
log.warning("No rows returned from SQL Server.")
return pl.DataFrame()
df = pl.concat(frames)
log.info(f"Fetched {len(df):,} total rows from SQL Server")
return df
# ── Step 4: Transform with Polars ─────────────────────────────────────────────
def transform(df: pl.DataFrame) -> pl.DataFrame:
"""
SQLAlchemy + Polars gives us richer type inference than raw pyodbc —
but we still normalise everything explicitly so ClickHouse never
gets a surprise type.
The chain of .with_columns() calls is like an assembly line:
each station does one job and passes the belt to the next.
"""
log.info("Transforming...")
df = (
df
# ── IDs ──────────────────────────────────────────────────────────────
.with_columns([
pl.col("project_id").cast(pl.Int32),
pl.col("MID").cast(pl.Int64),
pl.col("store_id").cast(pl.Int64),
pl.col("employee_id").cast(pl.Int64),
pl.col("supervisor_id").cast(pl.Int64).fill_null(0),
pl.col("storetype_id").cast(pl.Int32).fill_null(0),
pl.col("reason_id").cast(pl.Int32).fill_null(0),
])
# ── Dates ─────────────────────────────────────────────────────────────
# SQLAlchemy may return visit_date as datetime — cast to Date
.with_columns(
pl.col("visit_date").cast(pl.Date)
)
# ── Numerics ──────────────────────────────────────────────────────────
# DATEDIFF/CAST in SQL Server comes through as Decimal via ODBC
# → cast to Float64 first, then to Int32 (avoids Decimal overflow)
.with_columns(
pl.col("duration_minutes")
.cast(pl.Float64)
.fill_null(0.0)
.cast(pl.Int32)
)
.with_columns(
pl.col("distance_meters")
.cast(pl.Float64)
.fill_null(0.0)
.round(2)
)
# ── Flags ─────────────────────────────────────────────────────────────
# is_covered: 'Y'/'N' → UInt8 (1/0) — ClickHouse has no Bool type
.with_columns(
pl.when(pl.col("is_covered") == "Y")
.then(pl.lit(1, dtype=pl.UInt8))
.otherwise(pl.lit(0, dtype=pl.UInt8))
.alias("is_covered")
)
# camera_allow: SQL Server BIT → UInt8
.with_columns(
pl.col("camera_allow").cast(pl.UInt8).fill_null(0)
)
# ── Strings ───────────────────────────────────────────────────────────
# ClickHouse String column rejects NULL — replace with empty string
.with_columns([
pl.col("coverage_type").fill_null(""),
pl.col("update_by").fill_null("ETL-SQLAlchemy"),
])
# ── Timestamps ────────────────────────────────────────────────────────
.with_columns(
pl.col("update_date").cast(pl.Datetime)
)
# ── Dedup ─────────────────────────────────────────────────────────────
# MID is unique per project — drop any duplicates from chunked joins
.unique(subset=["project_id", "MID"])
)
log.info(f"After transform: {len(df):,} rows | schema: {df.schema}")
return df
# ── Step 5: Load to ClickHouse ────────────────────────────────────────────────
def load_to_clickhouse(df: pl.DataFrame) -> tuple[int, date]:
"""
clickhouse-connect v0.7+ supports insert_arrow() which accepts
a Polars DataFrame via its Arrow IPC backing — no pandas bridge needed.
Delete-then-insert mirrors the stored procedure's pattern:
wipe the day's data, reload fresh. Idempotent = safe to re-run.
"""
target_date: date = df["visit_date"].min() # always yesterday
log.info(f"Connecting to ClickHouse {CH_HOST}:{CH_PORT} db={CH_DB}...")
client = clickhouse_connect.get_client(
host=CH_HOST, port=CH_PORT,
username=CH_USER, password=CH_PASS,
database=CH_DB,
)
# Delete stale rows for this date + project
log.info(f"Deleting existing rows: project_id={PROJECT_ID} visit_date={target_date}")
client.command(
f"ALTER TABLE {CH_TABLE} DELETE "
f"WHERE project_id = {PROJECT_ID} "
f"AND visit_date = '{target_date}'"
)
# Insert in batches using Arrow — Polars → Arrow is zero-copy
total = 0
for start in range(0, len(df), BATCH_SIZE):
batch = df.slice(start, BATCH_SIZE)
# to_arrow() converts the Polars DataFrame to Apache Arrow Table
# insert_arrow() sends it directly — no pandas, no row-by-row overhead
client.insert_arrow(CH_TABLE, batch.to_arrow())
total += len(batch)
log.info(f" Inserted {total:,} / {len(df):,} rows")
log.info(f"ClickHouse load complete: {total:,} rows")
return total, target_date
# ── Step 6: Verify ────────────────────────────────────────────────────────────
def verify(expected: int, target_date: date) -> bool:
"""
Count rows in ClickHouse for this date and compare.
If they don't match — the ETL failed silently somewhere.
Exit code 2 so your scheduler/cron knows it's a data problem, not a crash.
"""
client = clickhouse_connect.get_client(
host=CH_HOST, port=CH_PORT,
username=CH_USER, password=CH_PASS,
database=CH_DB,
)
result = client.query(
f"SELECT count() FROM {CH_TABLE} "
f"WHERE project_id = {PROJECT_ID} "
f"AND visit_date = '{target_date}'"
)
actual = result.result_rows[0][0]
log.info(f"Verify — expected={expected:,} clickhouse={actual:,}")
if actual != expected:
log.error(f"MISMATCH: sent {expected:,}, ClickHouse has {actual:,}")
return False
log.info("Verify passed.")
return True
# ── Main ──────────────────────────────────────────────────────────────────────
def main() -> None:
#target_date = date.today() - timedelta(days=1)
for i in range(1, 10):
target_date = date(2023, 3, 9) + timedelta(days=i)
print(target_date)
log.info(f"=== Coverage KPI ETL | date={target_date} ===")
# Build the SQLAlchemy engine once — shared across all steps
engine = build_engine()
# Collect which MIDs need processing
mids = collect_mids(engine, target_date)
if not mids:
log.info("No MIDs for yesterday. Nothing to do.")
sys.exit(0)
# Fetch raw data using pl.read_database()
df_raw = fetch_coverage_data(engine, mids)
engine.dispose() # return all pooled connections to OS cleanly
if df_raw.is_empty():
log.warning("Empty result set. Exiting.")
sys.exit(0)
# Transform
df_clean = transform(df_raw)
# Load
rows_inserted, inserted_date = load_to_clickhouse(df_clean)
# Verify
if not verify(rows_inserted, inserted_date):
sys.exit(2)
log.info(f"=== Done. {rows_inserted:,} rows loaded for {inserted_date} ===")
if __name__ == "__main__":
main()