import pyodbc import pandas as pd import clickhouse_connect import numpy as np from datetime import datetime import traceback import warnings import time # ========================================================= # IGNORE WARNINGS # ========================================================= warnings.filterwarnings( 'ignore', 'pandas only supports SQLAlchemy connectable' ) print("\n================================================") print("ETL STARTED :", datetime.now()) print("================================================") # ========================================================= # SQL SERVER CONNECTION STRING # ========================================================= SQL_CONN_STR = ( 'DRIVER={ODBC Driver 17 for SQL Server};' 'SERVER=10.200.25.65;' 'DATABASE=CPMIndiaBusinessInsight;' 'UID=bsgteam_test;' 'PWD=B$gt3@m#00512;' 'TrustServerCertificate=yes;' 'Connection Timeout=60;' ) # ========================================================= # CLICKHOUSE CONFIG # ========================================================= CH_CONFIG = { 'host': '172.188.12.194', 'port': 8123, 'username': 'default', 'password': 'dipanshu_k', 'database': 'DaburIndia_BI' } # ========================================================= # TABLE DETAILS # ========================================================= TABLE_NAME = 'SOS_OneApp' PROJECT_ID = 41654 # ========================================================= # SETTINGS # ========================================================= TRUNCATE_BEFORE_LOAD = True table_truncated = False # ========================================================= # CHUNK SIZE # ========================================================= chunk_size = 25000 # ========================================================= # CONNECT SQL SERVER # ========================================================= def connect_sql(): try: conn = pyodbc.connect( SQL_CONN_STR, autocommit=True ) print("Connected SQL Server") return conn except Exception as e: print("SQL CONNECTION FAILED") print(str(e)) raise # ========================================================= # CONNECT CLICKHOUSE # ========================================================= def connect_clickhouse(): try: client = clickhouse_connect.get_client( **CH_CONFIG ) print("Connected ClickHouse") return client except Exception as e: print("CLICKHOUSE CONNECTION FAILED") print(str(e)) raise # ========================================================= # CLEAN DATAFRAME # ========================================================= def clean_dataframe(df): try: # --------------------------------------------- # Replace NaN # --------------------------------------------- df = df.replace({np.nan: None}) # --------------------------------------------- # Process Column Wise # --------------------------------------------- for col in df.columns: try: print(f"\nCleaning Column : {col}") # ===================================== # AUTO DETECT DATE / TIME COLUMNS # ===================================== if ( 'date' in col.lower() or 'time' in col.lower() ): print(f"Date Column : {col}") df[col] = pd.to_datetime( df[col], errors='coerce' ) cleaned_dates = [] for val in df[col]: if pd.isnull(val): cleaned_dates.append(None) else: cleaned_dates.append( val.to_pydatetime() ) df[col] = cleaned_dates # ===================================== # INTEGER COLUMNS # ===================================== elif pd.api.types.is_integer_dtype(df[col]): print(f"Integer Column : {col}") df[col] = pd.to_numeric( df[col], errors='coerce' ) df[col] = df[col].apply( lambda x: int(x) if pd.notnull(x) else None ) # ===================================== # FLOAT COLUMNS # ===================================== elif pd.api.types.is_float_dtype(df[col]): print(f"Float Column : {col}") df[col] = pd.to_numeric( df[col], errors='coerce' ) df[col] = df[col].apply( lambda x: float(x) if pd.notnull(x) else None ) # ===================================== # OBJECT / STRING COLUMNS # ===================================== else: print(f"String/Object Column : {col}") cleaned = [] for val in df[col]: # NULL if pd.isnull(val): cleaned.append(None) # DATETIME elif isinstance( val, ( datetime, pd.Timestamp ) ): if isinstance( val, pd.Timestamp ): cleaned.append( val.to_pydatetime() ) else: cleaned.append(val) # INTEGER elif isinstance( val, ( int, np.integer ) ): cleaned.append( int(val) ) # FLOAT elif isinstance( val, ( float, np.floating ) ): if np.isnan(val): cleaned.append(None) else: cleaned.append( float(val) ) # BOOLEAN elif isinstance( val, bool ): cleaned.append( int(val) ) # STRING else: cleaned.append( str(val).strip() ) df[col] = cleaned except Exception as col_error: print("\n================================") print(f"COLUMN FAILED : {col}") print(str(col_error)) print("================================") return df except Exception as clean_error: print("\n================================") print("DATA CLEAN FAILED") print(str(clean_error)) print("================================") return df # ========================================================= # MAIN PROCESS # ========================================================= try: # ===================================================== # CONNECT SQL SERVER # ===================================================== sql_conn = connect_sql() # ===================================================== # CONNECT CLICKHOUSE # ===================================================== ch_client = connect_clickhouse() # ===================================================== # QUERY # ===================================================== query = f""" SELECT * FROM dbo.[{TABLE_NAME}] WHERE Project_Id = {PROJECT_ID} """ print("\nExecuting Query") print(query) # ===================================================== # RETRY SETTINGS # ===================================================== retry_count = 0 max_retry = 5 # ===================================================== # MAIN RETRY LOOP # ===================================================== while retry_count < max_retry: try: # ============================================= # READ SQL DATA # ============================================= for chunk in pd.read_sql( query, sql_conn, chunksize=chunk_size ): try: print("\n================================") print( f"Processing Rows : " f"{len(chunk)}" ) print("================================") # ===================================== # CLEAN DATA # ===================================== chunk = clean_dataframe(chunk) # ===================================== # DEBUG COLUMN TYPES # ===================================== print("\nCOLUMN DATATYPES") print(chunk.dtypes) print("\nCOLUMN SAMPLE TYPES") for col in chunk.columns: sample = chunk[col].dropna() if len(sample) > 0: print( col, type(sample.iloc[0]), sample.iloc[0] ) # ===================================== # TRUNCATE TABLE FIRST TIME ONLY # ===================================== if ( TRUNCATE_BEFORE_LOAD and not table_truncated ): print("\n================================") print( f"TRUNCATING TABLE : " f"{TABLE_NAME}" ) print("================================") # IMPORTANT FIX truncate_query = f""" TRUNCATE TABLE `{CH_CONFIG['database']}`.`{TABLE_NAME}` """ ch_client.command( truncate_query ) print( "TABLE TRUNCATED SUCCESSFULLY" ) table_truncated = True # ===================================== # INSERT INTO CLICKHOUSE # ===================================== print( "\nINSERTING INTO CLICKHOUSE..." ) ch_client.insert_df( table=TABLE_NAME, df=chunk, database=CH_CONFIG['database'] ) print( f"INSERTED : " f"{len(chunk)} ROWS" ) except Exception as insert_error: print("\n================================") print("INSERT FAILED") print("================================") print(str(insert_error)) traceback.print_exc() # ================================= # SAVE ERROR LOG # ================================= with open( "insert_error.log", "a", encoding="utf-8" ) as log: log.write( "\n\n================================" ) log.write( f"\nTIME : " f"{datetime.now()}" ) log.write( f"\nTABLE : " f"{TABLE_NAME}" ) log.write( f"\nERROR : " f"{str(insert_error)}" ) log.write( f"\nTRACEBACK :\n" f"{traceback.format_exc()}" ) log.write( "\n================================" ) continue # ============================================= # SUCCESS # ============================================= break # ================================================= # SQL CONNECTION FAILURE # ================================================= except pyodbc.OperationalError as op_error: retry_count += 1 print("\n================================") print( f"SQL CONNECTION LOST " f"- RETRY {retry_count}" ) print("================================") print(str(op_error)) time.sleep(10) try: sql_conn.close() except: pass # RECONNECT SQL sql_conn = connect_sql() # ================================================= # OTHER ERROR # ================================================= except Exception as loop_error: print("\n================================") print("MAIN LOOP ERROR") print("================================") print(str(loop_error)) traceback.print_exc() break print("\n================================") print("ETL COMPLETED SUCCESSFULLY") print("================================") # ========================================================= # MAIN ERROR # ========================================================= except Exception as main_error: print("\n================================") print("MAIN ERROR") print("================================") print(str(main_error)) traceback.print_exc() with open( "main_error.log", "a", encoding="utf-8" ) as log: log.write( "\n\n================================" ) log.write( f"\nTIME : {datetime.now()}" ) log.write( f"\nERROR : {str(main_error)}" ) log.write( f"\nTRACEBACK :\n" f"{traceback.format_exc()}" ) log.write( "\n================================" ) # ========================================================= # CLOSE CONNECTIONS # ========================================================= finally: try: sql_conn.close() print("\nSQL SERVER CONNECTION CLOSED") except: pass try: ch_client.close() print("CLICKHOUSE CONNECTION CLOSED") except: pass print("\n================================================") print("ETL FINISHED :", datetime.now()) print("================================================")