Files
data_pipeline/main2.py
T
2026-06-22 13:22:16 +05:30

295 lines
7.4 KiB
Python

# /// script
# requires-python = ">=3.11"
# dependencies = [
# "polars>=0.20.0",
# "pyarrow>=18.0.0",
# "sqlalchemy>=2.0.0",
# "pyodbc>=5.0.0",
# "clickhouse-connect>=0.7.0",
# "clickhouse-sqlalchemy>=0.3.2",
# "pyyaml>=6.0.3",
# "python-dotenv>=1.0.0",
# ]
# ///
from __future__ import annotations
from time import sleep
import sys
from datetime import date, datetime, timedelta
import polars as pl
import yaml
from log import log
from clickhouse_task.create_table import create_clickhouse_table
from clickhouse_task.delete_task import (
delete_existing_data,
truncate_table,
)
from clickhouse_task.load_table import load_to_clickhouse
from db_con.connection import (
build_sql_server_engine,
build_clickhouse_engine,
get_clickhouse_client,
)
from mids import (
MID_TABLE_COV,
MID_TABLE_COV1,
)
from src.bridge import *
from src.fact import *
from src.dim import *
# ==========================================================
# Helpers
# ==========================================================
def table_exists(
client,
table_name: str,
) -> bool:
return bool(
client.command(
f"EXISTS TABLE {table_name}"
)
)
# ==========================================================
# Main
# ==========================================================
def main():
log.info("=" * 80)
log.info("Hello from data-move Python data pipeline!")
# ------------------------------------------------------
# Run Date
# ------------------------------------------------------
if len(sys.argv) > 1:
run_date = datetime.strptime(
sys.argv[1],
"%Y-%m-%d",
).date()
else:
run_date = date.today() - timedelta(days=1)
log.info(
"Pipeline Run Date: %s",
run_date,
)
# ------------------------------------------------------
# Connections
# ------------------------------------------------------
log.info(
"Connecting to databases..."
)
sql_engine = build_sql_server_engine()
clickhouse_engine = build_clickhouse_engine()
client = get_clickhouse_client()
log.info(
"Database connections established"
)
# ------------------------------------------------------
# mids Keys
# ------------------------------------------------------
mids = MID_TABLE_COV(
sql_engine,
run_date,
)
emp_visit_df = MID_TABLE_COV1(
sql_engine,
run_date,
)
# ------------------------------------------------------
# Config
# ------------------------------------------------------
with open(
"t.yml",
"r",
) as file:
config = yaml.safe_load(file)
# ------------------------------------------------------
# Process Tables
# ------------------------------------------------------
for table in config["tables"]:
table_name = table["name"]
operation = table["operation"]
fetch_by = table["fetch_by"]
table_type=table["type"]
log.info("=" * 80)
log.info(f"Processing Table: {table_name} | Table type is -: {table_type} | Based on -{fetch_by} and operation is used -{operation} " )
try:
# ------------------------------------------
# Fetch Data
# ------------------------------------------
log.info(f"Fetching Data from sql server for table-: {table_name} ..............")
fetch_list=["mids" ,"run_date", "reason_id"]
if fetch_by in fetch_list :
fn_name = f"fetch_{table_name}"
fn = globals()[fn_name]
df=fn(sql_engine, table_name , table_type, mids, run_date)
else:
df = fetch_data(sql_engine ,table_name,table_type)
log.info(f"Fetched total row -: {len(df)} from sql server for table-:{table_name} ...........!!!")
if df.is_empty():
log.warning(
"%s returned no rows",
table_name,
)
continue
log.info(
"Fetched %s rows",
len(df),
)
# ------------------------------------------
# Create Table If Missing
# ------------------------------------------
exists = table_exists(
client,
table_name,
)
if not exists:
log.info(
"Creating table %s",
table_name,
)
create_clickhouse_table(
df=df,
table_name=table_name,
clickhouse_engine=clickhouse_engine,
)
# ------------------------------------------
# Existing Table Logic
# ------------------------------------------
else:
if operation == "DELETE+INSERT":
truncate_table(
client,
table_name,
)
elif operation =="ONLY_INSERT" :
continue
else:
delete_existing_data(
client=client,
table_name=table_name,
run_date=run_date,
mids=mids,
emp_visit_df=emp_visit_df,
)
# ------------------------------------------
# Load Data
# ------------------------------------------
log.info("_ _ _ _Inserting data into clickhouse db from sql server_ _ _ _")
load_to_clickhouse(
client=client,
table_name=table_name,
df=df,
)
log.info(
"%s loaded successfully (%s rows)",
table_name,
len(df),
)
except Exception:
log.exception(
"Failed processing table %s",
table_name,
)
raise
log.info("=" * 80)
log.info("Pipeline Completed Successfully")
log.info("=" * 80)
if __name__ == "__main__":
for attempt in range(3):
try:
main()
with open("Pipeline_config.yml", "r") as f:
config = yaml.safe_load(f)
config["pipeline"]["last_successful_run_date"] = str(date.today())
with open("Pipeline_config.yml", "w") as f:
yaml.safe_dump(config, f, sort_keys=False)
log.info(
f"Pipeline completed successfully. "
f"last_successful_run_date={date.today()}"
)
break
except Exception as e:
with open("Pipeline_config.yml", "r") as f:
config = yaml.safe_load(f)
config["pipeline"]["run_date"] = str(date.today())
with open("Pipeline_config.yml", "w") as f:
yaml.safe_dump(config, f, sort_keys=False)
if attempt == 2:
raise
log.warning(
f"Pipeline failed. Retry {attempt + 1}/3. Error: {e}"
)
sleep(5)