454 lines
12 KiB
Python
454 lines
12 KiB
Python
|
|
import pyodbc
|
|
import pandas as pd
|
|
import clickhouse_connect
|
|
import numpy as np
|
|
from datetime import datetime
|
|
import traceback
|
|
import warnings
|
|
|
|
# =========================================================
|
|
# IGNORE WARNINGS
|
|
# =========================================================
|
|
warnings.filterwarnings(
|
|
'ignore',
|
|
'pandas only supports SQLAlchemy connectable'
|
|
)
|
|
|
|
print("ETL Started :", datetime.now())
|
|
|
|
# =========================================================
|
|
# SQL SERVER CONNECTION
|
|
# =========================================================
|
|
SQL_CONN_STR = (
|
|
'DRIVER={ODBC Driver 17 for SQL Server};'
|
|
'SERVER=10.200.25.65;'
|
|
'DATABASE=CPMIndiaBusinessInsight;'
|
|
'UID=bsgteam_test;'
|
|
'PWD=B$gt3@m#00512;'
|
|
'TrustServerCertificate=yes;'
|
|
)
|
|
|
|
# =========================================================
|
|
# CLICKHOUSE CONFIG
|
|
# =========================================================
|
|
CH_CONFIG = {
|
|
'host': '172.188.12.194',
|
|
'port': 8123,
|
|
'username': 'default',
|
|
'password': 'dipanshu_k',
|
|
'database': 'DaburIndia_BI'
|
|
}
|
|
|
|
# =========================================================
|
|
# TABLE NAME
|
|
# =========================================================
|
|
TABLE_NAME = ' additional_visibility'
|
|
PROJECT_ID = 41654
|
|
|
|
# =========================================================
|
|
# CLEAN DATAFRAME
|
|
# =========================================================
|
|
def clean_dataframe(df):
|
|
|
|
try:
|
|
|
|
# ---------------------------------------------
|
|
# Replace NaN
|
|
# ---------------------------------------------
|
|
df = df.replace({np.nan: None})
|
|
|
|
# ---------------------------------------------
|
|
# Process Column Wise
|
|
# ---------------------------------------------
|
|
for col in df.columns:
|
|
|
|
try:
|
|
|
|
col_lower = col.lower()
|
|
|
|
# =====================================
|
|
# DATE COLUMNS
|
|
# =====================================
|
|
if 'date' in col_lower:
|
|
|
|
print(f"Cleaning Date Column : {col}")
|
|
|
|
df[col] = pd.to_datetime(
|
|
df[col],
|
|
errors='coerce'
|
|
)
|
|
|
|
# Remove invalid dates
|
|
df[col] = df[col].where(
|
|
(df[col].dt.year >= 1970) &
|
|
(df[col].dt.year <= 2100)
|
|
)
|
|
|
|
# Convert to Python Date
|
|
df[col] = df[col].apply(
|
|
lambda x:
|
|
x.date()
|
|
if pd.notnull(x)
|
|
else None
|
|
)
|
|
|
|
# =====================================
|
|
# INTEGER COLUMNS
|
|
# =====================================
|
|
elif pd.api.types.is_integer_dtype(df[col]):
|
|
|
|
df[col] = pd.to_numeric(
|
|
df[col],
|
|
errors='coerce'
|
|
)
|
|
|
|
df[col] = df[col].apply(
|
|
lambda x:
|
|
int(x)
|
|
if pd.notnull(x)
|
|
else None
|
|
)
|
|
|
|
# =====================================
|
|
# FLOAT COLUMNS
|
|
# =====================================
|
|
elif pd.api.types.is_float_dtype(df[col]):
|
|
|
|
non_null = df[col].dropna()
|
|
|
|
# ---------------------------------
|
|
# Convert whole float to int
|
|
# Example:
|
|
# 3240.0 -> 3240
|
|
# ---------------------------------
|
|
if len(non_null) > 0 and (
|
|
(non_null % 1 == 0).all()
|
|
):
|
|
|
|
df[col] = df[col].apply(
|
|
lambda x:
|
|
int(x)
|
|
if pd.notnull(x)
|
|
else None
|
|
)
|
|
|
|
else:
|
|
|
|
df[col] = df[col].apply(
|
|
lambda x:
|
|
float(x)
|
|
if pd.notnull(x)
|
|
else None
|
|
)
|
|
|
|
# =====================================
|
|
# OBJECT / STRING COLUMNS
|
|
# =====================================
|
|
else:
|
|
|
|
cleaned = []
|
|
|
|
for val in df[col]:
|
|
|
|
# NULL
|
|
if pd.isnull(val):
|
|
|
|
cleaned.append(None)
|
|
|
|
# INTEGER
|
|
elif isinstance(
|
|
val,
|
|
(
|
|
int,
|
|
np.integer
|
|
)
|
|
):
|
|
|
|
cleaned.append(int(val))
|
|
|
|
# FLOAT
|
|
elif isinstance(
|
|
val,
|
|
(
|
|
float,
|
|
np.floating
|
|
)
|
|
):
|
|
|
|
if np.isnan(val):
|
|
|
|
cleaned.append(None)
|
|
|
|
else:
|
|
|
|
# IMPORTANT FIX
|
|
# Avoid '3240.0' string issue
|
|
if val.is_integer():
|
|
|
|
cleaned.append(int(val))
|
|
|
|
else:
|
|
|
|
cleaned.append(float(val))
|
|
|
|
# STRING
|
|
elif isinstance(val, str):
|
|
|
|
cleaned.append(val.strip())
|
|
|
|
# BOOLEAN
|
|
elif isinstance(val, bool):
|
|
|
|
cleaned.append(int(val))
|
|
|
|
# DATETIME
|
|
elif isinstance(
|
|
val,
|
|
(
|
|
datetime,
|
|
pd.Timestamp
|
|
)
|
|
):
|
|
|
|
cleaned.append(str(val))
|
|
|
|
# OTHER
|
|
else:
|
|
|
|
cleaned.append(str(val))
|
|
|
|
df[col] = cleaned
|
|
|
|
except Exception as col_error:
|
|
|
|
print("\n================================")
|
|
print(f"COLUMN FAILED : {col}")
|
|
print(str(col_error))
|
|
print("================================")
|
|
|
|
return df
|
|
|
|
except Exception as clean_error:
|
|
|
|
print("\n================================")
|
|
print("DATA CLEAN FAILED")
|
|
print(str(clean_error))
|
|
print("================================")
|
|
|
|
return df
|
|
|
|
# =========================================================
|
|
# MAIN PROCESS
|
|
# =========================================================
|
|
try:
|
|
|
|
# =====================================================
|
|
# CONNECT SQL SERVER
|
|
# =====================================================
|
|
sql_conn = pyodbc.connect(SQL_CONN_STR)
|
|
|
|
print("Connected to SQL Server")
|
|
|
|
# =====================================================
|
|
# CONNECT CLICKHOUSE
|
|
# =====================================================
|
|
ch_client = clickhouse_connect.get_client(**CH_CONFIG)
|
|
|
|
print("Connected to ClickHouse")
|
|
|
|
# =====================================================
|
|
# QUERY
|
|
# =====================================================
|
|
query = f"""
|
|
SELECT *
|
|
FROM dbo.[{TABLE_NAME}]
|
|
WHERE Project_Id = {PROJECT_ID}
|
|
"""
|
|
|
|
print("\nExecuting Query:")
|
|
print(query)
|
|
|
|
# =====================================================
|
|
# CHUNK SIZE
|
|
# =====================================================
|
|
chunk_size = 100000
|
|
|
|
total_rows = 0
|
|
|
|
# =====================================================
|
|
# READ DATA
|
|
# =====================================================
|
|
for chunk in pd.read_sql(
|
|
query,
|
|
sql_conn,
|
|
chunksize=chunk_size
|
|
):
|
|
|
|
try:
|
|
|
|
print("\n================================")
|
|
print(f"Processing {len(chunk)} Rows")
|
|
print("================================")
|
|
|
|
# =================================================
|
|
# CLEAN DATA
|
|
# =================================================
|
|
chunk = clean_dataframe(chunk)
|
|
|
|
# =================================================
|
|
# DEBUG COLUMN TYPES
|
|
# =================================================
|
|
print("\nCOLUMN TYPES")
|
|
|
|
for col in chunk.columns:
|
|
|
|
sample = chunk[col].dropna()
|
|
|
|
if len(sample) > 0:
|
|
|
|
print(
|
|
col,
|
|
type(sample.iloc[0]),
|
|
sample.iloc[0]
|
|
)
|
|
|
|
# =================================================
|
|
# SAMPLE DATA
|
|
# =================================================
|
|
print("\nSAMPLE DATA")
|
|
print(chunk.head(2))
|
|
|
|
# =================================================
|
|
# INSERT INTO CLICKHOUSE
|
|
# =================================================
|
|
print("\nInserting into ClickHouse...")
|
|
|
|
ch_client.insert_df(
|
|
table=TABLE_NAME,
|
|
df=chunk,
|
|
database=CH_CONFIG['database']
|
|
)
|
|
|
|
total_rows += len(chunk)
|
|
|
|
print(
|
|
f"\nInserted Total Rows : {total_rows}"
|
|
)
|
|
|
|
except Exception as chunk_error:
|
|
|
|
print("\n================================")
|
|
print("CHUNK INSERT FAILED")
|
|
print("================================")
|
|
|
|
print(str(chunk_error))
|
|
|
|
traceback.print_exc()
|
|
|
|
# =============================================
|
|
# SAVE ERROR LOG
|
|
# =============================================
|
|
with open(
|
|
"clickhouse_chunk_error.log",
|
|
"a",
|
|
encoding="utf-8"
|
|
) as log:
|
|
|
|
log.write(
|
|
"\n\n================================"
|
|
)
|
|
|
|
log.write(
|
|
f"\nTIME : {datetime.now()}"
|
|
)
|
|
|
|
log.write(
|
|
f"\nTABLE : {TABLE_NAME}"
|
|
)
|
|
|
|
log.write(
|
|
f"\nERROR : {str(chunk_error)}"
|
|
)
|
|
|
|
log.write(
|
|
f"\nTRACEBACK :\n"
|
|
f"{traceback.format_exc()}"
|
|
)
|
|
|
|
log.write(
|
|
"\n================================"
|
|
)
|
|
|
|
continue
|
|
|
|
print("\n================================")
|
|
print("ETL COMPLETED SUCCESSFULLY")
|
|
print(f"TOTAL ROWS INSERTED : {total_rows}")
|
|
print("================================")
|
|
|
|
# =========================================================
|
|
# MAIN ERROR
|
|
# =========================================================
|
|
except Exception as main_error:
|
|
|
|
print("\n================================")
|
|
print("MAIN ERROR")
|
|
print("================================")
|
|
|
|
print(str(main_error))
|
|
|
|
traceback.print_exc()
|
|
|
|
with open(
|
|
"clickhouse_main_error.log",
|
|
"a",
|
|
encoding="utf-8"
|
|
) as log:
|
|
|
|
log.write(
|
|
"\n\n================================"
|
|
)
|
|
|
|
log.write(
|
|
f"\nTIME : {datetime.now()}"
|
|
)
|
|
|
|
log.write(
|
|
f"\nERROR : {str(main_error)}"
|
|
)
|
|
|
|
log.write(
|
|
f"\nTRACEBACK :\n"
|
|
f"{traceback.format_exc()}"
|
|
)
|
|
|
|
log.write(
|
|
"\n================================"
|
|
)
|
|
|
|
# =========================================================
|
|
# CLOSE CONNECTIONS
|
|
# =========================================================
|
|
finally:
|
|
|
|
try:
|
|
|
|
sql_conn.close()
|
|
|
|
print("\nSQL Server Connection Closed")
|
|
|
|
except:
|
|
pass
|
|
|
|
try:
|
|
|
|
ch_client.close()
|
|
|
|
print("ClickHouse Connection Closed")
|
|
|
|
except:
|
|
pass
|
|
|
|
print("\nETL Finished :", datetime.now())
|