Files
data_pipeline/clickhouse_task/delete_task.py
T
2026-06-19 16:51:03 +05:30

185 lines
3.7 KiB
Python

from __future__ import annotations
import polars as pl
from datetime import date, datetime, timedelta
from log import log
def truncate_table(
client,
table_name: str,
) -> None:
"""
Full refresh tables.
"""
client.command(
f"TRUNCATE TABLE {table_name}"
)
log.info(
"Truncated table %s",
table_name,
)
def delete_rows(
client,
table_name: str,
where_clause: str,
) -> None:
"""
Generic ClickHouse delete.
"""
query = f"""
ALTER TABLE {table_name}
DELETE
WHERE {where_clause}
"""
log.info(f"_ _ _ _ Deleting Data from ClickHouse for {table_name} _ _ _ _")
client.command(query)
def delete_existing_data(
client,
table_name: str,
run_date,
mids: list[int],
emp_visit_df: pl.DataFrame,
) -> None:
"""
Incremental delete logic.
Matches the old SQL procedure.
"""
# --------------------------------------------------
# MID based tables
# --------------------------------------------------
mid_tables = {
"additional_visibility",
"Coverage",
"Survey",
"Promotion",
"PaidVisibility",
"SOS_OneApp",
"Stock_Details",
}
if table_name in mid_tables and mids :
mids_str = ",".join(str(mid) for mid in mids)
delete_rows(
client,
table_name,
f"Mid IN ({mids_str})",
)
return
# --------------------------------------------------
# Journey Plan
# --------------------------------------------------
if table_name == "Journey_Plan":
delete_rows(
client,
table_name,
f"""
project_id = 40148
AND toMonth(visit_date) = toMonth(toDate('2026-06-18'))
AND toYear(visit_date) = toYear(toDate('2026-06-18'))
""",
)
return
# --------------------------------------------------
# Logins
# --------------------------------------------------
if table_name == "Login":
delete_rows(
client,
table_name,
f"""
toDate(login_date) = toDate('{run_date}')
"""
)
# --------------------------------------------------
# Web Logins
# --------------------------------------------------
if table_name == "Web_Logins":
delete_rows(
client,
table_name,
f"""
toDate(date)
= toDate('{run_date}')
""",
)
return
# --------------------------------------------------
# Attendance
# --------------------------------------------------
if table_name == "Attendance":
delete_rows(
client,
table_name,
f"""
toDate(visit_date) BETWEEN toDate('{run_date - timedelta(days=15) }') AND toDate('{run_date}')
=
""",
)
return
# --------------------------------------------------
# OQaD
# --------------------------------------------------
if (
table_name == "OQaD"
and not emp_visit_df.is_empty()
):
conditions = [
(
f"(employee_id={row['EmpId']} "
f"AND toDate(visit_date)="
f"toDate('{row['VisitDate']}'))"
)
for row in emp_visit_df.iter_rows(
named=True
)
]
delete_rows(
client,
table_name,
" OR ".join(conditions),
)
return
log.info(
"No delete logic required for %s",
table_name,
)