Compare commits

...

15 Commits

Author SHA1 Message Date
dipanshuk 6b5ca49733 Login_Sup Import Data 2026-05-26 16:16:31 +05:30
dipanshuk a7b200dc26 Calculation for clickhouse data 2026-05-26 14:56:02 +05:30
dipanshuk ac3e021825 Calculation for clickhouse data 2026-05-25 12:37:52 +05:30
dipanshuk caffe4f4db ddl file for sql 2026-05-25 12:37:19 +05:30
dipanshuk 990719ec23 ddl file for sql 2026-05-25 12:36:26 +05:30
dipanshuk 368def830d ddl file for sql 2026-05-25 12:35:37 +05:30
dipanshuk 352bba7476 Calculation for clickhouse data 2026-05-25 12:32:22 +05:30
dipanshuk b0f7ddcdb4 Attendance_Sup Import Data 2026-05-25 11:40:35 +05:30
dipanshuk c78b02e737 PaidVisibility_Compliance Import Data 2026-05-23 11:23:09 +05:30
dipanshuk d4b68496d5 TEST 2026-05-22 12:08:06 +05:30
Dipanshu Kumar 0f4783e1da Promotion Import 2026-05-20 16:49:56 +05:30
Dipanshu Kumar e9029b38bc PaidVisibility Import 2026-05-20 15:02:44 +05:30
Dipanshu Kumar 5383e84fe1 Mapping_StorePromotion Import 2026-05-20 13:11:22 +05:30
Dipanshu Kumar 579d59e1b0 Journey_Plan data Import 2026-05-20 12:40:47 +05:30
Dipanshu Kumar 3be8cd7259 [Web Logins] Import 2026-05-19 16:58:54 +05:30
26 changed files with 26209 additions and 5138 deletions
+605
View File
@@ -0,0 +1,605 @@
import pyodbc
import pandas as pd
import clickhouse_connect
import numpy as np
from datetime import datetime
import traceback
import warnings
import time
# =========================================================
# IGNORE WARNINGS
# =========================================================
warnings.filterwarnings(
'ignore',
'pandas only supports SQLAlchemy connectable'
)
print("\n================================================")
print("ETL STARTED :", datetime.now())
print("================================================")
# =========================================================
# SQL SERVER CONNECTION STRING
# =========================================================
SQL_CONN_STR = (
'DRIVER={ODBC Driver 17 for SQL Server};'
'SERVER=10.200.25.65;'
'DATABASE=CPMIndiaBusinessInsight;'
'UID=bsgteam_test;'
'PWD=B$gt3@m#00512;'
'TrustServerCertificate=yes;'
'Connection Timeout=60;'
)
# =========================================================
# CLICKHOUSE CONFIG
# =========================================================
CH_CONFIG = {
'host': '172.188.12.194',
'port': 8123,
'username': 'default',
'password': 'dipanshu_k',
'database': 'DaburIndia_BI'
}
# =========================================================
# TABLE DETAILS
# =========================================================
TABLE_NAME = 'Attendance_Sup'
PROJECT_ID = 41654
# =========================================================
# SETTINGS
# =========================================================
TRUNCATE_BEFORE_LOAD = True
table_truncated = False
# =========================================================
# CHUNK SIZE
# =========================================================
chunk_size = 25000
# =========================================================
# CONNECT SQL SERVER
# =========================================================
def connect_sql():
try:
conn = pyodbc.connect(
SQL_CONN_STR,
autocommit=True
)
print("Connected SQL Server")
return conn
except Exception as e:
print("SQL CONNECTION FAILED")
print(str(e))
raise
# =========================================================
# CONNECT CLICKHOUSE
# =========================================================
def connect_clickhouse():
try:
client = clickhouse_connect.get_client(
**CH_CONFIG
)
print("Connected ClickHouse")
return client
except Exception as e:
print("CLICKHOUSE CONNECTION FAILED")
print(str(e))
raise
# =========================================================
# CLEAN DATAFRAME
# =========================================================
def clean_dataframe(df):
try:
# ---------------------------------------------
# Replace NaN
# ---------------------------------------------
df = df.replace({np.nan: None})
# ---------------------------------------------
# Process Column Wise
# ---------------------------------------------
for col in df.columns:
try:
print(f"\nCleaning Column : {col}")
# =====================================
# AUTO DETECT DATE / TIME COLUMNS
# =====================================
if (
'date' in col.lower()
or 'time' in col.lower()
):
print(f"Date Column : {col}")
df[col] = pd.to_datetime(
df[col],
errors='coerce'
)
cleaned_dates = []
for val in df[col]:
if pd.isnull(val):
cleaned_dates.append(None)
else:
cleaned_dates.append(
val.to_pydatetime()
)
df[col] = cleaned_dates
# =====================================
# INTEGER COLUMNS
# =====================================
elif pd.api.types.is_integer_dtype(df[col]):
print(f"Integer Column : {col}")
df[col] = pd.to_numeric(
df[col],
errors='coerce'
)
df[col] = df[col].apply(
lambda x:
int(x)
if pd.notnull(x)
else None
)
# =====================================
# FLOAT COLUMNS
# =====================================
elif pd.api.types.is_float_dtype(df[col]):
print(f"Float Column : {col}")
df[col] = pd.to_numeric(
df[col],
errors='coerce'
)
df[col] = df[col].apply(
lambda x:
float(x)
if pd.notnull(x)
else None
)
# =====================================
# OBJECT / STRING COLUMNS
# =====================================
else:
print(f"String/Object Column : {col}")
cleaned = []
for val in df[col]:
# NULL
if pd.isnull(val):
cleaned.append(None)
# DATETIME
elif isinstance(
val,
(
datetime,
pd.Timestamp
)
):
if isinstance(
val,
pd.Timestamp
):
cleaned.append(
val.to_pydatetime()
)
else:
cleaned.append(val)
# INTEGER
elif isinstance(
val,
(
int,
np.integer
)
):
cleaned.append(
int(val)
)
# FLOAT
elif isinstance(
val,
(
float,
np.floating
)
):
if np.isnan(val):
cleaned.append(None)
else:
cleaned.append(
float(val)
)
# BOOLEAN
elif isinstance(
val,
bool
):
cleaned.append(
int(val)
)
# STRING
else:
cleaned.append(
str(val).strip()
)
df[col] = cleaned
except Exception as col_error:
print("\n================================")
print(f"COLUMN FAILED : {col}")
print(str(col_error))
print("================================")
return df
except Exception as clean_error:
print("\n================================")
print("DATA CLEAN FAILED")
print(str(clean_error))
print("================================")
return df
# =========================================================
# MAIN PROCESS
# =========================================================
try:
# =====================================================
# CONNECT SQL SERVER
# =====================================================
sql_conn = connect_sql()
# =====================================================
# CONNECT CLICKHOUSE
# =====================================================
ch_client = connect_clickhouse()
# =====================================================
# QUERY
# =====================================================
query = f"""
SELECT *
FROM dbo.[{TABLE_NAME}]
WHERE Project_Id = {PROJECT_ID}
"""
print("\nExecuting Query")
print(query)
# =====================================================
# RETRY SETTINGS
# =====================================================
retry_count = 0
max_retry = 5
# =====================================================
# MAIN RETRY LOOP
# =====================================================
while retry_count < max_retry:
try:
# =============================================
# READ SQL DATA
# =============================================
for chunk in pd.read_sql(
query,
sql_conn,
chunksize=chunk_size
):
try:
print("\n================================")
print(
f"Processing Rows : "
f"{len(chunk)}"
)
print("================================")
# =====================================
# CLEAN DATA
# =====================================
chunk = clean_dataframe(chunk)
# =====================================
# DEBUG COLUMN TYPES
# =====================================
print("\nCOLUMN DATATYPES")
print(chunk.dtypes)
print("\nCOLUMN SAMPLE TYPES")
for col in chunk.columns:
sample = chunk[col].dropna()
if len(sample) > 0:
print(
col,
type(sample.iloc[0]),
sample.iloc[0]
)
# =====================================
# TRUNCATE TABLE FIRST TIME ONLY
# =====================================
if (
TRUNCATE_BEFORE_LOAD
and
not table_truncated
):
print("\n================================")
print(
f"TRUNCATING TABLE : "
f"{TABLE_NAME}"
)
print("================================")
# IMPORTANT FIX
truncate_query = f"""
TRUNCATE TABLE
`{CH_CONFIG['database']}`.`{TABLE_NAME}`
"""
ch_client.command(
truncate_query
)
print(
"TABLE TRUNCATED SUCCESSFULLY"
)
table_truncated = True
# =====================================
# INSERT INTO CLICKHOUSE
# =====================================
print(
"\nINSERTING INTO CLICKHOUSE..."
)
ch_client.insert_df(
table=TABLE_NAME,
df=chunk,
database=CH_CONFIG['database']
)
print(
f"INSERTED : "
f"{len(chunk)} ROWS"
)
except Exception as insert_error:
print("\n================================")
print("INSERT FAILED")
print("================================")
print(str(insert_error))
traceback.print_exc()
# =================================
# SAVE ERROR LOG
# =================================
with open(
"insert_error.log",
"a",
encoding="utf-8"
) as log:
log.write(
"\n\n================================"
)
log.write(
f"\nTIME : "
f"{datetime.now()}"
)
log.write(
f"\nTABLE : "
f"{TABLE_NAME}"
)
log.write(
f"\nERROR : "
f"{str(insert_error)}"
)
log.write(
f"\nTRACEBACK :\n"
f"{traceback.format_exc()}"
)
log.write(
"\n================================"
)
continue
# =============================================
# SUCCESS
# =============================================
break
# =================================================
# SQL CONNECTION FAILURE
# =================================================
except pyodbc.OperationalError as op_error:
retry_count += 1
print("\n================================")
print(
f"SQL CONNECTION LOST "
f"- RETRY {retry_count}"
)
print("================================")
print(str(op_error))
time.sleep(10)
try:
sql_conn.close()
except:
pass
# RECONNECT SQL
sql_conn = connect_sql()
# =================================================
# OTHER ERROR
# =================================================
except Exception as loop_error:
print("\n================================")
print("MAIN LOOP ERROR")
print("================================")
print(str(loop_error))
traceback.print_exc()
break
print("\n================================")
print("ETL COMPLETED SUCCESSFULLY")
print("================================")
# =========================================================
# MAIN ERROR
# =========================================================
except Exception as main_error:
print("\n================================")
print("MAIN ERROR")
print("================================")
print(str(main_error))
traceback.print_exc()
with open(
"main_error.log",
"a",
encoding="utf-8"
) as log:
log.write(
"\n\n================================"
)
log.write(
f"\nTIME : {datetime.now()}"
)
log.write(
f"\nERROR : {str(main_error)}"
)
log.write(
f"\nTRACEBACK :\n"
f"{traceback.format_exc()}"
)
log.write(
"\n================================"
)
# =========================================================
# CLOSE CONNECTIONS
# =========================================================
finally:
try:
sql_conn.close()
print("\nSQL SERVER CONNECTION CLOSED")
except:
pass
try:
ch_client.close()
print("CLICKHOUSE CONNECTION CLOSED")
except:
pass
print("\n================================================")
print("ETL FINISHED :", datetime.now())
print("================================================")
+29
View File
@@ -0,0 +1,29 @@
--I want to calculate OSA% WHERE use column skuavailability
--:- OSA% calculation =
--1. apply filter on MSL where msl=1
--2. calculate Y+N = count all the records of skuavailability
--3. calculate Y = count of all the Y in skuavailability
--4. OSA% calculation = (Y/ Y+N)*100
-- For Numinator
SELECT * FROM "DaburIndia_BI"."Stock_Details" WHERE "MSL" = '1' AND "skuavailability" = 'Y' and
"VisitDate" between '2026-04-01' and '2026-04-30';
-- For Demunitor
SELECT * FROM "DaburIndia_BI"."Stock_Details" WHERE "MSL" = '1' AND skuavailability IN ('Y','N')
"VisitDate" between '2026-04-01' and '2026-04-30';
-- OSA % FOR MonthWise
SELECT SUM(CASE WHEN MSL = '1' AND skuavailability IN ('Y','N') THEN 1 ELSE 0 END) AS Total_Y_N,
SUM(CASE WHEN MSL = '1' AND skuavailability = 'Y' THEN 1 ELSE 0 END) AS Total_Y,
ROUND(SUM(CASE WHEN MSL = '1' AND skuavailability = 'Y' THEN 1 ELSE 0 END) * 100.0/NULLIF(SUM(CASE
WHEN MSL = '1' AND skuavailability IN ('Y','N') THEN 1 ELSE 0 END),0),2) AS OSA_Percentage
FROM DaburIndia_BI.Stock_Details
where "VisitDate" between '2026-04-01' and '2026-04-01';
+14
View File
@@ -0,0 +1,14 @@
SELECT
store_id,
SUM(IF( msl = '1' AND present IN ('Y', 'N'), 1, 0)) AS Total_Y_N,
SUM(IF(msl = '1' AND present = 'Y', 1, 0)) AS Total_Y,
ROUND(
(SUM(IF(msl = '1' AND present = 'Y', 1, 0)) * 100.0) /
NULLIF(SUM(IF(msl = '1' AND present IN ('Y', 'N'), 1, 0)), 0),2
) AS Promo_Percentage
FROM DaburIndia_BI.Promotion
--WHERE visit_date >= '2026-04-01'
GROUP BY store_id
HAVING SUM(IF(msl = '1' AND present IN ('Y', 'N'), 1, 0)) > 0
ORDER BY store_id;
@@ -0,0 +1,114 @@
ALTER TABLE DaburIndia_BI.Promotion
MODIFY COLUMN MID Nullable(Int64)
COMMENT 'Unique master transaction ID (depends on unique combination of employee_id, store_id, and visit_date)';
ALTER TABLE DaburIndia_BI.Promotion
MODIFY COLUMN pk Int64
COMMENT 'Primary key identifier';
ALTER TABLE DaburIndia_BI.Promotion
MODIFY COLUMN store_id Int32
COMMENT 'Unique store identifier';
ALTER TABLE DaburIndia_BI_New.Promotion
MODIFY COLUMN visit_date Date32
COMMENT 'Date of store visit';
ALTER TABLE DaburIndia_BI.Promotion
MODIFY COLUMN employee_id Int32
COMMENT 'Employee unique identifier';
ALTER TABLE DaburIndia_BI_New.Promotion
MODIFY COLUMN supervisor_id Int32
COMMENT 'Supervisor unique identifier';
ALTER TABLE DaburIndia_BI_New.Promotion
MODIFY COLUMN channel_id Int32
COMMENT 'Store channel identifier';
ALTER TABLE DaburIndia_BI_New.Promotion
MODIFY COLUMN chain_id Int32
COMMENT 'Store chain identifier';
ALTER TABLE DaburIndia_BI_New.Promotion
MODIFY COLUMN storetype_id Int32
COMMENT 'Store type identifier';
ALTER TABLE DaburIndia_BI_New.Promotion
MODIFY COLUMN promo_definition_id Nullable(Int32)
COMMENT 'Promotion definition identifier';
ALTER TABLE DaburIndia_BI_New.Promotion
MODIFY COLUMN promo_definition_name Nullable(Int32)
COMMENT 'Promotion definition identifier';
ALTER TABLE DaburIndia_BI_New.Promotion
MODIFY COLUMN promo_definition_name Nullable(String)
COMMENT 'Promotion scheme or offer name';
ALTER TABLE DaburIndia_BI_New.Promotion
MODIFY COLUMN promotion_deatils_id Nullable(Int32)
COMMENT 'Promotion detail unique identifier depends on the promotion_details table';
ALTER TABLE DaburIndia_BI_New.Promotion
MODIFY COLUMN promotion_deatils Nullable(String)
COMMENT 'Promotion detail name of the table/subcategory ';
ALTER TABLE DaburIndia_BI_New.Promotion
MODIFY COLUMN promotion_value_id Nullable(Int32)
COMMENT 'Promotion offer value/SubCategory Id';
ALTER TABLE DaburIndia_BI_New.Promotion
MODIFY COLUMN promotion_value_name Nullable(String)
COMMENT 'Live promotion value name/SubCategory name ';
ALTER TABLE DaburIndia_BI_New.Promotion
MODIFY COLUMN promotion_type Nullable(String)
COMMENT 'Type of promotion';
ALTER TABLE DaburIndia_BI_New.Promotion
MODIFY COLUMN present Nullable(String)
COMMENT 'Promotion availability status (Y/N)';
ALTER TABLE DaburIndia_BI_New.Promotion
MODIFY COLUMN reason Nullable(String)
COMMENT 'Reason for non-availability';
ALTER TABLE DaburIndia_BI_New.Promotion
MODIFY COLUMN PromoQuestion Nullable(String)
COMMENT 'Promotion related questionnaire';
ALTER TABLE DaburIndia_BI_New.Promotion
MODIFY COLUMN image1 Nullable(String)
COMMENT 'First promotion image path';
ALTER TABLE DaburIndia_BI_New.Promotion
MODIFY COLUMN image2 Nullable(String)
COMMENT 'Second promotion image path';
ALTER TABLE DaburIndia_BI_New.Promotion
MODIFY COLUMN update_date DateTime64(3)
COMMENT 'Record update date';
ALTER TABLE DaburIndia_BI_New.Promotion
MODIFY COLUMN update_by Nullable(String)
COMMENT 'User who updated record';
ALTER TABLE DaburIndia_BI_New.Promotion
MODIFY COLUMN ReasonId Nullable(Int32)
COMMENT 'Reason Id for non-availability';
ALTER TABLE DaburIndia_BI_New.Promotion
MODIFY COLUMN msl Nullable(Int32)
COMMENT 'Must Stock List indicator 0=promotion Not exist , 1=promotion exist';
ALTER TABLE DaburIndia_BI_New.Promotion
MODIFY COLUMN stock Nullable(String)
COMMENT 'Stock availability status';
ALTER TABLE DaburIndia_BI_New.Promotion
MODIFY COLUMN PromoQuestionId Nullable(Int32)
COMMENT 'Promotion question identifier';
@@ -0,0 +1,16 @@
-- SOS Taget Achievement
SELECT store_id,SOSHeaderName,ChildSelfFacing,SOSHeaderFacing,IFNULL(Avg_SOSTarget, 0) AS Avg_SOSTarget,
ROUND(ChildSelfFacing * 1.0 / NULLIF(SOSHeaderFacing, 0),6) AS SOS,
IF((ChildSelfFacing * 1.0 /NULLIF(SOSHeaderFacing, 0)) >= (IFNULL(Avg_SOSTarget,0) / 100.0),
1,0) AS SOSCompliance,
LEAST(100.0,
ROUND(((ChildSelfFacing * 1.0 /NULLIF(SOSHeaderFacing, 0))/NULLIF(IFNULL(Avg_SOSTarget,0) / 100.0, 0)) * 100,2)) AS SOSAchievement_Percentage
FROM
(
SELECT store_id,SOSHeaderName,
SUM(ChildSelfFacing) AS ChildSelfFacing,SUM(SOSHeaderFacing) AS SOSHeaderFacing,
IFNULL(ROUND(AVG(SOSTarget), 2),0) AS Avg_SOSTarget FROM DaburIndia_BI.SOS_OneApp
GROUP BY store_id, SOSHeaderName) t
ORDER BY store_id,SOSHeaderName;
@@ -0,0 +1,25 @@
ALTER TABLE DaburIndia_BI_New.SOS_OneApp
MODIFY COLUMN pk Int64
COMMENT 'Primary key identifier';
ALTER TABLE DaburIndia_BI_New.SOS_OneApp
MODIFY COLUMN project_id Int32
COMMENT 'Project unique identifier';
ALTER TABLE DaburIndia_BI_New.SOS_OneApp
MODIFY COLUMN MID Nullable(Int64)
COMMENT 'Unique master transaction ID (depends on combination of employee_id, store_id, and visit_date)';
ALTER TABLE DaburIndia_BI_New.SOS_OneApp
MODIFY COLUMN store_id Int32
COMMENT 'Unique store identifier';
ALTER TABLE DaburIndia_BI_New.SOS_OneApp
MODIFY COLUMN visit_date Date32
COMMENT 'Date of store visit';
ALTER TABLE DaburIndia_BI_New.SOS_OneApp
MODIFY COLUMN storetype_id Nullable(Int32)
COMMENT 'Store type identifier';
+321 -171
View File
@@ -1,5 +1,3 @@
import pyodbc
import pandas as pd
import clickhouse_connect
@@ -7,6 +5,7 @@ import numpy as np
from datetime import datetime
import traceback
import warnings
import time
# =========================================================
# IGNORE WARNINGS
@@ -16,10 +15,12 @@ warnings.filterwarnings(
'pandas only supports SQLAlchemy connectable'
)
print("ETL Started :", datetime.now())
print("\n================================================")
print("ETL STARTED :", datetime.now())
print("================================================")
# =========================================================
# SQL SERVER CONNECTION
# SQL SERVER CONNECTION STRING
# =========================================================
SQL_CONN_STR = (
'DRIVER={ODBC Driver 17 for SQL Server};'
@@ -28,6 +29,7 @@ SQL_CONN_STR = (
'UID=bsgteam_test;'
'PWD=B$gt3@m#00512;'
'TrustServerCertificate=yes;'
'Connection Timeout=60;'
)
# =========================================================
@@ -42,11 +44,67 @@ CH_CONFIG = {
}
# =========================================================
# TABLE NAME
# TABLE DETAILS
# =========================================================
TABLE_NAME = 'Journey_Plan'
PROJECT_ID = 41654
# =========================================================
# SETTINGS
# =========================================================
TRUNCATE_BEFORE_LOAD = True
table_truncated = False
# =========================================================
# CHUNK SIZE
# =========================================================
chunk_size = 20000
# =========================================================
# CONNECT SQL SERVER
# =========================================================
def connect_sql():
try:
conn = pyodbc.connect(
SQL_CONN_STR,
autocommit=True
)
print("Connected SQL Server")
return conn
except Exception as e:
print("SQL CONNECTION FAILED")
print(str(e))
raise
# =========================================================
# CONNECT CLICKHOUSE
# =========================================================
def connect_clickhouse():
try:
client = clickhouse_connect.get_client(
**CH_CONFIG
)
print("Connected ClickHouse")
return client
except Exception as e:
print("CLICKHOUSE CONNECTION FAILED")
print(str(e))
raise
# =========================================================
# CLEAN DATAFRAME
# =========================================================
@@ -66,39 +124,46 @@ def clean_dataframe(df):
try:
col_lower = col.lower()
print(f"\nCleaning Column : {col}")
# =====================================
# DATE COLUMNS
# AUTO DETECT DATE / TIME COLUMNS
# =====================================
if 'date' in col_lower:
if (
'date' in col.lower()
or 'time' in col.lower()
):
print(f"Cleaning Date Column : {col}")
print(f"Date Column : {col}")
df[col] = pd.to_datetime(
df[col],
errors='coerce'
)
# Remove invalid dates
df[col] = df[col].where(
(df[col].dt.year >= 1970) &
(df[col].dt.year <= 2100)
)
cleaned_dates = []
# Convert to Python Date
df[col] = df[col].apply(
lambda x:
x.date()
if pd.notnull(x)
else None
)
for val in df[col]:
if pd.isnull(val):
cleaned_dates.append(None)
else:
cleaned_dates.append(
val.to_pydatetime()
)
df[col] = cleaned_dates
# =====================================
# INTEGER COLUMNS
# =====================================
elif pd.api.types.is_integer_dtype(df[col]):
print(f"Integer Column : {col}")
df[col] = pd.to_numeric(
df[col],
errors='coerce'
@@ -116,38 +181,27 @@ def clean_dataframe(df):
# =====================================
elif pd.api.types.is_float_dtype(df[col]):
non_null = df[col].dropna()
print(f"Float Column : {col}")
# ---------------------------------
# Convert whole float to int
# Example:
# 3240.0 -> 3240
# ---------------------------------
if len(non_null) > 0 and (
(non_null % 1 == 0).all()
):
df[col] = pd.to_numeric(
df[col],
errors='coerce'
)
df[col] = df[col].apply(
lambda x:
int(x)
if pd.notnull(x)
else None
)
else:
df[col] = df[col].apply(
lambda x:
float(x)
if pd.notnull(x)
else None
)
df[col] = df[col].apply(
lambda x:
float(x)
if pd.notnull(x)
else None
)
# =====================================
# OBJECT / STRING COLUMNS
# =====================================
else:
print(f"String/Object Column : {col}")
cleaned = []
for val in df[col]:
@@ -157,6 +211,28 @@ def clean_dataframe(df):
cleaned.append(None)
# DATETIME
elif isinstance(
val,
(
datetime,
pd.Timestamp
)
):
if isinstance(
val,
pd.Timestamp
):
cleaned.append(
val.to_pydatetime()
)
else:
cleaned.append(val)
# INTEGER
elif isinstance(
val,
@@ -166,7 +242,9 @@ def clean_dataframe(df):
)
):
cleaned.append(int(val))
cleaned.append(
int(val)
)
# FLOAT
elif isinstance(
@@ -183,41 +261,26 @@ def clean_dataframe(df):
else:
# IMPORTANT FIX
# Avoid '3240.0' string issue
if val.is_integer():
cleaned.append(int(val))
else:
cleaned.append(float(val))
# STRING
elif isinstance(val, str):
cleaned.append(val.strip())
cleaned.append(
float(val)
)
# BOOLEAN
elif isinstance(val, bool):
cleaned.append(int(val))
# DATETIME
elif isinstance(
val,
(
datetime,
pd.Timestamp
)
bool
):
cleaned.append(str(val))
cleaned.append(
int(val)
)
# OTHER
# STRING
else:
cleaned.append(str(val))
cleaned.append(
str(val).strip()
)
df[col] = cleaned
@@ -247,16 +310,12 @@ try:
# =====================================================
# CONNECT SQL SERVER
# =====================================================
sql_conn = pyodbc.connect(SQL_CONN_STR)
print("Connected to SQL Server")
sql_conn = connect_sql()
# =====================================================
# CONNECT CLICKHOUSE
# =====================================================
ch_client = clickhouse_connect.get_client(**CH_CONFIG)
print("Connected to ClickHouse")
ch_client = connect_clickhouse()
# =====================================================
# QUERY
@@ -267,125 +326,214 @@ try:
WHERE Project_Id = {PROJECT_ID}
"""
print("\nExecuting Query:")
print("\nExecuting Query")
print(query)
# =====================================================
# CHUNK SIZE
# RETRY SETTINGS
# =====================================================
chunk_size = 100000
total_rows = 0
retry_count = 0
max_retry = 5
# =====================================================
# READ DATA
# MAIN RETRY LOOP
# =====================================================
for chunk in pd.read_sql(
query,
sql_conn,
chunksize=chunk_size
):
while retry_count < max_retry:
try:
print("\n================================")
print(f"Processing {len(chunk)} Rows")
print("================================")
# =============================================
# READ SQL DATA
# =============================================
for chunk in pd.read_sql(
query,
sql_conn,
chunksize=chunk_size
):
# =================================================
# CLEAN DATA
# =================================================
chunk = clean_dataframe(chunk)
# =================================================
# DEBUG COLUMN TYPES
# =================================================
print("\nCOLUMN TYPES")
for col in chunk.columns:
sample = chunk[col].dropna()
if len(sample) > 0:
try:
print("\n================================")
print(
col,
type(sample.iloc[0]),
sample.iloc[0]
f"Processing Rows : "
f"{len(chunk)}"
)
print("================================")
# =====================================
# CLEAN DATA
# =====================================
chunk = clean_dataframe(chunk)
# =====================================
# DEBUG COLUMN TYPES
# =====================================
print("\nCOLUMN DATATYPES")
print(chunk.dtypes)
print("\nCOLUMN SAMPLE TYPES")
for col in chunk.columns:
sample = chunk[col].dropna()
if len(sample) > 0:
print(
col,
type(sample.iloc[0]),
sample.iloc[0]
)
# =====================================
# TRUNCATE TABLE FIRST TIME ONLY
# =====================================
if (
TRUNCATE_BEFORE_LOAD
and
not table_truncated
):
print("\n================================")
print(
f"TRUNCATING TABLE : "
f"{TABLE_NAME}"
)
print("================================")
# IMPORTANT FIX
truncate_query = f"""
TRUNCATE TABLE
`{CH_CONFIG['database']}`.`{TABLE_NAME}`
"""
ch_client.command(
truncate_query
)
print(
"TABLE TRUNCATED SUCCESSFULLY"
)
table_truncated = True
# =====================================
# INSERT INTO CLICKHOUSE
# =====================================
print(
"\nINSERTING INTO CLICKHOUSE..."
)
# =================================================
# SAMPLE DATA
# =================================================
print("\nSAMPLE DATA")
print(chunk.head(2))
ch_client.insert_df(
table=TABLE_NAME,
df=chunk,
database=CH_CONFIG['database']
)
# =================================================
# INSERT INTO CLICKHOUSE
# =================================================
print("\nInserting into ClickHouse...")
print(
f"INSERTED : "
f"{len(chunk)} ROWS"
)
ch_client.insert_df(
table=TABLE_NAME,
df=chunk,
database=CH_CONFIG['database']
)
except Exception as insert_error:
total_rows += len(chunk)
print("\n================================")
print("INSERT FAILED")
print("================================")
print(
f"\nInserted Total Rows : {total_rows}"
)
print(str(insert_error))
except Exception as chunk_error:
traceback.print_exc()
# =================================
# SAVE ERROR LOG
# =================================
with open(
"insert_error.log",
"a",
encoding="utf-8"
) as log:
log.write(
"\n\n================================"
)
log.write(
f"\nTIME : "
f"{datetime.now()}"
)
log.write(
f"\nTABLE : "
f"{TABLE_NAME}"
)
log.write(
f"\nERROR : "
f"{str(insert_error)}"
)
log.write(
f"\nTRACEBACK :\n"
f"{traceback.format_exc()}"
)
log.write(
"\n================================"
)
continue
# =============================================
# SUCCESS
# =============================================
break
# =================================================
# SQL CONNECTION FAILURE
# =================================================
except pyodbc.OperationalError as op_error:
retry_count += 1
print("\n================================")
print("CHUNK INSERT FAILED")
print(
f"SQL CONNECTION LOST "
f"- RETRY {retry_count}"
)
print("================================")
print(str(chunk_error))
print(str(op_error))
time.sleep(10)
try:
sql_conn.close()
except:
pass
# RECONNECT SQL
sql_conn = connect_sql()
# =================================================
# OTHER ERROR
# =================================================
except Exception as loop_error:
print("\n================================")
print("MAIN LOOP ERROR")
print("================================")
print(str(loop_error))
traceback.print_exc()
# =============================================
# SAVE ERROR LOG
# =============================================
with open(
"clickhouse_chunk_error.log",
"a",
encoding="utf-8"
) as log:
log.write(
"\n\n================================"
)
log.write(
f"\nTIME : {datetime.now()}"
)
log.write(
f"\nTABLE : {TABLE_NAME}"
)
log.write(
f"\nERROR : {str(chunk_error)}"
)
log.write(
f"\nTRACEBACK :\n"
f"{traceback.format_exc()}"
)
log.write(
"\n================================"
)
continue
break
print("\n================================")
print("ETL COMPLETED SUCCESSFULLY")
print(f"TOTAL ROWS INSERTED : {total_rows}")
print("================================")
# =========================================================
@@ -402,7 +550,7 @@ except Exception as main_error:
traceback.print_exc()
with open(
"clickhouse_main_error.log",
"main_error.log",
"a",
encoding="utf-8"
) as log:
@@ -437,7 +585,7 @@ finally:
sql_conn.close()
print("\nSQL Server Connection Closed")
print("\nSQL SERVER CONNECTION CLOSED")
except:
pass
@@ -446,9 +594,11 @@ finally:
ch_client.close()
print("ClickHouse Connection Closed")
print("CLICKHOUSE CONNECTION CLOSED")
except:
pass
print("\nETL Finished :", datetime.now())
print("\n================================================")
print("ETL FINISHED :", datetime.now())
print("================================================")
+722
View File
@@ -0,0 +1,722 @@
import pyodbc
import pandas as pd
import clickhouse_connect
import numpy as np
from datetime import datetime
import traceback
import warnings
import time
# =========================================================
# IGNORE WARNINGS
# =========================================================
warnings.filterwarnings(
'ignore',
'pandas only supports SQLAlchemy connectable'
)
print("\n================================================")
print("ETL STARTED :", datetime.now())
print("================================================")
# =========================================================
# SQL SERVER CONNECTION STRING
# =========================================================
SQL_CONN_STR = (
'DRIVER={ODBC Driver 17 for SQL Server};'
'SERVER=10.200.25.65;'
'DATABASE=CPMIndiaBusinessInsight;'
'UID=bsgteam_test;'
'PWD=B$gt3@m#00512;'
'TrustServerCertificate=yes;'
'Connection Timeout=60;'
)
# =========================================================
# CLICKHOUSE CONFIG
# =========================================================
CH_CONFIG = {
'host': '172.188.12.194',
'port': 8123,
'username': 'default',
'password': 'dipanshu_k',
'database': 'DaburIndia_BI'
}
# =========================================================
# TABLE DETAILS
# =========================================================
TABLE_NAME = 'Login_Sup'
PROJECTId = 41654
# =========================================================
# SETTINGS
# =========================================================
TRUNCATE_BEFORE_LOAD = True
table_truncated = False
# =========================================================
# CHUNK SIZE
# =========================================================
chunk_size = 25000
# =========================================================
# CONNECT SQL SERVER
# =========================================================
def connect_sql():
try:
conn = pyodbc.connect(
SQL_CONN_STR,
autocommit=True
)
print("Connected SQL Server")
return conn
except Exception as e:
print("SQL CONNECTION FAILED")
print(str(e))
raise
# =========================================================
# CONNECT CLICKHOUSE
# =========================================================
def connect_clickhouse():
try:
client = clickhouse_connect.get_client(
**CH_CONFIG
)
print("Connected ClickHouse")
return client
except Exception as e:
print("CLICKHOUSE CONNECTION FAILED")
print(str(e))
raise
# =========================================================
# CLEAN DATAFRAME
# =========================================================
def clean_dataframe(df):
try:
# =============================================
# Replace NaN
# =============================================
df = df.replace({np.nan: None})
# =============================================
# Process Columns
# =============================================
for col in df.columns:
try:
print(f"\nCleaning Column : {col}")
col_lower = col.lower()
# =====================================
# DATE COLUMNS
# =====================================
if (
'date' in col_lower
and
'time' not in col_lower
):
print(f"Date Column : {col}")
df[col] = pd.to_datetime(
df[col],
errors='coerce'
).dt.date
cleaned_dates = []
for val in df[col]:
if pd.isnull(val):
cleaned_dates.append(None)
else:
cleaned_dates.append(val)
df[col] = cleaned_dates
# =====================================
# TIME COLUMNS
# =====================================
elif 'time' in col_lower:
print(f"Time Column : {col}")
cleaned_times = []
for val in df[col]:
if pd.isnull(val):
cleaned_times.append(None)
else:
# KEEP AS STRING
cleaned_times.append(
str(val).strip()
)
df[col] = cleaned_times
# =====================================
# DATETIME COLUMNS
# =====================================
elif any(
x in col_lower
for x in [
'createdon',
'modifiedon',
'datetime',
'timestamp'
]
):
print(f"Datetime Column : {col}")
df[col] = pd.to_datetime(
df[col],
errors='coerce'
)
cleaned_datetime = []
for val in df[col]:
if pd.isnull(val):
cleaned_datetime.append(None)
else:
cleaned_datetime.append(
val.to_pydatetime()
)
df[col] = cleaned_datetime
# =====================================
# INTEGER COLUMNS
# =====================================
elif pd.api.types.is_integer_dtype(df[col]):
print(f"Integer Column : {col}")
df[col] = pd.to_numeric(
df[col],
errors='coerce'
)
df[col] = df[col].apply(
lambda x:
int(x)
if pd.notnull(x)
else None
)
# =====================================
# FLOAT COLUMNS
# =====================================
elif pd.api.types.is_float_dtype(df[col]):
print(f"Float Column : {col}")
df[col] = pd.to_numeric(
df[col],
errors='coerce'
)
df[col] = df[col].apply(
lambda x:
float(x)
if pd.notnull(x)
else None
)
# =====================================
# OBJECT / STRING COLUMNS
# =====================================
else:
print(f"String/Object Column : {col}")
cleaned = []
for val in df[col]:
# NULL
if pd.isnull(val):
cleaned.append(None)
# DATETIME
elif isinstance(
val,
(
datetime,
pd.Timestamp
)
):
if isinstance(
val,
pd.Timestamp
):
cleaned.append(
val.to_pydatetime()
)
else:
cleaned.append(val)
# INTEGER
elif isinstance(
val,
(
int,
np.integer
)
):
cleaned.append(
int(val)
)
# FLOAT
elif isinstance(
val,
(
float,
np.floating
)
):
if np.isnan(val):
cleaned.append(None)
else:
cleaned.append(
float(val)
)
# BOOLEAN
elif isinstance(
val,
bool
):
cleaned.append(
int(val)
)
# BYTES
elif isinstance(
val,
(
bytes,
bytearray
)
):
cleaned.append(
val.hex()
)
# STRING
else:
cleaned.append(
str(val).strip()
)
df[col] = cleaned
except Exception as col_error:
print("\n================================")
print(f"COLUMN FAILED : {col}")
print(str(col_error))
print(traceback.format_exc())
print("================================")
return df
except Exception as clean_error:
print("\n================================")
print("DATA CLEAN FAILED")
print(str(clean_error))
print(traceback.format_exc())
print("================================")
return df
# =========================================================
# MAIN PROCESS
# =========================================================
try:
# =====================================================
# CONNECT SQL SERVER
# =====================================================
sql_conn = connect_sql()
# =====================================================
# CONNECT CLICKHOUSE
# =====================================================
ch_client = connect_clickhouse()
# =====================================================
# QUERY
# =====================================================
query = f"""
SELECT *
FROM dbo.[{TABLE_NAME}]
WHERE PROJECTId = {PROJECTId}
"""
print("\nExecuting Query")
print(query)
# =====================================================
# RETRY SETTINGS
# =====================================================
retry_count = 0
max_retry = 5
# =====================================================
# MAIN RETRY LOOP
# =====================================================
while retry_count < max_retry:
try:
# =============================================
# READ SQL DATA
# =============================================
for chunk in pd.read_sql(
query,
sql_conn,
chunksize=chunk_size
):
try:
print("\n================================")
print(
f"Processing Rows : "
f"{len(chunk)}"
)
print("================================")
# =====================================
# CLEAN DATA
# =====================================
chunk = clean_dataframe(chunk)
# =====================================
# DEBUG DATATYPES
# =====================================
print("\nCOLUMN DATATYPES")
print(chunk.dtypes)
print("\nCOLUMN SAMPLE TYPES")
for col in chunk.columns:
sample = chunk[col].dropna()
if len(sample) > 0:
print(
col,
type(sample.iloc[0]),
sample.iloc[0]
)
# =====================================
# DEBUG TIME COLUMN
# =====================================
if 'logintime' in chunk.columns:
print("\nLOGINTIME SAMPLE")
print(
chunk['logintime']
.head(5)
)
print(
chunk['logintime']
.apply(type)
.value_counts()
)
# =====================================
# TRUNCATE TABLE FIRST TIME ONLY
# =====================================
if (
TRUNCATE_BEFORE_LOAD
and
not table_truncated
):
print("\n================================")
print(
f"TRUNCATING TABLE : "
f"{TABLE_NAME}"
)
print("================================")
truncate_query = f"""
TRUNCATE TABLE
`{CH_CONFIG['database']}`.`{TABLE_NAME}`
"""
ch_client.command(
truncate_query
)
print(
"TABLE TRUNCATED SUCCESSFULLY"
)
table_truncated = True
# =====================================
# INSERT INTO CLICKHOUSE
# =====================================
print(
"\nINSERTING INTO CLICKHOUSE..."
)
ch_client.insert_df(
table=TABLE_NAME,
df=chunk,
database=CH_CONFIG['database']
)
print(
f"INSERTED : "
f"{len(chunk)} ROWS"
)
except Exception as insert_error:
print("\n================================")
print("INSERT FAILED")
print("================================")
print(str(insert_error))
traceback.print_exc()
# =================================
# SAVE BAD CHUNK
# =================================
try:
error_file = (
f"error_chunk_"
f"{TABLE_NAME}.csv"
)
chunk.to_csv(
error_file,
index=False
)
print(
f"BAD CHUNK SAVED : "
f"{error_file}"
)
except:
pass
# =================================
# SAVE LOG
# =================================
with open(
"insert_error.log",
"a",
encoding="utf-8"
) as log:
log.write(
"\n\n================================"
)
log.write(
f"\nTIME : "
f"{datetime.now()}"
)
log.write(
f"\nTABLE : "
f"{TABLE_NAME}"
)
log.write(
f"\nERROR : "
f"{str(insert_error)}"
)
log.write(
f"\nTRACEBACK :\n"
f"{traceback.format_exc()}"
)
log.write(
"\n================================"
)
continue
# =============================================
# SUCCESS
# =============================================
break
# =================================================
# SQL CONNECTION FAILURE
# =================================================
except pyodbc.OperationalError as op_error:
retry_count += 1
print("\n================================")
print(
f"SQL CONNECTION LOST "
f"- RETRY {retry_count}"
)
print("================================")
print(str(op_error))
time.sleep(10)
try:
sql_conn.close()
except:
pass
sql_conn = connect_sql()
# =================================================
# OTHER ERROR
# =================================================
except Exception as loop_error:
print("\n================================")
print("MAIN LOOP ERROR")
print("================================")
print(str(loop_error))
traceback.print_exc()
break
print("\n================================")
print("ETL COMPLETED SUCCESSFULLY")
print("================================")
# =========================================================
# MAIN ERROR
# =========================================================
except Exception as main_error:
print("\n================================")
print("MAIN ERROR")
print("================================")
print(str(main_error))
traceback.print_exc()
with open(
"main_error.log",
"a",
encoding="utf-8"
) as log:
log.write(
"\n\n================================"
)
log.write(
f"\nTIME : {datetime.now()}"
)
log.write(
f"\nERROR : {str(main_error)}"
)
log.write(
f"\nTRACEBACK :\n"
f"{traceback.format_exc()}"
)
log.write(
"\n================================"
)
# =========================================================
# CLOSE CONNECTIONS
# =========================================================
finally:
try:
sql_conn.close()
print("\nSQL SERVER CONNECTION CLOSED")
except:
pass
try:
ch_client.close()
print("CLICKHOUSE CONNECTION CLOSED")
except:
pass
print("\n================================================")
print("ETL FINISHED :", datetime.now())
print("================================================")
+604
View File
@@ -0,0 +1,604 @@
import pyodbc
import pandas as pd
import clickhouse_connect
import numpy as np
from datetime import datetime
import traceback
import warnings
import time
# =========================================================
# IGNORE WARNINGS
# =========================================================
warnings.filterwarnings(
'ignore',
'pandas only supports SQLAlchemy connectable'
)
print("\n================================================")
print("ETL STARTED :", datetime.now())
print("================================================")
# =========================================================
# SQL SERVER CONNECTION STRING
# =========================================================
SQL_CONN_STR = (
'DRIVER={ODBC Driver 17 for SQL Server};'
'SERVER=10.200.25.65;'
'DATABASE=CPMIndiaBusinessInsight;'
'UID=bsgteam_test;'
'PWD=B$gt3@m#00512;'
'TrustServerCertificate=yes;'
'Connection Timeout=60;'
)
# =========================================================
# CLICKHOUSE CONFIG
# =========================================================
CH_CONFIG = {
'host': '172.188.12.194',
'port': 8123,
'username': 'default',
'password': 'dipanshu_k',
'database': 'DaburIndia_BI'
}
# =========================================================
# TABLE DETAILS
# =========================================================
TABLE_NAME = 'Mapping_StorePromotion'
PROJECT_ID = 41654
# =========================================================
# SETTINGS
# =========================================================
TRUNCATE_BEFORE_LOAD = True
table_truncated = False
# =========================================================
# CHUNK SIZE
# =========================================================
chunk_size = 20000
# =========================================================
# CONNECT SQL SERVER
# =========================================================
def connect_sql():
try:
conn = pyodbc.connect(
SQL_CONN_STR,
autocommit=True
)
print("Connected SQL Server")
return conn
except Exception as e:
print("SQL CONNECTION FAILED")
print(str(e))
raise
# =========================================================
# CONNECT CLICKHOUSE
# =========================================================
def connect_clickhouse():
try:
client = clickhouse_connect.get_client(
**CH_CONFIG
)
print("Connected ClickHouse")
return client
except Exception as e:
print("CLICKHOUSE CONNECTION FAILED")
print(str(e))
raise
# =========================================================
# CLEAN DATAFRAME
# =========================================================
def clean_dataframe(df):
try:
# ---------------------------------------------
# Replace NaN
# ---------------------------------------------
df = df.replace({np.nan: None})
# ---------------------------------------------
# Process Column Wise
# ---------------------------------------------
for col in df.columns:
try:
print(f"\nCleaning Column : {col}")
# =====================================
# AUTO DETECT DATE / TIME COLUMNS
# =====================================
if (
'date' in col.lower()
or 'time' in col.lower()
):
print(f"Date Column : {col}")
df[col] = pd.to_datetime(
df[col],
errors='coerce'
)
cleaned_dates = []
for val in df[col]:
if pd.isnull(val):
cleaned_dates.append(None)
else:
cleaned_dates.append(
val.to_pydatetime()
)
df[col] = cleaned_dates
# =====================================
# INTEGER COLUMNS
# =====================================
elif pd.api.types.is_integer_dtype(df[col]):
print(f"Integer Column : {col}")
df[col] = pd.to_numeric(
df[col],
errors='coerce'
)
df[col] = df[col].apply(
lambda x:
int(x)
if pd.notnull(x)
else None
)
# =====================================
# FLOAT COLUMNS
# =====================================
elif pd.api.types.is_float_dtype(df[col]):
print(f"Float Column : {col}")
df[col] = pd.to_numeric(
df[col],
errors='coerce'
)
df[col] = df[col].apply(
lambda x:
float(x)
if pd.notnull(x)
else None
)
# =====================================
# OBJECT / STRING COLUMNS
# =====================================
else:
print(f"String/Object Column : {col}")
cleaned = []
for val in df[col]:
# NULL
if pd.isnull(val):
cleaned.append(None)
# DATETIME
elif isinstance(
val,
(
datetime,
pd.Timestamp
)
):
if isinstance(
val,
pd.Timestamp
):
cleaned.append(
val.to_pydatetime()
)
else:
cleaned.append(val)
# INTEGER
elif isinstance(
val,
(
int,
np.integer
)
):
cleaned.append(
int(val)
)
# FLOAT
elif isinstance(
val,
(
float,
np.floating
)
):
if np.isnan(val):
cleaned.append(None)
else:
cleaned.append(
float(val)
)
# BOOLEAN
elif isinstance(
val,
bool
):
cleaned.append(
int(val)
)
# STRING
else:
cleaned.append(
str(val).strip()
)
df[col] = cleaned
except Exception as col_error:
print("\n================================")
print(f"COLUMN FAILED : {col}")
print(str(col_error))
print("================================")
return df
except Exception as clean_error:
print("\n================================")
print("DATA CLEAN FAILED")
print(str(clean_error))
print("================================")
return df
# =========================================================
# MAIN PROCESS
# =========================================================
try:
# =====================================================
# CONNECT SQL SERVER
# =====================================================
sql_conn = connect_sql()
# =====================================================
# CONNECT CLICKHOUSE
# =====================================================
ch_client = connect_clickhouse()
# =====================================================
# QUERY
# =====================================================
query = f"""
SELECT *
FROM dbo.[{TABLE_NAME}]
WHERE Project_Id = {PROJECT_ID}
"""
print("\nExecuting Query")
print(query)
# =====================================================
# RETRY SETTINGS
# =====================================================
retry_count = 0
max_retry = 5
# =====================================================
# MAIN RETRY LOOP
# =====================================================
while retry_count < max_retry:
try:
# =============================================
# READ SQL DATA
# =============================================
for chunk in pd.read_sql(
query,
sql_conn,
chunksize=chunk_size
):
try:
print("\n================================")
print(
f"Processing Rows : "
f"{len(chunk)}"
)
print("================================")
# =====================================
# CLEAN DATA
# =====================================
chunk = clean_dataframe(chunk)
# =====================================
# DEBUG COLUMN TYPES
# =====================================
print("\nCOLUMN DATATYPES")
print(chunk.dtypes)
print("\nCOLUMN SAMPLE TYPES")
for col in chunk.columns:
sample = chunk[col].dropna()
if len(sample) > 0:
print(
col,
type(sample.iloc[0]),
sample.iloc[0]
)
# =====================================
# TRUNCATE TABLE FIRST TIME ONLY
# =====================================
if (
TRUNCATE_BEFORE_LOAD
and
not table_truncated
):
print("\n================================")
print(
f"TRUNCATING TABLE : "
f"{TABLE_NAME}"
)
print("================================")
# IMPORTANT FIX
truncate_query = f"""
TRUNCATE TABLE
`{CH_CONFIG['database']}`.`{TABLE_NAME}`
"""
ch_client.command(
truncate_query
)
print(
"TABLE TRUNCATED SUCCESSFULLY"
)
table_truncated = True
# =====================================
# INSERT INTO CLICKHOUSE
# =====================================
print(
"\nINSERTING INTO CLICKHOUSE..."
)
ch_client.insert_df(
table=TABLE_NAME,
df=chunk,
database=CH_CONFIG['database']
)
print(
f"INSERTED : "
f"{len(chunk)} ROWS"
)
except Exception as insert_error:
print("\n================================")
print("INSERT FAILED")
print("================================")
print(str(insert_error))
traceback.print_exc()
# =================================
# SAVE ERROR LOG
# =================================
with open(
"insert_error.log",
"a",
encoding="utf-8"
) as log:
log.write(
"\n\n================================"
)
log.write(
f"\nTIME : "
f"{datetime.now()}"
)
log.write(
f"\nTABLE : "
f"{TABLE_NAME}"
)
log.write(
f"\nERROR : "
f"{str(insert_error)}"
)
log.write(
f"\nTRACEBACK :\n"
f"{traceback.format_exc()}"
)
log.write(
"\n================================"
)
continue
# =============================================
# SUCCESS
# =============================================
break
# =================================================
# SQL CONNECTION FAILURE
# =================================================
except pyodbc.OperationalError as op_error:
retry_count += 1
print("\n================================")
print(
f"SQL CONNECTION LOST "
f"- RETRY {retry_count}"
)
print("================================")
print(str(op_error))
time.sleep(10)
try:
sql_conn.close()
except:
pass
# RECONNECT SQL
sql_conn = connect_sql()
# =================================================
# OTHER ERROR
# =================================================
except Exception as loop_error:
print("\n================================")
print("MAIN LOOP ERROR")
print("================================")
print(str(loop_error))
traceback.print_exc()
break
print("\n================================")
print("ETL COMPLETED SUCCESSFULLY")
print("================================")
# =========================================================
# MAIN ERROR
# =========================================================
except Exception as main_error:
print("\n================================")
print("MAIN ERROR")
print("================================")
print(str(main_error))
traceback.print_exc()
with open(
"main_error.log",
"a",
encoding="utf-8"
) as log:
log.write(
"\n\n================================"
)
log.write(
f"\nTIME : {datetime.now()}"
)
log.write(
f"\nERROR : {str(main_error)}"
)
log.write(
f"\nTRACEBACK :\n"
f"{traceback.format_exc()}"
)
log.write(
"\n================================"
)
# =========================================================
# CLOSE CONNECTIONS
# =========================================================
finally:
try:
sql_conn.close()
print("\nSQL SERVER CONNECTION CLOSED")
except:
pass
try:
ch_client.close()
print("CLICKHOUSE CONNECTION CLOSED")
except:
pass
print("\n================================================")
print("ETL FINISHED :", datetime.now())
print("================================================")
+604
View File
@@ -0,0 +1,604 @@
import pyodbc
import pandas as pd
import clickhouse_connect
import numpy as np
from datetime import datetime
import traceback
import warnings
import time
# =========================================================
# IGNORE WARNINGS
# =========================================================
warnings.filterwarnings(
'ignore',
'pandas only supports SQLAlchemy connectable'
)
print("\n================================================")
print("ETL STARTED :", datetime.now())
print("================================================")
# =========================================================
# SQL SERVER CONNECTION STRING
# =========================================================
SQL_CONN_STR = (
'DRIVER={ODBC Driver 17 for SQL Server};'
'SERVER=10.200.25.65;'
'DATABASE=CPMIndiaBusinessInsight;'
'UID=bsgteam_test;'
'PWD=B$gt3@m#00512;'
'TrustServerCertificate=yes;'
'Connection Timeout=60;'
)
# =========================================================
# CLICKHOUSE CONFIG
# =========================================================
CH_CONFIG = {
'host': '172.188.12.194',
'port': 8123,
'username': 'default',
'password': 'dipanshu_k',
'database': 'DaburIndia_BI'
}
# =========================================================
# TABLE DETAILS
# =========================================================
TABLE_NAME = 'Mapping_StoreVisibility'
PROJECT_ID = 41654
# =========================================================
# SETTINGS
# =========================================================
TRUNCATE_BEFORE_LOAD = True
table_truncated = False
# =========================================================
# CHUNK SIZE
# =========================================================
chunk_size = 20000
# =========================================================
# CONNECT SQL SERVER
# =========================================================
def connect_sql():
try:
conn = pyodbc.connect(
SQL_CONN_STR,
autocommit=True
)
print("Connected SQL Server")
return conn
except Exception as e:
print("SQL CONNECTION FAILED")
print(str(e))
raise
# =========================================================
# CONNECT CLICKHOUSE
# =========================================================
def connect_clickhouse():
try:
client = clickhouse_connect.get_client(
**CH_CONFIG
)
print("Connected ClickHouse")
return client
except Exception as e:
print("CLICKHOUSE CONNECTION FAILED")
print(str(e))
raise
# =========================================================
# CLEAN DATAFRAME
# =========================================================
def clean_dataframe(df):
try:
# ---------------------------------------------
# Replace NaN
# ---------------------------------------------
df = df.replace({np.nan: None})
# ---------------------------------------------
# Process Column Wise
# ---------------------------------------------
for col in df.columns:
try:
print(f"\nCleaning Column : {col}")
# =====================================
# AUTO DETECT DATE / TIME COLUMNS
# =====================================
if (
'date' in col.lower()
or 'time' in col.lower()
):
print(f"Date Column : {col}")
df[col] = pd.to_datetime(
df[col],
errors='coerce'
)
cleaned_dates = []
for val in df[col]:
if pd.isnull(val):
cleaned_dates.append(None)
else:
cleaned_dates.append(
val.to_pydatetime()
)
df[col] = cleaned_dates
# =====================================
# INTEGER COLUMNS
# =====================================
elif pd.api.types.is_integer_dtype(df[col]):
print(f"Integer Column : {col}")
df[col] = pd.to_numeric(
df[col],
errors='coerce'
)
df[col] = df[col].apply(
lambda x:
int(x)
if pd.notnull(x)
else None
)
# =====================================
# FLOAT COLUMNS
# =====================================
elif pd.api.types.is_float_dtype(df[col]):
print(f"Float Column : {col}")
df[col] = pd.to_numeric(
df[col],
errors='coerce'
)
df[col] = df[col].apply(
lambda x:
float(x)
if pd.notnull(x)
else None
)
# =====================================
# OBJECT / STRING COLUMNS
# =====================================
else:
print(f"String/Object Column : {col}")
cleaned = []
for val in df[col]:
# NULL
if pd.isnull(val):
cleaned.append(None)
# DATETIME
elif isinstance(
val,
(
datetime,
pd.Timestamp
)
):
if isinstance(
val,
pd.Timestamp
):
cleaned.append(
val.to_pydatetime()
)
else:
cleaned.append(val)
# INTEGER
elif isinstance(
val,
(
int,
np.integer
)
):
cleaned.append(
int(val)
)
# FLOAT
elif isinstance(
val,
(
float,
np.floating
)
):
if np.isnan(val):
cleaned.append(None)
else:
cleaned.append(
float(val)
)
# BOOLEAN
elif isinstance(
val,
bool
):
cleaned.append(
int(val)
)
# STRING
else:
cleaned.append(
str(val).strip()
)
df[col] = cleaned
except Exception as col_error:
print("\n================================")
print(f"COLUMN FAILED : {col}")
print(str(col_error))
print("================================")
return df
except Exception as clean_error:
print("\n================================")
print("DATA CLEAN FAILED")
print(str(clean_error))
print("================================")
return df
# =========================================================
# MAIN PROCESS
# =========================================================
try:
# =====================================================
# CONNECT SQL SERVER
# =====================================================
sql_conn = connect_sql()
# =====================================================
# CONNECT CLICKHOUSE
# =====================================================
ch_client = connect_clickhouse()
# =====================================================
# QUERY
# =====================================================
query = f"""
SELECT *
FROM dbo.[{TABLE_NAME}]
WHERE Project_Id = {PROJECT_ID}
"""
print("\nExecuting Query")
print(query)
# =====================================================
# RETRY SETTINGS
# =====================================================
retry_count = 0
max_retry = 5
# =====================================================
# MAIN RETRY LOOP
# =====================================================
while retry_count < max_retry:
try:
# =============================================
# READ SQL DATA
# =============================================
for chunk in pd.read_sql(
query,
sql_conn,
chunksize=chunk_size
):
try:
print("\n================================")
print(
f"Processing Rows : "
f"{len(chunk)}"
)
print("================================")
# =====================================
# CLEAN DATA
# =====================================
chunk = clean_dataframe(chunk)
# =====================================
# DEBUG COLUMN TYPES
# =====================================
print("\nCOLUMN DATATYPES")
print(chunk.dtypes)
print("\nCOLUMN SAMPLE TYPES")
for col in chunk.columns:
sample = chunk[col].dropna()
if len(sample) > 0:
print(
col,
type(sample.iloc[0]),
sample.iloc[0]
)
# =====================================
# TRUNCATE TABLE FIRST TIME ONLY
# =====================================
if (
TRUNCATE_BEFORE_LOAD
and
not table_truncated
):
print("\n================================")
print(
f"TRUNCATING TABLE : "
f"{TABLE_NAME}"
)
print("================================")
# IMPORTANT FIX
truncate_query = f"""
TRUNCATE TABLE
`{CH_CONFIG['database']}`.`{TABLE_NAME}`
"""
ch_client.command(
truncate_query
)
print(
"TABLE TRUNCATED SUCCESSFULLY"
)
table_truncated = True
# =====================================
# INSERT INTO CLICKHOUSE
# =====================================
print(
"\nINSERTING INTO CLICKHOUSE..."
)
ch_client.insert_df(
table=TABLE_NAME,
df=chunk,
database=CH_CONFIG['database']
)
print(
f"INSERTED : "
f"{len(chunk)} ROWS"
)
except Exception as insert_error:
print("\n================================")
print("INSERT FAILED")
print("================================")
print(str(insert_error))
traceback.print_exc()
# =================================
# SAVE ERROR LOG
# =================================
with open(
"insert_error.log",
"a",
encoding="utf-8"
) as log:
log.write(
"\n\n================================"
)
log.write(
f"\nTIME : "
f"{datetime.now()}"
)
log.write(
f"\nTABLE : "
f"{TABLE_NAME}"
)
log.write(
f"\nERROR : "
f"{str(insert_error)}"
)
log.write(
f"\nTRACEBACK :\n"
f"{traceback.format_exc()}"
)
log.write(
"\n================================"
)
continue
# =============================================
# SUCCESS
# =============================================
break
# =================================================
# SQL CONNECTION FAILURE
# =================================================
except pyodbc.OperationalError as op_error:
retry_count += 1
print("\n================================")
print(
f"SQL CONNECTION LOST "
f"- RETRY {retry_count}"
)
print("================================")
print(str(op_error))
time.sleep(10)
try:
sql_conn.close()
except:
pass
# RECONNECT SQL
sql_conn = connect_sql()
# =================================================
# OTHER ERROR
# =================================================
except Exception as loop_error:
print("\n================================")
print("MAIN LOOP ERROR")
print("================================")
print(str(loop_error))
traceback.print_exc()
break
print("\n================================")
print("ETL COMPLETED SUCCESSFULLY")
print("================================")
# =========================================================
# MAIN ERROR
# =========================================================
except Exception as main_error:
print("\n================================")
print("MAIN ERROR")
print("================================")
print(str(main_error))
traceback.print_exc()
with open(
"main_error.log",
"a",
encoding="utf-8"
) as log:
log.write(
"\n\n================================"
)
log.write(
f"\nTIME : {datetime.now()}"
)
log.write(
f"\nERROR : {str(main_error)}"
)
log.write(
f"\nTRACEBACK :\n"
f"{traceback.format_exc()}"
)
log.write(
"\n================================"
)
# =========================================================
# CLOSE CONNECTIONS
# =========================================================
finally:
try:
sql_conn.close()
print("\nSQL SERVER CONNECTION CLOSED")
except:
pass
try:
ch_client.close()
print("CLICKHOUSE CONNECTION CLOSED")
except:
pass
print("\n================================================")
print("ETL FINISHED :", datetime.now())
print("================================================")
+545
View File
@@ -0,0 +1,545 @@
import pyodbc
import pandas as pd
import clickhouse_connect
import numpy as np
from datetime import datetime
import traceback
import warnings
# =========================================================
# IGNORE WARNINGS
# =========================================================
warnings.filterwarnings(
'ignore',
'pandas only supports SQLAlchemy connectable'
)
print("ETL Started :", datetime.now())
# =========================================================
# SQL SERVER CONNECTION
# =========================================================
SQL_CONN_STR = (
'DRIVER={ODBC Driver 17 for SQL Server};'
'SERVER=10.200.25.65;'
'DATABASE=CPMIndiaBusinessInsight;'
'UID=bsgteam_test;'
'PWD=B$gt3@m#00512;'
'TrustServerCertificate=yes;'
)
# =========================================================
# CLICKHOUSE CONFIG
# =========================================================
CH_CONFIG = {
'host': '172.188.12.194',
'port': 8123,
'username': 'default',
'password': 'dipanshu_k',
'database': 'DaburIndia_BI'
}
# =========================================================
# TABLE NAME
# =========================================================
TABLE_NAME = 'OQaD'
PROJECT_ID = 41654
# =========================================================
# LOAD SETTINGS
# =========================================================
TRUNCATE_BEFORE_LOAD = True
table_truncated = False
# =========================================================
# CLICKHOUSE DATE COLUMNS
# =========================================================
DATE_COLUMNS = [
'visit_date'
]
# =========================================================
# CLICKHOUSE DATETIME COLUMNS
# =========================================================
DATETIME_COLUMNS = [
'create_date',
'update_date'
]
# =========================================================
# CLEAN DATAFRAME
# =========================================================
def clean_dataframe(df):
try:
# ---------------------------------------------
# Replace NaN
# ---------------------------------------------
df = df.replace({np.nan: None})
# ---------------------------------------------
# Process Column Wise
# ---------------------------------------------
for col in df.columns:
try:
print(f"\nCleaning Column : {col}")
# =====================================
# DATE32 COLUMNS
# =====================================
if col.lower() in [
x.lower() for x in DATE_COLUMNS
]:
print(f"Date32 Column : {col}")
df[col] = pd.to_datetime(
df[col],
errors='coerce'
)
# Remove invalid dates
df[col] = df[col].where(
(df[col].dt.year >= 1970) &
(df[col].dt.year <= 2100)
)
# Convert to datetime.date
df[col] = df[col].apply(
lambda x:
x.date()
if pd.notnull(x)
else None
)
# =====================================
# DATETIME64 COLUMNS
# =====================================
elif col.lower() in [
x.lower() for x in DATETIME_COLUMNS
]:
print(f"DateTime Column : {col}")
df[col] = pd.to_datetime(
df[col],
errors='coerce'
)
# Remove invalid dates
df[col] = df[col].where(
(df[col].dt.year >= 1970) &
(df[col].dt.year <= 2100)
)
# Convert to datetime.datetime
df[col] = df[col].apply(
lambda x:
x.to_pydatetime()
if pd.notnull(x)
else None
)
# =====================================
# INTEGER COLUMNS
# =====================================
elif pd.api.types.is_integer_dtype(df[col]):
print(f"Integer Column : {col}")
df[col] = pd.to_numeric(
df[col],
errors='coerce'
)
df[col] = df[col].apply(
lambda x:
int(x)
if pd.notnull(x)
else None
)
# =====================================
# FLOAT COLUMNS
# =====================================
elif pd.api.types.is_float_dtype(df[col]):
print(f"Float Column : {col}")
non_null = df[col].dropna()
# Convert whole float to int
if len(non_null) > 0 and (
(non_null % 1 == 0).all()
):
df[col] = df[col].apply(
lambda x:
int(x)
if pd.notnull(x)
else None
)
else:
df[col] = df[col].apply(
lambda x:
float(x)
if pd.notnull(x)
else None
)
# =====================================
# OBJECT / STRING COLUMNS
# =====================================
else:
print(f"String/Object Column : {col}")
cleaned = []
for val in df[col]:
# NULL
if pd.isnull(val):
cleaned.append(None)
# INTEGER
elif isinstance(
val,
(
int,
np.integer
)
):
cleaned.append(int(val))
# FLOAT
elif isinstance(
val,
(
float,
np.floating
)
):
if np.isnan(val):
cleaned.append(None)
else:
if val.is_integer():
cleaned.append(int(val))
else:
cleaned.append(float(val))
# STRING
elif isinstance(val, str):
cleaned.append(val.strip())
# BOOLEAN
elif isinstance(val, bool):
cleaned.append(int(val))
# DATETIME
elif isinstance(
val,
(
datetime,
pd.Timestamp
)
):
if isinstance(
val,
pd.Timestamp
):
cleaned.append(
val.to_pydatetime()
)
else:
cleaned.append(val)
# OTHER
else:
cleaned.append(str(val))
df[col] = cleaned
except Exception as col_error:
print("\n================================")
print(f"COLUMN FAILED : {col}")
print(str(col_error))
print("================================")
return df
except Exception as clean_error:
print("\n================================")
print("DATA CLEAN FAILED")
print(str(clean_error))
print("================================")
return df
# =========================================================
# MAIN PROCESS
# =========================================================
try:
# =====================================================
# CONNECT SQL SERVER
# =====================================================
sql_conn = pyodbc.connect(SQL_CONN_STR)
print("Connected to SQL Server")
# =====================================================
# CONNECT CLICKHOUSE
# =====================================================
ch_client = clickhouse_connect.get_client(**CH_CONFIG)
print("Connected to ClickHouse")
# =====================================================
# QUERY
# =====================================================
query = f"""
SELECT *
FROM dbo.[{TABLE_NAME}]
WHERE Project_Id = {PROJECT_ID}
"""
print("\nExecuting Query:")
print(query)
# =====================================================
# CHUNK SIZE
# =====================================================
chunk_size = 100000
total_rows = 0
# =====================================================
# READ DATA
# =====================================================
for chunk in pd.read_sql(
query,
sql_conn,
chunksize=chunk_size
):
try:
print("\n================================")
print(f"Processing {len(chunk)} Rows")
print("================================")
# =================================================
# CLEAN DATA
# =================================================
chunk = clean_dataframe(chunk)
# =================================================
# DEBUG COLUMN TYPES
# =================================================
print("\nCOLUMN TYPES")
for col in chunk.columns:
sample = chunk[col].dropna()
if len(sample) > 0:
print(
col,
type(sample.iloc[0]),
sample.iloc[0]
)
# =================================================
# DEBUG DATE COLUMN
# =================================================
if 'visit_date' in chunk.columns:
print("\nvisit_date Sample")
print(chunk['visit_date'].head())
sample = chunk['visit_date'].dropna()
if len(sample) > 0:
print(
"visit_date datatype:",
type(sample.iloc[0])
)
# =================================================
# TRUNCATE TABLE FIRST TIME ONLY
# =================================================
if TRUNCATE_BEFORE_LOAD and not table_truncated:
print("\n================================")
print(f"TRUNCATING TABLE : {TABLE_NAME}")
print("================================")
truncate_query = f"""
TRUNCATE TABLE
{CH_CONFIG['database']}.{TABLE_NAME}
"""
ch_client.command(truncate_query)
print("TABLE TRUNCATED SUCCESSFULLY")
table_truncated = True
# =================================================
# INSERT INTO CLICKHOUSE
# =================================================
print("\nInserting into ClickHouse...")
ch_client.insert_df(
table=TABLE_NAME,
df=chunk,
database=CH_CONFIG['database']
)
total_rows += len(chunk)
print(
f"\nInserted Total Rows : {total_rows}"
)
except Exception as chunk_error:
print("\n================================")
print("CHUNK INSERT FAILED")
print("================================")
print(str(chunk_error))
traceback.print_exc()
# =============================================
# SAVE ERROR LOG
# =============================================
with open(
"clickhouse_chunk_error.log",
"a",
encoding="utf-8"
) as log:
log.write(
"\n\n================================"
)
log.write(
f"\nTIME : {datetime.now()}"
)
log.write(
f"\nTABLE : {TABLE_NAME}"
)
log.write(
f"\nERROR : {str(chunk_error)}"
)
log.write(
f"\nTRACEBACK :\n"
f"{traceback.format_exc()}"
)
log.write(
"\n================================"
)
continue
print("\n================================")
print("ETL COMPLETED SUCCESSFULLY")
print(f"TOTAL ROWS INSERTED : {total_rows}")
print("================================")
# =========================================================
# MAIN ERROR
# =========================================================
except Exception as main_error:
print("\n================================")
print("MAIN ERROR")
print("================================")
print(str(main_error))
traceback.print_exc()
with open(
"clickhouse_main_error.log",
"a",
encoding="utf-8"
) as log:
log.write(
"\n\n================================"
)
log.write(
f"\nTIME : {datetime.now()}"
)
log.write(
f"\nERROR : {str(main_error)}"
)
log.write(
f"\nTRACEBACK :\n"
f"{traceback.format_exc()}"
)
log.write(
"\n================================"
)
# =========================================================
# CLOSE CONNECTIONS
# =========================================================
finally:
try:
sql_conn.close()
print("\nSQL Server Connection Closed")
except:
pass
try:
ch_client.close()
print("ClickHouse Connection Closed")
except:
pass
print("\nETL Finished :", datetime.now())
+605
View File
@@ -0,0 +1,605 @@
import pyodbc
import pandas as pd
import clickhouse_connect
import numpy as np
from datetime import datetime
import traceback
import warnings
import time
# =========================================================
# IGNORE WARNINGS
# =========================================================
warnings.filterwarnings(
'ignore',
'pandas only supports SQLAlchemy connectable'
)
print("\n================================================")
print("ETL STARTED :", datetime.now())
print("================================================")
# =========================================================
# SQL SERVER CONNECTION STRING
# =========================================================
SQL_CONN_STR = (
'DRIVER={ODBC Driver 17 for SQL Server};'
'SERVER=10.200.25.65;'
'DATABASE=CPMIndiaBusinessInsight;'
'UID=bsgteam_test;'
'PWD=B$gt3@m#00512;'
'TrustServerCertificate=yes;'
'Connection Timeout=60;'
)
# =========================================================
# CLICKHOUSE CONFIG
# =========================================================
CH_CONFIG = {
'host': '172.188.12.194',
'port': 8123,
'username': 'default',
'password': 'dipanshu_k',
'database': 'DaburIndia_BI'
}
# =========================================================
# TABLE DETAILS
# =========================================================
TABLE_NAME = 'PaidVisibility_Compliance'
PROJECT_ID = 41654
# =========================================================
# SETTINGS
# =========================================================
TRUNCATE_BEFORE_LOAD = True
table_truncated = False
# =========================================================
# CHUNK SIZE
# =========================================================
chunk_size = 25000
# =========================================================
# CONNECT SQL SERVER
# =========================================================
def connect_sql():
try:
conn = pyodbc.connect(
SQL_CONN_STR,
autocommit=True
)
print("Connected SQL Server")
return conn
except Exception as e:
print("SQL CONNECTION FAILED")
print(str(e))
raise
# =========================================================
# CONNECT CLICKHOUSE
# =========================================================
def connect_clickhouse():
try:
client = clickhouse_connect.get_client(
**CH_CONFIG
)
print("Connected ClickHouse")
return client
except Exception as e:
print("CLICKHOUSE CONNECTION FAILED")
print(str(e))
raise
# =========================================================
# CLEAN DATAFRAME
# =========================================================
def clean_dataframe(df):
try:
# ---------------------------------------------
# Replace NaN
# ---------------------------------------------
df = df.replace({np.nan: None})
# ---------------------------------------------
# Process Column Wise
# ---------------------------------------------
for col in df.columns:
try:
print(f"\nCleaning Column : {col}")
# =====================================
# AUTO DETECT DATE / TIME COLUMNS
# =====================================
if (
'date' in col.lower()
or 'time' in col.lower()
):
print(f"Date Column : {col}")
df[col] = pd.to_datetime(
df[col],
errors='coerce'
)
cleaned_dates = []
for val in df[col]:
if pd.isnull(val):
cleaned_dates.append(None)
else:
cleaned_dates.append(
val.to_pydatetime()
)
df[col] = cleaned_dates
# =====================================
# INTEGER COLUMNS
# =====================================
elif pd.api.types.is_integer_dtype(df[col]):
print(f"Integer Column : {col}")
df[col] = pd.to_numeric(
df[col],
errors='coerce'
)
df[col] = df[col].apply(
lambda x:
int(x)
if pd.notnull(x)
else None
)
# =====================================
# FLOAT COLUMNS
# =====================================
elif pd.api.types.is_float_dtype(df[col]):
print(f"Float Column : {col}")
df[col] = pd.to_numeric(
df[col],
errors='coerce'
)
df[col] = df[col].apply(
lambda x:
float(x)
if pd.notnull(x)
else None
)
# =====================================
# OBJECT / STRING COLUMNS
# =====================================
else:
print(f"String/Object Column : {col}")
cleaned = []
for val in df[col]:
# NULL
if pd.isnull(val):
cleaned.append(None)
# DATETIME
elif isinstance(
val,
(
datetime,
pd.Timestamp
)
):
if isinstance(
val,
pd.Timestamp
):
cleaned.append(
val.to_pydatetime()
)
else:
cleaned.append(val)
# INTEGER
elif isinstance(
val,
(
int,
np.integer
)
):
cleaned.append(
int(val)
)
# FLOAT
elif isinstance(
val,
(
float,
np.floating
)
):
if np.isnan(val):
cleaned.append(None)
else:
cleaned.append(
float(val)
)
# BOOLEAN
elif isinstance(
val,
bool
):
cleaned.append(
int(val)
)
# STRING
else:
cleaned.append(
str(val).strip()
)
df[col] = cleaned
except Exception as col_error:
print("\n================================")
print(f"COLUMN FAILED : {col}")
print(str(col_error))
print("================================")
return df
except Exception as clean_error:
print("\n================================")
print("DATA CLEAN FAILED")
print(str(clean_error))
print("================================")
return df
# =========================================================
# MAIN PROCESS
# =========================================================
try:
# =====================================================
# CONNECT SQL SERVER
# =====================================================
sql_conn = connect_sql()
# =====================================================
# CONNECT CLICKHOUSE
# =====================================================
ch_client = connect_clickhouse()
# =====================================================
# QUERY
# =====================================================
query = f"""
SELECT *
FROM dbo.[{TABLE_NAME}]
WHERE Project_Id = {PROJECT_ID}
"""
print("\nExecuting Query")
print(query)
# =====================================================
# RETRY SETTINGS
# =====================================================
retry_count = 0
max_retry = 5
# =====================================================
# MAIN RETRY LOOP
# =====================================================
while retry_count < max_retry:
try:
# =============================================
# READ SQL DATA
# =============================================
for chunk in pd.read_sql(
query,
sql_conn,
chunksize=chunk_size
):
try:
print("\n================================")
print(
f"Processing Rows : "
f"{len(chunk)}"
)
print("================================")
# =====================================
# CLEAN DATA
# =====================================
chunk = clean_dataframe(chunk)
# =====================================
# DEBUG COLUMN TYPES
# =====================================
print("\nCOLUMN DATATYPES")
print(chunk.dtypes)
print("\nCOLUMN SAMPLE TYPES")
for col in chunk.columns:
sample = chunk[col].dropna()
if len(sample) > 0:
print(
col,
type(sample.iloc[0]),
sample.iloc[0]
)
# =====================================
# TRUNCATE TABLE FIRST TIME ONLY
# =====================================
if (
TRUNCATE_BEFORE_LOAD
and
not table_truncated
):
print("\n================================")
print(
f"TRUNCATING TABLE : "
f"{TABLE_NAME}"
)
print("================================")
# IMPORTANT FIX
truncate_query = f"""
TRUNCATE TABLE
`{CH_CONFIG['database']}`.`{TABLE_NAME}`
"""
ch_client.command(
truncate_query
)
print(
"TABLE TRUNCATED SUCCESSFULLY"
)
table_truncated = True
# =====================================
# INSERT INTO CLICKHOUSE
# =====================================
print(
"\nINSERTING INTO CLICKHOUSE..."
)
ch_client.insert_df(
table=TABLE_NAME,
df=chunk,
database=CH_CONFIG['database']
)
print(
f"INSERTED : "
f"{len(chunk)} ROWS"
)
except Exception as insert_error:
print("\n================================")
print("INSERT FAILED")
print("================================")
print(str(insert_error))
traceback.print_exc()
# =================================
# SAVE ERROR LOG
# =================================
with open(
"insert_error.log",
"a",
encoding="utf-8"
) as log:
log.write(
"\n\n================================"
)
log.write(
f"\nTIME : "
f"{datetime.now()}"
)
log.write(
f"\nTABLE : "
f"{TABLE_NAME}"
)
log.write(
f"\nERROR : "
f"{str(insert_error)}"
)
log.write(
f"\nTRACEBACK :\n"
f"{traceback.format_exc()}"
)
log.write(
"\n================================"
)
continue
# =============================================
# SUCCESS
# =============================================
break
# =================================================
# SQL CONNECTION FAILURE
# =================================================
except pyodbc.OperationalError as op_error:
retry_count += 1
print("\n================================")
print(
f"SQL CONNECTION LOST "
f"- RETRY {retry_count}"
)
print("================================")
print(str(op_error))
time.sleep(10)
try:
sql_conn.close()
except:
pass
# RECONNECT SQL
sql_conn = connect_sql()
# =================================================
# OTHER ERROR
# =================================================
except Exception as loop_error:
print("\n================================")
print("MAIN LOOP ERROR")
print("================================")
print(str(loop_error))
traceback.print_exc()
break
print("\n================================")
print("ETL COMPLETED SUCCESSFULLY")
print("================================")
# =========================================================
# MAIN ERROR
# =========================================================
except Exception as main_error:
print("\n================================")
print("MAIN ERROR")
print("================================")
print(str(main_error))
traceback.print_exc()
with open(
"main_error.log",
"a",
encoding="utf-8"
) as log:
log.write(
"\n\n================================"
)
log.write(
f"\nTIME : {datetime.now()}"
)
log.write(
f"\nERROR : {str(main_error)}"
)
log.write(
f"\nTRACEBACK :\n"
f"{traceback.format_exc()}"
)
log.write(
"\n================================"
)
# =========================================================
# CLOSE CONNECTIONS
# =========================================================
finally:
try:
sql_conn.close()
print("\nSQL SERVER CONNECTION CLOSED")
except:
pass
try:
ch_client.close()
print("CLICKHOUSE CONNECTION CLOSED")
except:
pass
print("\n================================================")
print("ETL FINISHED :", datetime.now())
print("================================================")
+604
View File
@@ -0,0 +1,604 @@
import pyodbc
import pandas as pd
import clickhouse_connect
import numpy as np
from datetime import datetime
import traceback
import warnings
import time
# =========================================================
# IGNORE WARNINGS
# =========================================================
warnings.filterwarnings(
'ignore',
'pandas only supports SQLAlchemy connectable'
)
print("\n================================================")
print("ETL STARTED :", datetime.now())
print("================================================")
# =========================================================
# SQL SERVER CONNECTION STRING
# =========================================================
SQL_CONN_STR = (
'DRIVER={ODBC Driver 17 for SQL Server};'
'SERVER=10.200.25.65;'
'DATABASE=CPMIndiaBusinessInsight;'
'UID=bsgteam_test;'
'PWD=B$gt3@m#00512;'
'TrustServerCertificate=yes;'
'Connection Timeout=60;'
)
# =========================================================
# CLICKHOUSE CONFIG
# =========================================================
CH_CONFIG = {
'host': '172.188.12.194',
'port': 8123,
'username': 'default',
'password': 'dipanshu_k',
'database': 'DaburIndia_BI'
}
# =========================================================
# TABLE DETAILS
# =========================================================
TABLE_NAME = 'PaidVisibility'
PROJECT_ID = 41654
# =========================================================
# SETTINGS
# =========================================================
TRUNCATE_BEFORE_LOAD = True
table_truncated = False
# =========================================================
# CHUNK SIZE
# =========================================================
chunk_size = 25000
# =========================================================
# CONNECT SQL SERVER
# =========================================================
def connect_sql():
try:
conn = pyodbc.connect(
SQL_CONN_STR,
autocommit=True
)
print("Connected SQL Server")
return conn
except Exception as e:
print("SQL CONNECTION FAILED")
print(str(e))
raise
# =========================================================
# CONNECT CLICKHOUSE
# =========================================================
def connect_clickhouse():
try:
client = clickhouse_connect.get_client(
**CH_CONFIG
)
print("Connected ClickHouse")
return client
except Exception as e:
print("CLICKHOUSE CONNECTION FAILED")
print(str(e))
raise
# =========================================================
# CLEAN DATAFRAME
# =========================================================
def clean_dataframe(df):
try:
# ---------------------------------------------
# Replace NaN
# ---------------------------------------------
df = df.replace({np.nan: None})
# ---------------------------------------------
# Process Column Wise
# ---------------------------------------------
for col in df.columns:
try:
print(f"\nCleaning Column : {col}")
# =====================================
# AUTO DETECT DATE / TIME COLUMNS
# =====================================
if (
'date' in col.lower()
or 'time' in col.lower()
):
print(f"Date Column : {col}")
df[col] = pd.to_datetime(
df[col],
errors='coerce'
)
cleaned_dates = []
for val in df[col]:
if pd.isnull(val):
cleaned_dates.append(None)
else:
cleaned_dates.append(
val.to_pydatetime()
)
df[col] = cleaned_dates
# =====================================
# INTEGER COLUMNS
# =====================================
elif pd.api.types.is_integer_dtype(df[col]):
print(f"Integer Column : {col}")
df[col] = pd.to_numeric(
df[col],
errors='coerce'
)
df[col] = df[col].apply(
lambda x:
int(x)
if pd.notnull(x)
else None
)
# =====================================
# FLOAT COLUMNS
# =====================================
elif pd.api.types.is_float_dtype(df[col]):
print(f"Float Column : {col}")
df[col] = pd.to_numeric(
df[col],
errors='coerce'
)
df[col] = df[col].apply(
lambda x:
float(x)
if pd.notnull(x)
else None
)
# =====================================
# OBJECT / STRING COLUMNS
# =====================================
else:
print(f"String/Object Column : {col}")
cleaned = []
for val in df[col]:
# NULL
if pd.isnull(val):
cleaned.append(None)
# DATETIME
elif isinstance(
val,
(
datetime,
pd.Timestamp
)
):
if isinstance(
val,
pd.Timestamp
):
cleaned.append(
val.to_pydatetime()
)
else:
cleaned.append(val)
# INTEGER
elif isinstance(
val,
(
int,
np.integer
)
):
cleaned.append(
int(val)
)
# FLOAT
elif isinstance(
val,
(
float,
np.floating
)
):
if np.isnan(val):
cleaned.append(None)
else:
cleaned.append(
float(val)
)
# BOOLEAN
elif isinstance(
val,
bool
):
cleaned.append(
int(val)
)
# STRING
else:
cleaned.append(
str(val).strip()
)
df[col] = cleaned
except Exception as col_error:
print("\n================================")
print(f"COLUMN FAILED : {col}")
print(str(col_error))
print("================================")
return df
except Exception as clean_error:
print("\n================================")
print("DATA CLEAN FAILED")
print(str(clean_error))
print("================================")
return df
# =========================================================
# MAIN PROCESS
# =========================================================
try:
# =====================================================
# CONNECT SQL SERVER
# =====================================================
sql_conn = connect_sql()
# =====================================================
# CONNECT CLICKHOUSE
# =====================================================
ch_client = connect_clickhouse()
# =====================================================
# QUERY
# =====================================================
query = f"""
SELECT *
FROM dbo.[{TABLE_NAME}]
WHERE Project_Id = {PROJECT_ID}
"""
print("\nExecuting Query")
print(query)
# =====================================================
# RETRY SETTINGS
# =====================================================
retry_count = 0
max_retry = 5
# =====================================================
# MAIN RETRY LOOP
# =====================================================
while retry_count < max_retry:
try:
# =============================================
# READ SQL DATA
# =============================================
for chunk in pd.read_sql(
query,
sql_conn,
chunksize=chunk_size
):
try:
print("\n================================")
print(
f"Processing Rows : "
f"{len(chunk)}"
)
print("================================")
# =====================================
# CLEAN DATA
# =====================================
chunk = clean_dataframe(chunk)
# =====================================
# DEBUG COLUMN TYPES
# =====================================
print("\nCOLUMN DATATYPES")
print(chunk.dtypes)
print("\nCOLUMN SAMPLE TYPES")
for col in chunk.columns:
sample = chunk[col].dropna()
if len(sample) > 0:
print(
col,
type(sample.iloc[0]),
sample.iloc[0]
)
# =====================================
# TRUNCATE TABLE FIRST TIME ONLY
# =====================================
if (
TRUNCATE_BEFORE_LOAD
and
not table_truncated
):
print("\n================================")
print(
f"TRUNCATING TABLE : "
f"{TABLE_NAME}"
)
print("================================")
# IMPORTANT FIX
truncate_query = f"""
TRUNCATE TABLE
`{CH_CONFIG['database']}`.`{TABLE_NAME}`
"""
ch_client.command(
truncate_query
)
print(
"TABLE TRUNCATED SUCCESSFULLY"
)
table_truncated = True
# =====================================
# INSERT INTO CLICKHOUSE
# =====================================
print(
"\nINSERTING INTO CLICKHOUSE..."
)
ch_client.insert_df(
table=TABLE_NAME,
df=chunk,
database=CH_CONFIG['database']
)
print(
f"INSERTED : "
f"{len(chunk)} ROWS"
)
except Exception as insert_error:
print("\n================================")
print("INSERT FAILED")
print("================================")
print(str(insert_error))
traceback.print_exc()
# =================================
# SAVE ERROR LOG
# =================================
with open(
"insert_error.log",
"a",
encoding="utf-8"
) as log:
log.write(
"\n\n================================"
)
log.write(
f"\nTIME : "
f"{datetime.now()}"
)
log.write(
f"\nTABLE : "
f"{TABLE_NAME}"
)
log.write(
f"\nERROR : "
f"{str(insert_error)}"
)
log.write(
f"\nTRACEBACK :\n"
f"{traceback.format_exc()}"
)
log.write(
"\n================================"
)
continue
# =============================================
# SUCCESS
# =============================================
break
# =================================================
# SQL CONNECTION FAILURE
# =================================================
except pyodbc.OperationalError as op_error:
retry_count += 1
print("\n================================")
print(
f"SQL CONNECTION LOST "
f"- RETRY {retry_count}"
)
print("================================")
print(str(op_error))
time.sleep(10)
try:
sql_conn.close()
except:
pass
# RECONNECT SQL
sql_conn = connect_sql()
# =================================================
# OTHER ERROR
# =================================================
except Exception as loop_error:
print("\n================================")
print("MAIN LOOP ERROR")
print("================================")
print(str(loop_error))
traceback.print_exc()
break
print("\n================================")
print("ETL COMPLETED SUCCESSFULLY")
print("================================")
# =========================================================
# MAIN ERROR
# =========================================================
except Exception as main_error:
print("\n================================")
print("MAIN ERROR")
print("================================")
print(str(main_error))
traceback.print_exc()
with open(
"main_error.log",
"a",
encoding="utf-8"
) as log:
log.write(
"\n\n================================"
)
log.write(
f"\nTIME : {datetime.now()}"
)
log.write(
f"\nERROR : {str(main_error)}"
)
log.write(
f"\nTRACEBACK :\n"
f"{traceback.format_exc()}"
)
log.write(
"\n================================"
)
# =========================================================
# CLOSE CONNECTIONS
# =========================================================
finally:
try:
sql_conn.close()
print("\nSQL SERVER CONNECTION CLOSED")
except:
pass
try:
ch_client.close()
print("CLICKHOUSE CONNECTION CLOSED")
except:
pass
print("\n================================================")
print("ETL FINISHED :", datetime.now())
print("================================================")
+604
View File
@@ -0,0 +1,604 @@
import pyodbc
import pandas as pd
import clickhouse_connect
import numpy as np
from datetime import datetime
import traceback
import warnings
import time
# =========================================================
# IGNORE WARNINGS
# =========================================================
warnings.filterwarnings(
'ignore',
'pandas only supports SQLAlchemy connectable'
)
print("\n================================================")
print("ETL STARTED :", datetime.now())
print("================================================")
# =========================================================
# SQL SERVER CONNECTION STRING
# =========================================================
SQL_CONN_STR = (
'DRIVER={ODBC Driver 17 for SQL Server};'
'SERVER=10.200.25.65;'
'DATABASE=CPMIndiaBusinessInsight;'
'UID=bsgteam_test;'
'PWD=B$gt3@m#00512;'
'TrustServerCertificate=yes;'
'Connection Timeout=60;'
)
# =========================================================
# CLICKHOUSE CONFIG
# =========================================================
CH_CONFIG = {
'host': '172.188.12.194',
'port': 8123,
'username': 'default',
'password': 'dipanshu_k',
'database': 'DaburIndia_BI'
}
# =========================================================
# TABLE DETAILS
# =========================================================
TABLE_NAME = 'Promotion'
PROJECT_ID = 41654
# =========================================================
# SETTINGS
# =========================================================
TRUNCATE_BEFORE_LOAD = True
table_truncated = False
# =========================================================
# CHUNK SIZE
# =========================================================
chunk_size = 25000
# =========================================================
# CONNECT SQL SERVER
# =========================================================
def connect_sql():
try:
conn = pyodbc.connect(
SQL_CONN_STR,
autocommit=True
)
print("Connected SQL Server")
return conn
except Exception as e:
print("SQL CONNECTION FAILED")
print(str(e))
raise
# =========================================================
# CONNECT CLICKHOUSE
# =========================================================
def connect_clickhouse():
try:
client = clickhouse_connect.get_client(
**CH_CONFIG
)
print("Connected ClickHouse")
return client
except Exception as e:
print("CLICKHOUSE CONNECTION FAILED")
print(str(e))
raise
# =========================================================
# CLEAN DATAFRAME
# =========================================================
def clean_dataframe(df):
try:
# ---------------------------------------------
# Replace NaN
# ---------------------------------------------
df = df.replace({np.nan: None})
# ---------------------------------------------
# Process Column Wise
# ---------------------------------------------
for col in df.columns:
try:
print(f"\nCleaning Column : {col}")
# =====================================
# AUTO DETECT DATE / TIME COLUMNS
# =====================================
if (
'date' in col.lower()
or 'time' in col.lower()
):
print(f"Date Column : {col}")
df[col] = pd.to_datetime(
df[col],
errors='coerce'
)
cleaned_dates = []
for val in df[col]:
if pd.isnull(val):
cleaned_dates.append(None)
else:
cleaned_dates.append(
val.to_pydatetime()
)
df[col] = cleaned_dates
# =====================================
# INTEGER COLUMNS
# =====================================
elif pd.api.types.is_integer_dtype(df[col]):
print(f"Integer Column : {col}")
df[col] = pd.to_numeric(
df[col],
errors='coerce'
)
df[col] = df[col].apply(
lambda x:
int(x)
if pd.notnull(x)
else None
)
# =====================================
# FLOAT COLUMNS
# =====================================
elif pd.api.types.is_float_dtype(df[col]):
print(f"Float Column : {col}")
df[col] = pd.to_numeric(
df[col],
errors='coerce'
)
df[col] = df[col].apply(
lambda x:
float(x)
if pd.notnull(x)
else None
)
# =====================================
# OBJECT / STRING COLUMNS
# =====================================
else:
print(f"String/Object Column : {col}")
cleaned = []
for val in df[col]:
# NULL
if pd.isnull(val):
cleaned.append(None)
# DATETIME
elif isinstance(
val,
(
datetime,
pd.Timestamp
)
):
if isinstance(
val,
pd.Timestamp
):
cleaned.append(
val.to_pydatetime()
)
else:
cleaned.append(val)
# INTEGER
elif isinstance(
val,
(
int,
np.integer
)
):
cleaned.append(
int(val)
)
# FLOAT
elif isinstance(
val,
(
float,
np.floating
)
):
if np.isnan(val):
cleaned.append(None)
else:
cleaned.append(
float(val)
)
# BOOLEAN
elif isinstance(
val,
bool
):
cleaned.append(
int(val)
)
# STRING
else:
cleaned.append(
str(val).strip()
)
df[col] = cleaned
except Exception as col_error:
print("\n================================")
print(f"COLUMN FAILED : {col}")
print(str(col_error))
print("================================")
return df
except Exception as clean_error:
print("\n================================")
print("DATA CLEAN FAILED")
print(str(clean_error))
print("================================")
return df
# =========================================================
# MAIN PROCESS
# =========================================================
try:
# =====================================================
# CONNECT SQL SERVER
# =====================================================
sql_conn = connect_sql()
# =====================================================
# CONNECT CLICKHOUSE
# =====================================================
ch_client = connect_clickhouse()
# =====================================================
# QUERY
# =====================================================
query = f"""
SELECT *
FROM dbo.[{TABLE_NAME}]
WHERE Project_Id = {PROJECT_ID}
"""
print("\nExecuting Query")
print(query)
# =====================================================
# RETRY SETTINGS
# =====================================================
retry_count = 0
max_retry = 5
# =====================================================
# MAIN RETRY LOOP
# =====================================================
while retry_count < max_retry:
try:
# =============================================
# READ SQL DATA
# =============================================
for chunk in pd.read_sql(
query,
sql_conn,
chunksize=chunk_size
):
try:
print("\n================================")
print(
f"Processing Rows : "
f"{len(chunk)}"
)
print("================================")
# =====================================
# CLEAN DATA
# =====================================
chunk = clean_dataframe(chunk)
# =====================================
# DEBUG COLUMN TYPES
# =====================================
print("\nCOLUMN DATATYPES")
print(chunk.dtypes)
print("\nCOLUMN SAMPLE TYPES")
for col in chunk.columns:
sample = chunk[col].dropna()
if len(sample) > 0:
print(
col,
type(sample.iloc[0]),
sample.iloc[0]
)
# =====================================
# TRUNCATE TABLE FIRST TIME ONLY
# =====================================
if (
TRUNCATE_BEFORE_LOAD
and
not table_truncated
):
print("\n================================")
print(
f"TRUNCATING TABLE : "
f"{TABLE_NAME}"
)
print("================================")
# IMPORTANT FIX
truncate_query = f"""
TRUNCATE TABLE
`{CH_CONFIG['database']}`.`{TABLE_NAME}`
"""
ch_client.command(
truncate_query
)
print(
"TABLE TRUNCATED SUCCESSFULLY"
)
table_truncated = True
# =====================================
# INSERT INTO CLICKHOUSE
# =====================================
print(
"\nINSERTING INTO CLICKHOUSE..."
)
ch_client.insert_df(
table=TABLE_NAME,
df=chunk,
database=CH_CONFIG['database']
)
print(
f"INSERTED : "
f"{len(chunk)} ROWS"
)
except Exception as insert_error:
print("\n================================")
print("INSERT FAILED")
print("================================")
print(str(insert_error))
traceback.print_exc()
# =================================
# SAVE ERROR LOG
# =================================
with open(
"insert_error.log",
"a",
encoding="utf-8"
) as log:
log.write(
"\n\n================================"
)
log.write(
f"\nTIME : "
f"{datetime.now()}"
)
log.write(
f"\nTABLE : "
f"{TABLE_NAME}"
)
log.write(
f"\nERROR : "
f"{str(insert_error)}"
)
log.write(
f"\nTRACEBACK :\n"
f"{traceback.format_exc()}"
)
log.write(
"\n================================"
)
continue
# =============================================
# SUCCESS
# =============================================
break
# =================================================
# SQL CONNECTION FAILURE
# =================================================
except pyodbc.OperationalError as op_error:
retry_count += 1
print("\n================================")
print(
f"SQL CONNECTION LOST "
f"- RETRY {retry_count}"
)
print("================================")
print(str(op_error))
time.sleep(10)
try:
sql_conn.close()
except:
pass
# RECONNECT SQL
sql_conn = connect_sql()
# =================================================
# OTHER ERROR
# =================================================
except Exception as loop_error:
print("\n================================")
print("MAIN LOOP ERROR")
print("================================")
print(str(loop_error))
traceback.print_exc()
break
print("\n================================")
print("ETL COMPLETED SUCCESSFULLY")
print("================================")
# =========================================================
# MAIN ERROR
# =========================================================
except Exception as main_error:
print("\n================================")
print("MAIN ERROR")
print("================================")
print(str(main_error))
traceback.print_exc()
with open(
"main_error.log",
"a",
encoding="utf-8"
) as log:
log.write(
"\n\n================================"
)
log.write(
f"\nTIME : {datetime.now()}"
)
log.write(
f"\nERROR : {str(main_error)}"
)
log.write(
f"\nTRACEBACK :\n"
f"{traceback.format_exc()}"
)
log.write(
"\n================================"
)
# =========================================================
# CLOSE CONNECTIONS
# =========================================================
finally:
try:
sql_conn.close()
print("\nSQL SERVER CONNECTION CLOSED")
except:
pass
try:
ch_client.close()
print("CLICKHOUSE CONNECTION CLOSED")
except:
pass
print("\n================================================")
print("ETL FINISHED :", datetime.now())
print("================================================")
+632
View File
@@ -0,0 +1,632 @@
import pyodbc
import pandas as pd
import clickhouse_connect
import numpy as np
from datetime import datetime
import traceback
import warnings
# =========================================================
# IGNORE WARNINGS
# =========================================================
warnings.filterwarnings(
'ignore',
'pandas only supports SQLAlchemy connectable'
)
print("\n====================================")
print("ETL Started :", datetime.now())
print("====================================")
# =========================================================
# SQL SERVER CONNECTION
# =========================================================
SQL_CONN_STR = (
'DRIVER={ODBC Driver 17 for SQL Server};'
'SERVER=10.200.25.65;'
'DATABASE=CPMIndiaBusinessInsight;'
'UID=bsgteam_test;'
'PWD=B$gt3@m#00512;'
'TrustServerCertificate=yes;'
)
# =========================================================
# CLICKHOUSE CONFIG
# =========================================================
CH_CONFIG = {
'host': '172.188.12.194',
'port': 8123,
'username': 'default',
'password': 'dipanshu_k',
'database': 'DaburIndia_BI'
}
# =========================================================
# TABLE DETAILS
# =========================================================
TABLE_NAME = 'SKU Master'
PROJECT_ID = 41654
# =========================================================
# SETTINGS
# =========================================================
TRUNCATE_BEFORE_LOAD = True
table_truncated = False
# =========================================================
# CLICKHOUSE DATE COLUMNS
# =========================================================
DATE_COLUMNS = [
'visit_date'
]
# =========================================================
# CLICKHOUSE DATETIME COLUMNS
# =========================================================
DATETIME_COLUMNS = [
'create_date',
'update_date'
]
# =========================================================
# CLEAN DATAFRAME
# =========================================================
def clean_dataframe(df):
try:
# ---------------------------------------------
# Replace NaN with None
# ---------------------------------------------
df = df.replace({np.nan: None})
# ---------------------------------------------
# Process Each Column
# ---------------------------------------------
for col in df.columns:
try:
print(f"\nCleaning Column : {col}")
# =====================================
# DATE COLUMNS
# =====================================
if col.lower() in [
x.lower() for x in DATE_COLUMNS
]:
print(f"Date Column : {col}")
df[col] = pd.to_datetime(
df[col],
errors='coerce'
)
cleaned_dates = []
for val in df[col]:
if pd.isnull(val):
cleaned_dates.append(None)
else:
cleaned_dates.append(
val.date()
)
df[col] = cleaned_dates
# =====================================
# DATETIME COLUMNS
# =====================================
elif col.lower() in [
x.lower() for x in DATETIME_COLUMNS
]:
print(f"DateTime Column : {col}")
df[col] = pd.to_datetime(
df[col],
errors='coerce'
)
cleaned_datetime = []
for val in df[col]:
if pd.isnull(val):
cleaned_datetime.append(None)
else:
cleaned_datetime.append(
val.to_pydatetime()
)
df[col] = cleaned_datetime
# =====================================
# INTEGER COLUMNS
# =====================================
elif pd.api.types.is_integer_dtype(df[col]):
print(f"Integer Column : {col}")
df[col] = pd.to_numeric(
df[col],
errors='coerce'
)
df[col] = df[col].apply(
lambda x:
int(x)
if pd.notnull(x)
else None
)
# =====================================
# FLOAT COLUMNS
# =====================================
elif pd.api.types.is_float_dtype(df[col]):
print(f"Float Column : {col}")
df[col] = pd.to_numeric(
df[col],
errors='coerce'
)
non_null = df[col].dropna()
# ---------------------------------
# Convert whole float to int
# Example:
# 12.0 -> 12
# ---------------------------------
if len(non_null) > 0 and (
(non_null % 1 == 0).all()
):
df[col] = df[col].apply(
lambda x:
int(x)
if pd.notnull(x)
else None
)
else:
df[col] = df[col].apply(
lambda x:
float(x)
if pd.notnull(x)
else None
)
# =====================================
# OBJECT / STRING COLUMNS
# =====================================
else:
print(f"String/Object Column : {col}")
cleaned = []
for val in df[col]:
try:
# -------------------------
# NULL
# -------------------------
if pd.isnull(val):
cleaned.append(None)
# -------------------------
# STRING
# -------------------------
elif isinstance(val, str):
cleaned.append(
val.strip()
)
# -------------------------
# BOOLEAN
# -------------------------
elif isinstance(val, bool):
cleaned.append(
int(val)
)
# -------------------------
# INTEGER
# -------------------------
elif isinstance(
val,
(
int,
np.integer
)
):
cleaned.append(
int(val)
)
# -------------------------
# FLOAT
# -------------------------
elif isinstance(
val,
(
float,
np.floating
)
):
if np.isnan(val):
cleaned.append(None)
else:
if val.is_integer():
cleaned.append(
int(val)
)
else:
cleaned.append(
float(val)
)
# -------------------------
# DATETIME
# -------------------------
elif isinstance(
val,
(
datetime,
pd.Timestamp
)
):
if isinstance(
val,
pd.Timestamp
):
cleaned.append(
val.to_pydatetime()
)
else:
cleaned.append(val)
# -------------------------
# DATE
# -------------------------
elif hasattr(val, 'year'):
cleaned.append(val)
# -------------------------
# OTHER
# -------------------------
else:
cleaned.append(
str(val)
)
except Exception as row_error:
print(
f"Row Cleaning Error "
f"in Column {col}"
)
print(str(row_error))
cleaned.append(None)
df[col] = cleaned
except Exception as col_error:
print("\n================================")
print(f"COLUMN FAILED : {col}")
print(str(col_error))
print("================================")
return df
except Exception as clean_error:
print("\n================================")
print("DATA CLEAN FAILED")
print(str(clean_error))
print(traceback.format_exc())
print("================================")
return df
# =========================================================
# MAIN PROCESS
# =========================================================
try:
# =====================================================
# CONNECT SQL SERVER
# =====================================================
print("\nConnecting SQL Server...")
sql_conn = pyodbc.connect(SQL_CONN_STR)
print("Connected to SQL Server")
# =====================================================
# CONNECT CLICKHOUSE
# =====================================================
print("\nConnecting ClickHouse...")
ch_client = clickhouse_connect.get_client(**CH_CONFIG)
print("Connected to ClickHouse")
# =====================================================
# QUERY
# =====================================================
query = f"""
SELECT *
FROM dbo.[{TABLE_NAME}]
WHERE Project_Id = {PROJECT_ID}
"""
print("\n====================================")
print("Executing Query")
print("====================================")
print(query)
# =====================================================
# CHUNK SIZE
# =====================================================
chunk_size = 100000
total_rows = 0
# =====================================================
# READ DATA
# =====================================================
for chunk in pd.read_sql(
query,
sql_conn,
chunksize=chunk_size
):
try:
print("\n====================================")
print(f"Processing Rows : {len(chunk)}")
print("====================================")
# =================================================
# CLEAN DATA
# =================================================
chunk = clean_dataframe(chunk)
# =================================================
# DEBUG COLUMN TYPES
# =================================================
print("\nCOLUMN TYPES")
for col in chunk.columns:
try:
sample = chunk[col].dropna()
if len(sample) > 0:
print(
col,
type(sample.iloc[0]),
sample.iloc[0]
)
except:
pass
# =================================================
# TRUNCATE TABLE
# =================================================
if TRUNCATE_BEFORE_LOAD and not table_truncated:
try:
print("\n====================================")
print(f"TRUNCATING : {TABLE_NAME}")
print("====================================")
truncate_query = f"""
TRUNCATE TABLE
`{CH_CONFIG['database']}`.`{TABLE_NAME}`
"""
print(truncate_query)
ch_client.command(
truncate_query
)
print(
"TABLE TRUNCATED SUCCESSFULLY"
)
table_truncated = True
except Exception as truncate_error:
print("\nTRUNCATE FAILED")
print(str(truncate_error))
raise
# =================================================
# INSERT DATA
# =================================================
try:
print("\n====================================")
print("INSERTING DATA INTO CLICKHOUSE")
print("====================================")
ch_client.insert_df(
table=f"`{TABLE_NAME}`",
df=chunk,
database=CH_CONFIG['database']
)
total_rows += len(chunk)
print(
f"\nTOTAL INSERTED : "
f"{total_rows}"
)
except Exception as insert_error:
print("\nINSERT FAILED")
print(str(insert_error))
traceback.print_exc()
# =============================================
# SAVE ERROR LOG
# =============================================
with open(
"clickhouse_insert_error.log",
"a",
encoding="utf-8"
) as log:
log.write(
"\n\n================================"
)
log.write(
f"\nTIME : {datetime.now()}"
)
log.write(
f"\nTABLE : {TABLE_NAME}"
)
log.write(
f"\nERROR : {str(insert_error)}"
)
log.write(
f"\nTRACEBACK :\n"
f"{traceback.format_exc()}"
)
log.write(
"\n================================"
)
continue
except Exception as chunk_error:
print("\n====================================")
print("CHUNK PROCESS FAILED")
print("====================================")
print(str(chunk_error))
traceback.print_exc()
continue
print("\n====================================")
print("ETL COMPLETED SUCCESSFULLY")
print(f"TOTAL ROWS INSERTED : {total_rows}")
print("====================================")
# =========================================================
# MAIN ERROR
# =========================================================
except Exception as main_error:
print("\n====================================")
print("MAIN ERROR")
print("====================================")
print(str(main_error))
traceback.print_exc()
with open(
"clickhouse_main_error.log",
"a",
encoding="utf-8"
) as log:
log.write(
"\n\n================================"
)
log.write(
f"\nTIME : {datetime.now()}"
)
log.write(
f"\nERROR : {str(main_error)}"
)
log.write(
f"\nTRACEBACK :\n"
f"{traceback.format_exc()}"
)
log.write(
"\n================================"
)
# =========================================================
# CLOSE CONNECTIONS
# =========================================================
finally:
try:
sql_conn.close()
print("\nSQL Server Connection Closed")
except:
pass
try:
ch_client.close()
print("ClickHouse Connection Closed")
except:
pass
print("\n====================================")
print("ETL Finished :", datetime.now())
print("====================================")
+605
View File
@@ -0,0 +1,605 @@
import pyodbc
import pandas as pd
import clickhouse_connect
import numpy as np
from datetime import datetime
import traceback
import warnings
import time
# =========================================================
# IGNORE WARNINGS
# =========================================================
warnings.filterwarnings(
'ignore',
'pandas only supports SQLAlchemy connectable'
)
print("\n================================================")
print("ETL STARTED :", datetime.now())
print("================================================")
# =========================================================
# SQL SERVER CONNECTION STRING
# =========================================================
SQL_CONN_STR = (
'DRIVER={ODBC Driver 17 for SQL Server};'
'SERVER=10.200.25.65;'
'DATABASE=CPMIndiaBusinessInsight;'
'UID=bsgteam_test;'
'PWD=B$gt3@m#00512;'
'TrustServerCertificate=yes;'
'Connection Timeout=60;'
)
# =========================================================
# CLICKHOUSE CONFIG
# =========================================================
CH_CONFIG = {
'host': '172.188.12.194',
'port': 8123,
'username': 'default',
'password': 'dipanshu_k',
'database': 'DaburIndia_BI'
}
# =========================================================
# TABLE DETAILS
# =========================================================
TABLE_NAME = 'SOS_OneApp'
PROJECT_ID = 41654
# =========================================================
# SETTINGS
# =========================================================
TRUNCATE_BEFORE_LOAD = True
table_truncated = False
# =========================================================
# CHUNK SIZE
# =========================================================
chunk_size = 25000
# =========================================================
# CONNECT SQL SERVER
# =========================================================
def connect_sql():
try:
conn = pyodbc.connect(
SQL_CONN_STR,
autocommit=True
)
print("Connected SQL Server")
return conn
except Exception as e:
print("SQL CONNECTION FAILED")
print(str(e))
raise
# =========================================================
# CONNECT CLICKHOUSE
# =========================================================
def connect_clickhouse():
try:
client = clickhouse_connect.get_client(
**CH_CONFIG
)
print("Connected ClickHouse")
return client
except Exception as e:
print("CLICKHOUSE CONNECTION FAILED")
print(str(e))
raise
# =========================================================
# CLEAN DATAFRAME
# =========================================================
def clean_dataframe(df):
try:
# ---------------------------------------------
# Replace NaN
# ---------------------------------------------
df = df.replace({np.nan: None})
# ---------------------------------------------
# Process Column Wise
# ---------------------------------------------
for col in df.columns:
try:
print(f"\nCleaning Column : {col}")
# =====================================
# AUTO DETECT DATE / TIME COLUMNS
# =====================================
if (
'date' in col.lower()
or 'time' in col.lower()
):
print(f"Date Column : {col}")
df[col] = pd.to_datetime(
df[col],
errors='coerce'
)
cleaned_dates = []
for val in df[col]:
if pd.isnull(val):
cleaned_dates.append(None)
else:
cleaned_dates.append(
val.to_pydatetime()
)
df[col] = cleaned_dates
# =====================================
# INTEGER COLUMNS
# =====================================
elif pd.api.types.is_integer_dtype(df[col]):
print(f"Integer Column : {col}")
df[col] = pd.to_numeric(
df[col],
errors='coerce'
)
df[col] = df[col].apply(
lambda x:
int(x)
if pd.notnull(x)
else None
)
# =====================================
# FLOAT COLUMNS
# =====================================
elif pd.api.types.is_float_dtype(df[col]):
print(f"Float Column : {col}")
df[col] = pd.to_numeric(
df[col],
errors='coerce'
)
df[col] = df[col].apply(
lambda x:
float(x)
if pd.notnull(x)
else None
)
# =====================================
# OBJECT / STRING COLUMNS
# =====================================
else:
print(f"String/Object Column : {col}")
cleaned = []
for val in df[col]:
# NULL
if pd.isnull(val):
cleaned.append(None)
# DATETIME
elif isinstance(
val,
(
datetime,
pd.Timestamp
)
):
if isinstance(
val,
pd.Timestamp
):
cleaned.append(
val.to_pydatetime()
)
else:
cleaned.append(val)
# INTEGER
elif isinstance(
val,
(
int,
np.integer
)
):
cleaned.append(
int(val)
)
# FLOAT
elif isinstance(
val,
(
float,
np.floating
)
):
if np.isnan(val):
cleaned.append(None)
else:
cleaned.append(
float(val)
)
# BOOLEAN
elif isinstance(
val,
bool
):
cleaned.append(
int(val)
)
# STRING
else:
cleaned.append(
str(val).strip()
)
df[col] = cleaned
except Exception as col_error:
print("\n================================")
print(f"COLUMN FAILED : {col}")
print(str(col_error))
print("================================")
return df
except Exception as clean_error:
print("\n================================")
print("DATA CLEAN FAILED")
print(str(clean_error))
print("================================")
return df
# =========================================================
# MAIN PROCESS
# =========================================================
try:
# =====================================================
# CONNECT SQL SERVER
# =====================================================
sql_conn = connect_sql()
# =====================================================
# CONNECT CLICKHOUSE
# =====================================================
ch_client = connect_clickhouse()
# =====================================================
# QUERY
# =====================================================
query = f"""
SELECT *
FROM dbo.[{TABLE_NAME}]
WHERE Project_Id = {PROJECT_ID}
"""
print("\nExecuting Query")
print(query)
# =====================================================
# RETRY SETTINGS
# =====================================================
retry_count = 0
max_retry = 5
# =====================================================
# MAIN RETRY LOOP
# =====================================================
while retry_count < max_retry:
try:
# =============================================
# READ SQL DATA
# =============================================
for chunk in pd.read_sql(
query,
sql_conn,
chunksize=chunk_size
):
try:
print("\n================================")
print(
f"Processing Rows : "
f"{len(chunk)}"
)
print("================================")
# =====================================
# CLEAN DATA
# =====================================
chunk = clean_dataframe(chunk)
# =====================================
# DEBUG COLUMN TYPES
# =====================================
print("\nCOLUMN DATATYPES")
print(chunk.dtypes)
print("\nCOLUMN SAMPLE TYPES")
for col in chunk.columns:
sample = chunk[col].dropna()
if len(sample) > 0:
print(
col,
type(sample.iloc[0]),
sample.iloc[0]
)
# =====================================
# TRUNCATE TABLE FIRST TIME ONLY
# =====================================
if (
TRUNCATE_BEFORE_LOAD
and
not table_truncated
):
print("\n================================")
print(
f"TRUNCATING TABLE : "
f"{TABLE_NAME}"
)
print("================================")
# IMPORTANT FIX
truncate_query = f"""
TRUNCATE TABLE
`{CH_CONFIG['database']}`.`{TABLE_NAME}`
"""
ch_client.command(
truncate_query
)
print(
"TABLE TRUNCATED SUCCESSFULLY"
)
table_truncated = True
# =====================================
# INSERT INTO CLICKHOUSE
# =====================================
print(
"\nINSERTING INTO CLICKHOUSE..."
)
ch_client.insert_df(
table=TABLE_NAME,
df=chunk,
database=CH_CONFIG['database']
)
print(
f"INSERTED : "
f"{len(chunk)} ROWS"
)
except Exception as insert_error:
print("\n================================")
print("INSERT FAILED")
print("================================")
print(str(insert_error))
traceback.print_exc()
# =================================
# SAVE ERROR LOG
# =================================
with open(
"insert_error.log",
"a",
encoding="utf-8"
) as log:
log.write(
"\n\n================================"
)
log.write(
f"\nTIME : "
f"{datetime.now()}"
)
log.write(
f"\nTABLE : "
f"{TABLE_NAME}"
)
log.write(
f"\nERROR : "
f"{str(insert_error)}"
)
log.write(
f"\nTRACEBACK :\n"
f"{traceback.format_exc()}"
)
log.write(
"\n================================"
)
continue
# =============================================
# SUCCESS
# =============================================
break
# =================================================
# SQL CONNECTION FAILURE
# =================================================
except pyodbc.OperationalError as op_error:
retry_count += 1
print("\n================================")
print(
f"SQL CONNECTION LOST "
f"- RETRY {retry_count}"
)
print("================================")
print(str(op_error))
time.sleep(10)
try:
sql_conn.close()
except:
pass
# RECONNECT SQL
sql_conn = connect_sql()
# =================================================
# OTHER ERROR
# =================================================
except Exception as loop_error:
print("\n================================")
print("MAIN LOOP ERROR")
print("================================")
print(str(loop_error))
traceback.print_exc()
break
print("\n================================")
print("ETL COMPLETED SUCCESSFULLY")
print("================================")
# =========================================================
# MAIN ERROR
# =========================================================
except Exception as main_error:
print("\n================================")
print("MAIN ERROR")
print("================================")
print(str(main_error))
traceback.print_exc()
with open(
"main_error.log",
"a",
encoding="utf-8"
) as log:
log.write(
"\n\n================================"
)
log.write(
f"\nTIME : {datetime.now()}"
)
log.write(
f"\nERROR : {str(main_error)}"
)
log.write(
f"\nTRACEBACK :\n"
f"{traceback.format_exc()}"
)
log.write(
"\n================================"
)
# =========================================================
# CLOSE CONNECTIONS
# =========================================================
finally:
try:
sql_conn.close()
print("\nSQL SERVER CONNECTION CLOSED")
except:
pass
try:
ch_client.close()
print("CLICKHOUSE CONNECTION CLOSED")
except:
pass
print("\n================================================")
print("ETL FINISHED :", datetime.now())
print("================================================")
+604
View File
@@ -0,0 +1,604 @@
import pyodbc
import pandas as pd
import clickhouse_connect
import numpy as np
from datetime import datetime
import traceback
import warnings
import time
# =========================================================
# IGNORE WARNINGS
# =========================================================
warnings.filterwarnings(
'ignore',
'pandas only supports SQLAlchemy connectable'
)
print("\n================================================")
print("ETL STARTED :", datetime.now())
print("================================================")
# =========================================================
# SQL SERVER CONNECTION STRING
# =========================================================
SQL_CONN_STR = (
'DRIVER={ODBC Driver 17 for SQL Server};'
'SERVER=10.200.25.65;'
'DATABASE=CPMIndiaBusinessInsight;'
'UID=bsgteam_test;'
'PWD=B$gt3@m#00512;'
'TrustServerCertificate=yes;'
'Connection Timeout=60;'
)
# =========================================================
# CLICKHOUSE CONFIG
# =========================================================
CH_CONFIG = {
'host': '172.188.12.194',
'port': 8123,
'username': 'default',
'password': 'dipanshu_k',
'database': 'DaburIndia_BI'
}
# =========================================================
# TABLE DETAILS
# =========================================================
TABLE_NAME = 'Sales'
PROJECT_ID = 41654
# =========================================================
# SETTINGS
# =========================================================
TRUNCATE_BEFORE_LOAD = True
table_truncated = False
# =========================================================
# CHUNK SIZE
# =========================================================
chunk_size = 10000
# =========================================================
# CONNECT SQL SERVER
# =========================================================
def connect_sql():
try:
conn = pyodbc.connect(
SQL_CONN_STR,
autocommit=True
)
print("Connected SQL Server")
return conn
except Exception as e:
print("SQL CONNECTION FAILED")
print(str(e))
raise
# =========================================================
# CONNECT CLICKHOUSE
# =========================================================
def connect_clickhouse():
try:
client = clickhouse_connect.get_client(
**CH_CONFIG
)
print("Connected ClickHouse")
return client
except Exception as e:
print("CLICKHOUSE CONNECTION FAILED")
print(str(e))
raise
# =========================================================
# CLEAN DATAFRAME
# =========================================================
def clean_dataframe(df):
try:
# ---------------------------------------------
# Replace NaN
# ---------------------------------------------
df = df.replace({np.nan: None})
# ---------------------------------------------
# Process Column Wise
# ---------------------------------------------
for col in df.columns:
try:
print(f"\nCleaning Column : {col}")
# =====================================
# AUTO DETECT DATE / TIME COLUMNS
# =====================================
if (
'date' in col.lower()
or 'time' in col.lower()
):
print(f"Date Column : {col}")
df[col] = pd.to_datetime(
df[col],
errors='coerce'
)
cleaned_dates = []
for val in df[col]:
if pd.isnull(val):
cleaned_dates.append(None)
else:
cleaned_dates.append(
val.to_pydatetime()
)
df[col] = cleaned_dates
# =====================================
# INTEGER COLUMNS
# =====================================
elif pd.api.types.is_integer_dtype(df[col]):
print(f"Integer Column : {col}")
df[col] = pd.to_numeric(
df[col],
errors='coerce'
)
df[col] = df[col].apply(
lambda x:
int(x)
if pd.notnull(x)
else None
)
# =====================================
# FLOAT COLUMNS
# =====================================
elif pd.api.types.is_float_dtype(df[col]):
print(f"Float Column : {col}")
df[col] = pd.to_numeric(
df[col],
errors='coerce'
)
df[col] = df[col].apply(
lambda x:
float(x)
if pd.notnull(x)
else None
)
# =====================================
# OBJECT / STRING COLUMNS
# =====================================
else:
print(f"String/Object Column : {col}")
cleaned = []
for val in df[col]:
# NULL
if pd.isnull(val):
cleaned.append(None)
# DATETIME
elif isinstance(
val,
(
datetime,
pd.Timestamp
)
):
if isinstance(
val,
pd.Timestamp
):
cleaned.append(
val.to_pydatetime()
)
else:
cleaned.append(val)
# INTEGER
elif isinstance(
val,
(
int,
np.integer
)
):
cleaned.append(
int(val)
)
# FLOAT
elif isinstance(
val,
(
float,
np.floating
)
):
if np.isnan(val):
cleaned.append(None)
else:
cleaned.append(
float(val)
)
# BOOLEAN
elif isinstance(
val,
bool
):
cleaned.append(
int(val)
)
# STRING
else:
cleaned.append(
str(val).strip()
)
df[col] = cleaned
except Exception as col_error:
print("\n================================")
print(f"COLUMN FAILED : {col}")
print(str(col_error))
print("================================")
return df
except Exception as clean_error:
print("\n================================")
print("DATA CLEAN FAILED")
print(str(clean_error))
print("================================")
return df
# =========================================================
# MAIN PROCESS
# =========================================================
try:
# =====================================================
# CONNECT SQL SERVER
# =====================================================
sql_conn = connect_sql()
# =====================================================
# CONNECT CLICKHOUSE
# =====================================================
ch_client = connect_clickhouse()
# =====================================================
# QUERY
# =====================================================
query = f"""
SELECT *
FROM dbo.[{TABLE_NAME}]
WHERE Project_Id = {PROJECT_ID}
"""
print("\nExecuting Query")
print(query)
# =====================================================
# RETRY SETTINGS
# =====================================================
retry_count = 0
max_retry = 5
# =====================================================
# MAIN RETRY LOOP
# =====================================================
while retry_count < max_retry:
try:
# =============================================
# READ SQL DATA
# =============================================
for chunk in pd.read_sql(
query,
sql_conn,
chunksize=chunk_size
):
try:
print("\n================================")
print(
f"Processing Rows : "
f"{len(chunk)}"
)
print("================================")
# =====================================
# CLEAN DATA
# =====================================
chunk = clean_dataframe(chunk)
# =====================================
# DEBUG COLUMN TYPES
# =====================================
print("\nCOLUMN DATATYPES")
print(chunk.dtypes)
print("\nCOLUMN SAMPLE TYPES")
for col in chunk.columns:
sample = chunk[col].dropna()
if len(sample) > 0:
print(
col,
type(sample.iloc[0]),
sample.iloc[0]
)
# =====================================
# TRUNCATE TABLE FIRST TIME ONLY
# =====================================
if (
TRUNCATE_BEFORE_LOAD
and
not table_truncated
):
print("\n================================")
print(
f"TRUNCATING TABLE : "
f"{TABLE_NAME}"
)
print("================================")
# IMPORTANT FIX
truncate_query = f"""
TRUNCATE TABLE
`{CH_CONFIG['database']}`.`{TABLE_NAME}`
"""
ch_client.command(
truncate_query
)
print(
"TABLE TRUNCATED SUCCESSFULLY"
)
table_truncated = True
# =====================================
# INSERT INTO CLICKHOUSE
# =====================================
print(
"\nINSERTING INTO CLICKHOUSE..."
)
ch_client.insert_df(
table=TABLE_NAME,
df=chunk,
database=CH_CONFIG['database']
)
print(
f"INSERTED : "
f"{len(chunk)} ROWS"
)
except Exception as insert_error:
print("\n================================")
print("INSERT FAILED")
print("================================")
print(str(insert_error))
traceback.print_exc()
# =================================
# SAVE ERROR LOG
# =================================
with open(
"insert_error.log",
"a",
encoding="utf-8"
) as log:
log.write(
"\n\n================================"
)
log.write(
f"\nTIME : "
f"{datetime.now()}"
)
log.write(
f"\nTABLE : "
f"{TABLE_NAME}"
)
log.write(
f"\nERROR : "
f"{str(insert_error)}"
)
log.write(
f"\nTRACEBACK :\n"
f"{traceback.format_exc()}"
)
log.write(
"\n================================"
)
continue
# =============================================
# SUCCESS
# =============================================
break
# =================================================
# SQL CONNECTION FAILURE
# =================================================
except pyodbc.OperationalError as op_error:
retry_count += 1
print("\n================================")
print(
f"SQL CONNECTION LOST "
f"- RETRY {retry_count}"
)
print("================================")
print(str(op_error))
time.sleep(10)
try:
sql_conn.close()
except:
pass
# RECONNECT SQL
sql_conn = connect_sql()
# =================================================
# OTHER ERROR
# =================================================
except Exception as loop_error:
print("\n================================")
print("MAIN LOOP ERROR")
print("================================")
print(str(loop_error))
traceback.print_exc()
break
print("\n================================")
print("ETL COMPLETED SUCCESSFULLY")
print("================================")
# =========================================================
# MAIN ERROR
# =========================================================
except Exception as main_error:
print("\n================================")
print("MAIN ERROR")
print("================================")
print(str(main_error))
traceback.print_exc()
with open(
"main_error.log",
"a",
encoding="utf-8"
) as log:
log.write(
"\n\n================================"
)
log.write(
f"\nTIME : {datetime.now()}"
)
log.write(
f"\nERROR : {str(main_error)}"
)
log.write(
f"\nTRACEBACK :\n"
f"{traceback.format_exc()}"
)
log.write(
"\n================================"
)
# =========================================================
# CLOSE CONNECTIONS
# =========================================================
finally:
try:
sql_conn.close()
print("\nSQL SERVER CONNECTION CLOSED")
except:
pass
try:
ch_client.close()
print("CLICKHOUSE CONNECTION CLOSED")
except:
pass
print("\n================================================")
print("ETL FINISHED :", datetime.now())
print("================================================")
+605
View File
@@ -0,0 +1,605 @@
import pyodbc
import pandas as pd
import clickhouse_connect
import numpy as np
from datetime import datetime
import traceback
import warnings
import time
# =========================================================
# IGNORE WARNINGS
# =========================================================
warnings.filterwarnings(
'ignore',
'pandas only supports SQLAlchemy connectable'
)
print("\n================================================")
print("ETL STARTED :", datetime.now())
print("================================================")
# =========================================================
# SQL SERVER CONNECTION STRING
# =========================================================
SQL_CONN_STR = (
'DRIVER={ODBC Driver 17 for SQL Server};'
'SERVER=10.200.25.65;'
'DATABASE=CPMIndiaBusinessInsight;'
'UID=bsgteam_test;'
'PWD=B$gt3@m#00512;'
'TrustServerCertificate=yes;'
'Connection Timeout=60;'
)
# =========================================================
# CLICKHOUSE CONFIG
# =========================================================
CH_CONFIG = {
'host': '172.188.12.194',
'port': 8123,
'username': 'default',
'password': 'dipanshu_k',
'database': 'DaburIndia_BI'
}
# =========================================================
# TABLE DETAILS
# =========================================================
TABLE_NAME = 'Stock_Details'
PROJECT_ID = 41654
# =========================================================
# SETTINGS
# =========================================================
TRUNCATE_BEFORE_LOAD = True
table_truncated = False
# =========================================================
# CHUNK SIZE
# =========================================================
chunk_size = 25000
# =========================================================
# CONNECT SQL SERVER
# =========================================================
def connect_sql():
try:
conn = pyodbc.connect(
SQL_CONN_STR,
autocommit=True
)
print("Connected SQL Server")
return conn
except Exception as e:
print("SQL CONNECTION FAILED")
print(str(e))
raise
# =========================================================
# CONNECT CLICKHOUSE
# =========================================================
def connect_clickhouse():
try:
client = clickhouse_connect.get_client(
**CH_CONFIG
)
print("Connected ClickHouse")
return client
except Exception as e:
print("CLICKHOUSE CONNECTION FAILED")
print(str(e))
raise
# =========================================================
# CLEAN DATAFRAME
# =========================================================
def clean_dataframe(df):
try:
# ---------------------------------------------
# Replace NaN
# ---------------------------------------------
df = df.replace({np.nan: None})
# ---------------------------------------------
# Process Column Wise
# ---------------------------------------------
for col in df.columns:
try:
print(f"\nCleaning Column : {col}")
# =====================================
# AUTO DETECT DATE / TIME COLUMNS
# =====================================
if (
'date' in col.lower()
or 'time' in col.lower()
):
print(f"Date Column : {col}")
df[col] = pd.to_datetime(
df[col],
errors='coerce'
)
cleaned_dates = []
for val in df[col]:
if pd.isnull(val):
cleaned_dates.append(None)
else:
cleaned_dates.append(
val.to_pydatetime()
)
df[col] = cleaned_dates
# =====================================
# INTEGER COLUMNS
# =====================================
elif pd.api.types.is_integer_dtype(df[col]):
print(f"Integer Column : {col}")
df[col] = pd.to_numeric(
df[col],
errors='coerce'
)
df[col] = df[col].apply(
lambda x:
int(x)
if pd.notnull(x)
else None
)
# =====================================
# FLOAT COLUMNS
# =====================================
elif pd.api.types.is_float_dtype(df[col]):
print(f"Float Column : {col}")
df[col] = pd.to_numeric(
df[col],
errors='coerce'
)
df[col] = df[col].apply(
lambda x:
float(x)
if pd.notnull(x)
else None
)
# =====================================
# OBJECT / STRING COLUMNS
# =====================================
else:
print(f"String/Object Column : {col}")
cleaned = []
for val in df[col]:
# NULL
if pd.isnull(val):
cleaned.append(None)
# DATETIME
elif isinstance(
val,
(
datetime,
pd.Timestamp
)
):
if isinstance(
val,
pd.Timestamp
):
cleaned.append(
val.to_pydatetime()
)
else:
cleaned.append(val)
# INTEGER
elif isinstance(
val,
(
int,
np.integer
)
):
cleaned.append(
int(val)
)
# FLOAT
elif isinstance(
val,
(
float,
np.floating
)
):
if np.isnan(val):
cleaned.append(None)
else:
cleaned.append(
float(val)
)
# BOOLEAN
elif isinstance(
val,
bool
):
cleaned.append(
int(val)
)
# STRING
else:
cleaned.append(
str(val).strip()
)
df[col] = cleaned
except Exception as col_error:
print("\n================================")
print(f"COLUMN FAILED : {col}")
print(str(col_error))
print("================================")
return df
except Exception as clean_error:
print("\n================================")
print("DATA CLEAN FAILED")
print(str(clean_error))
print("================================")
return df
# =========================================================
# MAIN PROCESS
# =========================================================
try:
# =====================================================
# CONNECT SQL SERVER
# =====================================================
sql_conn = connect_sql()
# =====================================================
# CONNECT CLICKHOUSE
# =====================================================
ch_client = connect_clickhouse()
# =====================================================
# QUERY
# =====================================================
query = f"""
SELECT *
FROM dbo.[{TABLE_NAME}]
WHERE Project_Id = {PROJECT_ID}
"""
print("\nExecuting Query")
print(query)
# =====================================================
# RETRY SETTINGS
# =====================================================
retry_count = 0
max_retry = 5
# =====================================================
# MAIN RETRY LOOP
# =====================================================
while retry_count < max_retry:
try:
# =============================================
# READ SQL DATA
# =============================================
for chunk in pd.read_sql(
query,
sql_conn,
chunksize=chunk_size
):
try:
print("\n================================")
print(
f"Processing Rows : "
f"{len(chunk)}"
)
print("================================")
# =====================================
# CLEAN DATA
# =====================================
chunk = clean_dataframe(chunk)
# =====================================
# DEBUG COLUMN TYPES
# =====================================
print("\nCOLUMN DATATYPES")
print(chunk.dtypes)
print("\nCOLUMN SAMPLE TYPES")
for col in chunk.columns:
sample = chunk[col].dropna()
if len(sample) > 0:
print(
col,
type(sample.iloc[0]),
sample.iloc[0]
)
# =====================================
# TRUNCATE TABLE FIRST TIME ONLY
# =====================================
if (
TRUNCATE_BEFORE_LOAD
and
not table_truncated
):
print("\n================================")
print(
f"TRUNCATING TABLE : "
f"{TABLE_NAME}"
)
print("================================")
# IMPORTANT FIX
truncate_query = f"""
TRUNCATE TABLE
`{CH_CONFIG['database']}`.`{TABLE_NAME}`
"""
ch_client.command(
truncate_query
)
print(
"TABLE TRUNCATED SUCCESSFULLY"
)
table_truncated = True
# =====================================
# INSERT INTO CLICKHOUSE
# =====================================
print(
"\nINSERTING INTO CLICKHOUSE..."
)
ch_client.insert_df(
table=TABLE_NAME,
df=chunk,
database=CH_CONFIG['database']
)
print(
f"INSERTED : "
f"{len(chunk)} ROWS"
)
except Exception as insert_error:
print("\n================================")
print("INSERT FAILED")
print("================================")
print(str(insert_error))
traceback.print_exc()
# =================================
# SAVE ERROR LOG
# =================================
with open(
"insert_error.log",
"a",
encoding="utf-8"
) as log:
log.write(
"\n\n================================"
)
log.write(
f"\nTIME : "
f"{datetime.now()}"
)
log.write(
f"\nTABLE : "
f"{TABLE_NAME}"
)
log.write(
f"\nERROR : "
f"{str(insert_error)}"
)
log.write(
f"\nTRACEBACK :\n"
f"{traceback.format_exc()}"
)
log.write(
"\n================================"
)
continue
# =============================================
# SUCCESS
# =============================================
break
# =================================================
# SQL CONNECTION FAILURE
# =================================================
except pyodbc.OperationalError as op_error:
retry_count += 1
print("\n================================")
print(
f"SQL CONNECTION LOST "
f"- RETRY {retry_count}"
)
print("================================")
print(str(op_error))
time.sleep(10)
try:
sql_conn.close()
except:
pass
# RECONNECT SQL
sql_conn = connect_sql()
# =================================================
# OTHER ERROR
# =================================================
except Exception as loop_error:
print("\n================================")
print("MAIN LOOP ERROR")
print("================================")
print(str(loop_error))
traceback.print_exc()
break
print("\n================================")
print("ETL COMPLETED SUCCESSFULLY")
print("================================")
# =========================================================
# MAIN ERROR
# =========================================================
except Exception as main_error:
print("\n================================")
print("MAIN ERROR")
print("================================")
print(str(main_error))
traceback.print_exc()
with open(
"main_error.log",
"a",
encoding="utf-8"
) as log:
log.write(
"\n\n================================"
)
log.write(
f"\nTIME : {datetime.now()}"
)
log.write(
f"\nERROR : {str(main_error)}"
)
log.write(
f"\nTRACEBACK :\n"
f"{traceback.format_exc()}"
)
log.write(
"\n================================"
)
# =========================================================
# CLOSE CONNECTIONS
# =========================================================
finally:
try:
sql_conn.close()
print("\nSQL SERVER CONNECTION CLOSED")
except:
pass
try:
ch_client.close()
print("CLICKHOUSE CONNECTION CLOSED")
except:
pass
print("\n================================================")
print("ETL FINISHED :", datetime.now())
print("================================================")
+632
View File
@@ -0,0 +1,632 @@
import pyodbc
import pandas as pd
import clickhouse_connect
import numpy as np
from datetime import datetime
import traceback
import warnings
# =========================================================
# IGNORE WARNINGS
# =========================================================
warnings.filterwarnings(
'ignore',
'pandas only supports SQLAlchemy connectable'
)
print("\n====================================")
print("ETL Started :", datetime.now())
print("====================================")
# =========================================================
# SQL SERVER CONNECTION
# =========================================================
SQL_CONN_STR = (
'DRIVER={ODBC Driver 17 for SQL Server};'
'SERVER=10.200.25.65;'
'DATABASE=CPMIndiaBusinessInsight;'
'UID=bsgteam_test;'
'PWD=B$gt3@m#00512;'
'TrustServerCertificate=yes;'
)
# =========================================================
# CLICKHOUSE CONFIG
# =========================================================
CH_CONFIG = {
'host': '172.188.12.194',
'port': 8123,
'username': 'default',
'password': 'dipanshu_k',
'database': 'DaburIndia_BI'
}
# =========================================================
# TABLE DETAILS
# =========================================================
TABLE_NAME = 'Web Logins'
PROJECT_ID = 41654
# =========================================================
# SETTINGS
# =========================================================
TRUNCATE_BEFORE_LOAD = True
table_truncated = False
# =========================================================
# CLICKHOUSE DATE COLUMNS
# =========================================================
DATE_COLUMNS = [
'visit_date'
]
# =========================================================
# CLICKHOUSE DATETIME COLUMNS
# =========================================================
DATETIME_COLUMNS = [
'create_date',
'update_date'
]
# =========================================================
# CLEAN DATAFRAME
# =========================================================
def clean_dataframe(df):
try:
# ---------------------------------------------
# Replace NaN with None
# ---------------------------------------------
df = df.replace({np.nan: None})
# ---------------------------------------------
# Process Each Column
# ---------------------------------------------
for col in df.columns:
try:
print(f"\nCleaning Column : {col}")
# =====================================
# DATE COLUMNS
# =====================================
if col.lower() in [
x.lower() for x in DATE_COLUMNS
]:
print(f"Date Column : {col}")
df[col] = pd.to_datetime(
df[col],
errors='coerce'
)
cleaned_dates = []
for val in df[col]:
if pd.isnull(val):
cleaned_dates.append(None)
else:
cleaned_dates.append(
val.date()
)
df[col] = cleaned_dates
# =====================================
# DATETIME COLUMNS
# =====================================
elif col.lower() in [
x.lower() for x in DATETIME_COLUMNS
]:
print(f"DateTime Column : {col}")
df[col] = pd.to_datetime(
df[col],
errors='coerce'
)
cleaned_datetime = []
for val in df[col]:
if pd.isnull(val):
cleaned_datetime.append(None)
else:
cleaned_datetime.append(
val.to_pydatetime()
)
df[col] = cleaned_datetime
# =====================================
# INTEGER COLUMNS
# =====================================
elif pd.api.types.is_integer_dtype(df[col]):
print(f"Integer Column : {col}")
df[col] = pd.to_numeric(
df[col],
errors='coerce'
)
df[col] = df[col].apply(
lambda x:
int(x)
if pd.notnull(x)
else None
)
# =====================================
# FLOAT COLUMNS
# =====================================
elif pd.api.types.is_float_dtype(df[col]):
print(f"Float Column : {col}")
df[col] = pd.to_numeric(
df[col],
errors='coerce'
)
non_null = df[col].dropna()
# ---------------------------------
# Convert whole float to int
# Example:
# 12.0 -> 12
# ---------------------------------
if len(non_null) > 0 and (
(non_null % 1 == 0).all()
):
df[col] = df[col].apply(
lambda x:
int(x)
if pd.notnull(x)
else None
)
else:
df[col] = df[col].apply(
lambda x:
float(x)
if pd.notnull(x)
else None
)
# =====================================
# OBJECT / STRING COLUMNS
# =====================================
else:
print(f"String/Object Column : {col}")
cleaned = []
for val in df[col]:
try:
# -------------------------
# NULL
# -------------------------
if pd.isnull(val):
cleaned.append(None)
# -------------------------
# STRING
# -------------------------
elif isinstance(val, str):
cleaned.append(
val.strip()
)
# -------------------------
# BOOLEAN
# -------------------------
elif isinstance(val, bool):
cleaned.append(
int(val)
)
# -------------------------
# INTEGER
# -------------------------
elif isinstance(
val,
(
int,
np.integer
)
):
cleaned.append(
int(val)
)
# -------------------------
# FLOAT
# -------------------------
elif isinstance(
val,
(
float,
np.floating
)
):
if np.isnan(val):
cleaned.append(None)
else:
if val.is_integer():
cleaned.append(
int(val)
)
else:
cleaned.append(
float(val)
)
# -------------------------
# DATETIME
# -------------------------
elif isinstance(
val,
(
datetime,
pd.Timestamp
)
):
if isinstance(
val,
pd.Timestamp
):
cleaned.append(
val.to_pydatetime()
)
else:
cleaned.append(val)
# -------------------------
# DATE
# -------------------------
elif hasattr(val, 'year'):
cleaned.append(val)
# -------------------------
# OTHER
# -------------------------
else:
cleaned.append(
str(val)
)
except Exception as row_error:
print(
f"Row Cleaning Error "
f"in Column {col}"
)
print(str(row_error))
cleaned.append(None)
df[col] = cleaned
except Exception as col_error:
print("\n================================")
print(f"COLUMN FAILED : {col}")
print(str(col_error))
print("================================")
return df
except Exception as clean_error:
print("\n================================")
print("DATA CLEAN FAILED")
print(str(clean_error))
print(traceback.format_exc())
print("================================")
return df
# =========================================================
# MAIN PROCESS
# =========================================================
try:
# =====================================================
# CONNECT SQL SERVER
# =====================================================
print("\nConnecting SQL Server...")
sql_conn = pyodbc.connect(SQL_CONN_STR)
print("Connected to SQL Server")
# =====================================================
# CONNECT CLICKHOUSE
# =====================================================
print("\nConnecting ClickHouse...")
ch_client = clickhouse_connect.get_client(**CH_CONFIG)
print("Connected to ClickHouse")
# =====================================================
# QUERY
# =====================================================
query = f"""
SELECT *
FROM dbo.[{TABLE_NAME}]
WHERE Project_Id = {PROJECT_ID}
"""
print("\n====================================")
print("Executing Query")
print("====================================")
print(query)
# =====================================================
# CHUNK SIZE
# =====================================================
chunk_size = 100000
total_rows = 0
# =====================================================
# READ DATA
# =====================================================
for chunk in pd.read_sql(
query,
sql_conn,
chunksize=chunk_size
):
try:
print("\n====================================")
print(f"Processing Rows : {len(chunk)}")
print("====================================")
# =================================================
# CLEAN DATA
# =================================================
chunk = clean_dataframe(chunk)
# =================================================
# DEBUG COLUMN TYPES
# =================================================
print("\nCOLUMN TYPES")
for col in chunk.columns:
try:
sample = chunk[col].dropna()
if len(sample) > 0:
print(
col,
type(sample.iloc[0]),
sample.iloc[0]
)
except:
pass
# =================================================
# TRUNCATE TABLE
# =================================================
if TRUNCATE_BEFORE_LOAD and not table_truncated:
try:
print("\n====================================")
print(f"TRUNCATING : {TABLE_NAME}")
print("====================================")
truncate_query = f"""
TRUNCATE TABLE
`{CH_CONFIG['database']}`.`{TABLE_NAME}`
"""
print(truncate_query)
ch_client.command(
truncate_query
)
print(
"TABLE TRUNCATED SUCCESSFULLY"
)
table_truncated = True
except Exception as truncate_error:
print("\nTRUNCATE FAILED")
print(str(truncate_error))
raise
# =================================================
# INSERT DATA
# =================================================
try:
print("\n====================================")
print("INSERTING DATA INTO CLICKHOUSE")
print("====================================")
ch_client.insert_df(
table=f"`{TABLE_NAME}`",
df=chunk,
database=CH_CONFIG['database']
)
total_rows += len(chunk)
print(
f"\nTOTAL INSERTED : "
f"{total_rows}"
)
except Exception as insert_error:
print("\nINSERT FAILED")
print(str(insert_error))
traceback.print_exc()
# =============================================
# SAVE ERROR LOG
# =============================================
with open(
"clickhouse_insert_error.log",
"a",
encoding="utf-8"
) as log:
log.write(
"\n\n================================"
)
log.write(
f"\nTIME : {datetime.now()}"
)
log.write(
f"\nTABLE : {TABLE_NAME}"
)
log.write(
f"\nERROR : {str(insert_error)}"
)
log.write(
f"\nTRACEBACK :\n"
f"{traceback.format_exc()}"
)
log.write(
"\n================================"
)
continue
except Exception as chunk_error:
print("\n====================================")
print("CHUNK PROCESS FAILED")
print("====================================")
print(str(chunk_error))
traceback.print_exc()
continue
print("\n====================================")
print("ETL COMPLETED SUCCESSFULLY")
print(f"TOTAL ROWS INSERTED : {total_rows}")
print("====================================")
# =========================================================
# MAIN ERROR
# =========================================================
except Exception as main_error:
print("\n====================================")
print("MAIN ERROR")
print("====================================")
print(str(main_error))
traceback.print_exc()
with open(
"clickhouse_main_error.log",
"a",
encoding="utf-8"
) as log:
log.write(
"\n\n================================"
)
log.write(
f"\nTIME : {datetime.now()}"
)
log.write(
f"\nERROR : {str(main_error)}"
)
log.write(
f"\nTRACEBACK :\n"
f"{traceback.format_exc()}"
)
log.write(
"\n================================"
)
# =========================================================
# CLOSE CONNECTIONS
# =========================================================
finally:
try:
sql_conn.close()
print("\nSQL Server Connection Closed")
except:
pass
try:
ch_client.close()
print("ClickHouse Connection Closed")
except:
pass
print("\n====================================")
print("ETL Finished :", datetime.now())
print("====================================")
File diff suppressed because it is too large Load Diff
+99
View File
@@ -90,3 +90,102 @@ Traceback (most recent call last):
NameError: name 'PROJECT_ID' is not defined. Did you mean: 'PROJECTID'?
================================
================================
TIME : 2026-05-19 15:19:40.194819
ERROR : ('08S01', '[08S01] [Microsoft][ODBC Driver 17 for SQL Server]TCP Provider: An existing connection was forcibly closed by the remote host.\r\n (10054) (SQLGetData); [08S01] [Microsoft][ODBC Driver 17 for SQL Server]Communication link failure (10054)')
TRACEBACK :
Traceback (most recent call last):
File "d:\Python Code\PaidVisibility_Import.py", line 345, in <module>
for chunk in pd.read_sql(
~~~~~~~~~~~^
query,
^^^^^^
sql_conn,
^^^^^^^^^
chunksize=chunk_size
^^^^^^^^^^^^^^^^^^^^
):
^
File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\pandas\io\sql.py", line 2730, in _query_iterator
data = cursor.fetchmany(chunksize)
pyodbc.OperationalError: ('08S01', '[08S01] [Microsoft][ODBC Driver 17 for SQL Server]TCP Provider: An existing connection was forcibly closed by the remote host.\r\n (10054) (SQLGetData); [08S01] [Microsoft][ODBC Driver 17 for SQL Server]Communication link failure (10054)')
================================
================================
TIME : 2026-05-19 15:26:07.910371
ERROR : ('08001', '[08001] [Microsoft][ODBC Driver 17 for SQL Server]Named Pipes Provider: Could not open a connection to SQL Server [5]. (5) (SQLDriverConnect); [08001] [Microsoft][ODBC Driver 17 for SQL Server]Login timeout expired (0); [08001] [Microsoft][ODBC Driver 17 for SQL Server]A network-related or instance-specific error has occurred while establishing a connection to SQL Server. Server is not found or not accessible. Check if instance name is correct and if SQL Server is configured to allow remote connections. For more information see SQL Server Books Online. (5)')
TRACEBACK :
Traceback (most recent call last):
File "d:\Python Code\OQaD Import.py", line 310, in <module>
sql_conn = pyodbc.connect(SQL_CONN_STR)
pyodbc.OperationalError: ('08001', '[08001] [Microsoft][ODBC Driver 17 for SQL Server]Named Pipes Provider: Could not open a connection to SQL Server [5]. (5) (SQLDriverConnect); [08001] [Microsoft][ODBC Driver 17 for SQL Server]Login timeout expired (0); [08001] [Microsoft][ODBC Driver 17 for SQL Server]A network-related or instance-specific error has occurred while establishing a connection to SQL Server. Server is not found or not accessible. Check if instance name is correct and if SQL Server is configured to allow remote connections. For more information see SQL Server Books Online. (5)')
================================
================================
TIME : 2026-05-19 16:01:03.630328
ERROR : ('08S01', '[08S01] [Microsoft][ODBC Driver 17 for SQL Server]TCP Provider: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond.\r\n (10060) (SQLGetData); [08S01] [Microsoft][ODBC Driver 17 for SQL Server]Communication link failure (10060)')
TRACEBACK :
Traceback (most recent call last):
File "d:\Python Code\PaidVisibility_Import.py", line 343, in <module>
for chunk in pd.read_sql(
~~~~~~~~~~~^
query,
^^^^^^
sql_conn,
^^^^^^^^^
chunksize=chunk_size
^^^^^^^^^^^^^^^^^^^^
):
^
File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\pandas\io\sql.py", line 2730, in _query_iterator
data = cursor.fetchmany(chunksize)
pyodbc.OperationalError: ('08S01', '[08S01] [Microsoft][ODBC Driver 17 for SQL Server]TCP Provider: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond.\r\n (10060) (SQLGetData); [08S01] [Microsoft][ODBC Driver 17 for SQL Server]Communication link failure (10060)')
================================
================================
TIME : 2026-05-19 16:36:09.557213
ERROR : ('08S01', '[08S01] [Microsoft][ODBC Driver 17 for SQL Server]TCP Provider: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond.\r\n (10060) (SQLGetData); [08S01] [Microsoft][ODBC Driver 17 for SQL Server]Communication link failure (10060)')
TRACEBACK :
Traceback (most recent call last):
File "d:\Python Code\PaidVisibility_Compliance Import.py", line 371, in <module>
for chunk in pd.read_sql(
~~~~~~~~~~~^
query,
^^^^^^
sql_conn,
^^^^^^^^^
chunksize=chunk_size
^^^^^^^^^^^^^^^^^^^^
):
^
File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\pandas\io\sql.py", line 2730, in _query_iterator
data = cursor.fetchmany(chunksize)
pyodbc.OperationalError: ('08S01', '[08S01] [Microsoft][ODBC Driver 17 for SQL Server]TCP Provider: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond.\r\n (10060) (SQLGetData); [08S01] [Microsoft][ODBC Driver 17 for SQL Server]Communication link failure (10060)')
================================
================================
TIME : 2026-05-19 17:16:08.366780
ERROR : ('08S01', '[08S01] [Microsoft][ODBC Driver 17 for SQL Server]TCP Provider: An existing connection was forcibly closed by the remote host.\r\n (10054) (SQLGetData); [08S01] [Microsoft][ODBC Driver 17 for SQL Server]Communication link failure (10054)')
TRACEBACK :
Traceback (most recent call last):
File "d:\Python Code\Sales_Import.py", line 411, in <module>
for chunk in pd.read_sql(
~~~~~~~~~~~^
query,
^^^^^^
sql_conn,
^^^^^^^^^
chunksize=chunk_size
^^^^^^^^^^^^^^^^^^^^
):
^
File "C:\Users\dipanshuk\AppData\Local\Python\pythoncore-3.14-64\Lib\site-packages\pandas\io\sql.py", line 2730, in _query_iterator
data = cursor.fetchmany(chunksize)
pyodbc.OperationalError: ('08S01', '[08S01] [Microsoft][ODBC Driver 17 for SQL Server]TCP Provider: An existing connection was forcibly closed by the remote host.\r\n (10054) (SQLGetData); [08S01] [Microsoft][ODBC Driver 17 for SQL Server]Communication link failure (10054)')
================================
File diff suppressed because it is too large Load Diff
+17587
View File
File diff suppressed because it is too large Load Diff
+33
View File
@@ -0,0 +1,33 @@
================================
TIME : 2026-05-23 11:02:49.254453
ERROR : ('08001', '[08001] [Microsoft][ODBC Driver 17 for SQL Server]TCP Provider: Timeout error [258]. (258) (SQLDriverConnect); [08001] [Microsoft][ODBC Driver 17 for SQL Server]Login timeout expired (0); [08001] [Microsoft][ODBC Driver 17 for SQL Server]Invalid connection string attribute (0); [08001] [Microsoft][ODBC Driver 17 for SQL Server]Unable to complete login process due to delay in login response (258)')
TRACEBACK :
Traceback (most recent call last):
File "d:\Python Code\PaidVisibility_Compliance Import.py", line 313, in <module>
sql_conn = connect_sql()
File "d:\Python Code\PaidVisibility_Compliance Import.py", line 70, in connect_sql
conn = pyodbc.connect(
SQL_CONN_STR,
autocommit=True
)
pyodbc.OperationalError: ('08001', '[08001] [Microsoft][ODBC Driver 17 for SQL Server]TCP Provider: Timeout error [258]. (258) (SQLDriverConnect); [08001] [Microsoft][ODBC Driver 17 for SQL Server]Login timeout expired (0); [08001] [Microsoft][ODBC Driver 17 for SQL Server]Invalid connection string attribute (0); [08001] [Microsoft][ODBC Driver 17 for SQL Server]Unable to complete login process due to delay in login response (258)')
================================
================================
TIME : 2026-05-23 16:38:12.551282
ERROR : ('08001', '[08001] [Microsoft][ODBC Driver 17 for SQL Server]TCP Provider: Timeout error [258]. (258) (SQLDriverConnect); [08001] [Microsoft][ODBC Driver 17 for SQL Server]Login timeout expired (0); [08001] [Microsoft][ODBC Driver 17 for SQL Server]Invalid connection string attribute (0); [08001] [Microsoft][ODBC Driver 17 for SQL Server]Unable to complete login process due to delay in login response (258)')
TRACEBACK :
Traceback (most recent call last):
File "d:\Python Code\Attendance_Sup_Import.py", line 313, in <module>
sql_conn = connect_sql()
File "d:\Python Code\Attendance_Sup_Import.py", line 70, in connect_sql
conn = pyodbc.connect(
SQL_CONN_STR,
autocommit=True
)
pyodbc.OperationalError: ('08001', '[08001] [Microsoft][ODBC Driver 17 for SQL Server]TCP Provider: Timeout error [258]. (258) (SQLDriverConnect); [08001] [Microsoft][ODBC Driver 17 for SQL Server]Login timeout expired (0); [08001] [Microsoft][ODBC Driver 17 for SQL Server]Invalid connection string attribute (0); [08001] [Microsoft][ODBC Driver 17 for SQL Server]Unable to complete login process due to delay in login response (258)')
================================
Binary file not shown.
Binary file not shown.