diff --git a/clickhouse_task/check_table.py b/clickhouse_task/check_table.py new file mode 100644 index 0000000..9772235 --- /dev/null +++ b/clickhouse_task/check_table.py @@ -0,0 +1,15 @@ + +import polars as pl +from clickhouse_connect.driver import * +from log import log + +exists = client.command( + "EXISTS TABLE Attendance" +) + +print(exists + + +exists = client.command( + "EXISTS TABLE Attendance" + ) \ No newline at end of file diff --git a/clickhouse_task/load_table.py b/clickhouse_task/load_table.py index e678a8a..c56ea6b 100644 --- a/clickhouse_task/load_table.py +++ b/clickhouse_task/load_table.py @@ -1,5 +1,8 @@ -import polars as pl -from clickhouse_connect.driver import Client +from sqlalchemy import create_engine, text +from sqlalchemy.engine import URL, Engine +import os +import clickhouse_connect +import polars as pd from log import log diff --git a/logs/etl_20260612.log b/logs/etl_20260612.log index e1e7e7c..f9963e8 100644 --- a/logs/etl_20260612.log +++ b/logs/etl_20260612.log @@ -837,3 +837,148 @@ 2026-06-12 15:20:47 | INFO | TABLE=Master_Salesterritorylayer | TYPE=DIMENSION | OPERATION=INSERT 2026-06-12 15:20:47 | INFO | Fetching Master Sales Territory Layer data 2026-06-12 15:20:47 | INFO | Fetched 33 Master Sales Territory Layer records +2026-06-12 15:41:09 | INFO | ================================================================================ +2026-06-12 15:41:09 | INFO | Hello from data-move Python data pipeline ! +2026-06-12 15:41:09 | INFO | Data-pipeline running Date is -:2026-06-11 +2026-06-12 15:41:09 | INFO | connecting with both db servers sql-serveras well as clickhouse DB +2026-06-12 15:41:10 | INFO | +2026-06-12 15:41:12 | INFO | +2026-06-12 15:41:12 | INFO | Both databases connected successfully +2026-06-12 15:41:12 | INFO | Collecting MIDs for: 2026-06-11 +2026-06-12 15:41:13 | INFO | Found 818 MIDs +2026-06-12 15:41:13 | INFO | ================================================================================ +2026-06-12 15:41:13 | INFO | TABLE=SOS_OneApp | TYPE=FACT | OPERATION=INSERT +2026-06-12 15:41:13 | INFO | Fetching data for 818 MIDs +2026-06-12 15:41:13 | INFO | Fetched 3,677 rows from SQL Server +2026-06-12 15:41:13 | INFO | ================================================================================ +2026-06-12 15:41:13 | INFO | TABLE=OQaD | TYPE=FACT | OPERATION=INSERT +2026-06-12 15:41:13 | INFO | Fetching OQaD data for 818 MIDs +2026-06-12 15:41:16 | INFO | Fetched 464 rows +2026-06-12 15:41:16 | INFO | ================================================================================ +2026-06-12 15:41:16 | INFO | TABLE=Survey | TYPE=FACT | OPERATION=INSERT +2026-06-12 15:41:16 | INFO | Fetching Survey data for 818 MIDs +2026-06-12 15:41:17 | INFO | Fetched 142 Survey rows +2026-06-12 15:41:17 | INFO | ================================================================================ +2026-06-12 15:41:17 | INFO | TABLE=additional_visibility | TYPE=FACT | OPERATION=INSERT +2026-06-12 15:41:17 | INFO | Fetching Additional Visibility data for 818 MIDs +2026-06-12 15:41:18 | INFO | Fetched 1,922 Additional Visibility rows +2026-06-12 15:41:18 | INFO | ================================================================================ +2026-06-12 15:41:18 | INFO | TABLE=Coverage | TYPE=FACT | OPERATION=INSERT +2026-06-12 15:41:18 | INFO | Fetching coverage data for 818 MIDs +2026-06-12 15:41:19 | INFO | Fetched 761 rows from SQL Server +2026-06-12 15:41:19 | INFO | ================================================================================ +2026-06-12 15:41:19 | INFO | TABLE=Login | TYPE=FACT | OPERATION=INSERT +2026-06-12 15:41:19 | INFO | Fetching Login data for yesterday +2026-06-12 15:41:19 | INFO | Fetched 479 Login rows +2026-06-12 15:41:19 | INFO | ================================================================================ +2026-06-12 15:41:19 | INFO | TABLE=Stock_Details | TYPE=FACT | OPERATION=INSERT +2026-06-12 15:41:19 | INFO | Fetching Stock Details data for 818 MIDs +2026-06-12 15:41:25 | INFO | Fetched 41,628 Stock Details rows +2026-06-12 15:41:25 | INFO | ================================================================================ +2026-06-12 15:41:25 | INFO | TABLE=Attendance | TYPE=FACT | OPERATION=DELETE+INSERT +2026-06-12 15:41:25 | INFO | Fetching Attendance data from 2026-05-27 to 2026-06-11 +2026-06-12 15:41:25 | INFO | Fetched 11,918 attendance rows for 592 employees +2026-06-12 15:44:38 | INFO | ================================================================================ +2026-06-12 15:44:38 | INFO | Hello from data-move Python data pipeline ! +2026-06-12 15:44:38 | INFO | Data-pipeline running Date is -:2026-06-11 +2026-06-12 15:44:38 | INFO | connecting with both db servers sql-serveras well as clickhouse DB +2026-06-12 15:44:40 | INFO | +2026-06-12 15:44:42 | INFO | +2026-06-12 15:44:42 | INFO | Both databases connected successfully +2026-06-12 15:44:42 | INFO | Collecting MIDs for: 2026-06-11 +2026-06-12 15:44:42 | INFO | Found 818 MIDs +2026-06-12 15:44:42 | INFO | ================================================================================ +2026-06-12 15:44:42 | INFO | TABLE=SOS_OneApp | TYPE=FACT | OPERATION=INSERT +2026-06-12 15:44:42 | INFO | Fetching data for 818 MIDs +2026-06-12 15:44:43 | INFO | Fetched 3,677 rows from SQL Server +2026-06-12 15:44:43 | INFO | ================================================================================ +2026-06-12 15:44:43 | INFO | TABLE=OQaD | TYPE=FACT | OPERATION=INSERT +2026-06-12 15:44:43 | INFO | Fetching OQaD data for 818 MIDs +2026-06-12 15:44:46 | INFO | Fetched 464 rows +2026-06-12 15:44:46 | INFO | ================================================================================ +2026-06-12 15:44:46 | INFO | TABLE=Survey | TYPE=FACT | OPERATION=INSERT +2026-06-12 15:44:46 | INFO | Fetching Survey data for 818 MIDs +2026-06-12 15:44:46 | INFO | Fetched 142 Survey rows +2026-06-12 15:44:46 | INFO | ================================================================================ +2026-06-12 15:44:46 | INFO | TABLE=additional_visibility | TYPE=FACT | OPERATION=INSERT +2026-06-12 15:44:46 | INFO | Fetching Additional Visibility data for 818 MIDs +2026-06-12 15:44:47 | INFO | Fetched 1,922 Additional Visibility rows +2026-06-12 15:44:47 | INFO | ================================================================================ +2026-06-12 15:44:47 | INFO | TABLE=Coverage | TYPE=FACT | OPERATION=INSERT +2026-06-12 15:44:47 | INFO | Fetching coverage data for 818 MIDs +2026-06-12 15:44:47 | INFO | Fetched 761 rows from SQL Server +2026-06-12 15:44:47 | INFO | ================================================================================ +2026-06-12 15:44:47 | INFO | TABLE=Login | TYPE=FACT | OPERATION=INSERT +2026-06-12 15:44:47 | INFO | Fetching Login data for yesterday +2026-06-12 15:44:48 | INFO | Fetched 479 Login rows +2026-06-12 15:44:48 | INFO | ================================================================================ +2026-06-12 15:44:48 | INFO | TABLE=Stock_Details | TYPE=FACT | OPERATION=INSERT +2026-06-12 15:44:48 | INFO | Fetching Stock Details data for 818 MIDs +2026-06-12 15:44:50 | INFO | Fetched 41,628 Stock Details rows +2026-06-12 15:44:50 | INFO | ================================================================================ +2026-06-12 15:44:50 | INFO | TABLE=Attendance | TYPE=FACT | OPERATION=DELETE+INSERT +2026-06-12 15:44:50 | INFO | Fetching Attendance data from 2026-05-27 to 2026-06-11 +2026-06-12 15:44:51 | INFO | Fetched 11,918 attendance rows for 592 employees +2026-06-12 19:03:59 | INFO | ================================================================================ +2026-06-12 19:03:59 | INFO | Hello from data-move Python data pipeline ! +2026-06-12 19:03:59 | INFO | Data-pipeline running Date is -:2026-06-11 +2026-06-12 19:03:59 | INFO | connecting with both db servers sql-serveras well as clickhouse DB +2026-06-12 19:04:01 | INFO | +2026-06-12 19:04:03 | INFO | +2026-06-12 19:04:03 | INFO | Both databases connected successfully +2026-06-12 19:04:03 | INFO | Collecting MIDs for: 2026-06-11 +2026-06-12 19:04:05 | INFO | Found 818 MIDs +2026-06-12 19:04:05 | INFO | ================================================================================ +2026-06-12 19:04:05 | INFO | TABLE=SOS_OneApp | TYPE=FACT | OPERATION=INSERT +2026-06-12 19:04:05 | INFO | Fetching data for 818 MIDs +2026-06-12 19:04:12 | INFO | Fetched 3,677 rows from SQL Server +2026-06-12 19:04:12 | INFO | Table ready: SOS_OneApp +2026-06-12 19:04:13 | INFO | SOS_OneApp: inserted 3,677 rows into ClickHouse +2026-06-12 19:04:13 | INFO | ================================================================================ +2026-06-12 19:04:13 | INFO | TABLE=OQaD | TYPE=FACT | OPERATION=INSERT +2026-06-12 19:04:13 | INFO | Fetching OQaD data for 818 MIDs +2026-06-12 19:04:26 | INFO | Fetched 464 rows +2026-06-12 19:04:27 | INFO | Table ready: OQaD +2026-06-12 19:04:27 | INFO | OQaD: inserted 464 rows into ClickHouse +2026-06-12 19:04:27 | INFO | ================================================================================ +2026-06-12 19:04:27 | INFO | TABLE=Survey | TYPE=FACT | OPERATION=INSERT +2026-06-12 19:04:27 | INFO | Fetching Survey data for 818 MIDs +2026-06-12 19:04:28 | INFO | Fetched 142 Survey rows +2026-06-12 19:04:29 | INFO | Table ready: Survey +2026-06-12 19:04:29 | INFO | Survey: inserted 142 rows into ClickHouse +2026-06-12 19:04:29 | INFO | ================================================================================ +2026-06-12 19:04:29 | INFO | TABLE=additional_visibility | TYPE=FACT | OPERATION=INSERT +2026-06-12 19:04:29 | INFO | Fetching Additional Visibility data for 818 MIDs +2026-06-12 19:04:30 | INFO | Fetched 1,922 Additional Visibility rows +2026-06-12 19:04:30 | INFO | Table ready: additional_visibility +2026-06-12 19:04:31 | INFO | additional_visibility: inserted 1,922 rows into ClickHouse +2026-06-12 19:04:31 | INFO | ================================================================================ +2026-06-12 19:04:31 | INFO | TABLE=Coverage | TYPE=FACT | OPERATION=INSERT +2026-06-12 19:04:31 | INFO | Fetching coverage data for 818 MIDs +2026-06-12 19:04:34 | INFO | Fetched 761 rows from SQL Server +2026-06-12 19:04:35 | INFO | Table ready: Coverage +2026-06-12 19:04:35 | INFO | Coverage: inserted 761 rows into ClickHouse +2026-06-12 19:04:35 | INFO | ================================================================================ +2026-06-12 19:04:35 | INFO | TABLE=Login | TYPE=FACT | OPERATION=INSERT +2026-06-12 19:04:35 | INFO | Fetching Login data for yesterday +2026-06-12 19:04:36 | INFO | Fetched 479 Login rows +2026-06-12 19:04:36 | INFO | Table ready: Login +2026-06-12 19:04:36 | INFO | Login: inserted 479 rows into ClickHouse +2026-06-12 19:04:36 | INFO | ================================================================================ +2026-06-12 19:04:36 | INFO | TABLE=Stock_Details | TYPE=FACT | OPERATION=INSERT +2026-06-12 19:04:36 | INFO | Fetching Stock Details data for 818 MIDs +2026-06-12 19:04:49 | INFO | Fetched 41,628 Stock Details rows +2026-06-12 19:04:49 | INFO | Table ready: Stock_Details +2026-06-12 19:04:49 | INFO | Stock_Details: inserted 41,628 rows into ClickHouse +2026-06-12 19:04:49 | INFO | ================================================================================ +2026-06-12 19:04:49 | INFO | TABLE=Attendance | TYPE=FACT | OPERATION=DELETE+INSERT +2026-06-12 19:04:49 | INFO | Fetching Attendance data from 2026-05-27 to 2026-06-11 +2026-06-12 19:04:50 | INFO | Fetched 11,918 attendance rows for 592 employees +2026-06-12 19:04:51 | INFO | Table ready: Attendance +2026-06-12 19:04:51 | INFO | Attendance: inserted 11,918 rows into ClickHouse +2026-06-12 19:04:51 | INFO | ================================================================================ +2026-06-12 19:04:51 | INFO | TABLE=Store_Master | TYPE=DIMENSION | OPERATION=DELETE+INSERT +2026-06-12 19:04:51 | INFO | Fetching Store Master data +2026-06-12 19:04:53 | INFO | Fetched 5,984 stores +2026-06-12 19:04:53 | INFO | Table Store_Master truncated successfully. +2026-06-12 19:04:53 | INFO | Truncate a ClickHouse table - Store_Master diff --git a/main.py b/main.py index 8709564..bf402e7 100644 --- a/main.py +++ b/main.py @@ -19,7 +19,7 @@ from __future__ import annotations import os # import pyarrow import sys -from datetime import date, timedelta +from datetime import date, timedelta, datetime import polars as pl import yaml @@ -72,7 +72,28 @@ def main(): log.info("Both databases connected successfully") mids=collect_mids(sql_engine , run_date) - + mid_list = ",".join(map(str, mids)) + conditions = { + "mids": f"MID IN ({mid_list})", + + "j_plan": ( + f"MONTH(VisitDate) = {run_date.month} " + f"AND YEAR(VisitDate) = {run_date.year}" + ), + + "mapping": ( + f"CAST(Z.FromDate AS DATE) <= '{run_date}' " + f"AND CAST(Z.ToDate AS DATE) >= '{run_date}'" + ), + + "web": ( + f"CAST(login_date AS DATE) = '{run_date}'" + ), + + "none": None, +} + + # fetching polar df from sql-server @@ -88,69 +109,100 @@ def main(): table_name=table["name"] table_type=table["type"] operation=table["operation"] + condition=table["condition"] + + c = conditions.get(condition) log.info("=" * 80) log.info("TABLE=%s | TYPE=%s | OPERATION=%s", table_name, table_type, operation) - fn=f"fetch_{table_name}" - list=["Attendance", "Journey_Plan", "Web_Logins"] - if table_type =="FACT" : - if table_name in list : - df = globals()[fn](sql_engine, run_date) - - else: - df = globals()[fn](sql_engine, mids) - - - elif table_type =="BRIDGE" : - - df = globals()[fn](sql_engine, run_date) - - else: - - df = globals()[fn](sql_engine) - - - - # Step 2 + fn=f"fetch_{table_name}" + fetch_by = table["fetch_by"] if operation == "DELETE+INSERT" : - truncate_table(client , table_name ) - log.info(f"Truncate a ClickHouse table - {table_name}") + if fetch_by == "mids": + df = globals()[fn](sql_engine, mids) + check_query=f"EXISTS TABLE {table_name}" + exists = client.command(check_query) + if exists == 0 : + create_clickhouse_table(df, table_name, clickhouse_engine) - load_to_clickhouse( - client=client, - table_name=table_name, - df=df, - ) + else: + truncate_table(client , table_name ) + log.info(f"Truncate a ClickHouse table - {table_name}") + load_to_clickhouse(client=client,table_name=table_name,df=df) + + elif fetch_by == "run_date": + df = globals()[fn](sql_engine, run_date) + check_query=f"EXISTS TABLE {table_name}" + exists = client.command(check_query) + if exists == 0 : + create_clickhouse_table(df, table_name, clickhouse_engine) + + else: + truncate_table(client , table_name ) + log.info(f"Truncate a ClickHouse table - {table_name}") + + load_to_clickhouse(client=client,table_name=table_name,df=df) + else: + df = globals()[fn](sql_engine) + check_query=f"EXISTS TABLE {table_name}" + exists = client.command(check_query) + if exists == 0 : + create_clickhouse_table(df, table_name, clickhouse_engine) + + else: + truncate_table(client , table_name ) + log.info(f"Truncate a ClickHouse table - {table_name}") + + load_to_clickhouse(client=client,table_name=table_name,df=df) else: - print("table is fact ") - + if fetch_by == "mids": + df = globals()[fn](sql_engine, mids) + check_query=f"EXISTS TABLE {table_name}" + exists = client.command(check_query) + if exists == 0 : + create_clickhouse_table(df, table_name, clickhouse_engine) + + else: + + delete_rows(client, table_name, c ) + + load_to_clickhouse(client=client,table_name=table_name,df=df) + + elif fetch_by == "run_date": + df = globals()[fn](sql_engine, run_date) + check_query=f"EXISTS TABLE {table_name}" + exists = client.command(check_query) + if exists == 0 : + create_clickhouse_table(df, table_name, clickhouse_engine) + + else: + + delete_rows(client, table_name, c) + + load_to_clickhouse(client=client,table_name=table_name,df=df) + else: + df = globals()[fn](sql_engine) + check_query=f"EXISTS TABLE {table_name}" + exists = client.command(check_query) + if exists == 0 : + create_clickhouse_table(df, table_name, clickhouse_engine) + + else: + truncate_table(client , table_name ) + log.info(f"Truncate a ClickHouse table - {table_name}") + + load_to_clickhouse(client=client,table_name=table_name,df=df) - - - - - - - - - - - #fetch table details - - - - - if __name__ == "__main__": - main() + main() \ No newline at end of file diff --git a/tables.yml b/tables.yml index 6d22e1d..a13e628 100644 --- a/tables.yml +++ b/tables.yml @@ -2,83 +2,125 @@ tables: - name: SOS_OneApp type: FACT operation: INSERT + fetch_by: mids + condition: mids - name: OQaD type: FACT operation: INSERT + fetch_by: mids + condition: mids - name: Survey type: FACT operation: INSERT + fetch_by: mids + condition: mids - name: additional_visibility type: FACT operation: INSERT + fetch_by: mids + condition: mids - name: Coverage type: FACT operation: INSERT + fetch_by: mids + condition: mids - name: Login type: FACT operation: INSERT + fetch_by: mids + condition: mids - name: Stock_Details type: FACT operation: INSERT + fetch_by: mids + condition: mids - name: Attendance type: FACT operation: DELETE+INSERT + fetch_by: run_date + condition: none - name: Store_Master type: DIMENSION operation: DELETE+INSERT + fetch_by: none + condition: none - name: SKU_Master type: DIMENSION operation: DELETE+INSERT + fetch_by: none + condition: none - name: display_master type: DIMENSION operation: DELETE+INSERT + fetch_by: none + condition: none - name: Employee_Master type: DIMENSION operation: DELETE+INSERT + fetch_by: none + condition: none - name: Journey_Plan type: FACT operation: INSERT + fetch_by: run_date + condition: j_plan - name: coverage_remarks type: FACT operation: INSERT + fetch_by: mids + condition: mids - name: mapping_storevisibility type: BRIDGE operation: INSERT + fetch_by: run_date + condition: mapping - name: Master_VisibilityReason type: DIMENSION - operation: INSERT + operation: DELETE+INSERT + fetch_by: none + condition: none - name: Master_VisibilityDefinition type: DIMENSION - operation: INSERT + operation: DELETE+INSERT + fetch_by: none + condition: none - name: Web_Logins type: FACT operation: INSERT + fetch_by: run_date + condition: web - name: Promotion type: FACT operation: INSERT + fetch_by: mids + condition: mids - name: PaidVisibility type: FACT operation: INSERT + fetch_by: mids + condition: mids - name: Master_Salesterritorylayer type: DIMENSION - operation: INSERT \ No newline at end of file + operation: DELETE+INSERT + fetch_by: none + condition: none \ No newline at end of file diff --git a/test.py b/test.py index 8517bfb..7e0d556 100644 --- a/test.py +++ b/test.py @@ -1,47 +1,41 @@ - -from __future__ import annotations - -import os -# import pyarrow -import sys +import yaml from datetime import date, timedelta -import polars as pl -import yaml -from dotenv import load_dotenv - -from sqlalchemy import create_engine, text -from sqlalchemy.engine import Engine, URL - -import clickhouse_connect - -from log import log - - - -from clickhouse_task.create_table import * -from clickhouse_task.delete_task import * -from clickhouse_task.load_table import * -from db_con.connection import * -from mids import * -from masters.dimensions import * -from masters.bridge import * -from kpi.facts import * +mids_list=[1,3] +run_date=date.today() +conditions = { + "mids": f"MID IN ({','.join(map(str, mids_list))})", + "j_plan": ( + f"MONTH(VisitDate) = {run_date.month} " + f"AND YEAR(VisitDate) = {run_date.year}" + ), + "mapping": ( + f"CAST(Z.FromDate AS DATE) <= '{run_date}' " + f"AND CAST(Z.ToDate AS DATE) >= '{run_date}'" + ), + "web": ( + f"CAST(login_date AS DATE) = '{run_date}'" + ), + "none": None, +} with open("tables.yml", "r") as file: config = yaml.safe_load(file) - for table in config["tables"]: - - table_name=table["name"] - table_type=table["type"] - operation=table["operation"] - print(table_name) - print(table_type) - print(operation) \ No newline at end of file +for table in config["tables"]: + + table_name=table["name"] + table_type=table["type"] + operation=table["operation"] + condition=table["condition"] + c = conditions[condition] + print(table_name) + print(table_type) + print(operation) + print(c) \ No newline at end of file