Compare commits

...

2 Commits

Author SHA1 Message Date
Ankit Malik 2c5a051772 second commit 2026-06-03 17:25:17 +05:30
Ankit Malik 279eed0d9d second commit 2026-06-03 17:24:38 +05:30
3 changed files with 604 additions and 23 deletions
+97
View File
@@ -0,0 +1,97 @@
-- ClickHouse DDL: promotion_kpi table
-- Run ONCE before the first ETL run
-- Connect: podman exec -it clickhouse-kelloggs clickhouse-client
CREATE DATABASE IF NOT EXISTS kelloggs;
CREATE TABLE IF NOT EXISTS kelloggs.promotion_kpi
(
-- ── Core identifiers ────────────────────────────────────────────────────
mid Int64,
project_id Int32,
store_id Int64,
employee_id Int64,
supervisor_id Int64,
-- ── Store dimension ─────────────────────────────────────────────────────
channel_id Int32,
chain_id Int64,
storetype_id Int32,
-- ── Visit ───────────────────────────────────────────────────────────────
visit_date Date32,
-- ── Promotion definition ────────────────────────────────────────────────
promo_definition_id Int32,
promo_definition_name String,
-- ── Promotion item (Category / SubCategory / Brand / SubBrand) ──────────
-- LowCardinality = dictionary-encoding — ideal for columns with few
-- distinct values (only 4 possible values here)
promotion_details LowCardinality(String), -- 'Category' | 'SubCategory' | 'Brand' | 'SubBrand'
promotion_details_id Int64, -- FK into the chosen master table
promo_value_name String, -- e.g. 'Chocos', 'Breakfast Cereals'
-- ── Execution status ────────────────────────────────────────────────────
is_present UInt8, -- 1 = present (Y), 0 = absent (N)
reason String, -- populated only when is_present = 0
-- ── Optional quiz ───────────────────────────────────────────────────────
question String,
answer String,
-- ── Photo proof ─────────────────────────────────────────────────────────
image1 String, -- full URL or empty string
image2 String,
-- ── Audit ───────────────────────────────────────────────────────────────
update_date DateTime DEFAULT now(),
update_by String
)
ENGINE = MergeTree()
-- Sort key: queries always filter on project_id + visit_date first
-- Adding promo_definition_id speeds up "show me all Promo X results"
ORDER BY (project_id, visit_date, mid, promo_definition_id)
PARTITION BY toYYYYMM(visit_date)
-- Bloom filter on store_id — speeds up store-level promotion reports
INDEX idx_store store_id TYPE bloom_filter GRANULARITY 1
-- Bloom filter on employee_id — speeds up per-rep promotion audits
INDEX idx_emp employee_id TYPE bloom_filter GRANULARITY 1
SETTINGS index_granularity = 8192;
-- ── Verification queries ─────────────────────────────────────────────────────
-- 1. How many promotions loaded today?
-- SELECT count(), visit_date
-- FROM kelloggs.promotion_kpi
-- WHERE project_id = 40148
-- GROUP BY visit_date ORDER BY visit_date DESC LIMIT 10;
-- 2. Present vs absent breakdown
-- SELECT
-- visit_date,
-- countIf(is_present = 1) AS present,
-- countIf(is_present = 0) AS absent,
-- count() AS total,
-- round(present / total * 100, 1) AS present_pct
-- FROM kelloggs.promotion_kpi
-- WHERE project_id = 40148
-- GROUP BY visit_date ORDER BY visit_date DESC LIMIT 7;
-- 3. Top promotion definitions by absence
-- SELECT promo_definition_name, count() AS absent_count
-- FROM kelloggs.promotion_kpi
-- WHERE project_id = 40148
-- AND is_present = 0
-- AND visit_date >= today() - 7
-- GROUP BY promo_definition_name
-- ORDER BY absent_count DESC LIMIT 10;
-- 4. Absence reasons breakdown
-- SELECT reason, count() AS cnt
-- FROM kelloggs.promotion_kpi
-- WHERE project_id = 40148
-- AND is_present = 0
-- AND visit_date = yesterday()
-- GROUP BY reason ORDER BY cnt DESC;
+481
View File
@@ -0,0 +1,481 @@
#!/usr/bin/env python3
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "polars>=0.20.0",
# "sqlalchemy>=2.0.0",
# "pyodbc>=5.0.0",
# "clickhouse-connect>=0.7.0",
# "pyarrow>=18.0.0",
# "python-dotenv>=1.0.0",
# ]
# ///
"""
ETL Pipeline: Promotion KPI — SQL Server → Polars → ClickHouse
---------------------------------------------------------------
Run with: uv run etl_promotion_kpi.py
What is Promotion KPI?
Kellogg's runs in-store promotions — standees, special shelf displays,
"buy 2 get 1 free" offers. The salesperson visits a store and records:
- Is the promotion physically present? (Present = Y/N)
- If absent, what is the reason? (Reason)
- What category/brand does it apply to? (PromoValueName)
- Photo proof (Image1, Image2)
- Quiz-style question + answer about the promo (Question, Answer)
The correlated CASE subqueries in the SP resolve the PromoTable column
('Master_Category' / 'Master_Brand' etc.) into a human-readable name.
SQL Server does this at query time — Polars receives the final string.
Nothing special needed in the transform for those columns.
Architecture: same 6-step pattern as Coverage KPI
1. Build SQLAlchemy + pyodbc engine (URL.create + odbc_connect DSN)
2. Collect MIDs for yesterday from T_StoreCoverage
3. Fetch Promotion rows in MID chunks via pl.read_database()
4. Transform with Polars (types, nulls, flags)
5. Delete stale ClickHouse rows → insert_arrow() in batches
6. Verify row count
"""
import os
import sys
import logging
from datetime import date, timedelta
import polars as pl
from sqlalchemy import create_engine, text
from sqlalchemy.engine import Engine, URL
import clickhouse_connect
from dotenv import load_dotenv
load_dotenv()
# ── Logging ───────────────────────────────────────────────────────────────────
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)-8s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
log = logging.getLogger("etl_promotion")
# ── Config ────────────────────────────────────────────────────────────────────
MSSQL_SERVER = os.getenv("MSSQL_SERVER", "your_server")
MSSQL_DB = os.getenv("MSSQL_DB", "insight")
MSSQL_USER = os.getenv("MSSQL_USER", "sa")
MSSQL_PASS = os.getenv("MSSQL_PASS", "")
# Build pyodbc DSN string first — keeps special chars in passwords safe.
# Then pass the whole DSN to URL.create() via "odbc_connect".
# URL.create() percent-encodes it internally so @ / + # never break parsing.
_ODBC_DSN = (
f"DRIVER={{ODBC Driver 18 for SQL Server}};"
f"SERVER={MSSQL_SERVER};"
f"DATABASE={MSSQL_DB};"
f"UID={MSSQL_USER};"
f"PWD={MSSQL_PASS};"
"TrustServerCertificate=yes;"
"timeout=30;"
)
MSSQL_SA_URL = URL.create(
drivername="mssql+pyodbc",
query={"odbc_connect": _ODBC_DSN},
)
CH_HOST = os.getenv("CH_HOST", "localhost")
CH_PORT = int(os.getenv("CH_PORT", "8123"))
CH_USER = os.getenv("CH_USER", "default")
CH_PASS = os.getenv("CH_PASS", "")
CH_DB = os.getenv("CH_DB", "kelloggs")
CH_TABLE = "promotion_kpi"
BATCH_SIZE = 5000 # rows per ClickHouse insert
MID_CHUNK = 1000 # SQL Server IN-clause safe chunk size
PROJECT_ID = 40148
# ── Step 1: Build SQLAlchemy Engine ───────────────────────────────────────────
def build_engine() -> Engine:
"""
SQLAlchemy = smart manager (handles pooling, dialect, parameterisation)
pyodbc = actual wire (speaks ODBC to SQL Server driver)
URL.create = safe assembler (never mangles passwords with special chars)
The engine is created ONCE and shared across all steps.
Each with engine.connect() borrows a slot from the pool and returns
it automatically when the block exits — no manual close() calls.
"""
log.info("Building SQLAlchemy + pyodbc engine...")
engine = create_engine(
MSSQL_SA_URL,
pool_size=3,
max_overflow=2,
pool_pre_ping=True,
echo=False,
connect_args={"fast_executemany": True},
)
with engine.connect() as conn:
conn.execute(text("SELECT 1"))
log.info("Engine ready.")
return engine
# ── Step 2: Collect MIDs ──────────────────────────────────────────────────────
def collect_mids(engine: Engine, target_date: date) -> list[int]:
"""
Exact replica of the SP's @MID_TABLE_COV logic.
UNION of CreateDate + UpdateDate = catches both new visits and edits.
:target_date is a named SQLAlchemy parameter — safe, no injection risk.
"""
sql = text("""
SELECT MID FROM OneApp_KelloggsMT.dbo.T_StoreCoverage
WHERE CONVERT(date, CreateDate) = :target_date
UNION
SELECT MID FROM OneApp_KelloggsMT.dbo.T_StoreCoverage
WHERE CONVERT(date, UpdateDate) = :target_date
""")
log.info(f"Collecting MIDs for: {target_date}")
with engine.connect() as conn:
result = conn.execute(sql, {"target_date": target_date})
mids = [row[0] for row in result.fetchall()]
log.info(f"Found {len(mids):,} MIDs")
return mids
# ── Step 3: Fetch Promotion data ──────────────────────────────────────────────
def fetch_promotion_data(engine: Engine, mids: list[int]) -> pl.DataFrame:
"""
The SQL here is a direct translation of the SP's Promotion INSERT SELECT.
Three things worth understanding:
1. Correlated CASE subqueries (PromoValueName)
The SP resolves the promo item name by looking up whichever master
table the PromoTable column points to. SQL Server runs these
subqueries at execution time and sends Polars a plain VARCHAR.
Nothing special needed in Python — they arrive as strings.
2. LEFT JOINs (images, reason, question)
These are optional — not every promo has a photo or a question.
NULLs from LEFT JOINs are handled in the transform step.
3. ISNULL(mpq.PromoQuestionName, '') and ISNULL(tpq.PromoAnswerName, '')
Already handled in SQL — arrive as empty strings, not NULLs.
We still do fill_null('') in Polars for safety.
MID chunking: SQL Server's IN clause limit is ~2100 parameters.
We split into chunks of 1000 MIDs each and concat the Polars frames.
pl.read_database() opens a connection from the pool per chunk and
releases it back automatically after the query finishes.
"""
if not mids:
log.warning("No MIDs — nothing to fetch.")
return pl.DataFrame()
# Exact SELECT from the SP's Promotion INSERT — columns renamed to
# snake_case for ClickHouse convention
promotion_sql = """
SELECT
sc.MID AS mid,
{project_id} AS project_id,
sc.StoreId AS store_id,
Em.EmpId AS employee_id,
CONVERT(date, sc.VisitDate) AS visit_date,
Em.SupervisorId AS supervisor_id,
sm.ChannelId AS channel_id,
sm.ChainId AS chain_id,
sm.StoreTypeId AS storetype_id,
msd.PromoDefinitionId AS promo_definition_id,
msd.PromoDefinitionName AS promo_definition_name,
-- Resolve which dimension level the promo targets
-- (Category / SubCategory / Brand / SubBrand)
CASE
WHEN ISNULL(ts.PromoTable,'') = 'Master_Category' THEN 'Category'
WHEN ISNULL(ts.PromoTable,'') = 'Master_SubCategory' THEN 'SubCategory'
WHEN ISNULL(ts.PromoTable,'') = 'Master_Brand' THEN 'Brand'
WHEN ISNULL(ts.PromoTable,'') = 'Master_SubBrand' THEN 'SubBrand'
ELSE ''
END AS promotion_details,
ts.PromoValue AS promotion_details_id,
-- Resolve the actual name of that dimension item
-- SQL Server evaluates exactly ONE of these subqueries per row
CASE
WHEN ISNULL(ts.PromoTable,'') = 'Master_Category'
THEN (SELECT a.CategoryName
FROM OneApp_KelloggsMT.dbo.Master_Category a
WHERE a.CategoryId = ts.PromoValue)
WHEN ISNULL(ts.PromoTable,'') = 'Master_SubCategory'
THEN (SELECT a.SubCategoryName
FROM OneApp_KelloggsMT.dbo.Master_SubCategory a
WHERE a.SubCategoryId = ts.PromoValue)
WHEN ISNULL(ts.PromoTable,'') = 'Master_Brand'
THEN (SELECT a.BrandName
FROM OneApp_KelloggsMT.dbo.Master_Brand a
WHERE a.BrandId = ts.PromoValue)
WHEN ISNULL(ts.PromoTable,'') = 'Master_SubBrand'
THEN (SELECT a.SubBrandName
FROM OneApp_KelloggsMT.dbo.Master_SubBrand a
WHERE a.SubBrandId = ts.PromoValue)
END AS promo_value_name,
-- Present flag: 0 → 'N', anything else → 'Y'
CASE WHEN ts.Present = 0 THEN 'N' ELSE 'Y' END AS is_present,
-- Reason only populated when promo is ABSENT
CASE
WHEN ts.Present = 1 THEN ''
ELSE ISNULL(mnp.PromoReason, '')
END AS reason,
ISNULL(mpq.PromoQuestionName, '') AS question,
ISNULL(tpq.PromoAnswerName, '') AS answer,
-- Image URLs — empty string when no image
CASE
WHEN ISNULL(SHI.PromoImage1,'') = '' THEN ''
ELSE 'https://kimt1.parinaam.in/Upload/PromotionImages/'
+ SHI.PromoImage1
END AS image1,
CASE
WHEN ISNULL(SHI.PromoImage2,'') = '' THEN ''
ELSE 'https://kimt1.parinaam.in/Upload/PromotionImages/'
+ SHI.PromoImage2
END AS image2,
GETDATE() AS update_date,
'ETL-SQLAlchemy' AS update_by
FROM OneApp_KelloggsMT.dbo.T_Promotion ts
INNER JOIN OneApp_KelloggsMT.dbo.T_StoreCoverage sc
ON ts.mid = sc.mid
INNER JOIN OneApp_KelloggsMT.dbo.vw_StoreDetail sm
ON sc.StoreId = sm.StoreId
INNER JOIN OneApp_KelloggsMT.dbo.vw_Employee_Detail Em
ON sc.EmpId = Em.EmpId
INNER JOIN OneApp_KelloggsMT.dbo.Master_PromotionDefinition msd
ON msd.PromoDefinitionId = ts.PromoDefinitionId
LEFT JOIN OneApp_KelloggsMT.dbo.T_PromotionImages SHI
ON ts.PId = SHI.PId
LEFT JOIN OneApp_KelloggsMT.dbo.Master_PromotionReason mnp
ON ts.PromoReasonId = mnp.PromoReasonId
LEFT JOIN OneApp_KelloggsMT.dbo.t_promotionquestion tpq
ON ts.PId = tpq.PId
LEFT JOIN OneApp_KelloggsMT.dbo.Master_PromotionQuestion mpq
ON tpq.PromoQuestionId = mpq.PromoQuestionId
WHERE Em.EmpName NOT LIKE 'test%'
AND sc.MID IN ({mid_placeholders})
"""
frames: list[pl.DataFrame] = []
for i in range(0, len(mids), MID_CHUNK):
chunk = mids[i : i + MID_CHUNK]
mid_list = ",".join(str(m) for m in chunk) # safe: all integers
sql = promotion_sql.format(
project_id=PROJECT_ID,
mid_placeholders=mid_list,
)
log.info(
f"Fetching chunk {i // MID_CHUNK + 1} "
f"(MIDs {i+1}{min(i + MID_CHUNK, len(mids))} of {len(mids)})..."
)
# pl.read_database() borrows a connection from the SQLAlchemy pool,
# runs the query, converts the result directly to a Polars DataFrame,
# then returns the connection to the pool. One clean call per chunk.
df_chunk = pl.read_database(query=sql, connection=engine)
if len(df_chunk) > 0:
frames.append(df_chunk)
if not frames:
log.warning("No promotion rows returned.")
return pl.DataFrame()
df = pl.concat(frames)
log.info(f"Fetched {len(df):,} total rows from SQL Server")
return df
# ── Step 4: Transform with Polars ─────────────────────────────────────────────
def transform(df: pl.DataFrame) -> pl.DataFrame:
"""
Assembly-line approach: each .with_columns() block is one station.
No station does two different jobs — easy to debug and extend.
Promotion-specific notes:
┌──────────────────────┬──────────────────────────────────────────────┐
│ Column │ Why / How │
├──────────────────────┼──────────────────────────────────────────────┤
│ is_present │ 'Y'/'N' string → UInt8 (ClickHouse no Bool) │
│ promo_value_name │ Can be NULL when PromoTable is unknown → │
│ │ fill_null('') │
│ promotion_details_id │ INT from SQL Server → cast to Int64 │
│ image1 / image2 │ Already empty string from SQL, still safe │
│ update_date │ GETDATE() comes as datetime → cast Datetime │
└──────────────────────┴──────────────────────────────────────────────┘
"""
log.info("Transforming Promotion data...")
df = (
df
# ── IDs ───────────────────────────────────────────────────────────────
.with_columns([
pl.col("mid").cast(pl.Int64),
pl.col("project_id").cast(pl.Int32),
pl.col("store_id").cast(pl.Int64),
pl.col("employee_id").cast(pl.Int64),
pl.col("supervisor_id").cast(pl.Int64).fill_null(0),
pl.col("channel_id").cast(pl.Int32).fill_null(0),
pl.col("chain_id").cast(pl.Int64).fill_null(0),
pl.col("storetype_id").cast(pl.Int32).fill_null(0),
pl.col("promo_definition_id").cast(pl.Int32).fill_null(0),
# promotion_details_id = PromoValue (the FK into the chosen master)
pl.col("promotion_details_id").cast(pl.Int64).fill_null(0),
])
# ── Date ──────────────────────────────────────────────────────────────
# SQL Server CONVERT(date, ...) may arrive as Python datetime via ODBC
.with_columns(
pl.col("visit_date").cast(pl.Date)
)
# ── Present flag: 'Y'/'N' → UInt8 (1/0) ─────────────────────────────
# ClickHouse uses UInt8 for booleans — no native Bool type
.with_columns(
pl.when(pl.col("is_present") == "Y")
.then(pl.lit(1, dtype=pl.UInt8))
.otherwise(pl.lit(0, dtype=pl.UInt8))
.alias("is_present")
)
# ── Strings: NULL → empty string ──────────────────────────────────────
# ClickHouse String columns reject NULL — every string col needs this
# promo_value_name is the most likely NULL (unknown PromoTable value)
.with_columns([
pl.col("promo_definition_name").fill_null(""),
pl.col("promotion_details").fill_null(""),
pl.col("promo_value_name").fill_null(""),
pl.col("reason").fill_null(""),
pl.col("question").fill_null(""),
pl.col("answer").fill_null(""),
pl.col("image1").fill_null(""),
pl.col("image2").fill_null(""),
pl.col("update_by").fill_null("ETL-SQLAlchemy"),
])
# ── Timestamp ─────────────────────────────────────────────────────────
.with_columns(
pl.col("update_date").cast(pl.Datetime)
)
# ── Dedup ─────────────────────────────────────────────────────────────
# A promo record is unique per MID + promo_definition_id + details_id.
# Chunked fetches can produce duplicates if MIDs overlap — drop them.
.unique(subset=["project_id", "mid", "promo_definition_id", "promotion_details_id"])
)
log.info(f"After transform: {len(df):,} rows | schema: {df.schema}")
return df
# ── Step 5: Load to ClickHouse ────────────────────────────────────────────────
def load_to_clickhouse(df: pl.DataFrame) -> tuple[int, date]:
"""
Delete-then-insert mirrors the SP exactly:
DELETE FROM Promotion WHERE MID IN (...) AND project_id=40148
INSERT INTO Promotion ...
We delete by visit_date (cheaper than per-MID deletes in ClickHouse).
insert_arrow() uses Polars' native Arrow backing — zero-copy, no pandas.
"""
target_date: date = df["visit_date"].min()
log.info(f"Connecting to ClickHouse {CH_HOST}:{CH_PORT} db={CH_DB}...")
client = clickhouse_connect.get_client(
host=CH_HOST, port=CH_PORT,
username=CH_USER, password=CH_PASS,
database=CH_DB,
)
log.info(f"Deleting: project_id={PROJECT_ID} visit_date={target_date}")
client.command(
f"ALTER TABLE {CH_TABLE} DELETE "
f"WHERE project_id = {PROJECT_ID} "
f"AND visit_date = '{target_date}'"
)
total = 0
for start in range(0, len(df), BATCH_SIZE):
batch = df.slice(start, BATCH_SIZE)
client.insert_arrow(CH_TABLE, batch.to_arrow())
total += len(batch)
log.info(f" Inserted {total:,} / {len(df):,} rows")
log.info(f"ClickHouse load complete: {total:,} rows")
return total, target_date
# ── Step 6: Verify ────────────────────────────────────────────────────────────
def verify(expected: int, target_date: date) -> bool:
"""
Final sanity check. Exit code 2 = data problem (not a crash).
Your cron/scheduler can distinguish: exit 0 = OK, exit 1 = crash,
exit 2 = ran fine but data didn't land correctly.
"""
client = clickhouse_connect.get_client(
host=CH_HOST, port=CH_PORT,
username=CH_USER, password=CH_PASS,
database=CH_DB,
)
result = client.query(
f"SELECT count() FROM {CH_TABLE} "
f"WHERE project_id = {PROJECT_ID} "
f"AND visit_date = '{target_date}'"
)
actual = result.result_rows[0][0]
log.info(f"Verify — expected={expected:,} clickhouse={actual:,}")
if actual != expected:
log.error(f"MISMATCH: sent {expected:,}, ClickHouse has {actual:,}")
return False
log.info("Verify passed.")
return True
# ── Main ──────────────────────────────────────────────────────────────────────
def main() -> None:
target_date = date.today() - timedelta(days=1)
log.info(f"=== Promotion KPI ETL | date={target_date} ===")
engine = build_engine()
mids = collect_mids(engine, target_date)
if not mids:
log.info("No MIDs for yesterday. Nothing to do.")
sys.exit(0)
df_raw = fetch_promotion_data(engine, mids)
engine.dispose() # cleanly return all pooled connections to OS
if df_raw.is_empty():
log.warning("Empty result set. Exiting.")
sys.exit(0)
df_clean = transform(df_raw)
rows_inserted, inserted_date = load_to_clickhouse(df_clean)
if not verify(rows_inserted, inserted_date):
sys.exit(2)
log.info(f"=== Done. {rows_inserted:,} rows loaded for {inserted_date} ===")
if __name__ == "__main__":
main()
+26 -23
View File
@@ -428,37 +428,40 @@ def verify(expected: int, target_date: date) -> bool:
# ── Main ──────────────────────────────────────────────────────────────────────
def main() -> None:
target_date = date.today() - timedelta(days=1)
log.info(f"=== Coverage KPI ETL | date={target_date} ===")
#target_date = date.today() - timedelta(days=1)
for i in range(1, 10):
target_date = date(2023, 3, 9) + timedelta(days=i)
print(target_date)
log.info(f"=== Coverage KPI ETL | date={target_date} ===")
# Build the SQLAlchemy engine once — shared across all steps
engine = build_engine()
# Build the SQLAlchemy engine once — shared across all steps
engine = build_engine()
# Collect which MIDs need processing
mids = collect_mids(engine, target_date)
if not mids:
log.info("No MIDs for yesterday. Nothing to do.")
sys.exit(0)
# Collect which MIDs need processing
mids = collect_mids(engine, target_date)
if not mids:
log.info("No MIDs for yesterday. Nothing to do.")
sys.exit(0)
# Fetch raw data using pl.read_database()
df_raw = fetch_coverage_data(engine, mids)
engine.dispose() # return all pooled connections to OS cleanly
# Fetch raw data using pl.read_database()
df_raw = fetch_coverage_data(engine, mids)
engine.dispose() # return all pooled connections to OS cleanly
if df_raw.is_empty():
log.warning("Empty result set. Exiting.")
sys.exit(0)
if df_raw.is_empty():
log.warning("Empty result set. Exiting.")
sys.exit(0)
# Transform
df_clean = transform(df_raw)
# Transform
df_clean = transform(df_raw)
# Load
rows_inserted, inserted_date = load_to_clickhouse(df_clean)
# Load
rows_inserted, inserted_date = load_to_clickhouse(df_clean)
# Verify
if not verify(rows_inserted, inserted_date):
sys.exit(2)
# Verify
if not verify(rows_inserted, inserted_date):
sys.exit(2)
log.info(f"=== Done. {rows_inserted:,} rows loaded for {inserted_date} ===")
log.info(f"=== Done. {rows_inserted:,} rows loaded for {inserted_date} ===")
if __name__ == "__main__":