from pathlib import Path import polars as pl from sqlalchemy import Engine from datetime import date , timedelta from log import log from db_con.connection import ( build_sql_server_engine, build_clickhouse_engine, get_clickhouse_client, ) # def fetch_data( # engine: Engine, # table_name: str, # table_type: str, # mids: list[int], # run_date: date # ) -> pl.DataFrame: # if not mids: # log.warning("No MIDs — nothing to fetch.") # return pl.DataFrame() # mid_list = ",".join(str(mid) for mid in mids) # sql_file = Path("src") / "sql" / f"{table_type.lower()}" / f"{table_name}.sql" # with open(sql_file, "r", encoding="utf-8") as f: # sql_template = f.read() # sql = sql_template.format( # mid_list=mid_list, # run_date=run_date.strftime("%Y-%m-%d") # ) # log.info(f"Fetching data for {len(mids):,} MIDs") # df = pl.read_database( # query=sql, # connection=engine # ) # log.info(f"Fetched {len(df):,} rows from SQL Server") # return df def fetch_SOS_OneApp( engine: Engine, table_name: str, table_type: str, mids: list[int], run_date: date ) -> pl.DataFrame: if not mids: log.warning("No MIDs — nothing to fetch.") return pl.DataFrame() log.info(f" Start Fetching data for these {len(mids):} MIDs ") mid_list = ",".join(str(mid) for mid in mids) log.info(f"Start Fetching data for these {len(mids):} MIDs or based on this date {run_date}") sql_file = Path("src") / "sql" / f"{table_type.lower()}" / f"{table_name}.sql" with open(sql_file, "r", encoding="utf-8") as f: sql_template = f.read() sql = sql_template.format( mid_list=mid_list, run_date=run_date.strftime("%Y-%m-%d") ) log.info(f"Fetching data for {len(mids):,} MIDs") df = pl.read_database( query=sql, connection=engine ) log.info(f"Fetched {len(df):,} rows from SQL Server") return df def fetch_additional_visibility( engine: Engine, table_name: str, table_type: str, mids: list[int], run_date: date ) -> pl.DataFrame: if not mids: log.warning("No MIDs — nothing to fetch.") return pl.DataFrame() log.info(f" Start Fetching data for these {len(mids):} MIDs ") mid_list = ",".join(str(mid) for mid in mids) log.info(f"Start Fetching data for these {len(mids):} MIDs or based on this date {run_date}") sql_file = Path("src") / "sql" / f"{table_type.lower()}" / f"{table_name}.sql" with open(sql_file, "r", encoding="utf-8") as f: sql_template = f.read() sql = sql_template.format( mid_list=mid_list, run_date=run_date.strftime("%Y-%m-%d") ) log.info(f"Fetching data for {len(mids):,} MIDs") df = pl.read_database( query=sql, connection=engine ) log.info(f"Fetched {len(df):,} rows from SQL Server") return df def fetch_OQaD( sql_engine: Engine, table_name: str, table_type: str, mids: list[int], run_date: date ) -> pl.DataFrame: client= get_clickhouse_client() def table_exists( client, table_name: str, ) -> bool: return bool( client.command( f"EXISTS TABLE {table_name}" ) ) def fetch_quiz_empids(engine: Engine, run_date : date) -> pl.DataFrame: sql_template = f""" WITH MID_TABLE_COV1 AS ( SELECT EmpId, VisitDate FROM OneApp_KelloggsMT.dbo.T_OQAD WHERE CreateDate >= {run_date} AND CreateDate < DATEADD(DAY,1,'{run_date}') UNION ALL SELECT EmpId, VisitDate FROM OneApp_KelloggsMT.dbo.T_OQAD WHERE UpdateDate >= {run_date} AND UpdateDate < DATEADD(DAY,1, '{run_date}') ), QUIZ AS ( SELECT Distinct E.EmpId as empid , CONVERT(date,DQ.VisitDate) AS visitdate FROM OneApp_KelloggsMT.dbo.T_OQAD DQ INNER JOIN OneApp_KelloggsMT.dbo.vw_Employee_Detail E ON DQ.EmpId = E.EmpId inner join OneApp_KelloggsMT.dbo.Master_OQAD_Question QU on DQ.QuestionId= qu.QuestionId inner join OneApp_KelloggsMT.dbo.Master_OQAD_Category qc on qu.QuestionCategoryId= qc.QuestionCategoryId where e.EmpName not like 'test%' and e.RightId in (6) and (E.ResignDate is null or E.ResignDate>=''+CONVERT(VARCHAR,'{run_date}')+'') AND E.EmpName NOT LIKE '%TEST%' AND DQ.EmpId IN (SELECT EmpId FROM MID_TABLE_COV1 A WHERE DQ.EmpId=A.EmpId AND CONVERT(date,VisitDate)=CONVERT(date,A.VisitDate) ) ) select * from quiz """ sql = sql_template.format( run_date=run_date.strftime("%Y-%m-%d") ) log.info(f"Fetching quiz_empids data for EMPID and Visitid") df = pl.read_database( query=sql, connection=engine ) log.info(f"Fetched {len(df):,} total empid and visitdate fetched for OQAD from SQL Server") return df def get_empids_clickhouse_OQAD( client, table_name: str = "OQaD", ) -> pl.DataFrame: if not table_exists(client, table_name): log.warning(f"Table '{table_name}' does not exist.") return pl.DataFrame( schema={ "empid": pl.Int64, "visitdate": pl.Date, } ) query = f""" SELECT DISTINCT employee_id AS empid, visit_date AS visitdate FROM {table_name} """ # ClickHouse -> PyArrow -> Polars arrow_table = client.query_arrow(query) return pl.from_arrow(arrow_table) qf=fetch_quiz_empids(sql_engine,run_date) db_df = get_empids_clickhouse_OQAD(client) matched = qf.join( db_df, on=["empid", "visitdate"], how="inner", ) if matched.is_empty(): empids=[0] log.warning( "%s Matched df in OQaD returned no rows", table_name, ) else: empids=matched["empid"].to_list() log.info(f"Fetched {len(empids):,} matched empids fetched for OQAD ") def fetch_data( engine: Engine, table_name: str, table_type: str, empids: list[int], run_date: date ) -> pl.DataFrame: empid_list = ",".join(str(empid) for empid in empids) sql_file = Path("src") / "sql" / "fact" / f"{table_name}.sql" log.info(f"Exists: {sql_file.exists()}") log.info(f"Path: {sql_file.resolve()}") with open(sql_file, "r", encoding="utf-8") as f: sql_template = f.read() sql = sql_template.format( empid_list=empid_list, run_date=run_date.strftime("%Y-%m-%d") ) log.info(f"Fetching data for {len(empids):,} EMPIDs") log.info("Fetching OQaD data for run_date=%s", run_date) df = pl.read_database( query=sql, connection=engine, ) log.info("fn name is fetch_OQad ------Fetched %s rows", len(df)) return df df=fetch_data( engine=sql_engine, table_name=table_name, table_type=table_type, empids=empids, run_date=run_date ) log.info(f"Fetched {len(df):,} rows from SQL Server") return df # def fetch_OQaD( # engine: Engine, # table_name: str, # table_type: str, # empids: list[int], # run_date: date # ) -> pl.DataFrame: # empid_list = ",".join(str(empid) for empid in empids) # sql_file = Path("src") / "sql" / "fact" / f"{table_name}.sql" # log.info(f"Exists: {sql_file.exists()}") # log.info(f"Path: {sql_file.resolve()}") # with open(sql_file, "r", encoding="utf-8") as f: # sql_template = f.read() # sql = sql_template.format( # empid_list=empid_list, # run_date=run_date.strftime("%Y-%m-%d") # ) # log.info(f"Fetching data for {len(empids):,} EMPIDs") # log.info("Fetching OQaD data for run_date=%s", run_date) # df = pl.read_database( # query=sql, # connection=engine, # ) # log.info("fn name is fetch_OQad ------Fetched %s rows", len(df)) # return df def fetch_Coverage( engine: Engine, table_name: str, table_type: str, mids: list[int], run_date: date ) -> pl.DataFrame: if not mids: log.warning("No MIDs — nothing to fetch.") return pl.DataFrame() log.info(f"Start Fetching data for these {len(mids):} MIDs or based on this date {run_date}") mid_list = ",".join(str(mid) for mid in mids) sql_file = Path("src") / "sql" / f"{table_type.lower()}" / f"{table_name}.sql" with open(sql_file, "r", encoding="utf-8") as f: sql_template = f.read() sql = sql_template.format( mid_list=mid_list, run_date=run_date.strftime("%Y-%m-%d") ) log.info(f"Fetching data for {len(mids):,} MIDs") df = pl.read_database( query=sql, connection=engine ) log.info(f"Fetched {len(df):,} rows from SQL Server") return df def fetch_Survey( engine: Engine, table_name: str, table_type: str, mids: list[int], run_date: date ) -> pl.DataFrame: if not mids: log.warning("No MIDs — nothing to fetch.") return pl.DataFrame() mid_list = ",".join(str(mid) for mid in mids) log.info(f"Start Fetching data for these {len(mids):} MIDs or based on this date {run_date}") sql_file = Path("src") / "sql" / f"{table_type.lower()}" / f"{table_name}.sql" with open(sql_file, "r", encoding="utf-8") as f: sql_template = f.read() sql = sql_template.format( mid_list=mid_list, run_date=run_date.strftime("%Y-%m-%d") ) log.info(f"Fetching data for {len(mids):,} MIDs") df = pl.read_database( query=sql, connection=engine ) log.info(f"Fetched {len(df):,} rows from SQL Server") return df def fetch_Login( engine: Engine, table_name: str, table_type: str, mids: list[int], run_date: date ) -> pl.DataFrame: # if not mids: # log.warning("No MIDs — nothing to fetch.") # return pl.DataFrame() mid_list = ",".join(str(mid) for mid in mids) log.info(f"Start Fetching data for these {len(mids):} MIDs or based on this date {run_date}") sql_file = Path("src") / "sql" / f"{table_type.lower()}" / f"{table_name}.sql" with open(sql_file, "r", encoding="utf-8") as f: sql_template = f.read() sql = sql_template.format( # mid_list=mid_list, run_date=run_date.strftime("%Y-%m-%d") ) log.info(f"Fetching data for {len(mids):,} MIDs") df = pl.read_database( query=sql, connection=engine ) log.info(f"Fetched {len(df):,} rows from SQL Server") return df def fetch_Stock_Details( engine: Engine, table_name: str, table_type: str, mids: list[int], run_date: date ) -> pl.DataFrame: if not mids: log.warning("No MIDs — nothing to fetch.") return pl.DataFrame() mid_list = ",".join(str(mid) for mid in mids) log.info(f"Start Fetching data for these {len(mids):} MIDs or based on this date {run_date}") sql_file = Path("src") / "sql" / f"{table_type.lower()}" / f"{table_name}.sql" with open(sql_file, "r", encoding="utf-8") as f: sql_template = f.read() sql = sql_template.format( mid_list=mid_list, run_date=run_date.strftime("%Y-%m-%d") ) log.info(f"Fetching data for {len(mids):,} MIDs") df = pl.read_database( query=sql, connection=engine ) log.info(f"Fetched {len(df):,} rows from SQL Server") return df # def fetch_Coverage( engine: Engine, # table_name: str, # table_type: str, # mids: list[int], # run_date: date # ) -> pl.DataFrame: # if not mids: # log.warning("No MIDs — nothing to fetch.") # return pl.DataFrame() # mid_list = ",".join(str(mid) for mid in mids) # log.info(f"Start Fetching data for these {len(mids):} MIDs or based on this date {run_date}") # sql_file = Path("src") / "sql" / f"{table_type.lower()}" / f"{table_name}.sql" # with open(sql_file, "r", encoding="utf-8") as f: # sql_template = f.read() # sql = sql_template.format( # mid_list=mid_list, # run_date=run_date.strftime("%Y-%m-%d") # ) # log.info(f"Fetching data for {len(mids):,} MIDs") # df = pl.read_database( # query=sql, # connection=engine # ) # log.info(f"Fetched {len(df):,} rows from SQL Server") # return df def fetch_Attendance( engine: Engine, table_name: str, table_type: str, mids: list[int], run_date: date | None = None, days_back: int = 15 ) -> pl.DataFrame: """ Fetch attendance source data. Default: end_date = yesterday start_date = yesterday - 15 days """ if run_date is None: run_date = date.today() - timedelta(days=1) start_date = run_date - timedelta(days=days_back) sql_file = Path("src") / "sql" / f"{table_type.lower()}" / f"{table_name}.sql" with open(sql_file, "r", encoding="utf-8") as f: sql_template = f.read() sql = sql_template.format( start_date=start_date.strftime("%Y-%m-%d"), run_date=run_date.strftime("%Y-%m-%d") ) log.info( f"Fetching Attendance data from {start_date} to {run_date}" ) df = pl.read_database( query=sql, connection=engine ) log.info( f"Fetched {len(df):,} attendance rows " f"for {df['employee_id'].n_unique():,} employees" ) return df def fetch_Journey_Plan( engine: Engine, table_name: str, table_type: str, mids: list[int], run_date: date ) -> pl.DataFrame: if not mids: log.warning("No MIDs — nothing to fetch.") return pl.DataFrame() log.info(f" Start Fetching data for these {len(mids):} MIDs ") mid_list = ",".join(str(mid) for mid in mids) log.info(f"Start Fetching data for these {len(mids):} MIDs or based on this date {run_date}") sql_file = Path("src") / "sql" / f"{table_type.lower()}" / f"{table_name}.sql" with open(sql_file, "r", encoding="utf-8") as f: sql_template = f.read() sql = sql_template.format( mid_list=mid_list, run_date=run_date.strftime("%Y-%m-%d") ) log.info(f"Fetching data for {len(mids):,} MIDs") df = pl.read_database( query=sql, connection=engine ) log.info(f"Fetched {len(df):,} rows from SQL Server") return df def fetch_PaidVisibility( engine: Engine, table_name: str, table_type: str, mids: list[int], run_date: date ) -> pl.DataFrame: if not mids: log.warning("No MIDs — nothing to fetch.") return pl.DataFrame() log.info(f" Start Fetching data for these {len(mids):} MIDs ") mid_list = ",".join(str(mid) for mid in mids) log.info(f"Start Fetching data for these {len(mids):} MIDs or based on this date {run_date}") sql_file = Path("src") / "sql" / f"{table_type.lower()}" / f"{table_name}.sql" with open(sql_file, "r", encoding="utf-8") as f: sql_template = f.read() sql = sql_template.format( mid_list=mid_list, run_date=run_date.strftime("%Y-%m-%d") ) log.info(f"Fetching data for {len(mids):,} MIDs") df = pl.read_database( query=sql, connection=engine ) log.info(f"Fetched {len(df):,} rows from SQL Server") return df def fetch_Web_Logins( engine: Engine, table_name: str, table_type: str, mids: list[int], run_date: date ) -> pl.DataFrame: if not mids: log.warning("No MIDs — nothing to fetch.") return pl.DataFrame() log.info(f" Start Fetching data for these {len(mids):} MIDs ") mid_list = ",".join(str(mid) for mid in mids) log.info(f"Start Fetching data for these {len(mids):} MIDs or based on this date {run_date}") sql_file = Path("src") / "sql" / f"{table_type.lower()}" / f"{table_name}.sql" with open(sql_file, "r", encoding="utf-8") as f: sql_template = f.read() sql = sql_template.format( mid_list=mid_list, run_date=run_date.strftime("%Y-%m-%d") ) log.info(f"Fetching data for {len(mids):,} MIDs") df = pl.read_database( query=sql, connection=engine ) log.info(f"Fetched {len(df):,} rows from SQL Server") return df def fetch_Promotion( engine: Engine, table_name: str, table_type: str, mids: list[int], run_date: date ) -> pl.DataFrame: if not mids: log.warning("No MIDs — nothing to fetch.") return pl.DataFrame() mid_list = ",".join(str(mid) for mid in mids) sql_file = Path("src") / "sql" / f"{table_type.lower()}" / f"{table_name}.sql" with open(sql_file, "r", encoding="utf-8") as f: sql_template = f.read() sql = sql_template.format( mid_list=mid_list, run_date=run_date.strftime("%Y-%m-%d") ) log.info(f"Fetching data for {len(mids):,} MIDs") df = pl.read_database( query=sql, connection=engine ) log.info(f"Fetched {len(df):,} rows from SQL Server") return df