import pyodbc import pandas as pd import clickhouse_connect import numpy as np from datetime import datetime import traceback import warnings # --------------------------------------------------- # Ignore Warning # --------------------------------------------------- warnings.filterwarnings( 'ignore', 'pandas only supports SQLAlchemy connectable' ) print("ETL Started :", datetime.now()) # --------------------------------------------------- # SQL SERVER CONNECTION # --------------------------------------------------- SQL_CONN_STR = ( 'DRIVER={ODBC Driver 17 for SQL Server};' 'SERVER=10.200.25.65;' 'DATABASE=CPMIndiaBusinessInsight;' 'UID=bsgteam_test;' 'PWD=B$gt3@m#00512;' 'TrustServerCertificate=yes;' ) # --------------------------------------------------- # CLICKHOUSE CONFIG # --------------------------------------------------- CH_CONFIG = { 'host': '172.188.12.194', 'port': 8123, 'username': 'default', 'password': 'dipanshu_k', 'database': 'DaburIndia_BI' } # --------------------------------------------------- # TABLE DETAILS # --------------------------------------------------- TABLE_NAME = 'Employee_Master' PROJECT_ID = 41654 # --------------------------------------------------- # CLEAN DATAFRAME # --------------------------------------------------- def clean_dataframe(df): try: # Replace NaN df = df.replace({np.nan: None}) for col in df.columns: try: # --------------------------------------------------- # HANDLE DATE COLUMNS # --------------------------------------------------- if 'date' in col.lower(): print(f"Cleaning Date Column : {col}") # Convert to datetime df[col] = pd.to_datetime( df[col], errors='coerce' ) # Remove invalid dates df[col] = df[col].where( (df[col].dt.year >= 1970) & (df[col].dt.year <= 2100) ) # Convert to Python Date df[col] = df[col].apply( lambda x: x.date() if pd.notnull(x) else None ) # --------------------------------------------------- # FORCE OBJECT COLUMNS TO STRING # --------------------------------------------------- else: cleaned_col = [] for val in df[col]: # NULL if pd.isnull(val): cleaned_col.append(None) # STRING elif isinstance(val, str): cleaned_col.append(val) # INTEGER elif isinstance( val, ( int, np.integer ) ): cleaned_col.append(int(val)) # FLOAT elif isinstance( val, ( float, np.floating ) ): cleaned_col.append(str(val)) # BOOLEAN elif isinstance(val, bool): cleaned_col.append(str(val)) # DATETIME elif isinstance( val, ( datetime, pd.Timestamp ) ): cleaned_col.append(str(val)) # OTHER else: cleaned_col.append(str(val)) df[col] = cleaned_col except Exception as col_error: print("\n===================================") print(f"COLUMN FAILED : {col}") print(str(col_error)) print("===================================") return df except Exception as clean_error: print("\n===================================") print("DATA CLEAN FAILED") print(str(clean_error)) print("===================================") return df try: # --------------------------------------------------- # CONNECT SQL SERVER # --------------------------------------------------- sql_conn = pyodbc.connect(SQL_CONN_STR) print("Connected to SQL Server") # --------------------------------------------------- # CONNECT CLICKHOUSE # --------------------------------------------------- ch_client = clickhouse_connect.get_client(**CH_CONFIG) print("Connected to ClickHouse") # --------------------------------------------------- # QUERY # --------------------------------------------------- query = f""" SELECT * FROM dbo.[{TABLE_NAME}] WHERE Project_Id = {PROJECT_ID} """ print("\nExecuting Query:") print(query) # --------------------------------------------------- # CHUNK SIZE # --------------------------------------------------- chunk_size = 100000 total_rows = 0 # --------------------------------------------------- # READ DATA # --------------------------------------------------- for chunk in pd.read_sql( query, sql_conn, chunksize=chunk_size ): try: print("\n===================================") print(f"Processing {len(chunk)} Rows") print("===================================") # --------------------------------------------------- # CLEAN DATA # --------------------------------------------------- chunk = clean_dataframe(chunk) # --------------------------------------------------- # FINAL DEBUG # --------------------------------------------------- print("\nFINAL COLUMN TYPES") for col in chunk.columns: sample = chunk[col].dropna() if len(sample) > 0: print( col, type(sample.iloc[0]), sample.iloc[0] ) # --------------------------------------------------- # SAMPLE DATA # --------------------------------------------------- print("\nSAMPLE DATA") print(chunk.head(2)) # --------------------------------------------------- # INSERT INTO CLICKHOUSE # --------------------------------------------------- print("\nInserting into ClickHouse...") ch_client.insert_df( table=TABLE_NAME, df=chunk, database=CH_CONFIG['database'] ) total_rows += len(chunk) print(f"\nInserted Total Rows : {total_rows}") except Exception as chunk_error: print("\n===================================") print("CHUNK INSERT FAILED") print("===================================") print(str(chunk_error)) traceback.print_exc() # --------------------------------------------------- # SAVE ERROR LOG # --------------------------------------------------- with open( "clickhouse_chunk_error.log", "a", encoding="utf-8" ) as log: log.write( "\n\n=====================================" ) log.write( f"\nTIME : {datetime.now()}" ) log.write( f"\nTABLE : {TABLE_NAME}" ) log.write( f"\nERROR : {str(chunk_error)}" ) log.write( f"\nTRACEBACK :\n{traceback.format_exc()}" ) log.write( "\n=====================================" ) continue print("\n===================================") print("ETL COMPLETED SUCCESSFULLY") print(f"TOTAL ROWS INSERTED : {total_rows}") print("===================================") except Exception as main_error: print("\n===================================") print("MAIN ERROR") print("===================================") print(str(main_error)) traceback.print_exc() with open( "clickhouse_main_error.log", "a", encoding="utf-8" ) as log: log.write( "\n\n=====================================" ) log.write( f"\nTIME : {datetime.now()}" ) log.write( f"\nERROR : {str(main_error)}" ) log.write( f"\nTRACEBACK :\n{traceback.format_exc()}" ) log.write( "\n=====================================" ) finally: # --------------------------------------------------- # CLOSE SQL SERVER # --------------------------------------------------- try: sql_conn.close() print("\nSQL Server Connection Closed") except: pass # --------------------------------------------------- # CLOSE CLICKHOUSE # --------------------------------------------------- try: ch_client.close() print("ClickHouse Connection Closed") except: pass print("\nETL Finished :", datetime.now())