16-06-2026 1st commit
This commit is contained in:
+164
-59
@@ -432,65 +432,170 @@ def fetch_Coverage(engine: Engine, mids: list[int]) -> pl.DataFrame:
|
|||||||
mid_list = ",".join(str(mid) for mid in mids)
|
mid_list = ",".join(str(mid) for mid in mids)
|
||||||
|
|
||||||
sql = f"""
|
sql = f"""
|
||||||
SELECT
|
WITH SOS_BASE AS
|
||||||
{p} AS project_id,
|
(
|
||||||
JP.MID,
|
SELECT
|
||||||
sm.StoreId AS store_id,
|
sm.CountryName,
|
||||||
JP.EmpId AS employee_id,
|
sc.MID,
|
||||||
CONVERT(date, JP.VisitDate) AS visit_date,
|
sm.RegionName,
|
||||||
JP.InTime AS in_time,
|
sm.StateName,
|
||||||
JP.OutTime AS out_time,
|
sm.CityName,
|
||||||
CASE
|
Em.SupervisorName,
|
||||||
WHEN JP.OutTime IS NULL OR JP.InTime IS NULL THEN NULL
|
em.EmpId,
|
||||||
WHEN JP.OutTime < JP.InTime THEN 0
|
Em.EmpName AS EmployeeName,
|
||||||
ELSE DATEDIFF(SECOND, JP.InTime, JP.OutTime) / 60
|
Em.DesignationName AS Designation,
|
||||||
END AS duration_minutes,
|
sm.StoreId,
|
||||||
CASE
|
CONVERT(varchar, sc.VisitDate, 101) AS VisitDate,
|
||||||
WHEN (
|
sm.StoreCode,
|
||||||
SELECT TOP 1 EmpId
|
sm.StoreName,
|
||||||
FROM OneApp_KelloggsMT.dbo.T_StoreCoverage SC
|
sm.Address,
|
||||||
WHERE SC.EmpId = JP.EmpId
|
sm.StoreTypeid,
|
||||||
AND SC.StoreId = JP.StoreId
|
sm.ChannelId,
|
||||||
AND SC.VisitDate = JP.VisitDate
|
sm.ChainName,
|
||||||
AND SC.ReasonId IN (0,1,3,9,10,19,20)
|
|
||||||
) > 0
|
MSD.SOSDefinitionName,
|
||||||
THEN 'Y' ELSE 'N'
|
|
||||||
END AS is_covered,
|
CASE
|
||||||
CASE JP.Deviation
|
WHEN ISNULL(TS.SOSHeaderTable,'')='Master_Category' THEN 'Category'
|
||||||
WHEN 0 THEN 'Planned'
|
WHEN ISNULL(TS.SOSHeaderTable,'')='Master_SubCategory' THEN 'SubCategory'
|
||||||
WHEN 1 THEN 'Adhoc'
|
WHEN ISNULL(TS.SOSHeaderTable,'')='Master_Brand' THEN 'Brand'
|
||||||
WHEN 2 THEN 'Beat Plan'
|
WHEN ISNULL(TS.SOSHeaderTable,'')='Master_SubBrand' THEN 'SubBrand'
|
||||||
WHEN 3 THEN 'Non Merchandised'
|
END AS SOSHeaderDeatils,
|
||||||
WHEN 4 THEN 'Add New Store'
|
|
||||||
WHEN 5 THEN 'Non Program'
|
TS.SOSHeaderName,
|
||||||
ELSE ''
|
TS.SOSHeaderValue AS SOSHeaderID,
|
||||||
END AS coverage_type,
|
|
||||||
sm.StoreTypeId AS storetype_id,
|
'Header_Image' AS HDR1,
|
||||||
Em.SupervisorId AS supervisor_id,
|
|
||||||
ISNULL(JP.ReasonId, 0) AS reason_id,
|
CASE
|
||||||
sm.CameraAllow AS camera_allow,
|
WHEN ISNULL(TSC.SOSChildTable,'')='Master_Category' THEN 'Category'
|
||||||
CAST(
|
WHEN ISNULL(TSC.SOSChildTable,'')='Master_SubCategory' THEN 'SubCategory'
|
||||||
CASE
|
WHEN ISNULL(TSC.SOSChildTable,'')='Master_Brand' THEN 'Brand'
|
||||||
WHEN sm.Latitude IS NULL OR sm.Latitude = 0 THEN 0
|
WHEN ISNULL(TSC.SOSChildTable,'')='Master_SubBrand' THEN 'SubBrand'
|
||||||
WHEN sm.Longitude IS NULL OR sm.Longitude = 0 THEN 0
|
END AS SOSChildDeatils,
|
||||||
WHEN JP.Latitude IS NULL OR JP.Latitude = 0 THEN 0
|
|
||||||
WHEN JP.Longitude IS NULL OR JP.Longitude = 0 THEN 0
|
TSC.SOSChildName,
|
||||||
ELSE SQRT(
|
TSC.SOSChildValue AS SOSChildID,
|
||||||
POWER(69.1 * (JP.Latitude - sm.Latitude), 2) +
|
TSC.ChildTotalFacing,
|
||||||
POWER(69.1 * (sm.Longitude - JP.Longitude)
|
TS.SOSHeaderFacing,
|
||||||
* COS(JP.Latitude / 57.3), 2)
|
TSC.ChildSelfFacing,
|
||||||
) * 1000
|
|
||||||
END AS FLOAT
|
(
|
||||||
) AS distance_meters,
|
SELECT TOP 1 SOSTarget
|
||||||
GETDATE() AS update_date,
|
FROM OneApp_KelloggsMT.dbo.Mapping_StoreShareOfShelfTarget a
|
||||||
'ETL-SQLAlchemy' AS update_by
|
WHERE a.SOSDefinitionId = MSD.SOSDefinitionId
|
||||||
FROM OneApp_KelloggsMT.dbo.T_StoreCoverage JP WITH (NOLOCK)
|
AND a.StoreId = sm.StoreId
|
||||||
INNER JOIN OneApp_KelloggsMT.dbo.vw_StoreDetail sm
|
AND a.FromDate <= sc.VisitDate
|
||||||
ON JP.StoreId = sm.StoreId
|
AND a.ToDate >= sc.VisitDate
|
||||||
INNER JOIN OneApp_KelloggsMT.dbo.vw_Employee_Detail Em
|
) AS SOSTarget,
|
||||||
ON JP.EmpId = Em.EmpId
|
|
||||||
WHERE JP.MID IN ({mid_list})
|
CASE
|
||||||
AND Em.UserName NOT LIKE 'test%'
|
WHEN ISNULL(SHI.SOSHeaderImage,'') = ''
|
||||||
|
THEN ''
|
||||||
|
ELSE
|
||||||
|
'https://kimt1.parinaam.in/Upload/SOSImages/'
|
||||||
|
+ SHI.SOSHeaderImage
|
||||||
|
END AS SOSHeaderImg
|
||||||
|
|
||||||
|
FROM OneApp_KelloggsMT.dbo.T_ShareOfShelfHeader ts
|
||||||
|
|
||||||
|
INNER JOIN OneApp_KelloggsMT.dbo.T_StoreCoverage sc
|
||||||
|
ON ts.MID = sc.MID
|
||||||
|
|
||||||
|
INNER JOIN OneApp_KelloggsMT.dbo.vw_StoreDetail sm
|
||||||
|
ON sc.StoreId = sm.StoreId
|
||||||
|
|
||||||
|
INNER JOIN OneApp_KelloggsMT.dbo.vw_Employee_Detail Em
|
||||||
|
ON sc.EmpId = Em.EmpId
|
||||||
|
|
||||||
|
INNER JOIN OneApp_KelloggsMT.dbo.T_ShareOfShelfChild tsc
|
||||||
|
ON ts.SOSId = tsc.SOSId
|
||||||
|
|
||||||
|
INNER JOIN OneApp_KelloggsMT.dbo.Master_ShareOfShelfDefinition msd
|
||||||
|
ON msd.SOSDefinitionId = tsc.SOSDefinitionId
|
||||||
|
|
||||||
|
LEFT JOIN OneApp_KelloggsMT.dbo.T_ShareOfShelfHeaderImages SHI
|
||||||
|
ON ts.SOSId = SHI.SOSId
|
||||||
|
AND ts.SOSHeaderValue = SHI.SOSHeaderValue
|
||||||
|
|
||||||
|
LEFT JOIN OneApp_KelloggsMT.dbo.T_ShareOfShelfChildImages SCI
|
||||||
|
ON tsc.SOSId = SCI.SOSId
|
||||||
|
AND tsc.SOSChildValue = SCI.SOSChildValue
|
||||||
|
|
||||||
|
WHERE Em.EmpName NOT LIKE 'test%'
|
||||||
|
AND sc.MID IN ({mid_list})
|
||||||
|
),
|
||||||
|
|
||||||
|
SOS_PIVOT AS
|
||||||
|
(
|
||||||
|
SELECT *
|
||||||
|
FROM SOS_BASE
|
||||||
|
PIVOT
|
||||||
|
(
|
||||||
|
MIN(SOSHeaderImg)
|
||||||
|
FOR HDR1 IN ([Header_Image])
|
||||||
|
) pvt
|
||||||
|
)
|
||||||
|
|
||||||
|
SELECT
|
||||||
|
'40148' AS ProjectId,
|
||||||
|
MID,
|
||||||
|
EmpId AS employee_id,
|
||||||
|
StoreId AS store_id,
|
||||||
|
VisitDate AS visit_date,
|
||||||
|
StoreTypeid AS storetype_id,
|
||||||
|
ChannelId AS channel_id,
|
||||||
|
SOSDefinitionName,
|
||||||
|
SOSHeaderDeatils,
|
||||||
|
SOSHeaderName,
|
||||||
|
SOSHeaderID,
|
||||||
|
SOSChildDeatils,
|
||||||
|
SOSChildName,
|
||||||
|
SOSChildID,
|
||||||
|
SOSHeaderFacing,
|
||||||
|
ChildTotalFacing,
|
||||||
|
ChildSelfFacing,
|
||||||
|
SOSTarget,
|
||||||
|
GETDATE() AS update_date,
|
||||||
|
'SP-Pius' AS update_by
|
||||||
|
|
||||||
|
FROM SOS_PIVOT
|
||||||
|
|
||||||
|
GROUP BY
|
||||||
|
CountryName,
|
||||||
|
MID,
|
||||||
|
RegionName,
|
||||||
|
StateName,
|
||||||
|
CityName,
|
||||||
|
SupervisorName,
|
||||||
|
EmpId,
|
||||||
|
EmployeeName,
|
||||||
|
Designation,
|
||||||
|
StoreId,
|
||||||
|
VisitDate,
|
||||||
|
StoreCode,
|
||||||
|
StoreName,
|
||||||
|
Address,
|
||||||
|
StoreTypeid,
|
||||||
|
ChannelId,
|
||||||
|
ChainName,
|
||||||
|
SOSDefinitionName,
|
||||||
|
SOSHeaderDeatils,
|
||||||
|
SOSHeaderName,
|
||||||
|
SOSHeaderID,
|
||||||
|
SOSChildDeatils,
|
||||||
|
SOSChildName,
|
||||||
|
SOSChildID,
|
||||||
|
ChildTotalFacing,
|
||||||
|
SOSHeaderFacing,
|
||||||
|
ChildSelfFacing,
|
||||||
|
SOSTarget
|
||||||
|
|
||||||
|
ORDER BY
|
||||||
|
RegionName,
|
||||||
|
StateName,
|
||||||
|
CityName,
|
||||||
|
VisitDate;
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,164 @@
|
|||||||
|
WITH SOS_BASE AS
|
||||||
|
(
|
||||||
|
SELECT
|
||||||
|
sm.CountryName,
|
||||||
|
sc.MID,
|
||||||
|
sm.RegionName,
|
||||||
|
sm.StateName,
|
||||||
|
sm.CityName,
|
||||||
|
Em.SupervisorName,
|
||||||
|
em.EmpId,
|
||||||
|
Em.EmpName AS EmployeeName,
|
||||||
|
Em.DesignationName AS Designation,
|
||||||
|
sm.StoreId,
|
||||||
|
CONVERT(varchar, sc.VisitDate, 101) AS VisitDate,
|
||||||
|
sm.StoreCode,
|
||||||
|
sm.StoreName,
|
||||||
|
sm.Address,
|
||||||
|
sm.StoreTypeid,
|
||||||
|
sm.ChannelId,
|
||||||
|
sm.ChainName,
|
||||||
|
|
||||||
|
MSD.SOSDefinitionName,
|
||||||
|
|
||||||
|
CASE
|
||||||
|
WHEN ISNULL(TS.SOSHeaderTable,'')='Master_Category' THEN 'Category'
|
||||||
|
WHEN ISNULL(TS.SOSHeaderTable,'')='Master_SubCategory' THEN 'SubCategory'
|
||||||
|
WHEN ISNULL(TS.SOSHeaderTable,'')='Master_Brand' THEN 'Brand'
|
||||||
|
WHEN ISNULL(TS.SOSHeaderTable,'')='Master_SubBrand' THEN 'SubBrand'
|
||||||
|
END AS SOSHeaderDeatils,
|
||||||
|
|
||||||
|
TS.SOSHeaderName,
|
||||||
|
TS.SOSHeaderValue AS SOSHeaderID,
|
||||||
|
|
||||||
|
'Header_Image' AS HDR1,
|
||||||
|
|
||||||
|
CASE
|
||||||
|
WHEN ISNULL(TSC.SOSChildTable,'')='Master_Category' THEN 'Category'
|
||||||
|
WHEN ISNULL(TSC.SOSChildTable,'')='Master_SubCategory' THEN 'SubCategory'
|
||||||
|
WHEN ISNULL(TSC.SOSChildTable,'')='Master_Brand' THEN 'Brand'
|
||||||
|
WHEN ISNULL(TSC.SOSChildTable,'')='Master_SubBrand' THEN 'SubBrand'
|
||||||
|
END AS SOSChildDeatils,
|
||||||
|
|
||||||
|
TSC.SOSChildName,
|
||||||
|
TSC.SOSChildValue AS SOSChildID,
|
||||||
|
TSC.ChildTotalFacing,
|
||||||
|
TS.SOSHeaderFacing,
|
||||||
|
TSC.ChildSelfFacing,
|
||||||
|
|
||||||
|
(
|
||||||
|
SELECT TOP 1 SOSTarget
|
||||||
|
FROM OneApp_KelloggsMT.dbo.Mapping_StoreShareOfShelfTarget a
|
||||||
|
WHERE a.SOSDefinitionId = MSD.SOSDefinitionId
|
||||||
|
AND a.StoreId = sm.StoreId
|
||||||
|
AND a.FromDate <= sc.VisitDate
|
||||||
|
AND a.ToDate >= sc.VisitDate
|
||||||
|
) AS SOSTarget,
|
||||||
|
|
||||||
|
CASE
|
||||||
|
WHEN ISNULL(SHI.SOSHeaderImage,'') = ''
|
||||||
|
THEN ''
|
||||||
|
ELSE
|
||||||
|
'https://kimt1.parinaam.in/Upload/SOSImages/'
|
||||||
|
+ SHI.SOSHeaderImage
|
||||||
|
END AS SOSHeaderImg
|
||||||
|
|
||||||
|
FROM OneApp_KelloggsMT.dbo.T_ShareOfShelfHeader ts
|
||||||
|
|
||||||
|
INNER JOIN OneApp_KelloggsMT.dbo.T_StoreCoverage sc
|
||||||
|
ON ts.MID = sc.MID
|
||||||
|
|
||||||
|
INNER JOIN OneApp_KelloggsMT.dbo.vw_StoreDetail sm
|
||||||
|
ON sc.StoreId = sm.StoreId
|
||||||
|
|
||||||
|
INNER JOIN OneApp_KelloggsMT.dbo.vw_Employee_Detail Em
|
||||||
|
ON sc.EmpId = Em.EmpId
|
||||||
|
|
||||||
|
INNER JOIN OneApp_KelloggsMT.dbo.T_ShareOfShelfChild tsc
|
||||||
|
ON ts.SOSId = tsc.SOSId
|
||||||
|
|
||||||
|
INNER JOIN OneApp_KelloggsMT.dbo.Master_ShareOfShelfDefinition msd
|
||||||
|
ON msd.SOSDefinitionId = tsc.SOSDefinitionId
|
||||||
|
|
||||||
|
LEFT JOIN OneApp_KelloggsMT.dbo.T_ShareOfShelfHeaderImages SHI
|
||||||
|
ON ts.SOSId = SHI.SOSId
|
||||||
|
AND ts.SOSHeaderValue = SHI.SOSHeaderValue
|
||||||
|
|
||||||
|
LEFT JOIN OneApp_KelloggsMT.dbo.T_ShareOfShelfChildImages SCI
|
||||||
|
ON tsc.SOSId = SCI.SOSId
|
||||||
|
AND tsc.SOSChildValue = SCI.SOSChildValue
|
||||||
|
|
||||||
|
WHERE Em.EmpName NOT LIKE 'test%'
|
||||||
|
AND sc.MID IN ({mid_list})
|
||||||
|
),
|
||||||
|
|
||||||
|
SOS_PIVOT AS
|
||||||
|
(
|
||||||
|
SELECT *
|
||||||
|
FROM SOS_BASE
|
||||||
|
PIVOT
|
||||||
|
(
|
||||||
|
MIN(SOSHeaderImg)
|
||||||
|
FOR HDR1 IN ([Header_Image])
|
||||||
|
) pvt
|
||||||
|
)
|
||||||
|
|
||||||
|
SELECT
|
||||||
|
'40148' AS ProjectId,
|
||||||
|
MID,
|
||||||
|
EmpId AS employee_id,
|
||||||
|
StoreId AS store_id,
|
||||||
|
VisitDate AS visit_date,
|
||||||
|
StoreTypeid AS storetype_id,
|
||||||
|
ChannelId AS channel_id,
|
||||||
|
SOSDefinitionName,
|
||||||
|
SOSHeaderDeatils,
|
||||||
|
SOSHeaderName,
|
||||||
|
SOSHeaderID,
|
||||||
|
SOSChildDeatils,
|
||||||
|
SOSChildName,
|
||||||
|
SOSChildID,
|
||||||
|
SOSHeaderFacing,
|
||||||
|
ChildTotalFacing,
|
||||||
|
ChildSelfFacing,
|
||||||
|
SOSTarget,
|
||||||
|
GETDATE() AS update_date,
|
||||||
|
'SP-Pius' AS update_by
|
||||||
|
|
||||||
|
FROM SOS_PIVOT
|
||||||
|
|
||||||
|
GROUP BY
|
||||||
|
CountryName,
|
||||||
|
MID,
|
||||||
|
RegionName,
|
||||||
|
StateName,
|
||||||
|
CityName,
|
||||||
|
SupervisorName,
|
||||||
|
EmpId,
|
||||||
|
EmployeeName,
|
||||||
|
Designation,
|
||||||
|
StoreId,
|
||||||
|
VisitDate,
|
||||||
|
StoreCode,
|
||||||
|
StoreName,
|
||||||
|
Address,
|
||||||
|
StoreTypeid,
|
||||||
|
ChannelId,
|
||||||
|
ChainName,
|
||||||
|
SOSDefinitionName,
|
||||||
|
SOSHeaderDeatils,
|
||||||
|
SOSHeaderName,
|
||||||
|
SOSHeaderID,
|
||||||
|
SOSChildDeatils,
|
||||||
|
SOSChildName,
|
||||||
|
SOSChildID,
|
||||||
|
ChildTotalFacing,
|
||||||
|
SOSHeaderFacing,
|
||||||
|
ChildSelfFacing,
|
||||||
|
SOSTarget
|
||||||
|
|
||||||
|
ORDER BY
|
||||||
|
RegionName,
|
||||||
|
StateName,
|
||||||
|
CityName,
|
||||||
|
VisitDate;
|
||||||
@@ -159,6 +159,7 @@ def main():
|
|||||||
table_name = table["name"]
|
table_name = table["name"]
|
||||||
operation = table["operation"]
|
operation = table["operation"]
|
||||||
fetch_by = table["fetch_by"]
|
fetch_by = table["fetch_by"]
|
||||||
|
table_type=table["type"]
|
||||||
|
|
||||||
log.info("=" * 80)
|
log.info("=" * 80)
|
||||||
log.info(
|
log.info(
|
||||||
|
|||||||
@@ -0,0 +1,295 @@
|
|||||||
|
# /// script
|
||||||
|
# requires-python = ">=3.11"
|
||||||
|
# dependencies = [
|
||||||
|
# "polars>=0.20.0",
|
||||||
|
# "pyarrow>=18.0.0",
|
||||||
|
# "sqlalchemy>=2.0.0",
|
||||||
|
# "pyodbc>=5.0.0",
|
||||||
|
# "clickhouse-connect>=0.7.0",
|
||||||
|
# "clickhouse-sqlalchemy>=0.3.2",
|
||||||
|
# "pyyaml>=6.0.3",
|
||||||
|
# "python-dotenv>=1.0.0",
|
||||||
|
# ]
|
||||||
|
# ///
|
||||||
|
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import sys
|
||||||
|
from datetime import date, datetime, timedelta
|
||||||
|
|
||||||
|
import polars as pl
|
||||||
|
import yaml
|
||||||
|
|
||||||
|
|
||||||
|
from log import log
|
||||||
|
|
||||||
|
from clickhouse_task.create_table import create_clickhouse_table
|
||||||
|
from clickhouse_task.delete_task import (
|
||||||
|
delete_existing_data,
|
||||||
|
truncate_table,
|
||||||
|
)
|
||||||
|
|
||||||
|
from clickhouse_task.load_table import load_to_clickhouse
|
||||||
|
|
||||||
|
from db_con.connection import (
|
||||||
|
build_sql_server_engine,
|
||||||
|
build_clickhouse_engine,
|
||||||
|
get_clickhouse_client,
|
||||||
|
)
|
||||||
|
|
||||||
|
from mids import (
|
||||||
|
MID_TABLE_COV,
|
||||||
|
MID_TABLE_COV1,
|
||||||
|
)
|
||||||
|
|
||||||
|
from masters.dimensions import *
|
||||||
|
from masters.bridge import *
|
||||||
|
from kpi.facts import *
|
||||||
|
|
||||||
|
|
||||||
|
# ==========================================================
|
||||||
|
# Helpers
|
||||||
|
# ==========================================================
|
||||||
|
|
||||||
|
def table_exists(
|
||||||
|
client,
|
||||||
|
table_name: str,
|
||||||
|
) -> bool:
|
||||||
|
|
||||||
|
return bool(
|
||||||
|
client.command(
|
||||||
|
f"EXISTS TABLE {table_name}"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def get_dataframe(
|
||||||
|
fn_name: str,
|
||||||
|
fetch_by: str,
|
||||||
|
sql_engine,
|
||||||
|
mids,
|
||||||
|
run_date,
|
||||||
|
):
|
||||||
|
|
||||||
|
fn = globals()[fn_name]
|
||||||
|
|
||||||
|
if fetch_by == "mids":
|
||||||
|
return fn(sql_engine, mids)
|
||||||
|
|
||||||
|
if fetch_by == "run_date":
|
||||||
|
return fn(sql_engine, run_date)
|
||||||
|
|
||||||
|
return fn(sql_engine)
|
||||||
|
|
||||||
|
def get_employee_ids(
|
||||||
|
client,
|
||||||
|
clickhouse_engine: Engine,
|
||||||
|
table_name: str = "OQaD",
|
||||||
|
) -> list[int]:
|
||||||
|
|
||||||
|
if not table_exists(client, table_name):
|
||||||
|
log.warning(f"Table '{table_name}' does not exist.")
|
||||||
|
return []
|
||||||
|
|
||||||
|
query = f"""
|
||||||
|
SELECT DISTINCT employee_id
|
||||||
|
FROM {table_name}
|
||||||
|
"""
|
||||||
|
|
||||||
|
return (
|
||||||
|
pl.read_database(query, clickhouse_engine)
|
||||||
|
.get_column("employee_id")
|
||||||
|
.to_list()
|
||||||
|
)
|
||||||
|
# ==========================================================
|
||||||
|
# Main
|
||||||
|
# ==========================================================
|
||||||
|
|
||||||
|
def main():
|
||||||
|
|
||||||
|
log.info("=" * 80)
|
||||||
|
log.info("Hello from data-move Python data pipeline!")
|
||||||
|
|
||||||
|
# ------------------------------------------------------
|
||||||
|
# Run Date
|
||||||
|
# ------------------------------------------------------
|
||||||
|
|
||||||
|
if len(sys.argv) > 1:
|
||||||
|
run_date = datetime.strptime(
|
||||||
|
sys.argv[1],
|
||||||
|
"%Y-%m-%d",
|
||||||
|
).date()
|
||||||
|
else:
|
||||||
|
run_date = date.today() - timedelta(days=1)
|
||||||
|
|
||||||
|
log.info(
|
||||||
|
"Pipeline Run Date: %s",
|
||||||
|
run_date,
|
||||||
|
)
|
||||||
|
|
||||||
|
# ------------------------------------------------------
|
||||||
|
# Connections
|
||||||
|
# ------------------------------------------------------
|
||||||
|
|
||||||
|
log.info(
|
||||||
|
"Connecting to databases..."
|
||||||
|
)
|
||||||
|
|
||||||
|
sql_engine = build_sql_server_engine()
|
||||||
|
clickhouse_engine = build_clickhouse_engine()
|
||||||
|
client = get_clickhouse_client()
|
||||||
|
|
||||||
|
log.info(
|
||||||
|
"Database connections established"
|
||||||
|
)
|
||||||
|
|
||||||
|
# ------------------------------------------------------
|
||||||
|
# mids Keys
|
||||||
|
# ------------------------------------------------------
|
||||||
|
|
||||||
|
mids = MID_TABLE_COV(
|
||||||
|
sql_engine,
|
||||||
|
run_date,
|
||||||
|
)
|
||||||
|
|
||||||
|
emp_visit_df = MID_TABLE_COV1(
|
||||||
|
sql_engine,
|
||||||
|
run_date,
|
||||||
|
)
|
||||||
|
|
||||||
|
# ------------------------------------------------------
|
||||||
|
# Config
|
||||||
|
# ------------------------------------------------------
|
||||||
|
|
||||||
|
with open(
|
||||||
|
"t.yml",
|
||||||
|
"r",
|
||||||
|
) as file:
|
||||||
|
|
||||||
|
config = yaml.safe_load(file)
|
||||||
|
|
||||||
|
# ------------------------------------------------------
|
||||||
|
# Process Tables
|
||||||
|
# ------------------------------------------------------
|
||||||
|
|
||||||
|
for table in config["tables"]:
|
||||||
|
|
||||||
|
table_name = table["name"]
|
||||||
|
operation = table["operation"]
|
||||||
|
fetch_by = table["fetch_by"]
|
||||||
|
table_type=table["type"]
|
||||||
|
|
||||||
|
log.info("=" * 80)
|
||||||
|
log.info(
|
||||||
|
"Processing Table: %s",
|
||||||
|
table_name,
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
|
||||||
|
# ------------------------------------------
|
||||||
|
# Fetch Data
|
||||||
|
# ------------------------------------------
|
||||||
|
|
||||||
|
fn_name = f"fetch_{table_name}"
|
||||||
|
|
||||||
|
df = get_dataframe(
|
||||||
|
fn_name=fn_name,
|
||||||
|
fetch_by=fetch_by,
|
||||||
|
sql_engine=sql_engine,
|
||||||
|
mids=mids,
|
||||||
|
run_date=run_date,
|
||||||
|
)
|
||||||
|
|
||||||
|
if df.is_empty():
|
||||||
|
|
||||||
|
log.warning(
|
||||||
|
"%s returned no rows",
|
||||||
|
table_name,
|
||||||
|
)
|
||||||
|
|
||||||
|
continue
|
||||||
|
|
||||||
|
log.info(
|
||||||
|
"Fetched %s rows",
|
||||||
|
len(df),
|
||||||
|
)
|
||||||
|
|
||||||
|
# ------------------------------------------
|
||||||
|
# Create Table If Missing
|
||||||
|
# ------------------------------------------
|
||||||
|
|
||||||
|
exists = table_exists(
|
||||||
|
client,
|
||||||
|
table_name,
|
||||||
|
)
|
||||||
|
|
||||||
|
if not exists:
|
||||||
|
|
||||||
|
log.info(
|
||||||
|
"Creating table %s",
|
||||||
|
table_name,
|
||||||
|
)
|
||||||
|
|
||||||
|
create_clickhouse_table(
|
||||||
|
df=df,
|
||||||
|
table_name=table_name,
|
||||||
|
clickhouse_engine=clickhouse_engine,
|
||||||
|
)
|
||||||
|
|
||||||
|
# ------------------------------------------
|
||||||
|
# Existing Table Logic
|
||||||
|
# ------------------------------------------
|
||||||
|
|
||||||
|
else:
|
||||||
|
|
||||||
|
if operation == "DELETE+INSERT":
|
||||||
|
|
||||||
|
truncate_table(
|
||||||
|
client,
|
||||||
|
table_name,
|
||||||
|
)
|
||||||
|
|
||||||
|
else:
|
||||||
|
|
||||||
|
delete_existing_data(
|
||||||
|
client=client,
|
||||||
|
table_name=table_name,
|
||||||
|
run_date=run_date,
|
||||||
|
mids=mids,
|
||||||
|
emp_visit_df=emp_visit_df,
|
||||||
|
)
|
||||||
|
|
||||||
|
# ------------------------------------------
|
||||||
|
# Load Data
|
||||||
|
# ------------------------------------------
|
||||||
|
|
||||||
|
load_to_clickhouse(
|
||||||
|
client=client,
|
||||||
|
table_name=table_name,
|
||||||
|
df=df,
|
||||||
|
)
|
||||||
|
|
||||||
|
log.info(
|
||||||
|
"%s loaded successfully (%s rows)",
|
||||||
|
table_name,
|
||||||
|
len(df),
|
||||||
|
)
|
||||||
|
|
||||||
|
except Exception:
|
||||||
|
|
||||||
|
log.exception(
|
||||||
|
"Failed processing table %s",
|
||||||
|
table_name,
|
||||||
|
)
|
||||||
|
|
||||||
|
raise
|
||||||
|
|
||||||
|
log.info("=" * 80)
|
||||||
|
log.info("Pipeline Completed Successfully")
|
||||||
|
log.info("=" * 80)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
+41
@@ -0,0 +1,41 @@
|
|||||||
|
from pathlib import Path
|
||||||
|
import polars as pl
|
||||||
|
from sqlalchemy import Engine
|
||||||
|
from datetime import date
|
||||||
|
from log import log
|
||||||
|
|
||||||
|
|
||||||
|
def fetch_data(
|
||||||
|
engine: Engine,
|
||||||
|
table_name: str,
|
||||||
|
table_type: str,
|
||||||
|
mids: list[int],
|
||||||
|
run_date: date
|
||||||
|
) -> pl.DataFrame:
|
||||||
|
|
||||||
|
if not mids:
|
||||||
|
log.warning("No MIDs — nothing to fetch.")
|
||||||
|
return pl.DataFrame()
|
||||||
|
|
||||||
|
mid_list = ",".join(str(mid) for mid in mids)
|
||||||
|
|
||||||
|
sql_file = Path("sql") / table_type / f"{table_name}.sql"
|
||||||
|
|
||||||
|
with open(sql_file, "r", encoding="utf-8") as f:
|
||||||
|
sql_template = f.read()
|
||||||
|
|
||||||
|
sql = sql_template.format(
|
||||||
|
mid_list=mid_list,
|
||||||
|
run_date=run_date.strftime("%Y-%m-%d")
|
||||||
|
)
|
||||||
|
|
||||||
|
log.info(f"Fetching data for {len(mids):,} MIDs")
|
||||||
|
|
||||||
|
df = pl.read_database(
|
||||||
|
query=sql,
|
||||||
|
connection=engine
|
||||||
|
)
|
||||||
|
|
||||||
|
log.info(f"Fetched {len(df):,} rows from SQL Server")
|
||||||
|
|
||||||
|
return df
|
||||||
@@ -0,0 +1,164 @@
|
|||||||
|
WITH SOS_BASE AS
|
||||||
|
(
|
||||||
|
SELECT
|
||||||
|
sm.CountryName,
|
||||||
|
sc.MID,
|
||||||
|
sm.RegionName,
|
||||||
|
sm.StateName,
|
||||||
|
sm.CityName,
|
||||||
|
Em.SupervisorName,
|
||||||
|
em.EmpId,
|
||||||
|
Em.EmpName AS EmployeeName,
|
||||||
|
Em.DesignationName AS Designation,
|
||||||
|
sm.StoreId,
|
||||||
|
CONVERT(varchar, sc.VisitDate, 101) AS VisitDate,
|
||||||
|
sm.StoreCode,
|
||||||
|
sm.StoreName,
|
||||||
|
sm.Address,
|
||||||
|
sm.StoreTypeid,
|
||||||
|
sm.ChannelId,
|
||||||
|
sm.ChainName,
|
||||||
|
|
||||||
|
MSD.SOSDefinitionName,
|
||||||
|
|
||||||
|
CASE
|
||||||
|
WHEN ISNULL(TS.SOSHeaderTable,'')='Master_Category' THEN 'Category'
|
||||||
|
WHEN ISNULL(TS.SOSHeaderTable,'')='Master_SubCategory' THEN 'SubCategory'
|
||||||
|
WHEN ISNULL(TS.SOSHeaderTable,'')='Master_Brand' THEN 'Brand'
|
||||||
|
WHEN ISNULL(TS.SOSHeaderTable,'')='Master_SubBrand' THEN 'SubBrand'
|
||||||
|
END AS SOSHeaderDeatils,
|
||||||
|
|
||||||
|
TS.SOSHeaderName,
|
||||||
|
TS.SOSHeaderValue AS SOSHeaderID,
|
||||||
|
|
||||||
|
'Header_Image' AS HDR1,
|
||||||
|
|
||||||
|
CASE
|
||||||
|
WHEN ISNULL(TSC.SOSChildTable,'')='Master_Category' THEN 'Category'
|
||||||
|
WHEN ISNULL(TSC.SOSChildTable,'')='Master_SubCategory' THEN 'SubCategory'
|
||||||
|
WHEN ISNULL(TSC.SOSChildTable,'')='Master_Brand' THEN 'Brand'
|
||||||
|
WHEN ISNULL(TSC.SOSChildTable,'')='Master_SubBrand' THEN 'SubBrand'
|
||||||
|
END AS SOSChildDeatils,
|
||||||
|
|
||||||
|
TSC.SOSChildName,
|
||||||
|
TSC.SOSChildValue AS SOSChildID,
|
||||||
|
TSC.ChildTotalFacing,
|
||||||
|
TS.SOSHeaderFacing,
|
||||||
|
TSC.ChildSelfFacing,
|
||||||
|
|
||||||
|
(
|
||||||
|
SELECT TOP 1 SOSTarget
|
||||||
|
FROM OneApp_KelloggsMT.dbo.Mapping_StoreShareOfShelfTarget a
|
||||||
|
WHERE a.SOSDefinitionId = MSD.SOSDefinitionId
|
||||||
|
AND a.StoreId = sm.StoreId
|
||||||
|
AND a.FromDate <= sc.VisitDate
|
||||||
|
AND a.ToDate >= sc.VisitDate
|
||||||
|
) AS SOSTarget,
|
||||||
|
|
||||||
|
CASE
|
||||||
|
WHEN ISNULL(SHI.SOSHeaderImage,'') = ''
|
||||||
|
THEN ''
|
||||||
|
ELSE
|
||||||
|
'https://kimt1.parinaam.in/Upload/SOSImages/'
|
||||||
|
+ SHI.SOSHeaderImage
|
||||||
|
END AS SOSHeaderImg
|
||||||
|
|
||||||
|
FROM OneApp_KelloggsMT.dbo.T_ShareOfShelfHeader ts
|
||||||
|
|
||||||
|
INNER JOIN OneApp_KelloggsMT.dbo.T_StoreCoverage sc
|
||||||
|
ON ts.MID = sc.MID
|
||||||
|
|
||||||
|
INNER JOIN OneApp_KelloggsMT.dbo.vw_StoreDetail sm
|
||||||
|
ON sc.StoreId = sm.StoreId
|
||||||
|
|
||||||
|
INNER JOIN OneApp_KelloggsMT.dbo.vw_Employee_Detail Em
|
||||||
|
ON sc.EmpId = Em.EmpId
|
||||||
|
|
||||||
|
INNER JOIN OneApp_KelloggsMT.dbo.T_ShareOfShelfChild tsc
|
||||||
|
ON ts.SOSId = tsc.SOSId
|
||||||
|
|
||||||
|
INNER JOIN OneApp_KelloggsMT.dbo.Master_ShareOfShelfDefinition msd
|
||||||
|
ON msd.SOSDefinitionId = tsc.SOSDefinitionId
|
||||||
|
|
||||||
|
LEFT JOIN OneApp_KelloggsMT.dbo.T_ShareOfShelfHeaderImages SHI
|
||||||
|
ON ts.SOSId = SHI.SOSId
|
||||||
|
AND ts.SOSHeaderValue = SHI.SOSHeaderValue
|
||||||
|
|
||||||
|
LEFT JOIN OneApp_KelloggsMT.dbo.T_ShareOfShelfChildImages SCI
|
||||||
|
ON tsc.SOSId = SCI.SOSId
|
||||||
|
AND tsc.SOSChildValue = SCI.SOSChildValue
|
||||||
|
|
||||||
|
WHERE Em.EmpName NOT LIKE 'test%'
|
||||||
|
AND sc.MID IN ({mid_list})
|
||||||
|
),
|
||||||
|
|
||||||
|
SOS_PIVOT AS
|
||||||
|
(
|
||||||
|
SELECT *
|
||||||
|
FROM SOS_BASE
|
||||||
|
PIVOT
|
||||||
|
(
|
||||||
|
MIN(SOSHeaderImg)
|
||||||
|
FOR HDR1 IN ([Header_Image])
|
||||||
|
) pvt
|
||||||
|
)
|
||||||
|
|
||||||
|
SELECT
|
||||||
|
'40148' AS ProjectId,
|
||||||
|
MID,
|
||||||
|
EmpId AS employee_id,
|
||||||
|
StoreId AS store_id,
|
||||||
|
VisitDate AS visit_date,
|
||||||
|
StoreTypeid AS storetype_id,
|
||||||
|
ChannelId AS channel_id,
|
||||||
|
SOSDefinitionName,
|
||||||
|
SOSHeaderDeatils,
|
||||||
|
SOSHeaderName,
|
||||||
|
SOSHeaderID,
|
||||||
|
SOSChildDeatils,
|
||||||
|
SOSChildName,
|
||||||
|
SOSChildID,
|
||||||
|
SOSHeaderFacing,
|
||||||
|
ChildTotalFacing,
|
||||||
|
ChildSelfFacing,
|
||||||
|
SOSTarget,
|
||||||
|
GETDATE() AS update_date,
|
||||||
|
'SP-Pius' AS update_by
|
||||||
|
|
||||||
|
FROM SOS_PIVOT
|
||||||
|
|
||||||
|
GROUP BY
|
||||||
|
CountryName,
|
||||||
|
MID,
|
||||||
|
RegionName,
|
||||||
|
StateName,
|
||||||
|
CityName,
|
||||||
|
SupervisorName,
|
||||||
|
EmpId,
|
||||||
|
EmployeeName,
|
||||||
|
Designation,
|
||||||
|
StoreId,
|
||||||
|
VisitDate,
|
||||||
|
StoreCode,
|
||||||
|
StoreName,
|
||||||
|
Address,
|
||||||
|
StoreTypeid,
|
||||||
|
ChannelId,
|
||||||
|
ChainName,
|
||||||
|
SOSDefinitionName,
|
||||||
|
SOSHeaderDeatils,
|
||||||
|
SOSHeaderName,
|
||||||
|
SOSHeaderID,
|
||||||
|
SOSChildDeatils,
|
||||||
|
SOSChildName,
|
||||||
|
SOSChildID,
|
||||||
|
ChildTotalFacing,
|
||||||
|
SOSHeaderFacing,
|
||||||
|
ChildSelfFacing,
|
||||||
|
SOSTarget
|
||||||
|
|
||||||
|
ORDER BY
|
||||||
|
RegionName,
|
||||||
|
StateName,
|
||||||
|
CityName,
|
||||||
|
VisitDate;
|
||||||
@@ -39,6 +39,26 @@ tables:
|
|||||||
operation: DELETE+INSERT
|
operation: DELETE+INSERT
|
||||||
fetch_by: run_date
|
fetch_by: run_date
|
||||||
|
|
||||||
|
- name: Journey_Plan
|
||||||
|
type: FACT
|
||||||
|
operation: INSERT
|
||||||
|
fetch_by: run_date
|
||||||
|
|
||||||
|
- name: Web_Logins
|
||||||
|
type: FACT
|
||||||
|
operation: INSERT
|
||||||
|
fetch_by: run_date
|
||||||
|
|
||||||
|
- name: Promotion
|
||||||
|
type: FACT
|
||||||
|
operation: INSERT
|
||||||
|
fetch_by: mids
|
||||||
|
|
||||||
|
- name: PaidVisibility
|
||||||
|
type: FACT
|
||||||
|
operation: INSERT
|
||||||
|
fetch_by: mids
|
||||||
|
|
||||||
- name: Store_Master
|
- name: Store_Master
|
||||||
type: DIMENSION
|
type: DIMENSION
|
||||||
operation: DELETE+INSERT
|
operation: DELETE+INSERT
|
||||||
@@ -59,10 +79,6 @@ tables:
|
|||||||
operation: DELETE+INSERT
|
operation: DELETE+INSERT
|
||||||
fetch_by: none
|
fetch_by: none
|
||||||
|
|
||||||
- name: Journey_Plan
|
|
||||||
type: FACT
|
|
||||||
operation: INSERT
|
|
||||||
fetch_by: run_date
|
|
||||||
|
|
||||||
- name: coverage_remarks
|
- name: coverage_remarks
|
||||||
type: DIMENSION
|
type: DIMENSION
|
||||||
@@ -84,20 +100,6 @@ tables:
|
|||||||
operation: DELETE+INSERT
|
operation: DELETE+INSERT
|
||||||
fetch_by: none
|
fetch_by: none
|
||||||
|
|
||||||
- name: Web_Logins
|
|
||||||
type: FACT
|
|
||||||
operation: INSERT
|
|
||||||
fetch_by: run_date
|
|
||||||
|
|
||||||
- name: Promotion
|
|
||||||
type: FACT
|
|
||||||
operation: INSERT
|
|
||||||
fetch_by: mids
|
|
||||||
|
|
||||||
- name: PaidVisibility
|
|
||||||
type: FACT
|
|
||||||
operation: INSERT
|
|
||||||
fetch_by: mids
|
|
||||||
|
|
||||||
- name: Master_Salesterritorylayer
|
- name: Master_Salesterritorylayer
|
||||||
type: DIMENSION
|
type: DIMENSION
|
||||||
|
|||||||
Reference in New Issue
Block a user