from __future__ import annotations import polars as pl from log import log def truncate_table( client, table_name: str, ) -> None: """ Full refresh tables. """ client.command( f"TRUNCATE TABLE {table_name}" ) log.info( "Truncated table %s", table_name, ) def delete_rows( client, table_name: str, where_clause: str, ) -> None: """ Generic ClickHouse delete. """ query = f""" ALTER TABLE {table_name} DELETE WHERE {where_clause} """ log.info(f"_ _ _ _ Deleting Data from ClickHouse for {table_name} _ _ _ _") client.command(query) def delete_existing_data( client, table_name: str, run_date, mids: list[int], emp_visit_df: pl.DataFrame, ) -> None: """ Incremental delete logic. Matches the old SQL procedure. """ # -------------------------------------------------- # MID based tables # -------------------------------------------------- mid_tables = { "additional_visibility", "Coverage", "Survey", "Promotion", "PaidVisibility", "SOS_OneApp", "Stock_Details", } if table_name in mid_tables and mids : mids_str = ",".join(str(mid) for mid in mids) delete_rows( client, table_name, f"Mid IN ({mids_str})", ) return # -------------------------------------------------- # Journey Plan # -------------------------------------------------- if table_name == "Journey_Plan": delete_rows( client, table_name, f""" toMonth(visit_date) = {run_date.month} AND toYear(visit_date) = {run_date.year} """, ) return # -------------------------------------------------- # Logins # -------------------------------------------------- if table_name == "Login": delete_rows( client, table_name, f""" toDate(login_date) = toDate('{run_date}') """ ) # -------------------------------------------------- # Web Logins # -------------------------------------------------- if table_name == "Web_Logins": delete_rows( client, table_name, f""" toDate(date) = toDate('{run_date}') """, ) return # -------------------------------------------------- # Attendance # -------------------------------------------------- if table_name == "Attendance": delete_rows( client, table_name, f""" toDate(visit_date) BETWEEN toDate('{run_date - }') AND toDate('{run_date}') = """, ) return # -------------------------------------------------- # OQaD # -------------------------------------------------- if ( table_name == "OQaD" and not emp_visit_df.is_empty() ): conditions = [ ( f"(employee_id={row['EmpId']} " f"AND toDate(visit_date)=" f"toDate('{row['VisitDate']}'))" ) for row in emp_visit_df.iter_rows( named=True ) ] delete_rows( client, table_name, " OR ".join(conditions), ) return log.info( "No delete logic required for %s", table_name, )