import pyodbc import pandas as pd import clickhouse_connect import numpy as np from datetime import datetime import traceback import warnings # ========================================================= # IGNORE WARNINGS # ========================================================= warnings.filterwarnings( 'ignore', 'pandas only supports SQLAlchemy connectable' ) print("\n====================================") print("ETL Started :", datetime.now()) print("====================================") # ========================================================= # SQL SERVER CONNECTION # ========================================================= SQL_CONN_STR = ( 'DRIVER={ODBC Driver 17 for SQL Server};' 'SERVER=10.200.25.65;' 'DATABASE=CPMIndiaBusinessInsight;' 'UID=bsgteam_test;' 'PWD=B$gt3@m#00512;' 'TrustServerCertificate=yes;' ) # ========================================================= # CLICKHOUSE CONFIG # ========================================================= CH_CONFIG = { 'host': '172.188.12.194', 'port': 8123, 'username': 'default', 'password': 'dipanshu_k', 'database': 'DaburIndia_BI' } # ========================================================= # TABLE DETAILS # ========================================================= TABLE_NAME = 'SKU Master' PROJECT_ID = 41654 # ========================================================= # SETTINGS # ========================================================= TRUNCATE_BEFORE_LOAD = True table_truncated = False # ========================================================= # CLICKHOUSE DATE COLUMNS # ========================================================= DATE_COLUMNS = [ 'visit_date' ] # ========================================================= # CLICKHOUSE DATETIME COLUMNS # ========================================================= DATETIME_COLUMNS = [ 'create_date', 'update_date' ] # ========================================================= # CLEAN DATAFRAME # ========================================================= def clean_dataframe(df): try: # --------------------------------------------- # Replace NaN with None # --------------------------------------------- df = df.replace({np.nan: None}) # --------------------------------------------- # Process Each Column # --------------------------------------------- for col in df.columns: try: print(f"\nCleaning Column : {col}") # ===================================== # DATE COLUMNS # ===================================== if col.lower() in [ x.lower() for x in DATE_COLUMNS ]: print(f"Date Column : {col}") df[col] = pd.to_datetime( df[col], errors='coerce' ) cleaned_dates = [] for val in df[col]: if pd.isnull(val): cleaned_dates.append(None) else: cleaned_dates.append( val.date() ) df[col] = cleaned_dates # ===================================== # DATETIME COLUMNS # ===================================== elif col.lower() in [ x.lower() for x in DATETIME_COLUMNS ]: print(f"DateTime Column : {col}") df[col] = pd.to_datetime( df[col], errors='coerce' ) cleaned_datetime = [] for val in df[col]: if pd.isnull(val): cleaned_datetime.append(None) else: cleaned_datetime.append( val.to_pydatetime() ) df[col] = cleaned_datetime # ===================================== # INTEGER COLUMNS # ===================================== elif pd.api.types.is_integer_dtype(df[col]): print(f"Integer Column : {col}") df[col] = pd.to_numeric( df[col], errors='coerce' ) df[col] = df[col].apply( lambda x: int(x) if pd.notnull(x) else None ) # ===================================== # FLOAT COLUMNS # ===================================== elif pd.api.types.is_float_dtype(df[col]): print(f"Float Column : {col}") df[col] = pd.to_numeric( df[col], errors='coerce' ) non_null = df[col].dropna() # --------------------------------- # Convert whole float to int # Example: # 12.0 -> 12 # --------------------------------- if len(non_null) > 0 and ( (non_null % 1 == 0).all() ): df[col] = df[col].apply( lambda x: int(x) if pd.notnull(x) else None ) else: df[col] = df[col].apply( lambda x: float(x) if pd.notnull(x) else None ) # ===================================== # OBJECT / STRING COLUMNS # ===================================== else: print(f"String/Object Column : {col}") cleaned = [] for val in df[col]: try: # ------------------------- # NULL # ------------------------- if pd.isnull(val): cleaned.append(None) # ------------------------- # STRING # ------------------------- elif isinstance(val, str): cleaned.append( val.strip() ) # ------------------------- # BOOLEAN # ------------------------- elif isinstance(val, bool): cleaned.append( int(val) ) # ------------------------- # INTEGER # ------------------------- elif isinstance( val, ( int, np.integer ) ): cleaned.append( int(val) ) # ------------------------- # FLOAT # ------------------------- elif isinstance( val, ( float, np.floating ) ): if np.isnan(val): cleaned.append(None) else: if val.is_integer(): cleaned.append( int(val) ) else: cleaned.append( float(val) ) # ------------------------- # DATETIME # ------------------------- elif isinstance( val, ( datetime, pd.Timestamp ) ): if isinstance( val, pd.Timestamp ): cleaned.append( val.to_pydatetime() ) else: cleaned.append(val) # ------------------------- # DATE # ------------------------- elif hasattr(val, 'year'): cleaned.append(val) # ------------------------- # OTHER # ------------------------- else: cleaned.append( str(val) ) except Exception as row_error: print( f"Row Cleaning Error " f"in Column {col}" ) print(str(row_error)) cleaned.append(None) df[col] = cleaned except Exception as col_error: print("\n================================") print(f"COLUMN FAILED : {col}") print(str(col_error)) print("================================") return df except Exception as clean_error: print("\n================================") print("DATA CLEAN FAILED") print(str(clean_error)) print(traceback.format_exc()) print("================================") return df # ========================================================= # MAIN PROCESS # ========================================================= try: # ===================================================== # CONNECT SQL SERVER # ===================================================== print("\nConnecting SQL Server...") sql_conn = pyodbc.connect(SQL_CONN_STR) print("Connected to SQL Server") # ===================================================== # CONNECT CLICKHOUSE # ===================================================== print("\nConnecting ClickHouse...") ch_client = clickhouse_connect.get_client(**CH_CONFIG) print("Connected to ClickHouse") # ===================================================== # QUERY # ===================================================== query = f""" SELECT * FROM dbo.[{TABLE_NAME}] WHERE Project_Id = {PROJECT_ID} """ print("\n====================================") print("Executing Query") print("====================================") print(query) # ===================================================== # CHUNK SIZE # ===================================================== chunk_size = 100000 total_rows = 0 # ===================================================== # READ DATA # ===================================================== for chunk in pd.read_sql( query, sql_conn, chunksize=chunk_size ): try: print("\n====================================") print(f"Processing Rows : {len(chunk)}") print("====================================") # ================================================= # CLEAN DATA # ================================================= chunk = clean_dataframe(chunk) # ================================================= # DEBUG COLUMN TYPES # ================================================= print("\nCOLUMN TYPES") for col in chunk.columns: try: sample = chunk[col].dropna() if len(sample) > 0: print( col, type(sample.iloc[0]), sample.iloc[0] ) except: pass # ================================================= # TRUNCATE TABLE # ================================================= if TRUNCATE_BEFORE_LOAD and not table_truncated: try: print("\n====================================") print(f"TRUNCATING : {TABLE_NAME}") print("====================================") truncate_query = f""" TRUNCATE TABLE `{CH_CONFIG['database']}`.`{TABLE_NAME}` """ print(truncate_query) ch_client.command( truncate_query ) print( "TABLE TRUNCATED SUCCESSFULLY" ) table_truncated = True except Exception as truncate_error: print("\nTRUNCATE FAILED") print(str(truncate_error)) raise # ================================================= # INSERT DATA # ================================================= try: print("\n====================================") print("INSERTING DATA INTO CLICKHOUSE") print("====================================") ch_client.insert_df( table=f"`{TABLE_NAME}`", df=chunk, database=CH_CONFIG['database'] ) total_rows += len(chunk) print( f"\nTOTAL INSERTED : " f"{total_rows}" ) except Exception as insert_error: print("\nINSERT FAILED") print(str(insert_error)) traceback.print_exc() # ============================================= # SAVE ERROR LOG # ============================================= with open( "clickhouse_insert_error.log", "a", encoding="utf-8" ) as log: log.write( "\n\n================================" ) log.write( f"\nTIME : {datetime.now()}" ) log.write( f"\nTABLE : {TABLE_NAME}" ) log.write( f"\nERROR : {str(insert_error)}" ) log.write( f"\nTRACEBACK :\n" f"{traceback.format_exc()}" ) log.write( "\n================================" ) continue except Exception as chunk_error: print("\n====================================") print("CHUNK PROCESS FAILED") print("====================================") print(str(chunk_error)) traceback.print_exc() continue print("\n====================================") print("ETL COMPLETED SUCCESSFULLY") print(f"TOTAL ROWS INSERTED : {total_rows}") print("====================================") # ========================================================= # MAIN ERROR # ========================================================= except Exception as main_error: print("\n====================================") print("MAIN ERROR") print("====================================") print(str(main_error)) traceback.print_exc() with open( "clickhouse_main_error.log", "a", encoding="utf-8" ) as log: log.write( "\n\n================================" ) log.write( f"\nTIME : {datetime.now()}" ) log.write( f"\nERROR : {str(main_error)}" ) log.write( f"\nTRACEBACK :\n" f"{traceback.format_exc()}" ) log.write( "\n================================" ) # ========================================================= # CLOSE CONNECTIONS # ========================================================= finally: try: sql_conn.close() print("\nSQL Server Connection Closed") except: pass try: ch_client.close() print("ClickHouse Connection Closed") except: pass print("\n====================================") print("ETL Finished :", datetime.now()) print("====================================")