# /// script # requires-python = ">=3.11" # dependencies = [ # "polars>=0.20.0", # "sqlalchemy>=2.0.0", # "pyodbc>=5.0.0", # "pyyaml>=6.0.3", # "clickhouse-connect>=0.7.0", # "clickhouse-sqlalchemy>=0.3.2", # "python-dotenv>=1.0.0", # "pyarrow>=18.0.0" # ] # /// import os import yaml import pyarrow import sys import logging from datetime import date, timedelta import polars as pl from sqlalchemy import create_engine, text from sqlalchemy.engine import Engine, URL import clickhouse_connect from dotenv import load_dotenv from log import log from clickhouse_task.create_table import create_clickhouse_table , check from db_con.connection import * from mids import * def main(): log.info("Hello from data-move Python data pipeline !") check() if len(sys.argv) > 1: run_date = datetime.strptime(sys.argv[1], "%Y-%m-%d").date() else: run_date = date.today() - timedelta(days=1) print(run_date) print(type(run_date)) # connecting with both db servers sql-server log.info("connecting with both db servers sql-serveras well as clickhouse DB") sql_engine = build_sql_server_engine() clickhouse_engine = build_clickhouse_engine() client=get_clickhouse_client() log.info("Both databases connected successfully") mids=collect_mids(sql_engine , run_date) # fetching polar df from sql-server with open("tables.yml", "r") as file: config = yaml.safe_load(file) for table in config["tables"]: table_name=table["name"], table_type=table["type"], operation=table["operation"] log.info(table_name, operation) fn=f"fetch_{table_name}" list=["Attendance", "Journey_Plan", "Web_Logins"] if table_type =="FACT" : if table_name in list : df = globals()[fn](sql_engine, run_date) else: df = globals()[fn](sql_engine, mids) elif table_type =="BRIDGE" : df = globals()[fn](sql_engine, run_date) else: df = globals()[fn](sql_engine) #fetch table details if __name__ == "__main__": main()