Files
clickHouseData/SKU Master_Import.py
2026-05-19 16:58:54 +05:30

633 lines
18 KiB
Python

import pyodbc
import pandas as pd
import clickhouse_connect
import numpy as np
from datetime import datetime
import traceback
import warnings
# =========================================================
# IGNORE WARNINGS
# =========================================================
warnings.filterwarnings(
'ignore',
'pandas only supports SQLAlchemy connectable'
)
print("\n====================================")
print("ETL Started :", datetime.now())
print("====================================")
# =========================================================
# SQL SERVER CONNECTION
# =========================================================
SQL_CONN_STR = (
'DRIVER={ODBC Driver 17 for SQL Server};'
'SERVER=10.200.25.65;'
'DATABASE=CPMIndiaBusinessInsight;'
'UID=bsgteam_test;'
'PWD=B$gt3@m#00512;'
'TrustServerCertificate=yes;'
)
# =========================================================
# CLICKHOUSE CONFIG
# =========================================================
CH_CONFIG = {
'host': '172.188.12.194',
'port': 8123,
'username': 'default',
'password': 'dipanshu_k',
'database': 'DaburIndia_BI'
}
# =========================================================
# TABLE DETAILS
# =========================================================
TABLE_NAME = 'SKU Master'
PROJECT_ID = 41654
# =========================================================
# SETTINGS
# =========================================================
TRUNCATE_BEFORE_LOAD = True
table_truncated = False
# =========================================================
# CLICKHOUSE DATE COLUMNS
# =========================================================
DATE_COLUMNS = [
'visit_date'
]
# =========================================================
# CLICKHOUSE DATETIME COLUMNS
# =========================================================
DATETIME_COLUMNS = [
'create_date',
'update_date'
]
# =========================================================
# CLEAN DATAFRAME
# =========================================================
def clean_dataframe(df):
try:
# ---------------------------------------------
# Replace NaN with None
# ---------------------------------------------
df = df.replace({np.nan: None})
# ---------------------------------------------
# Process Each Column
# ---------------------------------------------
for col in df.columns:
try:
print(f"\nCleaning Column : {col}")
# =====================================
# DATE COLUMNS
# =====================================
if col.lower() in [
x.lower() for x in DATE_COLUMNS
]:
print(f"Date Column : {col}")
df[col] = pd.to_datetime(
df[col],
errors='coerce'
)
cleaned_dates = []
for val in df[col]:
if pd.isnull(val):
cleaned_dates.append(None)
else:
cleaned_dates.append(
val.date()
)
df[col] = cleaned_dates
# =====================================
# DATETIME COLUMNS
# =====================================
elif col.lower() in [
x.lower() for x in DATETIME_COLUMNS
]:
print(f"DateTime Column : {col}")
df[col] = pd.to_datetime(
df[col],
errors='coerce'
)
cleaned_datetime = []
for val in df[col]:
if pd.isnull(val):
cleaned_datetime.append(None)
else:
cleaned_datetime.append(
val.to_pydatetime()
)
df[col] = cleaned_datetime
# =====================================
# INTEGER COLUMNS
# =====================================
elif pd.api.types.is_integer_dtype(df[col]):
print(f"Integer Column : {col}")
df[col] = pd.to_numeric(
df[col],
errors='coerce'
)
df[col] = df[col].apply(
lambda x:
int(x)
if pd.notnull(x)
else None
)
# =====================================
# FLOAT COLUMNS
# =====================================
elif pd.api.types.is_float_dtype(df[col]):
print(f"Float Column : {col}")
df[col] = pd.to_numeric(
df[col],
errors='coerce'
)
non_null = df[col].dropna()
# ---------------------------------
# Convert whole float to int
# Example:
# 12.0 -> 12
# ---------------------------------
if len(non_null) > 0 and (
(non_null % 1 == 0).all()
):
df[col] = df[col].apply(
lambda x:
int(x)
if pd.notnull(x)
else None
)
else:
df[col] = df[col].apply(
lambda x:
float(x)
if pd.notnull(x)
else None
)
# =====================================
# OBJECT / STRING COLUMNS
# =====================================
else:
print(f"String/Object Column : {col}")
cleaned = []
for val in df[col]:
try:
# -------------------------
# NULL
# -------------------------
if pd.isnull(val):
cleaned.append(None)
# -------------------------
# STRING
# -------------------------
elif isinstance(val, str):
cleaned.append(
val.strip()
)
# -------------------------
# BOOLEAN
# -------------------------
elif isinstance(val, bool):
cleaned.append(
int(val)
)
# -------------------------
# INTEGER
# -------------------------
elif isinstance(
val,
(
int,
np.integer
)
):
cleaned.append(
int(val)
)
# -------------------------
# FLOAT
# -------------------------
elif isinstance(
val,
(
float,
np.floating
)
):
if np.isnan(val):
cleaned.append(None)
else:
if val.is_integer():
cleaned.append(
int(val)
)
else:
cleaned.append(
float(val)
)
# -------------------------
# DATETIME
# -------------------------
elif isinstance(
val,
(
datetime,
pd.Timestamp
)
):
if isinstance(
val,
pd.Timestamp
):
cleaned.append(
val.to_pydatetime()
)
else:
cleaned.append(val)
# -------------------------
# DATE
# -------------------------
elif hasattr(val, 'year'):
cleaned.append(val)
# -------------------------
# OTHER
# -------------------------
else:
cleaned.append(
str(val)
)
except Exception as row_error:
print(
f"Row Cleaning Error "
f"in Column {col}"
)
print(str(row_error))
cleaned.append(None)
df[col] = cleaned
except Exception as col_error:
print("\n================================")
print(f"COLUMN FAILED : {col}")
print(str(col_error))
print("================================")
return df
except Exception as clean_error:
print("\n================================")
print("DATA CLEAN FAILED")
print(str(clean_error))
print(traceback.format_exc())
print("================================")
return df
# =========================================================
# MAIN PROCESS
# =========================================================
try:
# =====================================================
# CONNECT SQL SERVER
# =====================================================
print("\nConnecting SQL Server...")
sql_conn = pyodbc.connect(SQL_CONN_STR)
print("Connected to SQL Server")
# =====================================================
# CONNECT CLICKHOUSE
# =====================================================
print("\nConnecting ClickHouse...")
ch_client = clickhouse_connect.get_client(**CH_CONFIG)
print("Connected to ClickHouse")
# =====================================================
# QUERY
# =====================================================
query = f"""
SELECT *
FROM dbo.[{TABLE_NAME}]
WHERE Project_Id = {PROJECT_ID}
"""
print("\n====================================")
print("Executing Query")
print("====================================")
print(query)
# =====================================================
# CHUNK SIZE
# =====================================================
chunk_size = 100000
total_rows = 0
# =====================================================
# READ DATA
# =====================================================
for chunk in pd.read_sql(
query,
sql_conn,
chunksize=chunk_size
):
try:
print("\n====================================")
print(f"Processing Rows : {len(chunk)}")
print("====================================")
# =================================================
# CLEAN DATA
# =================================================
chunk = clean_dataframe(chunk)
# =================================================
# DEBUG COLUMN TYPES
# =================================================
print("\nCOLUMN TYPES")
for col in chunk.columns:
try:
sample = chunk[col].dropna()
if len(sample) > 0:
print(
col,
type(sample.iloc[0]),
sample.iloc[0]
)
except:
pass
# =================================================
# TRUNCATE TABLE
# =================================================
if TRUNCATE_BEFORE_LOAD and not table_truncated:
try:
print("\n====================================")
print(f"TRUNCATING : {TABLE_NAME}")
print("====================================")
truncate_query = f"""
TRUNCATE TABLE
`{CH_CONFIG['database']}`.`{TABLE_NAME}`
"""
print(truncate_query)
ch_client.command(
truncate_query
)
print(
"TABLE TRUNCATED SUCCESSFULLY"
)
table_truncated = True
except Exception as truncate_error:
print("\nTRUNCATE FAILED")
print(str(truncate_error))
raise
# =================================================
# INSERT DATA
# =================================================
try:
print("\n====================================")
print("INSERTING DATA INTO CLICKHOUSE")
print("====================================")
ch_client.insert_df(
table=f"`{TABLE_NAME}`",
df=chunk,
database=CH_CONFIG['database']
)
total_rows += len(chunk)
print(
f"\nTOTAL INSERTED : "
f"{total_rows}"
)
except Exception as insert_error:
print("\nINSERT FAILED")
print(str(insert_error))
traceback.print_exc()
# =============================================
# SAVE ERROR LOG
# =============================================
with open(
"clickhouse_insert_error.log",
"a",
encoding="utf-8"
) as log:
log.write(
"\n\n================================"
)
log.write(
f"\nTIME : {datetime.now()}"
)
log.write(
f"\nTABLE : {TABLE_NAME}"
)
log.write(
f"\nERROR : {str(insert_error)}"
)
log.write(
f"\nTRACEBACK :\n"
f"{traceback.format_exc()}"
)
log.write(
"\n================================"
)
continue
except Exception as chunk_error:
print("\n====================================")
print("CHUNK PROCESS FAILED")
print("====================================")
print(str(chunk_error))
traceback.print_exc()
continue
print("\n====================================")
print("ETL COMPLETED SUCCESSFULLY")
print(f"TOTAL ROWS INSERTED : {total_rows}")
print("====================================")
# =========================================================
# MAIN ERROR
# =========================================================
except Exception as main_error:
print("\n====================================")
print("MAIN ERROR")
print("====================================")
print(str(main_error))
traceback.print_exc()
with open(
"clickhouse_main_error.log",
"a",
encoding="utf-8"
) as log:
log.write(
"\n\n================================"
)
log.write(
f"\nTIME : {datetime.now()}"
)
log.write(
f"\nERROR : {str(main_error)}"
)
log.write(
f"\nTRACEBACK :\n"
f"{traceback.format_exc()}"
)
log.write(
"\n================================"
)
# =========================================================
# CLOSE CONNECTIONS
# =========================================================
finally:
try:
sql_conn.close()
print("\nSQL Server Connection Closed")
except:
pass
try:
ch_client.close()
print("ClickHouse Connection Closed")
except:
pass
print("\n====================================")
print("ETL Finished :", datetime.now())
print("====================================")