Files
2026-06-15 13:52:24 +05:30

188 lines
3.5 KiB
Python

from __future__ import annotations
import polars as pl
from log import log
def truncate_table(
client,
table_name: str,
) -> None:
"""
Full refresh tables.
"""
client.command(
f"TRUNCATE TABLE {table_name}"
)
log.info(
"Truncated table %s",
table_name,
)
def delete_rows(
client,
table_name: str,
where_clause: str,
) -> None:
"""
Generic ClickHouse delete.
"""
query = f"""
ALTER TABLE {table_name}
DELETE
WHERE {where_clause}
"""
log.info(
"Deleting from %s",
table_name,
)
client.command(query)
def delete_existing_data(
client,
table_name: str,
run_date,
mids: list[int],
emp_visit_df: pl.DataFrame,
) -> None:
"""
Incremental delete logic.
Matches the old SQL procedure.
"""
# --------------------------------------------------
# MID based tables
# --------------------------------------------------
mid_tables = {
"additional_visibility",
"Coverage",
"Survey",
"Promotion",
"PaidVisibility",
"SOS_OneApp",
"Stock_Details",
}
if table_name in mid_tables and mids:
mids_str = ",".join(
map(str, mids)
)
delete_rows(
client,
table_name,
f"MID IN ({mids_str})",
)
return
# --------------------------------------------------
# Journey Plan
# --------------------------------------------------
if table_name == "Journey_Plan":
delete_rows(
client,
table_name,
f"""
toMonth(visit_date) = {run_date.month}
AND toYear(visit_date) = {run_date.year}
""",
)
return
# --------------------------------------------------
# Logins
# --------------------------------------------------
if table_name == "Login":
delete_rows(
client,
table_name,
f"""
toDate(login_date) = toDate('{run_date}')
"""
)
# --------------------------------------------------
# Web Logins
# --------------------------------------------------
if table_name == "Web_Logins":
delete_rows(
client,
table_name,
f"""
toDate(date)
= toDate('{run_date}')
""",
)
return
# --------------------------------------------------
# Attendance
# --------------------------------------------------
if table_name == "Attendance":
delete_rows(
client,
table_name,
f"""
toDate(attendance_date)
= toDate('{run_date}')
""",
)
return
# --------------------------------------------------
# OQaD
# --------------------------------------------------
if (
table_name == "OQaD"
and not emp_visit_df.is_empty()
):
conditions = [
(
f"(employee_id={row['EmpId']} "
f"AND toDate(visit_date)="
f"toDate('{row['VisitDate']}'))"
)
for row in emp_visit_df.iter_rows(
named=True
)
]
delete_rows(
client,
table_name,
" OR ".join(conditions),
)
return
log.info(
"No delete logic required for %s",
table_name,
)