From 2c5a0517722f888a17553811cc7b9d4c35e88ba7 Mon Sep 17 00:00:00 2001 From: Ankit Malik Date: Wed, 3 Jun 2026 17:25:17 +0530 Subject: [PATCH] second commit --- create_promotion_table.sql | 97 ++++++++++++++++++++++++++++++++++++++ main.py | 49 ++++++++++--------- 2 files changed, 123 insertions(+), 23 deletions(-) create mode 100644 create_promotion_table.sql diff --git a/create_promotion_table.sql b/create_promotion_table.sql new file mode 100644 index 0000000..4da9156 --- /dev/null +++ b/create_promotion_table.sql @@ -0,0 +1,97 @@ +-- ClickHouse DDL: promotion_kpi table +-- Run ONCE before the first ETL run +-- Connect: podman exec -it clickhouse-kelloggs clickhouse-client + +CREATE DATABASE IF NOT EXISTS kelloggs; + +CREATE TABLE IF NOT EXISTS kelloggs.promotion_kpi +( + -- ── Core identifiers ──────────────────────────────────────────────────── + mid Int64, + project_id Int32, + store_id Int64, + employee_id Int64, + supervisor_id Int64, + + -- ── Store dimension ───────────────────────────────────────────────────── + channel_id Int32, + chain_id Int64, + storetype_id Int32, + + -- ── Visit ─────────────────────────────────────────────────────────────── + visit_date Date32, + + -- ── Promotion definition ──────────────────────────────────────────────── + promo_definition_id Int32, + promo_definition_name String, + + -- ── Promotion item (Category / SubCategory / Brand / SubBrand) ────────── + -- LowCardinality = dictionary-encoding — ideal for columns with few + -- distinct values (only 4 possible values here) + promotion_details LowCardinality(String), -- 'Category' | 'SubCategory' | 'Brand' | 'SubBrand' + promotion_details_id Int64, -- FK into the chosen master table + promo_value_name String, -- e.g. 'Chocos', 'Breakfast Cereals' + + -- ── Execution status ──────────────────────────────────────────────────── + is_present UInt8, -- 1 = present (Y), 0 = absent (N) + reason String, -- populated only when is_present = 0 + + -- ── Optional quiz ─────────────────────────────────────────────────────── + question String, + answer String, + + -- ── Photo proof ───────────────────────────────────────────────────────── + image1 String, -- full URL or empty string + image2 String, + + -- ── Audit ─────────────────────────────────────────────────────────────── + update_date DateTime DEFAULT now(), + update_by String +) +ENGINE = MergeTree() +-- Sort key: queries always filter on project_id + visit_date first +-- Adding promo_definition_id speeds up "show me all Promo X results" +ORDER BY (project_id, visit_date, mid, promo_definition_id) +PARTITION BY toYYYYMM(visit_date) +-- Bloom filter on store_id — speeds up store-level promotion reports +INDEX idx_store store_id TYPE bloom_filter GRANULARITY 1 +-- Bloom filter on employee_id — speeds up per-rep promotion audits +INDEX idx_emp employee_id TYPE bloom_filter GRANULARITY 1 +SETTINGS index_granularity = 8192; + + +-- ── Verification queries ───────────────────────────────────────────────────── + +-- 1. How many promotions loaded today? +-- SELECT count(), visit_date +-- FROM kelloggs.promotion_kpi +-- WHERE project_id = 40148 +-- GROUP BY visit_date ORDER BY visit_date DESC LIMIT 10; + +-- 2. Present vs absent breakdown +-- SELECT +-- visit_date, +-- countIf(is_present = 1) AS present, +-- countIf(is_present = 0) AS absent, +-- count() AS total, +-- round(present / total * 100, 1) AS present_pct +-- FROM kelloggs.promotion_kpi +-- WHERE project_id = 40148 +-- GROUP BY visit_date ORDER BY visit_date DESC LIMIT 7; + +-- 3. Top promotion definitions by absence +-- SELECT promo_definition_name, count() AS absent_count +-- FROM kelloggs.promotion_kpi +-- WHERE project_id = 40148 +-- AND is_present = 0 +-- AND visit_date >= today() - 7 +-- GROUP BY promo_definition_name +-- ORDER BY absent_count DESC LIMIT 10; + +-- 4. Absence reasons breakdown +-- SELECT reason, count() AS cnt +-- FROM kelloggs.promotion_kpi +-- WHERE project_id = 40148 +-- AND is_present = 0 +-- AND visit_date = yesterday() +-- GROUP BY reason ORDER BY cnt DESC; diff --git a/main.py b/main.py index 4f09cbb..392f7dd 100644 --- a/main.py +++ b/main.py @@ -428,37 +428,40 @@ def verify(expected: int, target_date: date) -> bool: # ── Main ────────────────────────────────────────────────────────────────────── def main() -> None: - target_date = date.today() - timedelta(days=1) - log.info(f"=== Coverage KPI ETL | date={target_date} ===") + #target_date = date.today() - timedelta(days=1) + for i in range(1, 10): + target_date = date(2023, 3, 9) + timedelta(days=i) + print(target_date) + log.info(f"=== Coverage KPI ETL | date={target_date} ===") - # Build the SQLAlchemy engine once — shared across all steps - engine = build_engine() + # Build the SQLAlchemy engine once — shared across all steps + engine = build_engine() - # Collect which MIDs need processing - mids = collect_mids(engine, target_date) - if not mids: - log.info("No MIDs for yesterday. Nothing to do.") - sys.exit(0) + # Collect which MIDs need processing + mids = collect_mids(engine, target_date) + if not mids: + log.info("No MIDs for yesterday. Nothing to do.") + sys.exit(0) - # Fetch raw data using pl.read_database() - df_raw = fetch_coverage_data(engine, mids) - engine.dispose() # return all pooled connections to OS cleanly + # Fetch raw data using pl.read_database() + df_raw = fetch_coverage_data(engine, mids) + engine.dispose() # return all pooled connections to OS cleanly - if df_raw.is_empty(): - log.warning("Empty result set. Exiting.") - sys.exit(0) + if df_raw.is_empty(): + log.warning("Empty result set. Exiting.") + sys.exit(0) - # Transform - df_clean = transform(df_raw) + # Transform + df_clean = transform(df_raw) - # Load - rows_inserted, inserted_date = load_to_clickhouse(df_clean) + # Load + rows_inserted, inserted_date = load_to_clickhouse(df_clean) - # Verify - if not verify(rows_inserted, inserted_date): - sys.exit(2) + # Verify + if not verify(rows_inserted, inserted_date): + sys.exit(2) - log.info(f"=== Done. {rows_inserted:,} rows loaded for {inserted_date} ===") + log.info(f"=== Done. {rows_inserted:,} rows loaded for {inserted_date} ===") if __name__ == "__main__":