PaidVisibility Import
This commit is contained in:
+318
-259
@@ -5,6 +5,7 @@ import numpy as np
|
||||
from datetime import datetime
|
||||
import traceback
|
||||
import warnings
|
||||
import time
|
||||
|
||||
# =========================================================
|
||||
# IGNORE WARNINGS
|
||||
@@ -14,10 +15,12 @@ warnings.filterwarnings(
|
||||
'pandas only supports SQLAlchemy connectable'
|
||||
)
|
||||
|
||||
print("ETL Started :", datetime.now())
|
||||
print("\n================================================")
|
||||
print("ETL STARTED :", datetime.now())
|
||||
print("================================================")
|
||||
|
||||
# =========================================================
|
||||
# SQL SERVER CONNECTION
|
||||
# SQL SERVER CONNECTION STRING
|
||||
# =========================================================
|
||||
SQL_CONN_STR = (
|
||||
'DRIVER={ODBC Driver 17 for SQL Server};'
|
||||
@@ -26,6 +29,7 @@ SQL_CONN_STR = (
|
||||
'UID=bsgteam_test;'
|
||||
'PWD=B$gt3@m#00512;'
|
||||
'TrustServerCertificate=yes;'
|
||||
'Connection Timeout=60;'
|
||||
)
|
||||
|
||||
# =========================================================
|
||||
@@ -40,31 +44,66 @@ CH_CONFIG = {
|
||||
}
|
||||
|
||||
# =========================================================
|
||||
# TABLE NAME
|
||||
# TABLE DETAILS
|
||||
# =========================================================
|
||||
TABLE_NAME = 'PaidVisibility'
|
||||
PROJECT_ID = 41654
|
||||
|
||||
# =========================================================
|
||||
# LOAD SETTINGS
|
||||
# SETTINGS
|
||||
# =========================================================
|
||||
TRUNCATE_BEFORE_LOAD = True
|
||||
table_truncated = False
|
||||
|
||||
# =========================================================
|
||||
# CLICKHOUSE DATE COLUMNS
|
||||
# CHUNK SIZE
|
||||
# =========================================================
|
||||
DATE_COLUMNS = [
|
||||
'visit_date'
|
||||
]
|
||||
chunk_size = 25000
|
||||
|
||||
# =========================================================
|
||||
# CLICKHOUSE DATETIME COLUMNS
|
||||
# CONNECT SQL SERVER
|
||||
# =========================================================
|
||||
DATETIME_COLUMNS = [
|
||||
'create_date',
|
||||
'update_date'
|
||||
]
|
||||
def connect_sql():
|
||||
|
||||
try:
|
||||
|
||||
conn = pyodbc.connect(
|
||||
SQL_CONN_STR,
|
||||
autocommit=True
|
||||
)
|
||||
|
||||
print("Connected SQL Server")
|
||||
|
||||
return conn
|
||||
|
||||
except Exception as e:
|
||||
|
||||
print("SQL CONNECTION FAILED")
|
||||
print(str(e))
|
||||
|
||||
raise
|
||||
|
||||
# =========================================================
|
||||
# CONNECT CLICKHOUSE
|
||||
# =========================================================
|
||||
def connect_clickhouse():
|
||||
|
||||
try:
|
||||
|
||||
client = clickhouse_connect.get_client(
|
||||
**CH_CONFIG
|
||||
)
|
||||
|
||||
print("Connected ClickHouse")
|
||||
|
||||
return client
|
||||
|
||||
except Exception as e:
|
||||
|
||||
print("CLICKHOUSE CONNECTION FAILED")
|
||||
print(str(e))
|
||||
|
||||
raise
|
||||
|
||||
# =========================================================
|
||||
# CLEAN DATAFRAME
|
||||
@@ -88,60 +127,35 @@ def clean_dataframe(df):
|
||||
print(f"\nCleaning Column : {col}")
|
||||
|
||||
# =====================================
|
||||
# DATE32 COLUMNS
|
||||
# AUTO DETECT DATE / TIME COLUMNS
|
||||
# =====================================
|
||||
if col.lower() in [
|
||||
x.lower() for x in DATE_COLUMNS
|
||||
]:
|
||||
if (
|
||||
'date' in col.lower()
|
||||
or 'time' in col.lower()
|
||||
):
|
||||
|
||||
print(f"Date32 Column : {col}")
|
||||
print(f"Date Column : {col}")
|
||||
|
||||
df[col] = pd.to_datetime(
|
||||
df[col],
|
||||
errors='coerce'
|
||||
)
|
||||
|
||||
# Remove invalid dates
|
||||
df[col] = df[col].where(
|
||||
(df[col].dt.year >= 1970) &
|
||||
(df[col].dt.year <= 2100)
|
||||
)
|
||||
cleaned_dates = []
|
||||
|
||||
# Convert to datetime.date
|
||||
df[col] = df[col].apply(
|
||||
lambda x:
|
||||
x.date()
|
||||
if pd.notnull(x)
|
||||
else None
|
||||
)
|
||||
for val in df[col]:
|
||||
|
||||
# =====================================
|
||||
# DATETIME64 COLUMNS
|
||||
# =====================================
|
||||
elif col.lower() in [
|
||||
x.lower() for x in DATETIME_COLUMNS
|
||||
]:
|
||||
if pd.isnull(val):
|
||||
|
||||
print(f"DateTime Column : {col}")
|
||||
cleaned_dates.append(None)
|
||||
|
||||
df[col] = pd.to_datetime(
|
||||
df[col],
|
||||
errors='coerce'
|
||||
)
|
||||
else:
|
||||
|
||||
# Remove invalid dates
|
||||
df[col] = df[col].where(
|
||||
(df[col].dt.year >= 1970) &
|
||||
(df[col].dt.year <= 2100)
|
||||
)
|
||||
cleaned_dates.append(
|
||||
val.to_pydatetime()
|
||||
)
|
||||
|
||||
# Convert to datetime.datetime
|
||||
df[col] = df[col].apply(
|
||||
lambda x:
|
||||
x.to_pydatetime()
|
||||
if pd.notnull(x)
|
||||
else None
|
||||
)
|
||||
df[col] = cleaned_dates
|
||||
|
||||
# =====================================
|
||||
# INTEGER COLUMNS
|
||||
@@ -169,28 +183,17 @@ def clean_dataframe(df):
|
||||
|
||||
print(f"Float Column : {col}")
|
||||
|
||||
non_null = df[col].dropna()
|
||||
df[col] = pd.to_numeric(
|
||||
df[col],
|
||||
errors='coerce'
|
||||
)
|
||||
|
||||
# Convert whole float to int
|
||||
if len(non_null) > 0 and (
|
||||
(non_null % 1 == 0).all()
|
||||
):
|
||||
|
||||
df[col] = df[col].apply(
|
||||
lambda x:
|
||||
int(x)
|
||||
if pd.notnull(x)
|
||||
else None
|
||||
)
|
||||
|
||||
else:
|
||||
|
||||
df[col] = df[col].apply(
|
||||
lambda x:
|
||||
float(x)
|
||||
if pd.notnull(x)
|
||||
else None
|
||||
)
|
||||
df[col] = df[col].apply(
|
||||
lambda x:
|
||||
float(x)
|
||||
if pd.notnull(x)
|
||||
else None
|
||||
)
|
||||
|
||||
# =====================================
|
||||
# OBJECT / STRING COLUMNS
|
||||
@@ -208,50 +211,6 @@ def clean_dataframe(df):
|
||||
|
||||
cleaned.append(None)
|
||||
|
||||
# INTEGER
|
||||
elif isinstance(
|
||||
val,
|
||||
(
|
||||
int,
|
||||
np.integer
|
||||
)
|
||||
):
|
||||
|
||||
cleaned.append(int(val))
|
||||
|
||||
# FLOAT
|
||||
elif isinstance(
|
||||
val,
|
||||
(
|
||||
float,
|
||||
np.floating
|
||||
)
|
||||
):
|
||||
|
||||
if np.isnan(val):
|
||||
|
||||
cleaned.append(None)
|
||||
|
||||
else:
|
||||
|
||||
if val.is_integer():
|
||||
|
||||
cleaned.append(int(val))
|
||||
|
||||
else:
|
||||
|
||||
cleaned.append(float(val))
|
||||
|
||||
# STRING
|
||||
elif isinstance(val, str):
|
||||
|
||||
cleaned.append(val.strip())
|
||||
|
||||
# BOOLEAN
|
||||
elif isinstance(val, bool):
|
||||
|
||||
cleaned.append(int(val))
|
||||
|
||||
# DATETIME
|
||||
elif isinstance(
|
||||
val,
|
||||
@@ -274,10 +233,54 @@ def clean_dataframe(df):
|
||||
|
||||
cleaned.append(val)
|
||||
|
||||
# OTHER
|
||||
# INTEGER
|
||||
elif isinstance(
|
||||
val,
|
||||
(
|
||||
int,
|
||||
np.integer
|
||||
)
|
||||
):
|
||||
|
||||
cleaned.append(
|
||||
int(val)
|
||||
)
|
||||
|
||||
# FLOAT
|
||||
elif isinstance(
|
||||
val,
|
||||
(
|
||||
float,
|
||||
np.floating
|
||||
)
|
||||
):
|
||||
|
||||
if np.isnan(val):
|
||||
|
||||
cleaned.append(None)
|
||||
|
||||
else:
|
||||
|
||||
cleaned.append(
|
||||
float(val)
|
||||
)
|
||||
|
||||
# BOOLEAN
|
||||
elif isinstance(
|
||||
val,
|
||||
bool
|
||||
):
|
||||
|
||||
cleaned.append(
|
||||
int(val)
|
||||
)
|
||||
|
||||
# STRING
|
||||
else:
|
||||
|
||||
cleaned.append(str(val))
|
||||
cleaned.append(
|
||||
str(val).strip()
|
||||
)
|
||||
|
||||
df[col] = cleaned
|
||||
|
||||
@@ -307,16 +310,12 @@ try:
|
||||
# =====================================================
|
||||
# CONNECT SQL SERVER
|
||||
# =====================================================
|
||||
sql_conn = pyodbc.connect(SQL_CONN_STR)
|
||||
|
||||
print("Connected to SQL Server")
|
||||
sql_conn = connect_sql()
|
||||
|
||||
# =====================================================
|
||||
# CONNECT CLICKHOUSE
|
||||
# =====================================================
|
||||
ch_client = clickhouse_connect.get_client(**CH_CONFIG)
|
||||
|
||||
print("Connected to ClickHouse")
|
||||
ch_client = connect_clickhouse()
|
||||
|
||||
# =====================================================
|
||||
# QUERY
|
||||
@@ -327,156 +326,214 @@ try:
|
||||
WHERE Project_Id = {PROJECT_ID}
|
||||
"""
|
||||
|
||||
print("\nExecuting Query:")
|
||||
print("\nExecuting Query")
|
||||
print(query)
|
||||
|
||||
# =====================================================
|
||||
# CHUNK SIZE
|
||||
# RETRY SETTINGS
|
||||
# =====================================================
|
||||
chunk_size = 100000
|
||||
|
||||
total_rows = 0
|
||||
retry_count = 0
|
||||
max_retry = 5
|
||||
|
||||
# =====================================================
|
||||
# READ DATA
|
||||
# MAIN RETRY LOOP
|
||||
# =====================================================
|
||||
for chunk in pd.read_sql(
|
||||
query,
|
||||
sql_conn,
|
||||
chunksize=chunk_size
|
||||
):
|
||||
while retry_count < max_retry:
|
||||
|
||||
try:
|
||||
|
||||
# =============================================
|
||||
# READ SQL DATA
|
||||
# =============================================
|
||||
for chunk in pd.read_sql(
|
||||
query,
|
||||
sql_conn,
|
||||
chunksize=chunk_size
|
||||
):
|
||||
|
||||
try:
|
||||
|
||||
print("\n================================")
|
||||
print(
|
||||
f"Processing Rows : "
|
||||
f"{len(chunk)}"
|
||||
)
|
||||
print("================================")
|
||||
|
||||
# =====================================
|
||||
# CLEAN DATA
|
||||
# =====================================
|
||||
chunk = clean_dataframe(chunk)
|
||||
|
||||
# =====================================
|
||||
# DEBUG COLUMN TYPES
|
||||
# =====================================
|
||||
print("\nCOLUMN DATATYPES")
|
||||
print(chunk.dtypes)
|
||||
|
||||
print("\nCOLUMN SAMPLE TYPES")
|
||||
|
||||
for col in chunk.columns:
|
||||
|
||||
sample = chunk[col].dropna()
|
||||
|
||||
if len(sample) > 0:
|
||||
|
||||
print(
|
||||
col,
|
||||
type(sample.iloc[0]),
|
||||
sample.iloc[0]
|
||||
)
|
||||
|
||||
# =====================================
|
||||
# TRUNCATE TABLE FIRST TIME ONLY
|
||||
# =====================================
|
||||
if (
|
||||
TRUNCATE_BEFORE_LOAD
|
||||
and
|
||||
not table_truncated
|
||||
):
|
||||
|
||||
print("\n================================")
|
||||
print(
|
||||
f"TRUNCATING TABLE : "
|
||||
f"{TABLE_NAME}"
|
||||
)
|
||||
print("================================")
|
||||
|
||||
# IMPORTANT FIX
|
||||
truncate_query = f"""
|
||||
TRUNCATE TABLE
|
||||
`{CH_CONFIG['database']}`.`{TABLE_NAME}`
|
||||
"""
|
||||
|
||||
ch_client.command(
|
||||
truncate_query
|
||||
)
|
||||
|
||||
print(
|
||||
"TABLE TRUNCATED SUCCESSFULLY"
|
||||
)
|
||||
|
||||
table_truncated = True
|
||||
|
||||
# =====================================
|
||||
# INSERT INTO CLICKHOUSE
|
||||
# =====================================
|
||||
print(
|
||||
"\nINSERTING INTO CLICKHOUSE..."
|
||||
)
|
||||
|
||||
ch_client.insert_df(
|
||||
table=TABLE_NAME,
|
||||
df=chunk,
|
||||
database=CH_CONFIG['database']
|
||||
)
|
||||
|
||||
print(
|
||||
f"INSERTED : "
|
||||
f"{len(chunk)} ROWS"
|
||||
)
|
||||
|
||||
except Exception as insert_error:
|
||||
|
||||
print("\n================================")
|
||||
print("INSERT FAILED")
|
||||
print("================================")
|
||||
|
||||
print(str(insert_error))
|
||||
|
||||
traceback.print_exc()
|
||||
|
||||
# =================================
|
||||
# SAVE ERROR LOG
|
||||
# =================================
|
||||
with open(
|
||||
"insert_error.log",
|
||||
"a",
|
||||
encoding="utf-8"
|
||||
) as log:
|
||||
|
||||
log.write(
|
||||
"\n\n================================"
|
||||
)
|
||||
|
||||
log.write(
|
||||
f"\nTIME : "
|
||||
f"{datetime.now()}"
|
||||
)
|
||||
|
||||
log.write(
|
||||
f"\nTABLE : "
|
||||
f"{TABLE_NAME}"
|
||||
)
|
||||
|
||||
log.write(
|
||||
f"\nERROR : "
|
||||
f"{str(insert_error)}"
|
||||
)
|
||||
|
||||
log.write(
|
||||
f"\nTRACEBACK :\n"
|
||||
f"{traceback.format_exc()}"
|
||||
)
|
||||
|
||||
log.write(
|
||||
"\n================================"
|
||||
)
|
||||
|
||||
continue
|
||||
|
||||
# =============================================
|
||||
# SUCCESS
|
||||
# =============================================
|
||||
break
|
||||
|
||||
# =================================================
|
||||
# SQL CONNECTION FAILURE
|
||||
# =================================================
|
||||
except pyodbc.OperationalError as op_error:
|
||||
|
||||
retry_count += 1
|
||||
|
||||
print("\n================================")
|
||||
print(f"Processing {len(chunk)} Rows")
|
||||
print("================================")
|
||||
|
||||
# =================================================
|
||||
# CLEAN DATA
|
||||
# =================================================
|
||||
chunk = clean_dataframe(chunk)
|
||||
|
||||
# =================================================
|
||||
# DEBUG COLUMN TYPES
|
||||
# =================================================
|
||||
print("\nCOLUMN TYPES")
|
||||
|
||||
for col in chunk.columns:
|
||||
|
||||
sample = chunk[col].dropna()
|
||||
|
||||
if len(sample) > 0:
|
||||
|
||||
print(
|
||||
col,
|
||||
type(sample.iloc[0]),
|
||||
sample.iloc[0]
|
||||
)
|
||||
|
||||
# =================================================
|
||||
# DEBUG DATE COLUMN
|
||||
# =================================================
|
||||
if 'visit_date' in chunk.columns:
|
||||
|
||||
print("\nvisit_date Sample")
|
||||
print(chunk['visit_date'].head())
|
||||
|
||||
sample = chunk['visit_date'].dropna()
|
||||
|
||||
if len(sample) > 0:
|
||||
|
||||
print(
|
||||
"visit_date datatype:",
|
||||
type(sample.iloc[0])
|
||||
)
|
||||
|
||||
# =================================================
|
||||
# TRUNCATE TABLE FIRST TIME ONLY
|
||||
# =================================================
|
||||
if TRUNCATE_BEFORE_LOAD and not table_truncated:
|
||||
|
||||
print("\n================================")
|
||||
print(f"TRUNCATING TABLE : {TABLE_NAME}")
|
||||
print("================================")
|
||||
|
||||
truncate_query = f"""
|
||||
TRUNCATE TABLE
|
||||
{CH_CONFIG['database']}.{TABLE_NAME}
|
||||
"""
|
||||
|
||||
ch_client.command(truncate_query)
|
||||
|
||||
print("TABLE TRUNCATED SUCCESSFULLY")
|
||||
|
||||
table_truncated = True
|
||||
|
||||
# =================================================
|
||||
# INSERT INTO CLICKHOUSE
|
||||
# =================================================
|
||||
print("\nInserting into ClickHouse...")
|
||||
|
||||
ch_client.insert_df(
|
||||
table=TABLE_NAME,
|
||||
df=chunk,
|
||||
database=CH_CONFIG['database']
|
||||
)
|
||||
|
||||
total_rows += len(chunk)
|
||||
|
||||
print(
|
||||
f"\nInserted Total Rows : {total_rows}"
|
||||
f"SQL CONNECTION LOST "
|
||||
f"- RETRY {retry_count}"
|
||||
)
|
||||
|
||||
except Exception as chunk_error:
|
||||
|
||||
print("\n================================")
|
||||
print("CHUNK INSERT FAILED")
|
||||
print("================================")
|
||||
|
||||
print(str(chunk_error))
|
||||
print(str(op_error))
|
||||
|
||||
time.sleep(10)
|
||||
|
||||
try:
|
||||
|
||||
sql_conn.close()
|
||||
|
||||
except:
|
||||
pass
|
||||
|
||||
# RECONNECT SQL
|
||||
sql_conn = connect_sql()
|
||||
|
||||
# =================================================
|
||||
# OTHER ERROR
|
||||
# =================================================
|
||||
except Exception as loop_error:
|
||||
|
||||
print("\n================================")
|
||||
print("MAIN LOOP ERROR")
|
||||
print("================================")
|
||||
|
||||
print(str(loop_error))
|
||||
|
||||
traceback.print_exc()
|
||||
|
||||
# =============================================
|
||||
# SAVE ERROR LOG
|
||||
# =============================================
|
||||
with open(
|
||||
"clickhouse_chunk_error.log",
|
||||
"a",
|
||||
encoding="utf-8"
|
||||
) as log:
|
||||
|
||||
log.write(
|
||||
"\n\n================================"
|
||||
)
|
||||
|
||||
log.write(
|
||||
f"\nTIME : {datetime.now()}"
|
||||
)
|
||||
|
||||
log.write(
|
||||
f"\nTABLE : {TABLE_NAME}"
|
||||
)
|
||||
|
||||
log.write(
|
||||
f"\nERROR : {str(chunk_error)}"
|
||||
)
|
||||
|
||||
log.write(
|
||||
f"\nTRACEBACK :\n"
|
||||
f"{traceback.format_exc()}"
|
||||
)
|
||||
|
||||
log.write(
|
||||
"\n================================"
|
||||
)
|
||||
|
||||
continue
|
||||
break
|
||||
|
||||
print("\n================================")
|
||||
print("ETL COMPLETED SUCCESSFULLY")
|
||||
print(f"TOTAL ROWS INSERTED : {total_rows}")
|
||||
print("================================")
|
||||
|
||||
# =========================================================
|
||||
@@ -493,7 +550,7 @@ except Exception as main_error:
|
||||
traceback.print_exc()
|
||||
|
||||
with open(
|
||||
"clickhouse_main_error.log",
|
||||
"main_error.log",
|
||||
"a",
|
||||
encoding="utf-8"
|
||||
) as log:
|
||||
@@ -528,7 +585,7 @@ finally:
|
||||
|
||||
sql_conn.close()
|
||||
|
||||
print("\nSQL Server Connection Closed")
|
||||
print("\nSQL SERVER CONNECTION CLOSED")
|
||||
|
||||
except:
|
||||
pass
|
||||
@@ -537,9 +594,11 @@ finally:
|
||||
|
||||
ch_client.close()
|
||||
|
||||
print("ClickHouse Connection Closed")
|
||||
print("CLICKHOUSE CONNECTION CLOSED")
|
||||
|
||||
except:
|
||||
pass
|
||||
|
||||
print("\nETL Finished :", datetime.now())
|
||||
print("\n================================================")
|
||||
print("ETL FINISHED :", datetime.now())
|
||||
print("================================================")
|
||||
|
||||
Reference in New Issue
Block a user