first commit

This commit is contained in:
Dipanshu Kumar
2026-05-18 16:26:29 +05:30
commit 981995396a
8 changed files with 4405 additions and 0 deletions
+376
View File
@@ -0,0 +1,376 @@
import pyodbc
import pandas as pd
import clickhouse_connect
import numpy as np
from datetime import datetime
import traceback
import warnings
# ---------------------------------------------------
# Ignore Warning
# ---------------------------------------------------
warnings.filterwarnings(
'ignore',
'pandas only supports SQLAlchemy connectable'
)
print("ETL Started :", datetime.now())
# ---------------------------------------------------
# SQL SERVER CONNECTION
# ---------------------------------------------------
SQL_CONN_STR = (
'DRIVER={ODBC Driver 17 for SQL Server};'
'SERVER=10.200.25.65;'
'DATABASE=CPMIndiaBusinessInsight;'
'UID=bsgteam_test;'
'PWD=B$gt3@m#00512;'
'TrustServerCertificate=yes;'
)
# ---------------------------------------------------
# CLICKHOUSE CONFIG
# ---------------------------------------------------
CH_CONFIG = {
'host': '172.188.12.194',
'port': 8123,
'username': 'default',
'password': 'dipanshu_k',
'database': 'DaburIndia_BI'
}
# ---------------------------------------------------
# TABLE DETAILS
# ---------------------------------------------------
TABLE_NAME = 'Employee_Master'
PROJECT_ID = 41654
# ---------------------------------------------------
# CLEAN DATAFRAME
# ---------------------------------------------------
def clean_dataframe(df):
try:
# Replace NaN
df = df.replace({np.nan: None})
for col in df.columns:
try:
# ---------------------------------------------------
# HANDLE DATE COLUMNS
# ---------------------------------------------------
if 'date' in col.lower():
print(f"Cleaning Date Column : {col}")
# Convert to datetime
df[col] = pd.to_datetime(
df[col],
errors='coerce'
)
# Remove invalid dates
df[col] = df[col].where(
(df[col].dt.year >= 1970) &
(df[col].dt.year <= 2100)
)
# Convert to Python Date
df[col] = df[col].apply(
lambda x:
x.date()
if pd.notnull(x)
else None
)
# ---------------------------------------------------
# FORCE OBJECT COLUMNS TO STRING
# ---------------------------------------------------
else:
cleaned_col = []
for val in df[col]:
# NULL
if pd.isnull(val):
cleaned_col.append(None)
# STRING
elif isinstance(val, str):
cleaned_col.append(val)
# INTEGER
elif isinstance(
val,
(
int,
np.integer
)
):
cleaned_col.append(int(val))
# FLOAT
elif isinstance(
val,
(
float,
np.floating
)
):
cleaned_col.append(str(val))
# BOOLEAN
elif isinstance(val, bool):
cleaned_col.append(str(val))
# DATETIME
elif isinstance(
val,
(
datetime,
pd.Timestamp
)
):
cleaned_col.append(str(val))
# OTHER
else:
cleaned_col.append(str(val))
df[col] = cleaned_col
except Exception as col_error:
print("\n===================================")
print(f"COLUMN FAILED : {col}")
print(str(col_error))
print("===================================")
return df
except Exception as clean_error:
print("\n===================================")
print("DATA CLEAN FAILED")
print(str(clean_error))
print("===================================")
return df
try:
# ---------------------------------------------------
# CONNECT SQL SERVER
# ---------------------------------------------------
sql_conn = pyodbc.connect(SQL_CONN_STR)
print("Connected to SQL Server")
# ---------------------------------------------------
# CONNECT CLICKHOUSE
# ---------------------------------------------------
ch_client = clickhouse_connect.get_client(**CH_CONFIG)
print("Connected to ClickHouse")
# ---------------------------------------------------
# QUERY
# ---------------------------------------------------
query = f"""
SELECT *
FROM dbo.[{TABLE_NAME}]
WHERE Project_Id = {PROJECT_ID}
"""
print("\nExecuting Query:")
print(query)
# ---------------------------------------------------
# CHUNK SIZE
# ---------------------------------------------------
chunk_size = 100000
total_rows = 0
# ---------------------------------------------------
# READ DATA
# ---------------------------------------------------
for chunk in pd.read_sql(
query,
sql_conn,
chunksize=chunk_size
):
try:
print("\n===================================")
print(f"Processing {len(chunk)} Rows")
print("===================================")
# ---------------------------------------------------
# CLEAN DATA
# ---------------------------------------------------
chunk = clean_dataframe(chunk)
# ---------------------------------------------------
# FINAL DEBUG
# ---------------------------------------------------
print("\nFINAL COLUMN TYPES")
for col in chunk.columns:
sample = chunk[col].dropna()
if len(sample) > 0:
print(
col,
type(sample.iloc[0]),
sample.iloc[0]
)
# ---------------------------------------------------
# SAMPLE DATA
# ---------------------------------------------------
print("\nSAMPLE DATA")
print(chunk.head(2))
# ---------------------------------------------------
# INSERT INTO CLICKHOUSE
# ---------------------------------------------------
print("\nInserting into ClickHouse...")
ch_client.insert_df(
table=TABLE_NAME,
df=chunk,
database=CH_CONFIG['database']
)
total_rows += len(chunk)
print(f"\nInserted Total Rows : {total_rows}")
except Exception as chunk_error:
print("\n===================================")
print("CHUNK INSERT FAILED")
print("===================================")
print(str(chunk_error))
traceback.print_exc()
# ---------------------------------------------------
# SAVE ERROR LOG
# ---------------------------------------------------
with open(
"clickhouse_chunk_error.log",
"a",
encoding="utf-8"
) as log:
log.write(
"\n\n====================================="
)
log.write(
f"\nTIME : {datetime.now()}"
)
log.write(
f"\nTABLE : {TABLE_NAME}"
)
log.write(
f"\nERROR : {str(chunk_error)}"
)
log.write(
f"\nTRACEBACK :\n{traceback.format_exc()}"
)
log.write(
"\n====================================="
)
continue
print("\n===================================")
print("ETL COMPLETED SUCCESSFULLY")
print(f"TOTAL ROWS INSERTED : {total_rows}")
print("===================================")
except Exception as main_error:
print("\n===================================")
print("MAIN ERROR")
print("===================================")
print(str(main_error))
traceback.print_exc()
with open(
"clickhouse_main_error.log",
"a",
encoding="utf-8"
) as log:
log.write(
"\n\n====================================="
)
log.write(
f"\nTIME : {datetime.now()}"
)
log.write(
f"\nERROR : {str(main_error)}"
)
log.write(
f"\nTRACEBACK :\n{traceback.format_exc()}"
)
log.write(
"\n====================================="
)
finally:
# ---------------------------------------------------
# CLOSE SQL SERVER
# ---------------------------------------------------
try:
sql_conn.close()
print("\nSQL Server Connection Closed")
except:
pass
# ---------------------------------------------------
# CLOSE CLICKHOUSE
# ---------------------------------------------------
try:
ch_client.close()
print("ClickHouse Connection Closed")
except:
pass
print("\nETL Finished :", datetime.now())