import pyodbc import pandas as pd import clickhouse_connect import numpy as np from datetime import datetime import traceback import warnings # ========================================================= # IGNORE WARNINGS # ========================================================= warnings.filterwarnings( 'ignore', 'pandas only supports SQLAlchemy connectable' ) print("ETL Started :", datetime.now()) # ========================================================= # SQL SERVER CONNECTION # ========================================================= SQL_CONN_STR = ( 'DRIVER={ODBC Driver 17 for SQL Server};' 'SERVER=10.200.25.65;' 'DATABASE=CPMIndiaBusinessInsight;' 'UID=bsgteam_test;' 'PWD=B$gt3@m#00512;' 'TrustServerCertificate=yes;' ) # ========================================================= # CLICKHOUSE CONFIG # ========================================================= CH_CONFIG = { 'host': '172.188.12.194', 'port': 8123, 'username': 'default', 'password': 'dipanshu_k', 'database': 'DaburIndia_BI' } # ========================================================= # TABLE NAME # ========================================================= TABLE_NAME = 'Contact Conversion' PROJECT_ID = 41654 # ========================================================= # CLEAN DATAFRAME # ========================================================= def clean_dataframe(df): try: # --------------------------------------------- # Replace NaN # --------------------------------------------- df = df.replace({np.nan: None}) # --------------------------------------------- # Process Column Wise # --------------------------------------------- for col in df.columns: try: col_lower = col.lower() # ===================================== # DATE COLUMNS # ===================================== if 'date' in col_lower: print(f"Cleaning Date Column : {col}") df[col] = pd.to_datetime( df[col], errors='coerce' ) # Remove invalid dates df[col] = df[col].where( (df[col].dt.year >= 1970) & (df[col].dt.year <= 2100) ) # Convert to Python Date df[col] = df[col].apply( lambda x: x.date() if pd.notnull(x) else None ) # ===================================== # INTEGER COLUMNS # ===================================== elif pd.api.types.is_integer_dtype(df[col]): df[col] = pd.to_numeric( df[col], errors='coerce' ) df[col] = df[col].apply( lambda x: int(x) if pd.notnull(x) else None ) # ===================================== # FLOAT COLUMNS # ===================================== elif pd.api.types.is_float_dtype(df[col]): non_null = df[col].dropna() # --------------------------------- # Convert whole float to int # Example: # 3240.0 -> 3240 # --------------------------------- if len(non_null) > 0 and ( (non_null % 1 == 0).all() ): df[col] = df[col].apply( lambda x: int(x) if pd.notnull(x) else None ) else: df[col] = df[col].apply( lambda x: float(x) if pd.notnull(x) else None ) # ===================================== # OBJECT / STRING COLUMNS # ===================================== else: cleaned = [] for val in df[col]: # NULL if pd.isnull(val): cleaned.append(None) # INTEGER elif isinstance( val, ( int, np.integer ) ): cleaned.append(int(val)) # FLOAT elif isinstance( val, ( float, np.floating ) ): if np.isnan(val): cleaned.append(None) else: # IMPORTANT FIX # Avoid '3240.0' string issue if val.is_integer(): cleaned.append(int(val)) else: cleaned.append(float(val)) # STRING elif isinstance(val, str): cleaned.append(val.strip()) # BOOLEAN elif isinstance(val, bool): cleaned.append(int(val)) # DATETIME elif isinstance( val, ( datetime, pd.Timestamp ) ): cleaned.append(str(val)) # OTHER else: cleaned.append(str(val)) df[col] = cleaned except Exception as col_error: print("\n================================") print(f"COLUMN FAILED : {col}") print(str(col_error)) print("================================") return df except Exception as clean_error: print("\n================================") print("DATA CLEAN FAILED") print(str(clean_error)) print("================================") return df # ========================================================= # MAIN PROCESS # ========================================================= try: # ===================================================== # CONNECT SQL SERVER # ===================================================== sql_conn = pyodbc.connect(SQL_CONN_STR) print("Connected to SQL Server") # ===================================================== # CONNECT CLICKHOUSE # ===================================================== ch_client = clickhouse_connect.get_client(**CH_CONFIG) print("Connected to ClickHouse") # ===================================================== # QUERY # ===================================================== query = f""" SELECT * FROM dbo.[{TABLE_NAME}] WHERE Project_Id = {PROJECT_ID} """ print("\nExecuting Query:") print(query) # ===================================================== # CHUNK SIZE # ===================================================== chunk_size = 100000 total_rows = 0 # ===================================================== # READ DATA # ===================================================== for chunk in pd.read_sql( query, sql_conn, chunksize=chunk_size ): try: print("\n================================") print(f"Processing {len(chunk)} Rows") print("================================") # ================================================= # CLEAN DATA # ================================================= chunk = clean_dataframe(chunk) # ================================================= # DEBUG COLUMN TYPES # ================================================= print("\nCOLUMN TYPES") for col in chunk.columns: sample = chunk[col].dropna() if len(sample) > 0: print( col, type(sample.iloc[0]), sample.iloc[0] ) # ================================================= # SAMPLE DATA # ================================================= print("\nSAMPLE DATA") print(chunk.head(2)) # ================================================= # INSERT INTO CLICKHOUSE # ================================================= print("\nInserting into ClickHouse...") ch_client.insert_df( table=TABLE_NAME, df=chunk, database=CH_CONFIG['database'] ) total_rows += len(chunk) print( f"\nInserted Total Rows : {total_rows}" ) except Exception as chunk_error: print("\n================================") print("CHUNK INSERT FAILED") print("================================") print(str(chunk_error)) traceback.print_exc() # ============================================= # SAVE ERROR LOG # ============================================= with open( "clickhouse_chunk_error.log", "a", encoding="utf-8" ) as log: log.write( "\n\n================================" ) log.write( f"\nTIME : {datetime.now()}" ) log.write( f"\nTABLE : {TABLE_NAME}" ) log.write( f"\nERROR : {str(chunk_error)}" ) log.write( f"\nTRACEBACK :\n" f"{traceback.format_exc()}" ) log.write( "\n================================" ) continue print("\n================================") print("ETL COMPLETED SUCCESSFULLY") print(f"TOTAL ROWS INSERTED : {total_rows}") print("================================") # ========================================================= # MAIN ERROR # ========================================================= except Exception as main_error: print("\n================================") print("MAIN ERROR") print("================================") print(str(main_error)) traceback.print_exc() with open( "clickhouse_main_error.log", "a", encoding="utf-8" ) as log: log.write( "\n\n================================" ) log.write( f"\nTIME : {datetime.now()}" ) log.write( f"\nERROR : {str(main_error)}" ) log.write( f"\nTRACEBACK :\n" f"{traceback.format_exc()}" ) log.write( "\n================================" ) # ========================================================= # CLOSE CONNECTIONS # ========================================================= finally: try: sql_conn.close() print("\nSQL Server Connection Closed") except: pass try: ch_client.close() print("ClickHouse Connection Closed") except: pass print("\nETL Finished :", datetime.now())