diff --git a/main2.py b/main2.py index d7f75df..a661812 100644 --- a/main2.py +++ b/main2.py @@ -82,26 +82,35 @@ def get_dataframe( return fn(sql_engine) -def get_employee_ids( + + + +def get_empids_clickhouse_OQAD( client, - clickhouse_engine: Engine, table_name: str = "OQaD", -) -> list[int]: +) -> pl.DataFrame: if not table_exists(client, table_name): log.warning(f"Table '{table_name}' does not exist.") - return [] + return pl.DataFrame( + schema={ + "EmpId": pl.Int64, + "VisitDate": pl.Date, + } + ) query = f""" - SELECT DISTINCT employee_id + SELECT DISTINCT + employee_id AS EmpId, + toDate(visit_date) AS VisitDate FROM {table_name} """ - return ( - pl.read_database(query, clickhouse_engine) - .get_column("employee_id") - .to_list() - ) + # ClickHouse -> PyArrow -> Polars + arrow_table = client.query_arrow(query) + + return pl.from_arrow(arrow_table) + # ========================================================== # Main # ========================================================== @@ -157,7 +166,14 @@ def main(): sql_engine, run_date, ) + qf=fetch_quiz_empids() + db_df = get_empids_clickhouse_OQAD(client) + matched = qf.join( + db_df, + on=["EmpId", "VisitDate"], + how="inner", +) # ------------------------------------------------------ # Config # ------------------------------------------------------ diff --git a/mids.py b/mids.py index 25760c2..b0deff6 100644 --- a/mids.py +++ b/mids.py @@ -61,4 +61,62 @@ def MID_TABLE_COV1( return pl.read_database( query=query, connection=engine, - ) \ No newline at end of file + ) + + + + + + + + + + + +def fetch_quiz_empids(engine: Engine, run_date : date) -> pl.DataFrame: + + sql_template = f""" + WITH MID_TABLE_COV1 AS +( + SELECT EmpId, VisitDate + FROM OneApp_KelloggsMT.dbo.T_OQAD + WHERE CreateDate >= {run_date} + AND CreateDate < DATEADD(DAY,1,'{run_date}') + + UNION ALL + + SELECT EmpId, VisitDate + FROM OneApp_KelloggsMT.dbo.T_OQAD + WHERE UpdateDate >= {run_date} + AND UpdateDate < DATEADD(DAY,1, '{run_date}') +), + +QUIZ AS +( +SELECT Distinct E.EmpId as empid +, CONVERT(VARCHAR,DQ.VisitDate) AS VisitDate +FROM OneApp_KelloggsMT.dbo.T_OQAD DQ INNER JOIN +OneApp_KelloggsMT.dbo.vw_Employee_Detail E ON DQ.EmpId = E.EmpId inner join +OneApp_KelloggsMT.dbo.Master_OQAD_Question QU on DQ.QuestionId= qu.QuestionId inner join +OneApp_KelloggsMT.dbo.Master_OQAD_Category qc on qu.QuestionCategoryId= qc.QuestionCategoryId +where e.EmpName not like 'test%' and e.RightId in (6) +and (E.ResignDate is null or E.ResignDate>=''+CONVERT(VARCHAR,'{run_date}')+'') AND E.EmpName NOT LIKE '%TEST%' +AND DQ.EmpId IN (SELECT EmpId FROM MID_TABLE_COV1 A WHERE +DQ.EmpId=A.EmpId AND CONVERT(date,VisitDate)=CONVERT(date,A.VisitDate) ) +) select * from quiz +""" + sql = sql_template.format( + run_date=run_date.strftime("%Y-%m-%d") + ) + + log.info(f"Fetching quiz_empids data for EMPID and Visitid") + + df = pl.read_database( + query=sql, + connection=engine + ) + + + log.info(f"Fetched {len(df):,} total empid and visitdate fetched for OQAD from SQL Server") + + return df diff --git a/src/OQAD.py b/src/OQAD.py new file mode 100644 index 0000000..e69de29 diff --git a/src/fact.py b/src/fact.py index cafceaa..213c640 100644 --- a/src/fact.py +++ b/src/fact.py @@ -19,7 +19,7 @@ def fetch_data( mid_list = ",".join(str(mid) for mid in mids) - sql_file = Path("sql") / table_type / f"{table_name}.sql" + sql_file = Path("sql") / f"{table_type}" / f"{table_name}.sql" with open(sql_file, "r", encoding="utf-8") as f: sql_template = f.read() @@ -38,4 +38,40 @@ def fetch_data( log.info(f"Fetched {len(df):,} rows from SQL Server") + return df + + + + +def fetch_OQaD( + engine: Engine, + table_name: str, + table_type: str, + empids: list[int], + run_date: date +) -> pl.DataFrame: + + empid_list = ",".join(str(empid) for empid in empids) + + sql_file = Path("sql") / table_type / f"{table_name}.sql" + + with open(sql_file, "r", encoding="utf-8") as f: + sql_template = f.read() + + sql = sql_template.format( + empid_list=empid_list, + run_date=run_date.strftime("%Y-%m-%d") + ) + + log.info(f"Fetching data for {len(empids):,} EMPIDs") + + log.info("Fetching OQaD data for run_date=%s", run_date) + + df = pl.read_database( + query=sql, + connection=engine, + ) + + log.info("Fetched %s rows", len(df)) + return df \ No newline at end of file