diff --git a/Mapping_StoreVisibility_Import.py b/Mapping_StoreVisibility_Import.py new file mode 100644 index 0000000..bd160c7 --- /dev/null +++ b/Mapping_StoreVisibility_Import.py @@ -0,0 +1,604 @@ +import pyodbc +import pandas as pd +import clickhouse_connect +import numpy as np +from datetime import datetime +import traceback +import warnings +import time + +# ========================================================= +# IGNORE WARNINGS +# ========================================================= +warnings.filterwarnings( + 'ignore', + 'pandas only supports SQLAlchemy connectable' +) + +print("\n================================================") +print("ETL STARTED :", datetime.now()) +print("================================================") + +# ========================================================= +# SQL SERVER CONNECTION STRING +# ========================================================= +SQL_CONN_STR = ( + 'DRIVER={ODBC Driver 17 for SQL Server};' + 'SERVER=10.200.25.65;' + 'DATABASE=CPMIndiaBusinessInsight;' + 'UID=bsgteam_test;' + 'PWD=B$gt3@m#00512;' + 'TrustServerCertificate=yes;' + 'Connection Timeout=60;' +) + +# ========================================================= +# CLICKHOUSE CONFIG +# ========================================================= +CH_CONFIG = { + 'host': '172.188.12.194', + 'port': 8123, + 'username': 'default', + 'password': 'dipanshu_k', + 'database': 'DaburIndia_BI' +} + +# ========================================================= +# TABLE DETAILS +# ========================================================= +TABLE_NAME = 'Mapping_StoreVisibility' +PROJECT_ID = 41654 + +# ========================================================= +# SETTINGS +# ========================================================= +TRUNCATE_BEFORE_LOAD = True +table_truncated = False + +# ========================================================= +# CHUNK SIZE +# ========================================================= +chunk_size = 20000 + +# ========================================================= +# CONNECT SQL SERVER +# ========================================================= +def connect_sql(): + + try: + + conn = pyodbc.connect( + SQL_CONN_STR, + autocommit=True + ) + + print("Connected SQL Server") + + return conn + + except Exception as e: + + print("SQL CONNECTION FAILED") + print(str(e)) + + raise + +# ========================================================= +# CONNECT CLICKHOUSE +# ========================================================= +def connect_clickhouse(): + + try: + + client = clickhouse_connect.get_client( + **CH_CONFIG + ) + + print("Connected ClickHouse") + + return client + + except Exception as e: + + print("CLICKHOUSE CONNECTION FAILED") + print(str(e)) + + raise + +# ========================================================= +# CLEAN DATAFRAME +# ========================================================= +def clean_dataframe(df): + + try: + + # --------------------------------------------- + # Replace NaN + # --------------------------------------------- + df = df.replace({np.nan: None}) + + # --------------------------------------------- + # Process Column Wise + # --------------------------------------------- + for col in df.columns: + + try: + + print(f"\nCleaning Column : {col}") + + # ===================================== + # AUTO DETECT DATE / TIME COLUMNS + # ===================================== + if ( + 'date' in col.lower() + or 'time' in col.lower() + ): + + print(f"Date Column : {col}") + + df[col] = pd.to_datetime( + df[col], + errors='coerce' + ) + + cleaned_dates = [] + + for val in df[col]: + + if pd.isnull(val): + + cleaned_dates.append(None) + + else: + + cleaned_dates.append( + val.to_pydatetime() + ) + + df[col] = cleaned_dates + + # ===================================== + # INTEGER COLUMNS + # ===================================== + elif pd.api.types.is_integer_dtype(df[col]): + + print(f"Integer Column : {col}") + + df[col] = pd.to_numeric( + df[col], + errors='coerce' + ) + + df[col] = df[col].apply( + lambda x: + int(x) + if pd.notnull(x) + else None + ) + + # ===================================== + # FLOAT COLUMNS + # ===================================== + elif pd.api.types.is_float_dtype(df[col]): + + print(f"Float Column : {col}") + + df[col] = pd.to_numeric( + df[col], + errors='coerce' + ) + + df[col] = df[col].apply( + lambda x: + float(x) + if pd.notnull(x) + else None + ) + + # ===================================== + # OBJECT / STRING COLUMNS + # ===================================== + else: + + print(f"String/Object Column : {col}") + + cleaned = [] + + for val in df[col]: + + # NULL + if pd.isnull(val): + + cleaned.append(None) + + # DATETIME + elif isinstance( + val, + ( + datetime, + pd.Timestamp + ) + ): + + if isinstance( + val, + pd.Timestamp + ): + + cleaned.append( + val.to_pydatetime() + ) + + else: + + cleaned.append(val) + + # INTEGER + elif isinstance( + val, + ( + int, + np.integer + ) + ): + + cleaned.append( + int(val) + ) + + # FLOAT + elif isinstance( + val, + ( + float, + np.floating + ) + ): + + if np.isnan(val): + + cleaned.append(None) + + else: + + cleaned.append( + float(val) + ) + + # BOOLEAN + elif isinstance( + val, + bool + ): + + cleaned.append( + int(val) + ) + + # STRING + else: + + cleaned.append( + str(val).strip() + ) + + df[col] = cleaned + + except Exception as col_error: + + print("\n================================") + print(f"COLUMN FAILED : {col}") + print(str(col_error)) + print("================================") + + return df + + except Exception as clean_error: + + print("\n================================") + print("DATA CLEAN FAILED") + print(str(clean_error)) + print("================================") + + return df + +# ========================================================= +# MAIN PROCESS +# ========================================================= +try: + + # ===================================================== + # CONNECT SQL SERVER + # ===================================================== + sql_conn = connect_sql() + + # ===================================================== + # CONNECT CLICKHOUSE + # ===================================================== + ch_client = connect_clickhouse() + + # ===================================================== + # QUERY + # ===================================================== + query = f""" + SELECT * + FROM dbo.[{TABLE_NAME}] + WHERE Project_Id = {PROJECT_ID} + """ + + print("\nExecuting Query") + print(query) + + # ===================================================== + # RETRY SETTINGS + # ===================================================== + retry_count = 0 + max_retry = 5 + + # ===================================================== + # MAIN RETRY LOOP + # ===================================================== + while retry_count < max_retry: + + try: + + # ============================================= + # READ SQL DATA + # ============================================= + for chunk in pd.read_sql( + query, + sql_conn, + chunksize=chunk_size + ): + + try: + + print("\n================================") + print( + f"Processing Rows : " + f"{len(chunk)}" + ) + print("================================") + + # ===================================== + # CLEAN DATA + # ===================================== + chunk = clean_dataframe(chunk) + + # ===================================== + # DEBUG COLUMN TYPES + # ===================================== + print("\nCOLUMN DATATYPES") + print(chunk.dtypes) + + print("\nCOLUMN SAMPLE TYPES") + + for col in chunk.columns: + + sample = chunk[col].dropna() + + if len(sample) > 0: + + print( + col, + type(sample.iloc[0]), + sample.iloc[0] + ) + + # ===================================== + # TRUNCATE TABLE FIRST TIME ONLY + # ===================================== + if ( + TRUNCATE_BEFORE_LOAD + and + not table_truncated + ): + + print("\n================================") + print( + f"TRUNCATING TABLE : " + f"{TABLE_NAME}" + ) + print("================================") + + # IMPORTANT FIX + truncate_query = f""" + TRUNCATE TABLE + `{CH_CONFIG['database']}`.`{TABLE_NAME}` + """ + + ch_client.command( + truncate_query + ) + + print( + "TABLE TRUNCATED SUCCESSFULLY" + ) + + table_truncated = True + + # ===================================== + # INSERT INTO CLICKHOUSE + # ===================================== + print( + "\nINSERTING INTO CLICKHOUSE..." + ) + + ch_client.insert_df( + table=TABLE_NAME, + df=chunk, + database=CH_CONFIG['database'] + ) + + print( + f"INSERTED : " + f"{len(chunk)} ROWS" + ) + + except Exception as insert_error: + + print("\n================================") + print("INSERT FAILED") + print("================================") + + print(str(insert_error)) + + traceback.print_exc() + + # ================================= + # SAVE ERROR LOG + # ================================= + with open( + "insert_error.log", + "a", + encoding="utf-8" + ) as log: + + log.write( + "\n\n================================" + ) + + log.write( + f"\nTIME : " + f"{datetime.now()}" + ) + + log.write( + f"\nTABLE : " + f"{TABLE_NAME}" + ) + + log.write( + f"\nERROR : " + f"{str(insert_error)}" + ) + + log.write( + f"\nTRACEBACK :\n" + f"{traceback.format_exc()}" + ) + + log.write( + "\n================================" + ) + + continue + + # ============================================= + # SUCCESS + # ============================================= + break + + # ================================================= + # SQL CONNECTION FAILURE + # ================================================= + except pyodbc.OperationalError as op_error: + + retry_count += 1 + + print("\n================================") + print( + f"SQL CONNECTION LOST " + f"- RETRY {retry_count}" + ) + print("================================") + + print(str(op_error)) + + time.sleep(10) + + try: + + sql_conn.close() + + except: + pass + + # RECONNECT SQL + sql_conn = connect_sql() + + # ================================================= + # OTHER ERROR + # ================================================= + except Exception as loop_error: + + print("\n================================") + print("MAIN LOOP ERROR") + print("================================") + + print(str(loop_error)) + + traceback.print_exc() + + break + + print("\n================================") + print("ETL COMPLETED SUCCESSFULLY") + print("================================") + +# ========================================================= +# MAIN ERROR +# ========================================================= +except Exception as main_error: + + print("\n================================") + print("MAIN ERROR") + print("================================") + + print(str(main_error)) + + traceback.print_exc() + + with open( + "main_error.log", + "a", + encoding="utf-8" + ) as log: + + log.write( + "\n\n================================" + ) + + log.write( + f"\nTIME : {datetime.now()}" + ) + + log.write( + f"\nERROR : {str(main_error)}" + ) + + log.write( + f"\nTRACEBACK :\n" + f"{traceback.format_exc()}" + ) + + log.write( + "\n================================" + ) + +# ========================================================= +# CLOSE CONNECTIONS +# ========================================================= +finally: + + try: + + sql_conn.close() + + print("\nSQL SERVER CONNECTION CLOSED") + + except: + pass + + try: + + ch_client.close() + + print("CLICKHOUSE CONNECTION CLOSED") + + except: + pass + +print("\n================================================") +print("ETL FINISHED :", datetime.now()) +print("================================================") diff --git a/PaidVisibility_Import.py b/PaidVisibility_Import.py index e3a21af..21e20b0 100644 --- a/PaidVisibility_Import.py +++ b/PaidVisibility_Import.py @@ -5,6 +5,7 @@ import numpy as np from datetime import datetime import traceback import warnings +import time # ========================================================= # IGNORE WARNINGS @@ -14,10 +15,12 @@ warnings.filterwarnings( 'pandas only supports SQLAlchemy connectable' ) -print("ETL Started :", datetime.now()) +print("\n================================================") +print("ETL STARTED :", datetime.now()) +print("================================================") # ========================================================= -# SQL SERVER CONNECTION +# SQL SERVER CONNECTION STRING # ========================================================= SQL_CONN_STR = ( 'DRIVER={ODBC Driver 17 for SQL Server};' @@ -26,6 +29,7 @@ SQL_CONN_STR = ( 'UID=bsgteam_test;' 'PWD=B$gt3@m#00512;' 'TrustServerCertificate=yes;' + 'Connection Timeout=60;' ) # ========================================================= @@ -40,31 +44,66 @@ CH_CONFIG = { } # ========================================================= -# TABLE NAME +# TABLE DETAILS # ========================================================= TABLE_NAME = 'PaidVisibility' PROJECT_ID = 41654 # ========================================================= -# LOAD SETTINGS +# SETTINGS # ========================================================= TRUNCATE_BEFORE_LOAD = True table_truncated = False # ========================================================= -# CLICKHOUSE DATE COLUMNS +# CHUNK SIZE # ========================================================= -DATE_COLUMNS = [ - 'visit_date' -] +chunk_size = 25000 # ========================================================= -# CLICKHOUSE DATETIME COLUMNS +# CONNECT SQL SERVER # ========================================================= -DATETIME_COLUMNS = [ - 'create_date', - 'update_date' -] +def connect_sql(): + + try: + + conn = pyodbc.connect( + SQL_CONN_STR, + autocommit=True + ) + + print("Connected SQL Server") + + return conn + + except Exception as e: + + print("SQL CONNECTION FAILED") + print(str(e)) + + raise + +# ========================================================= +# CONNECT CLICKHOUSE +# ========================================================= +def connect_clickhouse(): + + try: + + client = clickhouse_connect.get_client( + **CH_CONFIG + ) + + print("Connected ClickHouse") + + return client + + except Exception as e: + + print("CLICKHOUSE CONNECTION FAILED") + print(str(e)) + + raise # ========================================================= # CLEAN DATAFRAME @@ -88,60 +127,35 @@ def clean_dataframe(df): print(f"\nCleaning Column : {col}") # ===================================== - # DATE32 COLUMNS + # AUTO DETECT DATE / TIME COLUMNS # ===================================== - if col.lower() in [ - x.lower() for x in DATE_COLUMNS - ]: + if ( + 'date' in col.lower() + or 'time' in col.lower() + ): - print(f"Date32 Column : {col}") + print(f"Date Column : {col}") df[col] = pd.to_datetime( df[col], errors='coerce' ) - # Remove invalid dates - df[col] = df[col].where( - (df[col].dt.year >= 1970) & - (df[col].dt.year <= 2100) - ) + cleaned_dates = [] - # Convert to datetime.date - df[col] = df[col].apply( - lambda x: - x.date() - if pd.notnull(x) - else None - ) + for val in df[col]: - # ===================================== - # DATETIME64 COLUMNS - # ===================================== - elif col.lower() in [ - x.lower() for x in DATETIME_COLUMNS - ]: + if pd.isnull(val): - print(f"DateTime Column : {col}") + cleaned_dates.append(None) - df[col] = pd.to_datetime( - df[col], - errors='coerce' - ) + else: - # Remove invalid dates - df[col] = df[col].where( - (df[col].dt.year >= 1970) & - (df[col].dt.year <= 2100) - ) + cleaned_dates.append( + val.to_pydatetime() + ) - # Convert to datetime.datetime - df[col] = df[col].apply( - lambda x: - x.to_pydatetime() - if pd.notnull(x) - else None - ) + df[col] = cleaned_dates # ===================================== # INTEGER COLUMNS @@ -169,28 +183,17 @@ def clean_dataframe(df): print(f"Float Column : {col}") - non_null = df[col].dropna() + df[col] = pd.to_numeric( + df[col], + errors='coerce' + ) - # Convert whole float to int - if len(non_null) > 0 and ( - (non_null % 1 == 0).all() - ): - - df[col] = df[col].apply( - lambda x: - int(x) - if pd.notnull(x) - else None - ) - - else: - - df[col] = df[col].apply( - lambda x: - float(x) - if pd.notnull(x) - else None - ) + df[col] = df[col].apply( + lambda x: + float(x) + if pd.notnull(x) + else None + ) # ===================================== # OBJECT / STRING COLUMNS @@ -208,50 +211,6 @@ def clean_dataframe(df): cleaned.append(None) - # INTEGER - elif isinstance( - val, - ( - int, - np.integer - ) - ): - - cleaned.append(int(val)) - - # FLOAT - elif isinstance( - val, - ( - float, - np.floating - ) - ): - - if np.isnan(val): - - cleaned.append(None) - - else: - - if val.is_integer(): - - cleaned.append(int(val)) - - else: - - cleaned.append(float(val)) - - # STRING - elif isinstance(val, str): - - cleaned.append(val.strip()) - - # BOOLEAN - elif isinstance(val, bool): - - cleaned.append(int(val)) - # DATETIME elif isinstance( val, @@ -274,10 +233,54 @@ def clean_dataframe(df): cleaned.append(val) - # OTHER + # INTEGER + elif isinstance( + val, + ( + int, + np.integer + ) + ): + + cleaned.append( + int(val) + ) + + # FLOAT + elif isinstance( + val, + ( + float, + np.floating + ) + ): + + if np.isnan(val): + + cleaned.append(None) + + else: + + cleaned.append( + float(val) + ) + + # BOOLEAN + elif isinstance( + val, + bool + ): + + cleaned.append( + int(val) + ) + + # STRING else: - cleaned.append(str(val)) + cleaned.append( + str(val).strip() + ) df[col] = cleaned @@ -307,16 +310,12 @@ try: # ===================================================== # CONNECT SQL SERVER # ===================================================== - sql_conn = pyodbc.connect(SQL_CONN_STR) - - print("Connected to SQL Server") + sql_conn = connect_sql() # ===================================================== # CONNECT CLICKHOUSE # ===================================================== - ch_client = clickhouse_connect.get_client(**CH_CONFIG) - - print("Connected to ClickHouse") + ch_client = connect_clickhouse() # ===================================================== # QUERY @@ -327,156 +326,214 @@ try: WHERE Project_Id = {PROJECT_ID} """ - print("\nExecuting Query:") + print("\nExecuting Query") print(query) # ===================================================== - # CHUNK SIZE + # RETRY SETTINGS # ===================================================== - chunk_size = 100000 - - total_rows = 0 + retry_count = 0 + max_retry = 5 # ===================================================== - # READ DATA + # MAIN RETRY LOOP # ===================================================== - for chunk in pd.read_sql( - query, - sql_conn, - chunksize=chunk_size - ): + while retry_count < max_retry: try: + # ============================================= + # READ SQL DATA + # ============================================= + for chunk in pd.read_sql( + query, + sql_conn, + chunksize=chunk_size + ): + + try: + + print("\n================================") + print( + f"Processing Rows : " + f"{len(chunk)}" + ) + print("================================") + + # ===================================== + # CLEAN DATA + # ===================================== + chunk = clean_dataframe(chunk) + + # ===================================== + # DEBUG COLUMN TYPES + # ===================================== + print("\nCOLUMN DATATYPES") + print(chunk.dtypes) + + print("\nCOLUMN SAMPLE TYPES") + + for col in chunk.columns: + + sample = chunk[col].dropna() + + if len(sample) > 0: + + print( + col, + type(sample.iloc[0]), + sample.iloc[0] + ) + + # ===================================== + # TRUNCATE TABLE FIRST TIME ONLY + # ===================================== + if ( + TRUNCATE_BEFORE_LOAD + and + not table_truncated + ): + + print("\n================================") + print( + f"TRUNCATING TABLE : " + f"{TABLE_NAME}" + ) + print("================================") + + # IMPORTANT FIX + truncate_query = f""" + TRUNCATE TABLE + `{CH_CONFIG['database']}`.`{TABLE_NAME}` + """ + + ch_client.command( + truncate_query + ) + + print( + "TABLE TRUNCATED SUCCESSFULLY" + ) + + table_truncated = True + + # ===================================== + # INSERT INTO CLICKHOUSE + # ===================================== + print( + "\nINSERTING INTO CLICKHOUSE..." + ) + + ch_client.insert_df( + table=TABLE_NAME, + df=chunk, + database=CH_CONFIG['database'] + ) + + print( + f"INSERTED : " + f"{len(chunk)} ROWS" + ) + + except Exception as insert_error: + + print("\n================================") + print("INSERT FAILED") + print("================================") + + print(str(insert_error)) + + traceback.print_exc() + + # ================================= + # SAVE ERROR LOG + # ================================= + with open( + "insert_error.log", + "a", + encoding="utf-8" + ) as log: + + log.write( + "\n\n================================" + ) + + log.write( + f"\nTIME : " + f"{datetime.now()}" + ) + + log.write( + f"\nTABLE : " + f"{TABLE_NAME}" + ) + + log.write( + f"\nERROR : " + f"{str(insert_error)}" + ) + + log.write( + f"\nTRACEBACK :\n" + f"{traceback.format_exc()}" + ) + + log.write( + "\n================================" + ) + + continue + + # ============================================= + # SUCCESS + # ============================================= + break + + # ================================================= + # SQL CONNECTION FAILURE + # ================================================= + except pyodbc.OperationalError as op_error: + + retry_count += 1 + print("\n================================") - print(f"Processing {len(chunk)} Rows") - print("================================") - - # ================================================= - # CLEAN DATA - # ================================================= - chunk = clean_dataframe(chunk) - - # ================================================= - # DEBUG COLUMN TYPES - # ================================================= - print("\nCOLUMN TYPES") - - for col in chunk.columns: - - sample = chunk[col].dropna() - - if len(sample) > 0: - - print( - col, - type(sample.iloc[0]), - sample.iloc[0] - ) - - # ================================================= - # DEBUG DATE COLUMN - # ================================================= - if 'visit_date' in chunk.columns: - - print("\nvisit_date Sample") - print(chunk['visit_date'].head()) - - sample = chunk['visit_date'].dropna() - - if len(sample) > 0: - - print( - "visit_date datatype:", - type(sample.iloc[0]) - ) - - # ================================================= - # TRUNCATE TABLE FIRST TIME ONLY - # ================================================= - if TRUNCATE_BEFORE_LOAD and not table_truncated: - - print("\n================================") - print(f"TRUNCATING TABLE : {TABLE_NAME}") - print("================================") - - truncate_query = f""" - TRUNCATE TABLE - {CH_CONFIG['database']}.{TABLE_NAME} - """ - - ch_client.command(truncate_query) - - print("TABLE TRUNCATED SUCCESSFULLY") - - table_truncated = True - - # ================================================= - # INSERT INTO CLICKHOUSE - # ================================================= - print("\nInserting into ClickHouse...") - - ch_client.insert_df( - table=TABLE_NAME, - df=chunk, - database=CH_CONFIG['database'] - ) - - total_rows += len(chunk) - print( - f"\nInserted Total Rows : {total_rows}" + f"SQL CONNECTION LOST " + f"- RETRY {retry_count}" ) - - except Exception as chunk_error: - - print("\n================================") - print("CHUNK INSERT FAILED") print("================================") - print(str(chunk_error)) + print(str(op_error)) + + time.sleep(10) + + try: + + sql_conn.close() + + except: + pass + + # RECONNECT SQL + sql_conn = connect_sql() + + # ================================================= + # OTHER ERROR + # ================================================= + except Exception as loop_error: + + print("\n================================") + print("MAIN LOOP ERROR") + print("================================") + + print(str(loop_error)) traceback.print_exc() - # ============================================= - # SAVE ERROR LOG - # ============================================= - with open( - "clickhouse_chunk_error.log", - "a", - encoding="utf-8" - ) as log: - - log.write( - "\n\n================================" - ) - - log.write( - f"\nTIME : {datetime.now()}" - ) - - log.write( - f"\nTABLE : {TABLE_NAME}" - ) - - log.write( - f"\nERROR : {str(chunk_error)}" - ) - - log.write( - f"\nTRACEBACK :\n" - f"{traceback.format_exc()}" - ) - - log.write( - "\n================================" - ) - - continue + break print("\n================================") print("ETL COMPLETED SUCCESSFULLY") - print(f"TOTAL ROWS INSERTED : {total_rows}") print("================================") # ========================================================= @@ -493,7 +550,7 @@ except Exception as main_error: traceback.print_exc() with open( - "clickhouse_main_error.log", + "main_error.log", "a", encoding="utf-8" ) as log: @@ -528,7 +585,7 @@ finally: sql_conn.close() - print("\nSQL Server Connection Closed") + print("\nSQL SERVER CONNECTION CLOSED") except: pass @@ -537,9 +594,11 @@ finally: ch_client.close() - print("ClickHouse Connection Closed") + print("CLICKHOUSE CONNECTION CLOSED") except: pass -print("\nETL Finished :", datetime.now()) \ No newline at end of file +print("\n================================================") +print("ETL FINISHED :", datetime.now()) +print("================================================") diff --git a/insert_error.log b/insert_error.log index b8409b0..e49a596 100644 --- a/insert_error.log +++ b/insert_error.log @@ -13245,4 +13245,279 @@ Traceback (most recent call last): raise OperationalError(f'Error {ex} executing HTTP request attempt {attempts}{err_url}') from ex clickhouse_connect.driver.exceptions.OperationalError: Error HTTPConnectionPool(host='172.188.12.194', port=8123): Max retries exceeded with url: /?date_time_input_format=best_effort&cast_string_to_dynamic_use_inference=1&session_id=1cd3ec5f-e757-424a-b309-e62e27c757e0&cancel_http_readonly_queries_on_client_close=1&wait_end_of_query=1&send_progress_in_http_headers=1&http_headers_progress_interval_ms=120000&database=DaburIndia_BI&client_protocol_version=54405&query_id=f9e2d350-eb1c-4dc0-b2cf-c3f4def4df83 (Caused by NewConnectionError("HTTPConnection(host='172.188.12.194', port=8123): Failed to establish a new connection: [WinError 10065] A socket operation was attempted to an unreachable host")) executing HTTP request attempt 1 (http://172.188.12.194:8123) +================================ + +================================ +TIME : 2026-05-20 13:42:49.425356 +TABLE : PaidVisibility +ERROR : Invalid None value in non-Nullable column, column name: `update_by` +TRACEBACK : +Traceback (most recent call last): + File "d:\Python Code\PaidVisibility_Import.py", line 427, in + ch_client.insert_df( + ~~~~~~~~~~~~~~~~~~~^ + table=TABLE_NAME, + ^^^^^^^^^^^^^^^^^ + df=chunk, + ^^^^^^^^^ + database=CH_CONFIG['database'] + ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + ) + ^ + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\driver\client.py", line 1013, in insert_df + return self.insert(table, + ~~~~~~~~~~~^^^^^^^ + df, + ^^^ + ...<5 lines>... + transport_settings=transport_settings, + ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + context=context) + ^^^^^^^^^^^^^^^^ + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\driver\client.py", line 978, in insert + return self.data_insert(context) + ~~~~~~~~~~~~~~~~^^^^^^^^^ + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\driver\httpclient.py", line 347, in data_insert + response = self._raw_request(block_gen, params, headers, error_handler=error_handler, server_wait=False) + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\driver\httpclient.py", line 569, in _raw_request + error_handler(response) + ~~~~~~~~~~~~~^^^^^^^^^^ + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\driver\httpclient.py", line 331, in error_handler + raise ex + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\driver\transform.py", line 114, in chunk_gen + col_type.write_column(data, output, context) + ~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^ + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\datatypes\base.py", line 216, in write_column + self.write_column_data(column, dest, ctx) + ~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^ + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\datatypes\base.py", line 231, in write_column_data + self._write_column_binary(column, dest, ctx) + ~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^ + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\datatypes\string.py", line 51, in _write_column_binary + handle_error(data_conv.write_str_col(column, self.nullable, encoding, dest), ctx) + ~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\driver\errors.py", line 17, in handle_error + raise DataError(msg) +clickhouse_connect.driver.exceptions.DataError: Invalid None value in non-Nullable column, column name: `update_by` + +================================ + +================================ +TIME : 2026-05-20 13:42:52.853981 +TABLE : PaidVisibility +ERROR : Invalid None value in non-Nullable column, column name: `update_by` +TRACEBACK : +Traceback (most recent call last): + File "d:\Python Code\PaidVisibility_Import.py", line 427, in + ch_client.insert_df( + ~~~~~~~~~~~~~~~~~~~^ + table=TABLE_NAME, + ^^^^^^^^^^^^^^^^^ + df=chunk, + ^^^^^^^^^ + database=CH_CONFIG['database'] + ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + ) + ^ + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\driver\client.py", line 1013, in insert_df + return self.insert(table, + ~~~~~~~~~~~^^^^^^^ + df, + ^^^ + ...<5 lines>... + transport_settings=transport_settings, + ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + context=context) + ^^^^^^^^^^^^^^^^ + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\driver\client.py", line 978, in insert + return self.data_insert(context) + ~~~~~~~~~~~~~~~~^^^^^^^^^ + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\driver\httpclient.py", line 347, in data_insert + response = self._raw_request(block_gen, params, headers, error_handler=error_handler, server_wait=False) + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\driver\httpclient.py", line 569, in _raw_request + error_handler(response) + ~~~~~~~~~~~~~^^^^^^^^^^ + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\driver\httpclient.py", line 331, in error_handler + raise ex + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\driver\transform.py", line 114, in chunk_gen + col_type.write_column(data, output, context) + ~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^ + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\datatypes\base.py", line 216, in write_column + self.write_column_data(column, dest, ctx) + ~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^ + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\datatypes\base.py", line 231, in write_column_data + self._write_column_binary(column, dest, ctx) + ~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^ + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\datatypes\string.py", line 51, in _write_column_binary + handle_error(data_conv.write_str_col(column, self.nullable, encoding, dest), ctx) + ~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\driver\errors.py", line 17, in handle_error + raise DataError(msg) +clickhouse_connect.driver.exceptions.DataError: Invalid None value in non-Nullable column, column name: `update_by` + +================================ + +================================ +TIME : 2026-05-20 13:42:55.690001 +TABLE : PaidVisibility +ERROR : Invalid None value in non-Nullable column, column name: `update_by` +TRACEBACK : +Traceback (most recent call last): + File "d:\Python Code\PaidVisibility_Import.py", line 427, in + ch_client.insert_df( + ~~~~~~~~~~~~~~~~~~~^ + table=TABLE_NAME, + ^^^^^^^^^^^^^^^^^ + df=chunk, + ^^^^^^^^^ + database=CH_CONFIG['database'] + ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + ) + ^ + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\driver\client.py", line 1013, in insert_df + return self.insert(table, + ~~~~~~~~~~~^^^^^^^ + df, + ^^^ + ...<5 lines>... + transport_settings=transport_settings, + ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + context=context) + ^^^^^^^^^^^^^^^^ + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\driver\client.py", line 978, in insert + return self.data_insert(context) + ~~~~~~~~~~~~~~~~^^^^^^^^^ + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\driver\httpclient.py", line 347, in data_insert + response = self._raw_request(block_gen, params, headers, error_handler=error_handler, server_wait=False) + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\driver\httpclient.py", line 569, in _raw_request + error_handler(response) + ~~~~~~~~~~~~~^^^^^^^^^^ + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\driver\httpclient.py", line 331, in error_handler + raise ex + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\driver\transform.py", line 114, in chunk_gen + col_type.write_column(data, output, context) + ~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^ + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\datatypes\base.py", line 216, in write_column + self.write_column_data(column, dest, ctx) + ~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^ + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\datatypes\base.py", line 231, in write_column_data + self._write_column_binary(column, dest, ctx) + ~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^ + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\datatypes\string.py", line 51, in _write_column_binary + handle_error(data_conv.write_str_col(column, self.nullable, encoding, dest), ctx) + ~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\driver\errors.py", line 17, in handle_error + raise DataError(msg) +clickhouse_connect.driver.exceptions.DataError: Invalid None value in non-Nullable column, column name: `update_by` + +================================ + +================================ +TIME : 2026-05-20 13:42:58.818786 +TABLE : PaidVisibility +ERROR : Invalid None value in non-Nullable column, column name: `update_by` +TRACEBACK : +Traceback (most recent call last): + File "d:\Python Code\PaidVisibility_Import.py", line 427, in + ch_client.insert_df( + ~~~~~~~~~~~~~~~~~~~^ + table=TABLE_NAME, + ^^^^^^^^^^^^^^^^^ + df=chunk, + ^^^^^^^^^ + database=CH_CONFIG['database'] + ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + ) + ^ + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\driver\client.py", line 1013, in insert_df + return self.insert(table, + ~~~~~~~~~~~^^^^^^^ + df, + ^^^ + ...<5 lines>... + transport_settings=transport_settings, + ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + context=context) + ^^^^^^^^^^^^^^^^ + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\driver\client.py", line 978, in insert + return self.data_insert(context) + ~~~~~~~~~~~~~~~~^^^^^^^^^ + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\driver\httpclient.py", line 347, in data_insert + response = self._raw_request(block_gen, params, headers, error_handler=error_handler, server_wait=False) + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\driver\httpclient.py", line 569, in _raw_request + error_handler(response) + ~~~~~~~~~~~~~^^^^^^^^^^ + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\driver\httpclient.py", line 331, in error_handler + raise ex + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\driver\transform.py", line 114, in chunk_gen + col_type.write_column(data, output, context) + ~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^ + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\datatypes\base.py", line 216, in write_column + self.write_column_data(column, dest, ctx) + ~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^ + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\datatypes\base.py", line 231, in write_column_data + self._write_column_binary(column, dest, ctx) + ~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^ + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\datatypes\string.py", line 51, in _write_column_binary + handle_error(data_conv.write_str_col(column, self.nullable, encoding, dest), ctx) + ~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\driver\errors.py", line 17, in handle_error + raise DataError(msg) +clickhouse_connect.driver.exceptions.DataError: Invalid None value in non-Nullable column, column name: `update_by` + +================================ + +================================ +TIME : 2026-05-20 13:43:02.036478 +TABLE : PaidVisibility +ERROR : Invalid None value in non-Nullable column, column name: `update_by` +TRACEBACK : +Traceback (most recent call last): + File "d:\Python Code\PaidVisibility_Import.py", line 427, in + ch_client.insert_df( + ~~~~~~~~~~~~~~~~~~~^ + table=TABLE_NAME, + ^^^^^^^^^^^^^^^^^ + df=chunk, + ^^^^^^^^^ + database=CH_CONFIG['database'] + ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + ) + ^ + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\driver\client.py", line 1013, in insert_df + return self.insert(table, + ~~~~~~~~~~~^^^^^^^ + df, + ^^^ + ...<5 lines>... + transport_settings=transport_settings, + ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + context=context) + ^^^^^^^^^^^^^^^^ + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\driver\client.py", line 978, in insert + return self.data_insert(context) + ~~~~~~~~~~~~~~~~^^^^^^^^^ + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\driver\httpclient.py", line 347, in data_insert + response = self._raw_request(block_gen, params, headers, error_handler=error_handler, server_wait=False) + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\driver\httpclient.py", line 569, in _raw_request + error_handler(response) + ~~~~~~~~~~~~~^^^^^^^^^^ + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\driver\httpclient.py", line 331, in error_handler + raise ex + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\driver\transform.py", line 114, in chunk_gen + col_type.write_column(data, output, context) + ~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^ + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\datatypes\base.py", line 216, in write_column + self.write_column_data(column, dest, ctx) + ~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^ + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\datatypes\base.py", line 231, in write_column_data + self._write_column_binary(column, dest, ctx) + ~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^ + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\datatypes\string.py", line 51, in _write_column_binary + handle_error(data_conv.write_str_col(column, self.nullable, encoding, dest), ctx) + ~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\driver\errors.py", line 17, in handle_error + raise DataError(msg) +clickhouse_connect.driver.exceptions.DataError: Invalid None value in non-Nullable column, column name: `update_by` + ================================ \ No newline at end of file