4th commit

This commit is contained in:
Ankit Malik
2026-06-12 19:05:32 +05:30
parent 8aaae1e27d
commit 418a8847b2
6 changed files with 340 additions and 89 deletions
+101 -49
View File
@@ -19,7 +19,7 @@ from __future__ import annotations
import os
# import pyarrow
import sys
from datetime import date, timedelta
from datetime import date, timedelta, datetime
import polars as pl
import yaml
@@ -72,7 +72,28 @@ def main():
log.info("Both databases connected successfully")
mids=collect_mids(sql_engine , run_date)
mid_list = ",".join(map(str, mids))
conditions = {
"mids": f"MID IN ({mid_list})",
"j_plan": (
f"MONTH(VisitDate) = {run_date.month} "
f"AND YEAR(VisitDate) = {run_date.year}"
),
"mapping": (
f"CAST(Z.FromDate AS DATE) <= '{run_date}' "
f"AND CAST(Z.ToDate AS DATE) >= '{run_date}'"
),
"web": (
f"CAST(login_date AS DATE) = '{run_date}'"
),
"none": None,
}
# fetching polar df from sql-server
@@ -88,69 +109,100 @@ def main():
table_name=table["name"]
table_type=table["type"]
operation=table["operation"]
condition=table["condition"]
c = conditions.get(condition)
log.info("=" * 80)
log.info("TABLE=%s | TYPE=%s | OPERATION=%s",
table_name,
table_type,
operation)
fn=f"fetch_{table_name}"
list=["Attendance", "Journey_Plan", "Web_Logins"]
if table_type =="FACT" :
if table_name in list :
df = globals()[fn](sql_engine, run_date)
else:
df = globals()[fn](sql_engine, mids)
elif table_type =="BRIDGE" :
df = globals()[fn](sql_engine, run_date)
else:
df = globals()[fn](sql_engine)
# Step 2
fn=f"fetch_{table_name}"
fetch_by = table["fetch_by"]
if operation == "DELETE+INSERT" :
truncate_table(client , table_name )
log.info(f"Truncate a ClickHouse table - {table_name}")
if fetch_by == "mids":
df = globals()[fn](sql_engine, mids)
check_query=f"EXISTS TABLE {table_name}"
exists = client.command(check_query)
if exists == 0 :
create_clickhouse_table(df, table_name, clickhouse_engine)
load_to_clickhouse(
client=client,
table_name=table_name,
df=df,
)
else:
truncate_table(client , table_name )
log.info(f"Truncate a ClickHouse table - {table_name}")
load_to_clickhouse(client=client,table_name=table_name,df=df)
elif fetch_by == "run_date":
df = globals()[fn](sql_engine, run_date)
check_query=f"EXISTS TABLE {table_name}"
exists = client.command(check_query)
if exists == 0 :
create_clickhouse_table(df, table_name, clickhouse_engine)
else:
truncate_table(client , table_name )
log.info(f"Truncate a ClickHouse table - {table_name}")
load_to_clickhouse(client=client,table_name=table_name,df=df)
else:
df = globals()[fn](sql_engine)
check_query=f"EXISTS TABLE {table_name}"
exists = client.command(check_query)
if exists == 0 :
create_clickhouse_table(df, table_name, clickhouse_engine)
else:
truncate_table(client , table_name )
log.info(f"Truncate a ClickHouse table - {table_name}")
load_to_clickhouse(client=client,table_name=table_name,df=df)
else:
print("table is fact ")
if fetch_by == "mids":
df = globals()[fn](sql_engine, mids)
check_query=f"EXISTS TABLE {table_name}"
exists = client.command(check_query)
if exists == 0 :
create_clickhouse_table(df, table_name, clickhouse_engine)
else:
delete_rows(client, table_name, c )
load_to_clickhouse(client=client,table_name=table_name,df=df)
elif fetch_by == "run_date":
df = globals()[fn](sql_engine, run_date)
check_query=f"EXISTS TABLE {table_name}"
exists = client.command(check_query)
if exists == 0 :
create_clickhouse_table(df, table_name, clickhouse_engine)
else:
delete_rows(client, table_name, c)
load_to_clickhouse(client=client,table_name=table_name,df=df)
else:
df = globals()[fn](sql_engine)
check_query=f"EXISTS TABLE {table_name}"
exists = client.command(check_query)
if exists == 0 :
create_clickhouse_table(df, table_name, clickhouse_engine)
else:
truncate_table(client , table_name )
log.info(f"Truncate a ClickHouse table - {table_name}")
load_to_clickhouse(client=client,table_name=table_name,df=df)
#fetch table details
if __name__ == "__main__":
main()
main()