diff --git a/Master_PromotionDefinition_Import.py b/Master_PromotionDefinition_Import.py new file mode 100644 index 0000000..8685a86 --- /dev/null +++ b/Master_PromotionDefinition_Import.py @@ -0,0 +1,462 @@ +import pyodbc +import pandas as pd +import clickhouse_connect +import numpy as np +from datetime import datetime +import traceback +import warnings + +# ========================================================= +# IGNORE WARNINGS +# ========================================================= +warnings.filterwarnings( + 'ignore', + 'pandas only supports SQLAlchemy connectable' +) + +print("ETL Started :", datetime.now()) + +# ========================================================= +# SQL SERVER CONNECTION +# ========================================================= +SQL_CONN_STR = ( + 'DRIVER={ODBC Driver 17 for SQL Server};' + 'SERVER=10.200.25.65;' + 'DATABASE=CPMIndiaBusinessInsight;' + 'UID=bsgteam_test;' + 'PWD=B$gt3@m#00512;' + 'TrustServerCertificate=yes;' +) + +# ========================================================= +# CLICKHOUSE CONFIG +# ========================================================= +CH_CONFIG = { + 'host': '172.188.12.194', + 'port': 8123, + 'username': 'default', + 'password': 'dipanshu_k', + 'database': 'DaburIndia_BI' +} + +# ========================================================= +# TABLE NAME +# ========================================================= +TABLE_NAME = 'Master_PromotionDefinition' +PROJECT_ID = 41654 + +# ========================================================= +# CLEAN DATAFRAME +# ========================================================= +def clean_dataframe(df): + + try: + + # --------------------------------------------- + # Replace NaN with None + # --------------------------------------------- + df = df.replace({np.nan: None}) + + # --------------------------------------------- + # Process Column Wise + # --------------------------------------------- + for col in df.columns: + + try: + + print(f"\nCleaning Column : {col}") + + # ===================================== + # DATETIME COLUMNS + # ===================================== + if pd.api.types.is_datetime64_any_dtype(df[col]): + + print(f"Datetime Column Detected : {col}") + + df[col] = pd.to_datetime( + df[col], + errors='coerce' + ) + + # Remove invalid dates + df[col] = df[col].where( + (df[col].dt.year >= 1970) & + (df[col].dt.year <= 2100) + ) + + # IMPORTANT FIX + # Convert to Python datetime + df[col] = df[col].apply( + lambda x: + x.to_pydatetime() + if pd.notnull(x) + else None + ) + + # ===================================== + # INTEGER COLUMNS + # ===================================== + elif pd.api.types.is_integer_dtype(df[col]): + + print(f"Integer Column Detected : {col}") + + df[col] = pd.to_numeric( + df[col], + errors='coerce' + ) + + df[col] = df[col].apply( + lambda x: + int(x) + if pd.notnull(x) + else None + ) + + # ===================================== + # FLOAT COLUMNS + # ===================================== + elif pd.api.types.is_float_dtype(df[col]): + + print(f"Float Column Detected : {col}") + + non_null = df[col].dropna() + + # --------------------------------- + # Convert whole float to int + # Example: + # 3240.0 -> 3240 + # --------------------------------- + if len(non_null) > 0 and ( + (non_null % 1 == 0).all() + ): + + df[col] = df[col].apply( + lambda x: + int(x) + if pd.notnull(x) + else None + ) + + else: + + df[col] = df[col].apply( + lambda x: + float(x) + if pd.notnull(x) + else None + ) + + # ===================================== + # OBJECT / STRING COLUMNS + # ===================================== + else: + + print(f"String/Object Column Detected : {col}") + + cleaned = [] + + for val in df[col]: + + # NULL + if pd.isnull(val): + + cleaned.append(None) + + # INTEGER + elif isinstance( + val, + ( + int, + np.integer + ) + ): + + cleaned.append(int(val)) + + # FLOAT + elif isinstance( + val, + ( + float, + np.floating + ) + ): + + if np.isnan(val): + + cleaned.append(None) + + else: + + # Avoid 3240.0 issue + if val.is_integer(): + + cleaned.append(int(val)) + + else: + + cleaned.append(float(val)) + + # STRING + elif isinstance(val, str): + + cleaned.append(val.strip()) + + # BOOLEAN + elif isinstance(val, bool): + + cleaned.append(int(val)) + + # DATETIME + elif isinstance( + val, + ( + datetime, + pd.Timestamp + ) + ): + + cleaned.append( + val.to_pydatetime() + if isinstance(val, pd.Timestamp) + else val + ) + + # OTHER + else: + + cleaned.append(str(val)) + + df[col] = cleaned + + except Exception as col_error: + + print("\n================================") + print(f"COLUMN FAILED : {col}") + print(str(col_error)) + print("================================") + + return df + + except Exception as clean_error: + + print("\n================================") + print("DATA CLEAN FAILED") + print(str(clean_error)) + print("================================") + + return df + +# ========================================================= +# MAIN PROCESS +# ========================================================= +try: + + # ===================================================== + # CONNECT SQL SERVER + # ===================================================== + sql_conn = pyodbc.connect(SQL_CONN_STR) + + print("Connected to SQL Server") + + # ===================================================== + # CONNECT CLICKHOUSE + # ===================================================== + ch_client = clickhouse_connect.get_client(**CH_CONFIG) + + print("Connected to ClickHouse") + + # ===================================================== + # QUERY + # ===================================================== + query = f""" + SELECT * + FROM dbo.[{TABLE_NAME}] + WHERE Project_Id = {PROJECT_ID} + """ + + print("\nExecuting Query:") + print(query) + + # ===================================================== + # CHUNK SIZE + # ===================================================== + chunk_size = 100000 + + total_rows = 0 + + # ===================================================== + # READ DATA + # ===================================================== + for chunk in pd.read_sql( + query, + sql_conn, + chunksize=chunk_size + ): + + try: + + print("\n================================") + print(f"Processing {len(chunk)} Rows") + print("================================") + + # ================================================= + # CLEAN DATA + # ================================================= + chunk = clean_dataframe(chunk) + + # ================================================= + # DEBUG COLUMN TYPES + # ================================================= + print("\nCOLUMN TYPES") + + for col in chunk.columns: + + sample = chunk[col].dropna() + + if len(sample) > 0: + + print( + col, + type(sample.iloc[0]), + sample.iloc[0] + ) + + # ================================================= + # SAMPLE DATA + # ================================================= + print("\nSAMPLE DATA") + print(chunk.head(2)) + + # ================================================= + # INSERT INTO CLICKHOUSE + # ================================================= + print("\nInserting into ClickHouse...") + + ch_client.insert_df( + table=TABLE_NAME, + df=chunk, + database=CH_CONFIG['database'] + ) + + total_rows += len(chunk) + + print( + f"\nInserted Total Rows : {total_rows}" + ) + + except Exception as chunk_error: + + print("\n================================") + print("CHUNK INSERT FAILED") + print("================================") + + print(str(chunk_error)) + + traceback.print_exc() + + # ============================================= + # SAVE ERROR LOG + # ============================================= + with open( + "clickhouse_chunk_error.log", + "a", + encoding="utf-8" + ) as log: + + log.write( + "\n\n================================" + ) + + log.write( + f"\nTIME : {datetime.now()}" + ) + + log.write( + f"\nTABLE : {TABLE_NAME}" + ) + + log.write( + f"\nERROR : {str(chunk_error)}" + ) + + log.write( + f"\nTRACEBACK :\n" + f"{traceback.format_exc()}" + ) + + log.write( + "\n================================" + ) + + continue + + print("\n================================") + print("ETL COMPLETED SUCCESSFULLY") + print(f"TOTAL ROWS INSERTED : {total_rows}") + print("================================") + +# ========================================================= +# MAIN ERROR +# ========================================================= +except Exception as main_error: + + print("\n================================") + print("MAIN ERROR") + print("================================") + + print(str(main_error)) + + traceback.print_exc() + + with open( + "clickhouse_main_error.log", + "a", + encoding="utf-8" + ) as log: + + log.write( + "\n\n================================" + ) + + log.write( + f"\nTIME : {datetime.now()}" + ) + + log.write( + f"\nERROR : {str(main_error)}" + ) + + log.write( + f"\nTRACEBACK :\n" + f"{traceback.format_exc()}" + ) + + log.write( + "\n================================" + ) + +# ========================================================= +# CLOSE CONNECTIONS +# ========================================================= +finally: + + try: + + sql_conn.close() + + print("\nSQL Server Connection Closed") + + except: + pass + + try: + + ch_client.close() + + print("ClickHouse Connection Closed") + + except: + pass + +print("\nETL Finished :", datetime.now()) \ No newline at end of file diff --git a/clickhouse_chunk_error.log b/clickhouse_chunk_error.log index f797440..953ccde 100644 --- a/clickhouse_chunk_error.log +++ b/clickhouse_chunk_error.log @@ -1868,4 +1868,57 @@ Traceback (most recent call last): raise DataError(msg) clickhouse_connect.driver.exceptions.DataError: Invalid None value in non-Nullable column, column name: `update_by` +================================ + +================================ +TIME : 2026-05-19 12:59:38.730742 +TABLE : Master_PromotionDefinition +ERROR : 'datetime.date' object has no attribute 'timestamp' +TRACEBACK : +Traceback (most recent call last): + File "d:\Python Code\Master_PromotionDefinition_Import.py", line 328, in + ch_client.insert_df( + ~~~~~~~~~~~~~~~~~~~^ + table=TABLE_NAME, + ^^^^^^^^^^^^^^^^^ + df=chunk, + ^^^^^^^^^ + database=CH_CONFIG['database'] + ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + ) + ^ + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\driver\client.py", line 1013, in insert_df + return self.insert(table, + ~~~~~~~~~~~^^^^^^^ + df, + ^^^ + ...<5 lines>... + transport_settings=transport_settings, + ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + context=context) + ^^^^^^^^^^^^^^^^ + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\driver\client.py", line 978, in insert + return self.data_insert(context) + ~~~~~~~~~~~~~~~~^^^^^^^^^ + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\driver\httpclient.py", line 347, in data_insert + response = self._raw_request(block_gen, params, headers, error_handler=error_handler, server_wait=False) + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\driver\httpclient.py", line 569, in _raw_request + error_handler(response) + ~~~~~~~~~~~~~^^^^^^^^^^ + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\driver\httpclient.py", line 331, in error_handler + raise ex + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\driver\transform.py", line 114, in chunk_gen + col_type.write_column(data, output, context) + ~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^ + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\datatypes\base.py", line 216, in write_column + self.write_column_data(column, dest, ctx) + ~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^ + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\datatypes\base.py", line 231, in write_column_data + self._write_column_binary(column, dest, ctx) + ~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^ + File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\clickhouse_connect\datatypes\temporal.py", line 303, in _write_column_binary + column = [((int(x.timestamp()) * 1000000 + x.microsecond) * prec) // 1000000 if x else 0 + ^^^^^^^^^^^ +AttributeError: 'datetime.date' object has no attribute 'timestamp'. Did you mean: 'fromtimestamp'? + ================================ \ No newline at end of file