first commit

This commit is contained in:
Ankit Malik
2026-06-22 16:03:00 +05:30
commit e218aafc26
46 changed files with 5416 additions and 0 deletions
+54
View File
@@ -0,0 +1,54 @@
import polars as pl
from sqlalchemy import text
from log import *
def create_clickhouse_table(
df: pl.DataFrame,
table_name: str,
clickhouse_engine
):
type_mapping = {
pl.Int8: "Nullable(Int8)",
pl.Int16: "Nullable(Int16)",
pl.Int32: "Nullable(Int32)",
pl.Int64: "Nullable(Int64)",
pl.UInt8: "Nullable(UInt8)",
pl.UInt16: "Nullable(UInt16)",
pl.UInt32: "Nullable(UInt32)",
pl.UInt64: "Nullable(UInt64)",
pl.Float32: "Nullable(Float32)",
pl.Float64: "Nullable(Float64)",
pl.Boolean: "Nullable(Bool)",
pl.String: "Nullable(String)",
pl.Date: "Nullable(Date)",
pl.Datetime: "Nullable(DateTime)",
}
columns = []
for col_name, dtype in df.schema.items():
clickhouse_type = type_mapping.get(
dtype,
"Nullable(String)"
)
columns.append(
f"`{col_name}` {clickhouse_type}"
)
create_sql = f"""
CREATE TABLE IF NOT EXISTS {table_name}
(
{', '.join(columns)}
)
ENGINE = MergeTree()
ORDER BY tuple()
"""
with clickhouse_engine.begin() as conn:
conn.execute(text(create_sql))
log.info(f"Table ready: {table_name}")
+185
View File
@@ -0,0 +1,185 @@
from __future__ import annotations
import polars as pl
from datetime import date, datetime, timedelta
from log import log
def truncate_table(
client,
table_name: str,
) -> None:
"""
Full refresh tables.
"""
client.command(
f"TRUNCATE TABLE {table_name}"
)
log.info(
"Truncated table %s",
table_name,
)
def delete_rows(
client,
table_name: str,
where_clause: str,
) -> None:
"""
Generic ClickHouse delete.
"""
query = f"""
ALTER TABLE {table_name}
DELETE
WHERE {where_clause}
"""
log.info(f"_ _ _ _ Deleting Data from ClickHouse for {table_name} _ _ _ _")
client.command(query)
def delete_existing_data(
client,
table_name: str,
run_date,
mids: list[int],
emp_visit_df: pl.DataFrame,
) -> None:
"""
Incremental delete logic.
Matches the old SQL procedure.
"""
# --------------------------------------------------
# MID based tables
# --------------------------------------------------
mid_tables = {
"additional_visibility",
"Coverage",
"Survey",
"Promotion",
"PaidVisibility",
"SOS_OneApp",
"Stock_Details",
}
if table_name in mid_tables and mids :
mids_str = ",".join(str(mid) for mid in mids)
delete_rows(
client,
table_name,
f"Mid IN ({mids_str})",
)
return
# --------------------------------------------------
# Journey Plan
# --------------------------------------------------
if table_name == "Journey_Plan":
delete_rows(
client,
table_name,
f"""
project_id = 40148
AND toMonth(visit_date) = toMonth(toDate('2026-06-18'))
AND toYear(visit_date) = toYear(toDate('2026-06-18'))
""",
)
return
# --------------------------------------------------
# Logins
# --------------------------------------------------
if table_name == "Login":
delete_rows(
client,
table_name,
f"""
toDate(login_date) = toDate('{run_date}')
"""
)
# --------------------------------------------------
# Web Logins
# --------------------------------------------------
if table_name == "Web_Logins":
delete_rows(
client,
table_name,
f"""
toDate(date)
= toDate('{run_date}')
""",
)
return
# --------------------------------------------------
# Attendance
# --------------------------------------------------
if table_name == "Attendance":
delete_rows(
client,
table_name,
f"""
toDate(visit_date) BETWEEN toDate('{run_date - timedelta(days=15) }') AND toDate('{run_date}')
=
""",
)
return
# --------------------------------------------------
# OQaD
# --------------------------------------------------
if (
table_name == "OQaD"
and not emp_visit_df.is_empty()
):
conditions = [
(
f"(employee_id={row['EmpId']} "
f"AND toDate(visit_date)="
f"toDate('{row['VisitDate']}'))"
)
for row in emp_visit_df.iter_rows(
named=True
)
]
delete_rows(
client,
table_name,
" OR ".join(conditions),
)
return
log.info(
"No delete logic required for %s",
table_name,
)
+32
View File
@@ -0,0 +1,32 @@
from sqlalchemy import create_engine, text
from sqlalchemy.engine import URL, Engine
import os
import clickhouse_connect
import polars as pd
import pyarrow
from log import log
def load_to_clickhouse(
client: Client,
table_name: str,
df: pl.DataFrame,
) -> None:
"""
Load a Polars DataFrame into ClickHouse using Arrow.
"""
if df.is_empty():
log.warning(f"{table_name}: DataFrame is empty. Skipping.")
return
arrow_table = df.to_arrow()
client.insert_arrow(
table=table_name,
arrow_table=arrow_table,
)
log.info(
f"{table_name}: inserted {len(df):,} rows into ClickHouse"
)