From 379ead3d8eb1b6743a1cb5e78765f35d3cbc2563 Mon Sep 17 00:00:00 2001 From: Dipanshu Kumar Date: Tue, 19 May 2026 13:29:43 +0530 Subject: [PATCH] Master_VisibilityReason Import --- Master_VisibilityReason_Import.py | 462 ++++++++++++++++++++++++++++++ 1 file changed, 462 insertions(+) create mode 100644 Master_VisibilityReason_Import.py diff --git a/Master_VisibilityReason_Import.py b/Master_VisibilityReason_Import.py new file mode 100644 index 0000000..3e82dc4 --- /dev/null +++ b/Master_VisibilityReason_Import.py @@ -0,0 +1,462 @@ +import pyodbc +import pandas as pd +import clickhouse_connect +import numpy as np +from datetime import datetime +import traceback +import warnings + +# ========================================================= +# IGNORE WARNINGS +# ========================================================= +warnings.filterwarnings( + 'ignore', + 'pandas only supports SQLAlchemy connectable' +) + +print("ETL Started :", datetime.now()) + +# ========================================================= +# SQL SERVER CONNECTION +# ========================================================= +SQL_CONN_STR = ( + 'DRIVER={ODBC Driver 17 for SQL Server};' + 'SERVER=10.200.25.65;' + 'DATABASE=CPMIndiaBusinessInsight;' + 'UID=bsgteam_test;' + 'PWD=B$gt3@m#00512;' + 'TrustServerCertificate=yes;' +) + +# ========================================================= +# CLICKHOUSE CONFIG +# ========================================================= +CH_CONFIG = { + 'host': '172.188.12.194', + 'port': 8123, + 'username': 'default', + 'password': 'dipanshu_k', + 'database': 'DaburIndia_BI' +} + +# ========================================================= +# TABLE NAME +# ========================================================= +TABLE_NAME = 'Master_VisibilityReason' +PROJECTId = 41654 + +# ========================================================= +# CLEAN DATAFRAME +# ========================================================= +def clean_dataframe(df): + + try: + + # --------------------------------------------- + # Replace NaN with None + # --------------------------------------------- + df = df.replace({np.nan: None}) + + # --------------------------------------------- + # Process Column Wise + # --------------------------------------------- + for col in df.columns: + + try: + + print(f"\nCleaning Column : {col}") + + # ===================================== + # DATETIME COLUMNS + # ===================================== + if pd.api.types.is_datetime64_any_dtype(df[col]): + + print(f"Datetime Column Detected : {col}") + + df[col] = pd.to_datetime( + df[col], + errors='coerce' + ) + + # Remove invalid dates + df[col] = df[col].where( + (df[col].dt.year >= 1970) & + (df[col].dt.year <= 2100) + ) + + # IMPORTANT FIX + # Convert to Python datetime + df[col] = df[col].apply( + lambda x: + x.to_pydatetime() + if pd.notnull(x) + else None + ) + + # ===================================== + # INTEGER COLUMNS + # ===================================== + elif pd.api.types.is_integer_dtype(df[col]): + + print(f"Integer Column Detected : {col}") + + df[col] = pd.to_numeric( + df[col], + errors='coerce' + ) + + df[col] = df[col].apply( + lambda x: + int(x) + if pd.notnull(x) + else None + ) + + # ===================================== + # FLOAT COLUMNS + # ===================================== + elif pd.api.types.is_float_dtype(df[col]): + + print(f"Float Column Detected : {col}") + + non_null = df[col].dropna() + + # --------------------------------- + # Convert whole float to int + # Example: + # 3240.0 -> 3240 + # --------------------------------- + if len(non_null) > 0 and ( + (non_null % 1 == 0).all() + ): + + df[col] = df[col].apply( + lambda x: + int(x) + if pd.notnull(x) + else None + ) + + else: + + df[col] = df[col].apply( + lambda x: + float(x) + if pd.notnull(x) + else None + ) + + # ===================================== + # OBJECT / STRING COLUMNS + # ===================================== + else: + + print(f"String/Object Column Detected : {col}") + + cleaned = [] + + for val in df[col]: + + # NULL + if pd.isnull(val): + + cleaned.append(None) + + # INTEGER + elif isinstance( + val, + ( + int, + np.integer + ) + ): + + cleaned.append(int(val)) + + # FLOAT + elif isinstance( + val, + ( + float, + np.floating + ) + ): + + if np.isnan(val): + + cleaned.append(None) + + else: + + # Avoid 3240.0 issue + if val.is_integer(): + + cleaned.append(int(val)) + + else: + + cleaned.append(float(val)) + + # STRING + elif isinstance(val, str): + + cleaned.append(val.strip()) + + # BOOLEAN + elif isinstance(val, bool): + + cleaned.append(int(val)) + + # DATETIME + elif isinstance( + val, + ( + datetime, + pd.Timestamp + ) + ): + + cleaned.append( + val.to_pydatetime() + if isinstance(val, pd.Timestamp) + else val + ) + + # OTHER + else: + + cleaned.append(str(val)) + + df[col] = cleaned + + except Exception as col_error: + + print("\n================================") + print(f"COLUMN FAILED : {col}") + print(str(col_error)) + print("================================") + + return df + + except Exception as clean_error: + + print("\n================================") + print("DATA CLEAN FAILED") + print(str(clean_error)) + print("================================") + + return df + +# ========================================================= +# MAIN PROCESS +# ========================================================= +try: + + # ===================================================== + # CONNECT SQL SERVER + # ===================================================== + sql_conn = pyodbc.connect(SQL_CONN_STR) + + print("Connected to SQL Server") + + # ===================================================== + # CONNECT CLICKHOUSE + # ===================================================== + ch_client = clickhouse_connect.get_client(**CH_CONFIG) + + print("Connected to ClickHouse") + + # ===================================================== + # QUERY + # ===================================================== + query = f""" + SELECT * + FROM dbo.[{TABLE_NAME}] + WHERE PROJECTId = {PROJECTId} + """ + + print("\nExecuting Query:") + print(query) + + # ===================================================== + # CHUNK SIZE + # ===================================================== + chunk_size = 100000 + + total_rows = 0 + + # ===================================================== + # READ DATA + # ===================================================== + for chunk in pd.read_sql( + query, + sql_conn, + chunksize=chunk_size + ): + + try: + + print("\n================================") + print(f"Processing {len(chunk)} Rows") + print("================================") + + # ================================================= + # CLEAN DATA + # ================================================= + chunk = clean_dataframe(chunk) + + # ================================================= + # DEBUG COLUMN TYPES + # ================================================= + print("\nCOLUMN TYPES") + + for col in chunk.columns: + + sample = chunk[col].dropna() + + if len(sample) > 0: + + print( + col, + type(sample.iloc[0]), + sample.iloc[0] + ) + + # ================================================= + # SAMPLE DATA + # ================================================= + print("\nSAMPLE DATA") + print(chunk.head(2)) + + # ================================================= + # INSERT INTO CLICKHOUSE + # ================================================= + print("\nInserting into ClickHouse...") + + ch_client.insert_df( + table=TABLE_NAME, + df=chunk, + database=CH_CONFIG['database'] + ) + + total_rows += len(chunk) + + print( + f"\nInserted Total Rows : {total_rows}" + ) + + except Exception as chunk_error: + + print("\n================================") + print("CHUNK INSERT FAILED") + print("================================") + + print(str(chunk_error)) + + traceback.print_exc() + + # ============================================= + # SAVE ERROR LOG + # ============================================= + with open( + "clickhouse_chunk_error.log", + "a", + encoding="utf-8" + ) as log: + + log.write( + "\n\n================================" + ) + + log.write( + f"\nTIME : {datetime.now()}" + ) + + log.write( + f"\nTABLE : {TABLE_NAME}" + ) + + log.write( + f"\nERROR : {str(chunk_error)}" + ) + + log.write( + f"\nTRACEBACK :\n" + f"{traceback.format_exc()}" + ) + + log.write( + "\n================================" + ) + + continue + + print("\n================================") + print("ETL COMPLETED SUCCESSFULLY") + print(f"TOTAL ROWS INSERTED : {total_rows}") + print("================================") + +# ========================================================= +# MAIN ERROR +# ========================================================= +except Exception as main_error: + + print("\n================================") + print("MAIN ERROR") + print("================================") + + print(str(main_error)) + + traceback.print_exc() + + with open( + "clickhouse_main_error.log", + "a", + encoding="utf-8" + ) as log: + + log.write( + "\n\n================================" + ) + + log.write( + f"\nTIME : {datetime.now()}" + ) + + log.write( + f"\nERROR : {str(main_error)}" + ) + + log.write( + f"\nTRACEBACK :\n" + f"{traceback.format_exc()}" + ) + + log.write( + "\n================================" + ) + +# ========================================================= +# CLOSE CONNECTIONS +# ========================================================= +finally: + + try: + + sql_conn.close() + + print("\nSQL Server Connection Closed") + + except: + pass + + try: + + ch_client.close() + + print("ClickHouse Connection Closed") + + except: + pass + +print("\nETL Finished :", datetime.now()) \ No newline at end of file