diff --git a/PaidVisibility_Compliance Import.py b/PaidVisibility_Compliance Import.py index 38b55f6..216f4ac 100644 --- a/PaidVisibility_Compliance Import.py +++ b/PaidVisibility_Compliance Import.py @@ -46,7 +46,7 @@ CH_CONFIG = { # ========================================================= # TABLE DETAILS # ========================================================= -TABLE_NAME = 'Sales' +TABLE_NAME = 'PaidVisibility_Compliance' PROJECT_ID = 41654 # ========================================================= @@ -58,7 +58,7 @@ table_truncated = False # ========================================================= # CHUNK SIZE # ========================================================= -chunk_size = 20000 +chunk_size = 25000 # ========================================================= # CONNECT SQL SERVER @@ -602,3 +602,4 @@ finally: print("\n================================================") print("ETL FINISHED :", datetime.now()) print("================================================") + diff --git a/SOS_OneApp_Import.py b/SOS_OneApp_Import.py index 6d522bd..a304f3c 100644 --- a/SOS_OneApp_Import.py +++ b/SOS_OneApp_Import.py @@ -602,3 +602,4 @@ finally: print("\n================================================") print("ETL FINISHED :", datetime.now()) print("================================================") + diff --git a/Stock_Details_Import.py b/Stock_Details_Import.py new file mode 100644 index 0000000..c3bb24f --- /dev/null +++ b/Stock_Details_Import.py @@ -0,0 +1,605 @@ +import pyodbc +import pandas as pd +import clickhouse_connect +import numpy as np +from datetime import datetime +import traceback +import warnings +import time + +# ========================================================= +# IGNORE WARNINGS +# ========================================================= +warnings.filterwarnings( + 'ignore', + 'pandas only supports SQLAlchemy connectable' +) + +print("\n================================================") +print("ETL STARTED :", datetime.now()) +print("================================================") + +# ========================================================= +# SQL SERVER CONNECTION STRING +# ========================================================= +SQL_CONN_STR = ( + 'DRIVER={ODBC Driver 17 for SQL Server};' + 'SERVER=10.200.25.65;' + 'DATABASE=CPMIndiaBusinessInsight;' + 'UID=bsgteam_test;' + 'PWD=B$gt3@m#00512;' + 'TrustServerCertificate=yes;' + 'Connection Timeout=60;' +) + +# ========================================================= +# CLICKHOUSE CONFIG +# ========================================================= +CH_CONFIG = { + 'host': '172.188.12.194', + 'port': 8123, + 'username': 'default', + 'password': 'dipanshu_k', + 'database': 'DaburIndia_BI' +} + +# ========================================================= +# TABLE DETAILS +# ========================================================= +TABLE_NAME = 'Stock_Details' +PROJECT_ID = 41654 + +# ========================================================= +# SETTINGS +# ========================================================= +TRUNCATE_BEFORE_LOAD = True +table_truncated = False + +# ========================================================= +# CHUNK SIZE +# ========================================================= +chunk_size = 25000 + +# ========================================================= +# CONNECT SQL SERVER +# ========================================================= +def connect_sql(): + + try: + + conn = pyodbc.connect( + SQL_CONN_STR, + autocommit=True + ) + + print("Connected SQL Server") + + return conn + + except Exception as e: + + print("SQL CONNECTION FAILED") + print(str(e)) + + raise + +# ========================================================= +# CONNECT CLICKHOUSE +# ========================================================= +def connect_clickhouse(): + + try: + + client = clickhouse_connect.get_client( + **CH_CONFIG + ) + + print("Connected ClickHouse") + + return client + + except Exception as e: + + print("CLICKHOUSE CONNECTION FAILED") + print(str(e)) + + raise + +# ========================================================= +# CLEAN DATAFRAME +# ========================================================= +def clean_dataframe(df): + + try: + + # --------------------------------------------- + # Replace NaN + # --------------------------------------------- + df = df.replace({np.nan: None}) + + # --------------------------------------------- + # Process Column Wise + # --------------------------------------------- + for col in df.columns: + + try: + + print(f"\nCleaning Column : {col}") + + # ===================================== + # AUTO DETECT DATE / TIME COLUMNS + # ===================================== + if ( + 'date' in col.lower() + or 'time' in col.lower() + ): + + print(f"Date Column : {col}") + + df[col] = pd.to_datetime( + df[col], + errors='coerce' + ) + + cleaned_dates = [] + + for val in df[col]: + + if pd.isnull(val): + + cleaned_dates.append(None) + + else: + + cleaned_dates.append( + val.to_pydatetime() + ) + + df[col] = cleaned_dates + + # ===================================== + # INTEGER COLUMNS + # ===================================== + elif pd.api.types.is_integer_dtype(df[col]): + + print(f"Integer Column : {col}") + + df[col] = pd.to_numeric( + df[col], + errors='coerce' + ) + + df[col] = df[col].apply( + lambda x: + int(x) + if pd.notnull(x) + else None + ) + + # ===================================== + # FLOAT COLUMNS + # ===================================== + elif pd.api.types.is_float_dtype(df[col]): + + print(f"Float Column : {col}") + + df[col] = pd.to_numeric( + df[col], + errors='coerce' + ) + + df[col] = df[col].apply( + lambda x: + float(x) + if pd.notnull(x) + else None + ) + + # ===================================== + # OBJECT / STRING COLUMNS + # ===================================== + else: + + print(f"String/Object Column : {col}") + + cleaned = [] + + for val in df[col]: + + # NULL + if pd.isnull(val): + + cleaned.append(None) + + # DATETIME + elif isinstance( + val, + ( + datetime, + pd.Timestamp + ) + ): + + if isinstance( + val, + pd.Timestamp + ): + + cleaned.append( + val.to_pydatetime() + ) + + else: + + cleaned.append(val) + + # INTEGER + elif isinstance( + val, + ( + int, + np.integer + ) + ): + + cleaned.append( + int(val) + ) + + # FLOAT + elif isinstance( + val, + ( + float, + np.floating + ) + ): + + if np.isnan(val): + + cleaned.append(None) + + else: + + cleaned.append( + float(val) + ) + + # BOOLEAN + elif isinstance( + val, + bool + ): + + cleaned.append( + int(val) + ) + + # STRING + else: + + cleaned.append( + str(val).strip() + ) + + df[col] = cleaned + + except Exception as col_error: + + print("\n================================") + print(f"COLUMN FAILED : {col}") + print(str(col_error)) + print("================================") + + return df + + except Exception as clean_error: + + print("\n================================") + print("DATA CLEAN FAILED") + print(str(clean_error)) + print("================================") + + return df + +# ========================================================= +# MAIN PROCESS +# ========================================================= +try: + + # ===================================================== + # CONNECT SQL SERVER + # ===================================================== + sql_conn = connect_sql() + + # ===================================================== + # CONNECT CLICKHOUSE + # ===================================================== + ch_client = connect_clickhouse() + + # ===================================================== + # QUERY + # ===================================================== + query = f""" + SELECT * + FROM dbo.[{TABLE_NAME}] + WHERE Project_Id = {PROJECT_ID} + """ + + print("\nExecuting Query") + print(query) + + # ===================================================== + # RETRY SETTINGS + # ===================================================== + retry_count = 0 + max_retry = 5 + + # ===================================================== + # MAIN RETRY LOOP + # ===================================================== + while retry_count < max_retry: + + try: + + # ============================================= + # READ SQL DATA + # ============================================= + for chunk in pd.read_sql( + query, + sql_conn, + chunksize=chunk_size + ): + + try: + + print("\n================================") + print( + f"Processing Rows : " + f"{len(chunk)}" + ) + print("================================") + + # ===================================== + # CLEAN DATA + # ===================================== + chunk = clean_dataframe(chunk) + + # ===================================== + # DEBUG COLUMN TYPES + # ===================================== + print("\nCOLUMN DATATYPES") + print(chunk.dtypes) + + print("\nCOLUMN SAMPLE TYPES") + + for col in chunk.columns: + + sample = chunk[col].dropna() + + if len(sample) > 0: + + print( + col, + type(sample.iloc[0]), + sample.iloc[0] + ) + + # ===================================== + # TRUNCATE TABLE FIRST TIME ONLY + # ===================================== + if ( + TRUNCATE_BEFORE_LOAD + and + not table_truncated + ): + + print("\n================================") + print( + f"TRUNCATING TABLE : " + f"{TABLE_NAME}" + ) + print("================================") + + # IMPORTANT FIX + truncate_query = f""" + TRUNCATE TABLE + `{CH_CONFIG['database']}`.`{TABLE_NAME}` + """ + + ch_client.command( + truncate_query + ) + + print( + "TABLE TRUNCATED SUCCESSFULLY" + ) + + table_truncated = True + + # ===================================== + # INSERT INTO CLICKHOUSE + # ===================================== + print( + "\nINSERTING INTO CLICKHOUSE..." + ) + + ch_client.insert_df( + table=TABLE_NAME, + df=chunk, + database=CH_CONFIG['database'] + ) + + print( + f"INSERTED : " + f"{len(chunk)} ROWS" + ) + + except Exception as insert_error: + + print("\n================================") + print("INSERT FAILED") + print("================================") + + print(str(insert_error)) + + traceback.print_exc() + + # ================================= + # SAVE ERROR LOG + # ================================= + with open( + "insert_error.log", + "a", + encoding="utf-8" + ) as log: + + log.write( + "\n\n================================" + ) + + log.write( + f"\nTIME : " + f"{datetime.now()}" + ) + + log.write( + f"\nTABLE : " + f"{TABLE_NAME}" + ) + + log.write( + f"\nERROR : " + f"{str(insert_error)}" + ) + + log.write( + f"\nTRACEBACK :\n" + f"{traceback.format_exc()}" + ) + + log.write( + "\n================================" + ) + + continue + + # ============================================= + # SUCCESS + # ============================================= + break + + # ================================================= + # SQL CONNECTION FAILURE + # ================================================= + except pyodbc.OperationalError as op_error: + + retry_count += 1 + + print("\n================================") + print( + f"SQL CONNECTION LOST " + f"- RETRY {retry_count}" + ) + print("================================") + + print(str(op_error)) + + time.sleep(10) + + try: + + sql_conn.close() + + except: + pass + + # RECONNECT SQL + sql_conn = connect_sql() + + # ================================================= + # OTHER ERROR + # ================================================= + except Exception as loop_error: + + print("\n================================") + print("MAIN LOOP ERROR") + print("================================") + + print(str(loop_error)) + + traceback.print_exc() + + break + + print("\n================================") + print("ETL COMPLETED SUCCESSFULLY") + print("================================") + +# ========================================================= +# MAIN ERROR +# ========================================================= +except Exception as main_error: + + print("\n================================") + print("MAIN ERROR") + print("================================") + + print(str(main_error)) + + traceback.print_exc() + + with open( + "main_error.log", + "a", + encoding="utf-8" + ) as log: + + log.write( + "\n\n================================" + ) + + log.write( + f"\nTIME : {datetime.now()}" + ) + + log.write( + f"\nERROR : {str(main_error)}" + ) + + log.write( + f"\nTRACEBACK :\n" + f"{traceback.format_exc()}" + ) + + log.write( + "\n================================" + ) + +# ========================================================= +# CLOSE CONNECTIONS +# ========================================================= +finally: + + try: + + sql_conn.close() + + print("\nSQL SERVER CONNECTION CLOSED") + + except: + pass + + try: + + ch_client.close() + + print("CLICKHOUSE CONNECTION CLOSED") + + except: + pass + +print("\n================================================") +print("ETL FINISHED :", datetime.now()) +print("================================================") + diff --git a/main_error.log b/main_error.log new file mode 100644 index 0000000..3996de5 --- /dev/null +++ b/main_error.log @@ -0,0 +1,17 @@ + + +================================ +TIME : 2026-05-23 11:02:49.254453 +ERROR : ('08001', '[08001] [Microsoft][ODBC Driver 17 for SQL Server]TCP Provider: Timeout error [258]. (258) (SQLDriverConnect); [08001] [Microsoft][ODBC Driver 17 for SQL Server]Login timeout expired (0); [08001] [Microsoft][ODBC Driver 17 for SQL Server]Invalid connection string attribute (0); [08001] [Microsoft][ODBC Driver 17 for SQL Server]Unable to complete login process due to delay in login response (258)') +TRACEBACK : +Traceback (most recent call last): + File "d:\Python Code\PaidVisibility_Compliance Import.py", line 313, in + sql_conn = connect_sql() + File "d:\Python Code\PaidVisibility_Compliance Import.py", line 70, in connect_sql + conn = pyodbc.connect( + SQL_CONN_STR, + autocommit=True + ) +pyodbc.OperationalError: ('08001', '[08001] [Microsoft][ODBC Driver 17 for SQL Server]TCP Provider: Timeout error [258]. (258) (SQLDriverConnect); [08001] [Microsoft][ODBC Driver 17 for SQL Server]Login timeout expired (0); [08001] [Microsoft][ODBC Driver 17 for SQL Server]Invalid connection string attribute (0); [08001] [Microsoft][ODBC Driver 17 for SQL Server]Unable to complete login process due to delay in login response (258)') + +================================ \ No newline at end of file