# /// script # requires-python = ">=3.11" # dependencies = [ # "polars>=0.20.0", # "pyarrow>=18.0.0", # "sqlalchemy>=2.0.0", # "pyodbc>=5.0.0", # "clickhouse-connect>=0.7.0", # "clickhouse-sqlalchemy>=0.3.2", # "pyyaml>=6.0.3", # "python-dotenv>=1.0.0", # ] # /// from __future__ import annotations from time import sleep import sys from datetime import date, datetime, timedelta import polars as pl import yaml from log import log from clickhouse_task.create_table import create_clickhouse_table from clickhouse_task.delete_task import ( delete_existing_data, truncate_table, ) from clickhouse_task.load_table import load_to_clickhouse from db_con.connection import ( build_sql_server_engine, build_clickhouse_engine, get_clickhouse_client, ) from mids import ( MID_TABLE_COV, MID_TABLE_COV1, ) from src.bridge import * from src.fact import * from src.dim import * # ========================================================== # Helpers # ========================================================== def table_exists( client, table_name: str, ) -> bool: return bool( client.command( f"EXISTS TABLE {table_name}" ) ) # ========================================================== # Main # ========================================================== def main(): log.info("=" * 80) log.info("Hello from data-move Python data pipeline!") # ------------------------------------------------------ # Run Date # ------------------------------------------------------ if len(sys.argv) > 1: run_date = datetime.strptime( sys.argv[1], "%Y-%m-%d", ).date() else: run_date = date.today() - timedelta(days=1) log.info( "Pipeline Run Date: %s", run_date, ) # ------------------------------------------------------ # Connections # ------------------------------------------------------ log.info( "Connecting to databases..." ) sql_engine = build_sql_server_engine() clickhouse_engine = build_clickhouse_engine() client = get_clickhouse_client() log.info( "Database connections established" ) # ------------------------------------------------------ # mids Keys # ------------------------------------------------------ mids = MID_TABLE_COV( sql_engine, run_date, ) emp_visit_df = MID_TABLE_COV1( sql_engine, run_date, ) # ------------------------------------------------------ # Config # ------------------------------------------------------ with open( "t.yml", "r", ) as file: config = yaml.safe_load(file) # ------------------------------------------------------ # Process Tables # ------------------------------------------------------ for table in config["tables"]: table_name = table["name"] operation = table["operation"] fetch_by = table["fetch_by"] table_type=table["type"] log.info("=" * 80) log.info(f"Processing Table: {table_name} | Table type is -: {table_type} | Based on -{fetch_by} and operation is used -{operation} " ) try: # ------------------------------------------ # Fetch Data # ------------------------------------------ log.info(f"Fetching Data from sql server for table-: {table_name} ..............") fetch_list=["mids" ,"run_date", "reason_id"] if fetch_by in fetch_list : fn_name = f"fetch_{table_name}" fn = globals()[fn_name] df=fn(sql_engine, table_name , table_type, mids, run_date) else: df = fetch_data(sql_engine ,table_name,table_type) log.info(f"Fetched total row -: {len(df)} from sql server for table-:{table_name} ...........!!!") if df.is_empty(): log.warning( "%s returned no rows", table_name, ) continue log.info( "Fetched %s rows", len(df), ) # ------------------------------------------ # Create Table If Missing # ------------------------------------------ exists = table_exists( client, table_name, ) if not exists: log.info( "Creating table %s", table_name, ) create_clickhouse_table( df=df, table_name=table_name, clickhouse_engine=clickhouse_engine, ) # ------------------------------------------ # Existing Table Logic # ------------------------------------------ else: if operation == "DELETE+INSERT": truncate_table( client, table_name, ) elif operation =="ONLY_INSERT" : continue else: delete_existing_data( client=client, table_name=table_name, run_date=run_date, mids=mids, emp_visit_df=emp_visit_df, ) # ------------------------------------------ # Load Data # ------------------------------------------ log.info("_ _ _ _Inserting data into clickhouse db from sql server_ _ _ _") load_to_clickhouse( client=client, table_name=table_name, df=df, ) log.info( "%s loaded successfully (%s rows)", table_name, len(df), ) except Exception: log.exception( "Failed processing table %s", table_name, ) raise log.info("=" * 80) log.info("Pipeline Completed Successfully") log.info("=" * 80) if __name__ == "__main__": for attempt in range(3): try: main() with open("Pipeline_config.yml", "r") as f: config = yaml.safe_load(f) config["pipeline"]["last_successful_run_date"] = str(date.today()) with open("Pipeline_config.yml", "w") as f: yaml.safe_dump(config, f, sort_keys=False) log.info( f"Pipeline completed successfully. " f"last_successful_run_date={date.today()}" ) break except Exception as e: with open("Pipeline_config.yml", "r") as f: config = yaml.safe_load(f) config["pipeline"]["run_date"] = str(date.today()) with open("Pipeline_config.yml", "w") as f: yaml.safe_dump(config, f, sort_keys=False) if attempt == 2: raise log.warning( f"Pipeline failed. Retry {attempt + 1}/3. Error: {e}" ) sleep(5)