# /// script # requires-python = ">=3.11" # dependencies = [ # "polars>=0.20.0", # "pyarrow>=18.0.0", # "sqlalchemy>=2.0.0", # "pyodbc>=5.0.0", # "clickhouse-connect>=0.7.0", # "clickhouse-sqlalchemy>=0.3.2", # "pyyaml>=6.0.3", # "python-dotenv>=1.0.0", # ] # /// from __future__ import annotations from time import sleep import sys from datetime import date, datetime, timedelta import polars as pl import yaml from log import log from clickhouse_task.create_table import create_clickhouse_table from clickhouse_task.delete_task import ( delete_existing_data, truncate_table, ) from clickhouse_task.load_table import load_to_clickhouse from db_con.connection import ( build_sql_server_engine, build_clickhouse_engine, get_clickhouse_client, ) from mids import ( MID_TABLE_COV, MID_TABLE_COV1, ) from src.bridge import * from src.fact import * from src.dim import * # ========================================================== # Helpers # ========================================================== def get_dates_from_yaml(filename: str): with open(filename, "r") as file: data = yaml.safe_load(file) start_date = date.fromisoformat( str(data["pipeline"]["start_date"]) ) end_date = date.fromisoformat( str(data["pipeline"]["end_date"]) ) flag=str(data["pipeline"]["flag"]) return start_date, end_date , flag def write_table_to_yaml( data: dict, run_date: date, filename: str | None = None ): """Write table data to a YAML file.""" if filename is None: filename = f"elt_pipeline_{run_date}.yml" with open(filename, "w") as file: yaml.dump( data, file, default_flow_style=False, sort_keys=False ) print(f"Table written to {filename}") def table_exists( client, table_name: str, ) -> bool: return bool( client.command( f"EXISTS TABLE {table_name}" ) ) # ========================================================== # Main # ========================================================== def elt(run_date : date): log.info("=" * 80) log.info("Hello from data-move Python data pipeline!") # ------------------------------------------------------ # Run Date # ------------------------------------------------------ log.info( "Pipeline Run Date: %s", run_date, ) # ------------------------------------------------------ # Connections # ------------------------------------------------------ log.info( "Connecting to databases..." ) sql_engine = build_sql_server_engine() clickhouse_engine = build_clickhouse_engine() client = get_clickhouse_client() log.info( "Database connections established" ) # ------------------------------------------------------ # mids Keys # ------------------------------------------------------ mids = MID_TABLE_COV( sql_engine, run_date, ) emp_visit_df = MID_TABLE_COV1( sql_engine, run_date, ) # ------------------------------------------------------ # Config # ------------------------------------------------------ with open( "y.yml", "r", ) as file: config = yaml.safe_load(file) # ------------------------------------------------------ # Process Tables # ------------------------------------------------------ for table in config["tables"]: table_name = table["name"] operation = table["operation"] fetch_by = table["fetch_by"] table_type=table["type"] log.info("=" * 80) log.info(f"Processing Table-:{table_name} | Table type -:{table_type} | fetcht by-:{fetch_by} | operation-:{operation}" ) try: # ------------------------------------------ # Fetch Data # ------------------------------------------ log.info(f"Fetching Data from sql server for table-: {table_name} ..............") fetch_list=["mids" ,"run_date", "reason_id"] if fetch_by in fetch_list : fn_name = f"fetch_{table_name}" fn = globals()[fn_name] df=fn(sql_engine, table_name , table_type, mids, run_date) else: df = fetch_data(sql_engine ,table_name,table_type) log.info(f"Fetched total row -: {len(df)} from sql server for table-:{table_name} ...........!!!") if df.is_empty(): log.warning( "%s returned no rows", table_name, ) continue log.info( "Fetched %s rows", len(df), ) # ------------------------------------------ # Create Table If Missing # ------------------------------------------ exists = table_exists( client, table_name, ) if not exists: log.info( "Creating table %s", table_name, ) create_clickhouse_table( df=df, table_name=table_name, clickhouse_engine=clickhouse_engine, ) # ------------------------------------------ # Existing Table Logic # ------------------------------------------ else: if operation == "DELETE+INSERT": truncate_table( client, table_name, ) else: delete_existing_data( client=client, table_name=table_name, run_date=run_date, mids=mids, emp_visit_df=emp_visit_df, ) # ------------------------------------------ # Load Data # ------------------------------------------ log.info("_ _ _ _Inserting data into clickhouse db from sql server_ _ _ _") load_to_clickhouse( client=client, table_name=table_name, df=df, ) log.info( "%s loaded successfully (%s rows)", table_name, len(df), ) except Exception: log.exception( "Failed processing table %s", table_name, ) raise log.info("=" * 80) log.info("Pipeline Completed Successfully") log.info("=" * 80) def main() : config_file = Path("Pipeline_config.yml") if not config_file.exists(): default_config = { "pipeline": { "run_date": None, "status": None, "error_message": None, } } with open(config_file, "w") as f: yaml.safe_dump(default_config, f) with open(config_file, "r") as f: config = yaml.safe_load(f) p_start_date, p_end_date , flag= get_dates_from_yaml("elt_pipeline_custom_dates.yml") if flag =="Y" : start_date=p_start_date end_date=p_end_date elif len(sys.argv) > 1: start_date = datetime.strptime( sys.argv[1], "%Y-%m-%d", ).date() end_date=start_date + timedelta(days=1) else: start_date = date.today() - timedelta(days=1) end_date=start_date log.info( "Pipeline Start Date: %s", start_date, ) failed_dates=[] successful_dates=[] filename_successful = "successful_Pipeline_dates_config.yml" filename_failed = "failed_Pipeline_dates_config.yml" while start_date <=end_date: run_date = start_date for attempt in range(3): try: elt(run_date) successful_dates.append({ 'pipeline_trigeered_on_date': str(date.today()), 'last_successful_run_date': run_date, }) log.info( f"Pipeline completed successfully. " f"pipeline_trigeered_on_date={date.today()}" f"last_successful_run_date={run_date}" ) break except Exception as e: failed_dates.append({ 'pipeline_trigeered_on_date': str(date.today()), 'failed_run_date': run_date, "attempt" : attempt }) if attempt == 2: raise log.warning( f"Pipeline failed. Retry {attempt + 1}/3. Error: {e}" ) sleep(5) start_date=start_date + timedelta(days=1) with open(filename_successful, "w") as f: yaml.dump( successful_dates, f, default_flow_style=False, sort_keys=False, ) if len(failed_dates) == 0 : failed_dates.append({ 'pipeline_trigeered_on_date': str(date.today()), 'failed_run_date': "none", "attempt" : "none" }) with open(filename_failed, "w") as f: yaml.dump(failed_dates, f, default_flow_style=False, sort_keys=False) if __name__ == "__main__": main()