# /// script # requires-python = ">=3.11" # dependencies = [ # "polars>=0.20.0", # "pyarrow>=18.0.0", # "sqlalchemy>=2.0.0", # "pyodbc>=5.0.0", # "clickhouse-connect>=0.7.0", # "clickhouse-sqlalchemy>=0.3.2", # "pyyaml>=6.0.3", # "python-dotenv>=1.0.0", # ] # /// from __future__ import annotations import sys from datetime import date, datetime, timedelta import polars as pl import yaml from log import log from clickhouse_task.create_table import create_clickhouse_table from clickhouse_task.delete_task import ( delete_existing_data, truncate_table, ) from clickhouse_task.load_table import load_to_clickhouse from db_con.connection import ( build_sql_server_engine, build_clickhouse_engine, get_clickhouse_client, ) from mids import ( MID_TABLE_COV, MID_TABLE_COV1, ) from masters.dimensions import * from masters.bridge import * from src.fact import * # ========================================================== # Helpers # ========================================================== def table_exists( client, table_name: str, ) -> bool: return bool( client.command( f"EXISTS TABLE {table_name}" ) ) def get_dataframe( fn_name: str, fetch_by: str, sql_engine, mids, run_date, ): fn = globals()[fn_name] if fetch_by == "mids": return fn(sql_engine, mids) if fetch_by == "run_date": return fn(sql_engine, run_date) return fn(sql_engine) def get_empids_clickhouse_OQAD( client, table_name: str = "OQaD", ) -> pl.DataFrame: if not table_exists(client, table_name): log.warning(f"Table '{table_name}' does not exist.") return pl.DataFrame( schema={ "empid": pl.Int64, "visitdate": pl.Date, } ) query = f""" SELECT DISTINCT employee_id AS empid, visit_date AS visitdate FROM {table_name} """ # ClickHouse -> PyArrow -> Polars arrow_table = client.query_arrow(query) return pl.from_arrow(arrow_table) # ========================================================== # Main # ========================================================== def main(): log.info("=" * 80) log.info("Hello from data-move Python data pipeline!") # ------------------------------------------------------ # Run Date # ------------------------------------------------------ if len(sys.argv) > 1: run_date = datetime.strptime( sys.argv[1], "%Y-%m-%d", ).date() else: run_date = date.today() - timedelta(days=1) log.info( "Pipeline Run Date: %s", run_date, ) # ------------------------------------------------------ # Connections # ------------------------------------------------------ log.info( "Connecting to databases..." ) sql_engine = build_sql_server_engine() clickhouse_engine = build_clickhouse_engine() client = get_clickhouse_client() log.info( "Database connections established" ) # ------------------------------------------------------ # mids Keys # ------------------------------------------------------ mids = MID_TABLE_COV( sql_engine, run_date, ) emp_visit_df = MID_TABLE_COV1( sql_engine, run_date, ) qf=fetch_quiz_empids(sql_engine,run_date) db_df = get_empids_clickhouse_OQAD(client) matched = qf.join( db_df, on=["empid", "visitdate"], how="inner", ) empids=matched["empid"].to_list() log.info(f"Fetched {len(empids):,} matched empids fetched for OQAD ") # ------------------------------------------------------ # Config # ------------------------------------------------------ with open( "y.yml", "r", ) as file: config = yaml.safe_load(file) # ------------------------------------------------------ # Process Tables # ------------------------------------------------------ for table in config["tables"]: table_name = table["name"] operation = table["operation"] fetch_by = table["fetch_by"] table_type=table["type"] log.info("=" * 80) log.info( "Processing Table: %s", table_name, ) try: # ------------------------------------------ # Fetch Data # ------------------------------------------ if table_name =="OQaD": df=fetch_OQaD( engine=sql_engine, table_name=table_name, table_type=table_type, empids=empids, run_date=run_date ) else: df = fetch_data( engine=sql_engine, table_name=table_name, table_type=table_type, mids=mids, run_date=run_date, ) if df.is_empty(): log.warning( "%s returned no rows", table_name, ) continue log.info( "Fetched %s rows", len(df), ) # ------------------------------------------ # Create Table If Missing # ------------------------------------------ exists = table_exists( client, table_name, ) if not exists: log.info( "Creating table %s", table_name, ) create_clickhouse_table( df=df, table_name=table_name, clickhouse_engine=clickhouse_engine, ) # ------------------------------------------ # Existing Table Logic # ------------------------------------------ else: if operation == "DELETE+INSERT": truncate_table( client, table_name, ) else: delete_existing_data( client=client, table_name=table_name, run_date=run_date, mids=mids, emp_visit_df=emp_visit_df, ) # ------------------------------------------ # Load Data # ------------------------------------------ load_to_clickhouse( client=client, table_name=table_name, df=df, ) log.info( "%s loaded successfully (%s rows)", table_name, len(df), ) except Exception: log.exception( "Failed processing table %s", table_name, ) raise log.info("=" * 80) log.info("Pipeline Completed Successfully") log.info("=" * 80) if __name__ == "__main__": main()