Files
data_pipeline/test.py
T
2026-06-15 12:44:13 +05:30

267 lines
6.0 KiB
Python

from __future__ import annotations
import sys
from datetime import date, datetime, timedelta
import polars as pl
import yaml
from log import log
from clickhouse_task.create_table import create_clickhouse_table
from clickhouse_task.delete_task import delete_existing_data , truncate_table
from clickhouse_task.load_table import load_to_clickhouse
from db_con.connection import (
build_sql_server_engine,
build_clickhouse_engine,
get_clickhouse_client,
)
from mids import MID_TABLE_COV, MID_TABLE_COV1
from masters.dimensions import *
from masters.bridge import *
from kpi.facts import *
# ============================================================
# Helpers
# ============================================================
def get_dataframe(
fn_name: str,
fetch_by: str,
sql_engine,
mids,
run_date,
) -> pl.DataFrame:
fn = globals()[fn_name]
if fetch_by == "mids":
return fn(sql_engine, mids)
if fetch_by == "run_date":
return fn(sql_engine, run_date)
return fn(sql_engine)
def ensure_table_exists(
client,
clickhouse_engine,
table_name: str,
df: pl.DataFrame,
) -> bool:
exists = client.command(
f"EXISTS TABLE {table_name}"
)
if not exists:
log.info(
"Creating ClickHouse table: %s",
table_name,
)
create_clickhouse_table(
df=df,
table_name=table_name,
clickhouse_engine=clickhouse_engine,
)
return bool(exists)
# ============================================================
# Main
# ============================================================
def main():
log.info("=" * 80)
log.info("Hello from data-move Python data pipeline !")
# --------------------------------------------------------
# Run Date
# --------------------------------------------------------
if len(sys.argv) > 1:
run_date = datetime.strptime(
sys.argv[1],
"%Y-%m-%d",
).date()
else:
run_date = date.today() - timedelta(days=1)
log.info(
"Pipeline Run Date : %s",
run_date,
)
# --------------------------------------------------------
# Connections
# --------------------------------------------------------
log.info(
"Connecting to SQL Server and ClickHouse"
)
sql_engine = build_sql_server_engine()
clickhouse_engine = (
build_clickhouse_engine()
)
client = get_clickhouse_client()
log.info(
"Database connections established"
)
# --------------------------------------------------------
# Collect Delete Keys
# --------------------------------------------------------
mids = MID_TABLE_COV(
sql_engine,
run_date,
)
emp_visit_df = MID_TABLE_COV1(
sql_engine,
run_date,
)
# --------------------------------------------------------
# Delete Existing Data
# --------------------------------------------------------
delete_existing_data(
client=client,
run_date=run_date,
mids=mids,
emp_visit_df=emp_visit_df,
)
# --------------------------------------------------------
# Load Config
# --------------------------------------------------------
with open(
"tables.yml",
"r",
) as file:
config = yaml.safe_load(file)
# --------------------------------------------------------
# Process Tables
# --------------------------------------------------------
for table in config["tables"]:
table_name = table["name"]
table_type = table["type"]
operation = table["operation"]
fetch_by = table["fetch_by"]
log.info("=" * 80)
log.info(
"TABLE=%s | TYPE=%s | OPERATION=%s",
table_name,
table_type,
operation,
)
try:
# --------------------------------------------
# Fetch
# --------------------------------------------
fn_name = f"fetch_{table_name}"
df = get_dataframe(
fn_name=fn_name,
fetch_by=fetch_by,
sql_engine=sql_engine,
mids=mids,
run_date=run_date,
)
if df.is_empty():
log.warning(
"%s returned no rows",
table_name,
)
continue
log.info(
"Fetched %s rows",
len(df),
)
# --------------------------------------------
# Create Table if Missing
# --------------------------------------------
exists = ensure_table_exists(
client=client,
clickhouse_engine=clickhouse_engine,
table_name=table_name,
df=df,
)
# --------------------------------------------
# Full Refresh Tables
# --------------------------------------------
if exists and operation == "DELETE+INSERT":
truncate_table(
client,
table_name,
)
log.info(
"Truncated %s",
table_name,
)
# --------------------------------------------
# Load
# --------------------------------------------
load_to_clickhouse(
client=client,
table_name=table_name,
df=df,
)
log.info(
"%s loaded successfully (%s rows)",
table_name,
len(df),
)
except Exception as e:
log.exception(
"Failed processing table %s",
table_name,
)
raise
log.info("=" * 80)
log.info("Pipeline Completed Successfully")
log.info("=" * 80)
if __name__ == "__main__":
main()