14-06-2026 2nd commit

This commit is contained in:
Ankit Malik
2026-06-15 13:52:24 +05:30
parent 166a2a1d19
commit e77941c23d
10 changed files with 2844 additions and 772 deletions
+1 -4
View File
@@ -70,8 +70,6 @@ def delete_existing_data(
"PaidVisibility", "PaidVisibility",
"SOS_OneApp", "SOS_OneApp",
"Stock_Details", "Stock_Details",
"Login",
"coverage_remarks",
} }
if table_name in mid_tables and mids: if table_name in mid_tables and mids:
@@ -113,8 +111,7 @@ def delete_existing_data(
client, client,
table_name, table_name,
f""" f"""
project_id = 40148 toDate(login_date) = toDate('{run_date}')
AND toDate(login_date) = toDate('{run_date}')
""" """
) )
+24
View File
@@ -0,0 +1,24 @@
from pathlib import Path
from datetime import datetime
import logging
# Create logs folder if it doesn't exist
Path("logs").mkdir(exist_ok=True)
# Daily log file
log_file = Path("logs") / f"etl_{datetime.now():%Y%m%d}.log"
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)-8s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
handlers=[
logging.StreamHandler(),
logging.FileHandler(log_file, encoding="utf-8"),
],
)
# Export logger
log = logging.getLogger("etl")
+7 -52
View File
@@ -807,35 +807,6 @@ def fetch_Journey_Plan(
def fetch_coverage_remarks(engine: Engine , mids: list[int]) -> pl.DataFrame:
"""
Source:
OneApp_KelloggsMT.dbo.Master_NonWorkingReason
Target:
coverage_remarks
"""
sql = """
SELECT
40148 AS project_id,
ReasonId AS reason_id,
Reason AS reason_remarks
FROM OneApp_KelloggsMT.dbo.Master_NonWorkingReason
"""
log.info("Fetching Coverage Remarks")
df = pl.read_database(
query=sql,
connection=engine
)
log.info(
f"Fetched {len(df):,} Coverage Remark records"
)
return df
def fetch_Web_Logins( def fetch_Web_Logins(
@@ -851,56 +822,40 @@ def fetch_Web_Logins(
""" """
sql = f""" sql = f"""
SELECT DISTINCT SELECT DISTINCT
40148 AS project_id, 40148 AS project_id,
EM1.Id AS supervisor_id, EM1.Id AS supervisor_id,
EM1.EmployeeName AS supervisor_name, EM1.EmployeeName AS supervisor_name,
EM.Id AS emp_id, EM.Id AS emp_id,
EM.EmployeeName AS employee_name, EM.EmployeeName AS employee_name,
DM.DesignationName AS designation, DM.DesignationName AS designation,
CAST(AL.Date AS DATE) AS date,
CAST(AL.Date AS DATE) AS login_date, CONVERT(VARCHAR(8), AL.Date, 108) AS time,
CONVERT(VARCHAR, AL.Date, 108) AS login_time,
AL.Thread AS activity_name, AL.Thread AS activity_name,
AL.Level AS activity_type, AL.Level AS activity_type,
R.RightName AS right_name, R.RightName AS right_name,
GETDATE() AS CreateDate,
GETDATE() AS create_date, 'Pius' AS CreateBy
'Pius' AS create_by
FROM OneApp_KelloggsMT.dbo.T_User_Activity_Log AL FROM OneApp_KelloggsMT.dbo.T_User_Activity_Log AL
LEFT JOIN OneApp_KelloggsMT.dbo.AspNetUsers EM LEFT JOIN OneApp_KelloggsMT.dbo.AspNetUsers EM
ON AL.Logger = EM.UserName ON AL.Logger = EM.USERNAME
INNER JOIN OneApp_KelloggsMT.dbo.AspNetUsers EM1 INNER JOIN OneApp_KelloggsMT.dbo.AspNetUsers EM1
ON EM1.Id = EM.ManagerId ON EM1.Id = EM.ManagerId
INNER JOIN OneApp_KelloggsMT.dbo.AspNetUsers EM2 INNER JOIN OneApp_KelloggsMT.dbo.AspNetUsers EM2
ON EM2.Id = EM1.ManagerId ON EM2.Id = EM1.ManagerId
INNER JOIN OneApp_KelloggsMT.dbo.Master_Designation DM INNER JOIN OneApp_KelloggsMT.dbo.Master_Designation DM
ON EM.DesignationId = DM.DesignationId ON EM.DesignationId = DM.DesignationId
INNER JOIN OneApp_KelloggsMT.dbo.Master_City CM INNER JOIN OneApp_KelloggsMT.dbo.Master_City CM
ON EM.CityId = CM.CityId ON EM.CityId = CM.CityId
INNER JOIN OneApp_KelloggsMT.dbo.Master_State ST INNER JOIN OneApp_KelloggsMT.dbo.Master_State ST
ON ST.StateId = CM.StateId ON ST.StateId = CM.StateId
INNER JOIN OneApp_KelloggsMT.dbo.Master_Region RM INNER JOIN OneApp_KelloggsMT.dbo.Master_Region RM
ON RM.RegionId = ST.RegionId ON RM.RegionId = ST.RegionId
INNER JOIN OneApp_KelloggsMT.dbo.Right_Master R INNER JOIN OneApp_KelloggsMT.dbo.Right_Master R
ON EM.RightId = R.RightId ON EM.RightId = R.RightId
WHERE CAST(AL.Date AS DATE) = '{run_date}' WHERE CAST(AL.Date AS DATE) = '{run_date}'
""" """
log.info(f"Fetching Web Login data for {run_date}") log.info(f"Fetching Web Login data for {run_date}")
File diff suppressed because one or more lines are too long
+195 -171
View File
@@ -13,238 +13,262 @@
# /// # ///
from __future__ import annotations from __future__ import annotations
import os
# import pyarrow
import sys import sys
from datetime import date, timedelta, datetime from datetime import date, datetime, timedelta
import polars as pl import polars as pl
import yaml import yaml
from dotenv import load_dotenv
from sqlalchemy import create_engine, text
from sqlalchemy.engine import Engine, URL
import clickhouse_connect
from log import log from log import log
from clickhouse_task.create_table import create_clickhouse_table
from clickhouse_task.delete_task import (
delete_existing_data,
truncate_table,
)
from clickhouse_task.load_table import load_to_clickhouse
from db_con.connection import (
build_sql_server_engine,
build_clickhouse_engine,
get_clickhouse_client,
)
from mids import (
MID_TABLE_COV,
MID_TABLE_COV1,
)
from clickhouse_task.create_table import *
from clickhouse_task.delete_task import *
from clickhouse_task.load_table import *
from db_con.connection import *
from mids import *
from masters.dimensions import * from masters.dimensions import *
from masters.bridge import * from masters.bridge import *
from kpi.facts import * from kpi.facts import *
# ==========================================================
# Helpers
# ==========================================================
def table_exists(
client,
table_name: str,
) -> bool:
return bool(
client.command(
f"EXISTS TABLE {table_name}"
)
)
def get_dataframe(
fn_name: str,
fetch_by: str,
sql_engine,
mids,
run_date,
):
fn = globals()[fn_name]
if fetch_by == "mids":
return fn(sql_engine, mids)
if fetch_by == "run_date":
return fn(sql_engine, run_date)
return fn(sql_engine)
# ==========================================================
# Main
# ==========================================================
def main(): def main():
log.info("=" * 80) log.info("=" * 80)
log.info("Hello from data-move Python data pipeline !") log.info("Hello from data-move Python data pipeline!")
# ------------------------------------------------------
# Run Date
# ------------------------------------------------------
if len(sys.argv) > 1: if len(sys.argv) > 1:
run_date = datetime.strptime(sys.argv[1], "%Y-%m-%d").date() run_date = datetime.strptime(
sys.argv[1],
"%Y-%m-%d",
).date()
else: else:
run_date = date.today() - timedelta(days=1) run_date = date.today() - timedelta(days=1)
log.info(f"Data-pipeline running Date is -:{run_date}") log.info(
# connecting with both db servers sql-server "Pipeline Run Date: %s",
run_date,
)
# ------------------------------------------------------
# Connections
# ------------------------------------------------------
log.info(
"Connecting to databases..."
)
log.info("connecting with both db servers sql-serveras well as clickhouse DB")
sql_engine = build_sql_server_engine() sql_engine = build_sql_server_engine()
clickhouse_engine = build_clickhouse_engine() clickhouse_engine = build_clickhouse_engine()
client=get_clickhouse_client() client = get_clickhouse_client()
log.info("Both databases connected successfully") log.info(
"Database connections established"
)
mids = MID_TABLE_COV(sql_engine, run_date) # ------------------------------------------------------
# Delete Keys
# ------------------------------------------------------
mids = MID_TABLE_COV(
sql_engine,
run_date,
)
emp_visit_df = MID_TABLE_COV1( emp_visit_df = MID_TABLE_COV1(
sql_engine, sql_engine,
run_date run_date,
) )
delete_existing_data( # ------------------------------------------------------
client=client, # Config
run_date=run_date, # ------------------------------------------------------
mids=mids,
emp_visit_df=emp_visit_df,
)
with open(
"t.yml",
"r",
) as file:
mid_list = ",".join(map(str, mids))
conditions = {
"mids": f"MID IN ({mid_list})",
"j_plan": (
f"MONTH(VisitDate) = {run_date.month} "
f"AND YEAR(VisitDate) = {run_date.year}"
),
"mapping": (
f"CAST(Z.FromDate AS DATE) <= '{run_date}' "
f"AND CAST(Z.ToDate AS DATE) >= '{run_date}'"
),
"web": (
f"CAST(login_date AS DATE) = '{run_date}'"
),
"none": None,
}
# fetching polar df from sql-server
with open("tables.yml", "r") as file:
config = yaml.safe_load(file) config = yaml.safe_load(file)
for table in config["tables"]: # ------------------------------------------------------
# Process Tables
table_name=table["name"] # ------------------------------------------------------
table_type=table["type"]
operation=table["operation"]
condition=table["condition"]
c = conditions.get(condition)
log.info("=" * 80)
log.info("TABLE=%s | TYPE=%s | OPERATION=%s",
table_name,
table_type,
operation)
fn=f"fetch_{table_name}" for table in config["tables"]:
table_name = table["name"]
operation = table["operation"]
fetch_by = table["fetch_by"] fetch_by = table["fetch_by"]
if operation == "DELETE+INSERT" : log.info("=" * 80)
if fetch_by == "mids": log.info(
df = globals()[fn](sql_engine, mids) "Processing Table: %s",
check_query=f"EXISTS TABLE {table_name}" table_name,
exists = client.command(check_query) )
if exists == 0 :
create_clickhouse_table(df, table_name, clickhouse_engine)
else: try:
truncate_table(client , table_name )
log.info(f"Truncate a ClickHouse table - {table_name}")
load_to_clickhouse(client=client,table_name=table_name,df=df) # ------------------------------------------
# Fetch Data
# ------------------------------------------
elif fetch_by == "run_date": fn_name = f"fetch_{table_name}"
df = globals()[fn](sql_engine, run_date)
check_query=f"EXISTS TABLE {table_name}"
exists = client.command(check_query)
if exists == 0 :
create_clickhouse_table(df, table_name, clickhouse_engine)
else: df = get_dataframe(
truncate_table(client , table_name ) fn_name=fn_name,
log.info(f"Truncate a ClickHouse table - {table_name}") fetch_by=fetch_by,
sql_engine=sql_engine,
mids=mids,
run_date=run_date,
)
if df.is_empty():
log.warning(
"%s returned no rows",
table_name,
)
continue
log.info(
"Fetched %s rows",
len(df),
)
# ------------------------------------------
# Create Table If Missing
# ------------------------------------------
exists = table_exists(
client,
table_name,
)
if not exists:
log.info(
"Creating table %s",
table_name,
)
create_clickhouse_table(
df=df,
table_name=table_name,
clickhouse_engine=clickhouse_engine,
)
# ------------------------------------------
# Existing Table Logic
# ------------------------------------------
else:
if operation == "DELETE+INSERT":
truncate_table(
client,
table_name,
)
load_to_clickhouse(client=client,table_name=table_name,df=df)
else: else:
df = globals()[fn](sql_engine)
check_query=f"EXISTS TABLE {table_name}"
exists = client.command(check_query)
if exists == 0 :
create_clickhouse_table(df, table_name, clickhouse_engine)
else: delete_existing_data(
truncate_table(client , table_name ) client=client,
log.info(f"Truncate a ClickHouse table - {table_name}") table_name=table_name,
run_date=run_date,
mids=mids,
emp_visit_df=emp_visit_df,
)
load_to_clickhouse(client=client,table_name=table_name,df=df) # ------------------------------------------
else: # Load Data
if fetch_by == "mids": # ------------------------------------------
df = globals()[fn](sql_engine, mids)
check_query=f"EXISTS TABLE {table_name}"
exists = client.command(check_query)
if exists == 0 :
create_clickhouse_table(df, table_name, clickhouse_engine)
else: load_to_clickhouse(
client=client,
table_name=table_name,
df=df,
)
delete_rows(client, table_name, c ) log.info(
"%s loaded successfully (%s rows)",
table_name,
len(df),
)
load_to_clickhouse(client=client,table_name=table_name,df=df) except Exception:
elif fetch_by == "run_date":
df = globals()[fn](sql_engine, run_date)
check_query=f"EXISTS TABLE {table_name}"
exists = client.command(check_query)
if exists == 0 :
create_clickhouse_table(df, table_name, clickhouse_engine)
else:
delete_rows(client, table_name, c)
load_to_clickhouse(client=client,table_name=table_name,df=df)
else:
df = globals()[fn](sql_engine)
check_query=f"EXISTS TABLE {table_name}"
exists = client.command(check_query)
if exists == 0 :
create_clickhouse_table(df, table_name, clickhouse_engine)
else:
truncate_table(client , table_name )
log.info(f"Truncate a ClickHouse table - {table_name}")
load_to_clickhouse(client=client,table_name=table_name,df=df)
log.exception(
"Failed processing table %s",
table_name,
)
raise
log.info("=" * 80)
log.info("Pipeline Completed Successfully")
log.info("=" * 80)
if __name__ == "__main__": if __name__ == "__main__":
+29
View File
@@ -484,3 +484,32 @@ def fetch_Master_Salesterritorylayer(
return df return df
def fetch_coverage_remarks(engine: Engine ) -> pl.DataFrame:
"""
Source:
OneApp_KelloggsMT.dbo.Master_NonWorkingReason
Target:
coverage_remarks
"""
sql = """
SELECT
40148 AS project_id,
ReasonId AS reason_id,
Reason AS reason_remarks
FROM OneApp_KelloggsMT.dbo.Master_NonWorkingReason
"""
log.info("Fetching Coverage Remarks")
df = pl.read_database(
query=sql,
connection=engine
)
log.info(
f"Fetched {len(df):,} Coverage Remark records"
)
return df
+105
View File
@@ -0,0 +1,105 @@
tables:
- name: SOS_OneApp
type: FACT
operation: INSERT
fetch_by: mids
- name: OQaD
type: FACT
operation: INSERT
fetch_by: run_date
- name: Survey
type: FACT
operation: INSERT
fetch_by: mids
- name: additional_visibility
type: FACT
operation: INSERT
fetch_by: mids
- name: Coverage
type: FACT
operation: INSERT
fetch_by: mids
- name: Login
type: FACT
operation: INSERT
fetch_by: run_date
- name: Stock_Details
type: FACT
operation: INSERT
fetch_by: mids
- name: Attendance
type: FACT
operation: DELETE+INSERT
fetch_by: run_date
- name: Store_Master
type: DIMENSION
operation: DELETE+INSERT
fetch_by: none
- name: SKU_Master
type: DIMENSION
operation: DELETE+INSERT
fetch_by: none
- name: display_master
type: DIMENSION
operation: DELETE+INSERT
fetch_by: none
- name: Employee_Master
type: DIMENSION
operation: DELETE+INSERT
fetch_by: none
- name: Journey_Plan
type: FACT
operation: INSERT
fetch_by: run_date
- name: coverage_remarks
type: DIMENSION
operation: DELETE+INSERT
fetch_by: none
- name: mapping_storevisibility
type: BRIDGE
operation: DELETE+INSERT
fetch_by: run_date
- name: Master_VisibilityReason
type: DIMENSION
operation: DELETE+INSERT
fetch_by: none
- name: Master_VisibilityDefinition
type: DIMENSION
operation: DELETE+INSERT
fetch_by: none
- name: Web_Logins
type: FACT
operation: INSERT
fetch_by: run_date
- name: Promotion
type: FACT
operation: INSERT
fetch_by: mids
- name: PaidVisibility
type: FACT
operation: INSERT
fetch_by: mids
- name: Master_Salesterritorylayer
type: DIMENSION
operation: DELETE+INSERT
fetch_by: none
+3 -3
View File
@@ -9,7 +9,7 @@ tables:
type: FACT type: FACT
operation: INSERT operation: INSERT
fetch_by: run_date fetch_by: run_date
condition: mids condition: none
- name: Survey - name: Survey
type: FACT type: FACT
@@ -32,8 +32,8 @@ tables:
- name: Login - name: Login
type: FACT type: FACT
operation: INSERT operation: INSERT
fetch_by: mids fetch_by: run_date
condition: mids condition: none
- name: Stock_Details - name: Stock_Details
type: FACT type: FACT
-267
View File
@@ -1,267 +0,0 @@
from __future__ import annotations
import sys
from datetime import date, datetime, timedelta
import polars as pl
import yaml
from log import log
from clickhouse_task.create_table import create_clickhouse_table
from clickhouse_task.delete_task import delete_existing_data , truncate_table
from clickhouse_task.load_table import load_to_clickhouse
from db_con.connection import (
build_sql_server_engine,
build_clickhouse_engine,
get_clickhouse_client,
)
from mids import MID_TABLE_COV, MID_TABLE_COV1
from masters.dimensions import *
from masters.bridge import *
from kpi.facts import *
# ============================================================
# Helpers
# ============================================================
def get_dataframe(
fn_name: str,
fetch_by: str,
sql_engine,
mids,
run_date,
) -> pl.DataFrame:
fn = globals()[fn_name]
if fetch_by == "mids":
return fn(sql_engine, mids)
if fetch_by == "run_date":
return fn(sql_engine, run_date)
return fn(sql_engine)
def ensure_table_exists(
client,
clickhouse_engine,
table_name: str,
df: pl.DataFrame,
) -> bool:
exists = client.command(
f"EXISTS TABLE {table_name}"
)
if not exists:
log.info(
"Creating ClickHouse table: %s",
table_name,
)
create_clickhouse_table(
df=df,
table_name=table_name,
clickhouse_engine=clickhouse_engine,
)
return bool(exists)
# ============================================================
# Main
# ============================================================
def main():
log.info("=" * 80)
log.info("Hello from data-move Python data pipeline !")
# --------------------------------------------------------
# Run Date
# --------------------------------------------------------
if len(sys.argv) > 1:
run_date = datetime.strptime(
sys.argv[1],
"%Y-%m-%d",
).date()
else:
run_date = date.today() - timedelta(days=1)
log.info(
"Pipeline Run Date : %s",
run_date,
)
# --------------------------------------------------------
# Connections
# --------------------------------------------------------
log.info(
"Connecting to SQL Server and ClickHouse"
)
sql_engine = build_sql_server_engine()
clickhouse_engine = (
build_clickhouse_engine()
)
client = get_clickhouse_client()
log.info(
"Database connections established"
)
# --------------------------------------------------------
# Collect Delete Keys
# --------------------------------------------------------
mids = MID_TABLE_COV(
sql_engine,
run_date,
)
emp_visit_df = MID_TABLE_COV1(
sql_engine,
run_date,
)
# --------------------------------------------------------
# Delete Existing Data
# --------------------------------------------------------
delete_existing_data(
client=client,
run_date=run_date,
mids=mids,
emp_visit_df=emp_visit_df,
)
# --------------------------------------------------------
# Load Config
# --------------------------------------------------------
with open(
"tables.yml",
"r",
) as file:
config = yaml.safe_load(file)
# --------------------------------------------------------
# Process Tables
# --------------------------------------------------------
for table in config["tables"]:
table_name = table["name"]
table_type = table["type"]
operation = table["operation"]
fetch_by = table["fetch_by"]
log.info("=" * 80)
log.info(
"TABLE=%s | TYPE=%s | OPERATION=%s",
table_name,
table_type,
operation,
)
try:
# --------------------------------------------
# Fetch
# --------------------------------------------
fn_name = f"fetch_{table_name}"
df = get_dataframe(
fn_name=fn_name,
fetch_by=fetch_by,
sql_engine=sql_engine,
mids=mids,
run_date=run_date,
)
if df.is_empty():
log.warning(
"%s returned no rows",
table_name,
)
continue
log.info(
"Fetched %s rows",
len(df),
)
# --------------------------------------------
# Create Table if Missing
# --------------------------------------------
exists = ensure_table_exists(
client=client,
clickhouse_engine=clickhouse_engine,
table_name=table_name,
df=df,
)
# --------------------------------------------
# Full Refresh Tables
# --------------------------------------------
if exists and operation == "DELETE+INSERT":
truncate_table(
client,
table_name,
)
log.info(
"Truncated %s",
table_name,
)
# --------------------------------------------
# Load
# --------------------------------------------
load_to_clickhouse(
client=client,
table_name=table_name,
df=df,
)
log.info(
"%s loaded successfully (%s rows)",
table_name,
len(df),
)
except Exception as e:
log.exception(
"Failed processing table %s",
table_name,
)
raise
log.info("=" * 80)
log.info("Pipeline Completed Successfully")
log.info("=" * 80)
if __name__ == "__main__":
main()
-275
View File
@@ -1,275 +0,0 @@
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "polars>=0.20.0",
# "pyarrow>=18.0.0",
# "sqlalchemy>=2.0.0",
# "pyodbc>=5.0.0",
# "clickhouse-connect>=0.7.0",
# "clickhouse-sqlalchemy>=0.3.2",
# "pyyaml>=6.0.3",
# "python-dotenv>=1.0.0",
# ]
# ///
from __future__ import annotations
import sys
from datetime import date, datetime, timedelta
import polars as pl
import yaml
from log import log
from clickhouse_task.create_table import create_clickhouse_table
from clickhouse_task.delete_task import (
delete_existing_data,
truncate_table,
)
from clickhouse_task.load_table import load_to_clickhouse
from db_con.connection import (
build_sql_server_engine,
build_clickhouse_engine,
get_clickhouse_client,
)
from mids import (
MID_TABLE_COV,
MID_TABLE_COV1,
)
from masters.dimensions import *
from masters.bridge import *
from kpi.facts import *
# ==========================================================
# Helpers
# ==========================================================
def table_exists(
client,
table_name: str,
) -> bool:
return bool(
client.command(
f"EXISTS TABLE {table_name}"
)
)
def get_dataframe(
fn_name: str,
fetch_by: str,
sql_engine,
mids,
run_date,
):
fn = globals()[fn_name]
if fetch_by == "mids":
return fn(sql_engine, mids)
if fetch_by == "run_date":
return fn(sql_engine, run_date)
return fn(sql_engine)
# ==========================================================
# Main
# ==========================================================
def main():
log.info("=" * 80)
log.info("Hello from data-move Python data pipeline!")
# ------------------------------------------------------
# Run Date
# ------------------------------------------------------
if len(sys.argv) > 1:
run_date = datetime.strptime(
sys.argv[1],
"%Y-%m-%d",
).date()
else:
run_date = date.today() - timedelta(days=1)
log.info(
"Pipeline Run Date: %s",
run_date,
)
# ------------------------------------------------------
# Connections
# ------------------------------------------------------
log.info(
"Connecting to databases..."
)
sql_engine = build_sql_server_engine()
clickhouse_engine = build_clickhouse_engine()
client = get_clickhouse_client()
log.info(
"Database connections established"
)
# ------------------------------------------------------
# Delete Keys
# ------------------------------------------------------
mids = MID_TABLE_COV(
sql_engine,
run_date,
)
emp_visit_df = MID_TABLE_COV1(
sql_engine,
run_date,
)
# ------------------------------------------------------
# Config
# ------------------------------------------------------
with open(
"tables.yml",
"r",
) as file:
config = yaml.safe_load(file)
# ------------------------------------------------------
# Process Tables
# ------------------------------------------------------
for table in config["tables"]:
table_name = table["name"]
operation = table["operation"]
fetch_by = table["fetch_by"]
log.info("=" * 80)
log.info(
"Processing Table: %s",
table_name,
)
try:
# ------------------------------------------
# Fetch Data
# ------------------------------------------
fn_name = f"fetch_{table_name}"
df = get_dataframe(
fn_name=fn_name,
fetch_by=fetch_by,
sql_engine=sql_engine,
mids=mids,
run_date=run_date,
)
if df.is_empty():
log.warning(
"%s returned no rows",
table_name,
)
continue
log.info(
"Fetched %s rows",
len(df),
)
# ------------------------------------------
# Create Table If Missing
# ------------------------------------------
exists = table_exists(
client,
table_name,
)
if not exists:
log.info(
"Creating table %s",
table_name,
)
create_clickhouse_table(
df=df,
table_name=table_name,
clickhouse_engine=clickhouse_engine,
)
# ------------------------------------------
# Existing Table Logic
# ------------------------------------------
else:
if operation == "DELETE+INSERT":
truncate_table(
client,
table_name,
)
else:
delete_existing_data(
client=client,
table_name=table_name,
run_date=run_date,
mids=mids,
emp_visit_df=emp_visit_df,
)
# ------------------------------------------
# Load Data
# ------------------------------------------
load_to_clickhouse(
client=client,
table_name=table_name,
df=df,
)
log.info(
"%s loaded successfully (%s rows)",
table_name,
len(df),
)
except Exception:
log.exception(
"Failed processing table %s",
table_name,
)
raise
log.info("=" * 80)
log.info("Pipeline Completed Successfully")
log.info("=" * 80)
if __name__ == "__main__":
main()