Files
data_pipeline/src/bridge.py
T
2026-06-19 18:15:27 +05:30

110 lines
3.1 KiB
Python

from pathlib import Path
import polars as pl
from sqlalchemy import Engine
from datetime import date , timedelta
from log import log
from db_con.connection import (
build_sql_server_engine,
build_clickhouse_engine,
get_clickhouse_client,
)
def fetch_mapping_storevisibility(
sql_engine: Engine,
table_name: str,
table_type: str,
mids: list[int],
run_date: date
) -> pl.DataFrame:
run_date = run_date + timedelta(days=1)
client= get_clickhouse_client()
def table_exists(
client,
table_name: str,
) -> bool:
return bool(
client.command(
f"EXISTS TABLE {table_name}"
)
)
def get_reason_ids_mapping_storevisibility(
client,
run_date: date,
table_name: str = "mapping_storevisibility",
) -> list[int] :
if not table_exists(client, table_name):
log.warning(f"Table '{table_name}' does not exist. During collecting store_ids")
return [0]
query = f"""
SELECT DISTINCT StoreId
FROM mapping_storevisibility
WHERE toDate(Fromdate) <= toDate('{run_date + timedelta(days= 1)}')
AND toDate(Todate) >= toDate('{run_date + timedelta(days= 1)}')
AND project_Id = '40148'
"""
# ClickHouse -> PyArrow -> Polars
arrow_table = client.query_arrow(query)
df= pl.from_arrow(arrow_table)
list=df["reason_id"].to_list()
return list
def fetch_data(
engine: Engine,
table_name: str,
table_type: str,
run_date: date,
store_id: list[int]
) -> pl.DataFrame:
log.info(f"Fetching data from sql server for Master table......")
store_id_list = ",".join(str(sid) for sid in store_id)
sql_file = Path("src") / "sql" / f"dim" / f"{table_name}.sql"
with open(sql_file, "r", encoding="utf-8") as f:
sql_template = f.read()
sql = sql_template.format(
store_id_list=store_id_list,
run_date=run_date.strftime("%Y-%m-%d")
)
log.info(f"Fetching in progress .... ")
df = pl.read_database(
query=sql,
connection=engine
)
log.info(f"Fetched {len(df):,} rows from SQL Server")
return df
store_id=get_reason_ids_mapping_storevisibility(client, run_date, "coverage_remarks")
df=fetch_data(engine=sql_engine,
table_name=table_name,
table_type=table_type,
store_id=store_id,
)
log.info(f"Fetched {len(df):,} rows from SQL Server")
return df