import os # import pyarrow import sys import logging from datetime import date, timedelta import polars as pl from sqlalchemy import create_engine, text from sqlalchemy.engine import Engine, URL import clickhouse_connect from dotenv import load_dotenv from log import log from clickhouse_task.create_table import * from db_con.connection import * from mids import * #PROJECT_ID = 40148 p=40148 def fetch_Store_Master(engine: Engine) -> pl.DataFrame: sql = """ SELECT RegionId AS region_id, RegionName AS region, StateId AS state_id, StateName AS state, CityId AS city_id, CityName AS city, CityCode AS cpm_city_id, ChannelId AS channel_id, ChannelName AS channel, DistributorId AS distributor_id, Distributor AS distributor_name, ChainId AS keyaccount_id, ChainName AS keyaccount, StoreUniqueCode AS insight_store_id, StoreCode AS client_store_code, Latitude AS latitude, Longitude AS longitude, StoreCategoryId AS store_category_id, StoreCategory AS store_category, StoreTypeId AS store_type_id, StoreType AS store_type, StoreClassId AS store_classification_id, StoreClass AS store_classification, StLayerFourId, StoreId AS store_id, StoreName AS store_name, Address AS address FROM OneApp_KelloggsMT.dbo.vw_storedetail """ log.info("Fetching Store Master data") df = pl.read_database( query=sql, connection=engine ) log.info( f"Fetched {len(df):,} stores" ) return df def fetch_SKU_Master(engine: Engine) -> pl.DataFrame: sql = """ SELECT CM.CategoryId AS category_id, CM.CategoryCode AS category_code, CM.CategoryName AS category_name, SCA.SubCategoryId AS sub_category_id, SCA.SubCategoryCode AS sub_category_code, SCA.SubCategoryName AS sub_category_name, BR.BrandId AS brand_id, BR.BrandCode AS brand_code, BR.BrandName AS brand_name, SB.SubBrandId AS sub_brand_id, SB.SubBrandCode AS sub_brand_code, SB.SubBrandName AS sub_brand_name, P.ProductId AS product_id, P.ProductName AS product_name, P.ProductCode AS product_code, P.MRP AS mrp, FL.FlavourId AS flavour_id, FL.Flavour AS flavour, P.Grammage AS grammage, P.ProductSequence AS product_sequence, P.CaseSize AS case_size, MC.Company AS company_name, MC.IsCompetitor AS is_competitor, P.PTR AS ptr FROM OneApp_KelloggsMT.dbo.Master_Product P RIGHT JOIN OneApp_KelloggsMT.dbo.Master_Flavour FL ON P.FlavourId = FL.FlavourId RIGHT JOIN OneApp_KelloggsMT.dbo.Master_SubBrand SB ON P.SubBrandId = SB.SubBrandId RIGHT JOIN OneApp_KelloggsMT.dbo.Master_Brand BR ON SB.BrandId = BR.BrandId RIGHT JOIN OneApp_KelloggsMT.dbo.Master_SubCategory SCA ON BR.SubCategoryId = SCA.SubCategoryId RIGHT JOIN OneApp_KelloggsMT.dbo.Master_Category CM ON SCA.CategoryId = CM.CategoryId RIGHT JOIN OneApp_KelloggsMT.dbo.Master_Company MC ON MC.CompanyId = BR.CompanyId """ log.info("Fetching SKU Master data") df = pl.read_database( query=sql, connection=engine ) log.info( f"Fetched {len(df):,} SKU Master rows" ) return df def fetch_display_master(engine: Engine) -> pl.DataFrame: """ Fetch Display Master data. Source: Master_Display Target: display_master """ sql = """ SELECT DisplayId AS display_id, DisplayCode AS display_code, DisplayName AS display_name, DisplayRefImage AS display_ref_url FROM OneApp_KelloggsMT.dbo.Master_Display """ log.info("Fetching Display Master data") df = pl.read_database( query=sql, connection=engine ) log.info( f"Fetched {len(df):,} Display Master records" ) return df def fetch_Employee_Master(engine: Engine) -> pl.DataFrame: """ Fetch Employee Master data. Source: vw_Employee_Detail + Mapping_PositionUser + Master_Position Target: Employee_Master """ sql = """ SELECT RegionId AS region_id, RegionName AS region, StateId AS state_id, StateName AS state, CityId AS city_id, CityName AS city, A.EmpId AS employee_id, EmpName AS employee_name, Gender AS gender, A.DesignationId AS designation_id, DesignationName AS designation, SupervisorId AS manager_id, SupervisorName AS manager_name, JoinDate AS employee_joining_date, ResignDate AS employee_resign_date, C.PositionCode AS position_code, EmpCode AS employee_legacy_code, RIGHTNAME AS employee_role, CASE WHEN RIGHTNAME IN ('Client', 'Client HO') THEN 'NON CPM' ELSE 'CPM' END AS employee_type FROM OneApp_KelloggsMT.dbo.vw_Employee_Detail A LEFT JOIN ( SELECT DISTINCT PositionId, EmpId FROM OneApp_KelloggsMT.dbo.Mapping_PositionUser WHERE DATEDIFF(DAY, FromDate, GETDATE()) >= 0 AND DATEDIFF(DAY, ToDate, GETDATE()) <= 0 ) B ON A.EmpId = B.EmpId LEFT JOIN ( SELECT PositionId, PositionCode FROM OneApp_KelloggsMT.dbo.Master_Position ) C ON B.PositionId = C.PositionId """ log.info("Fetching Employee Master data") df = pl.read_database( query=sql, connection=engine ) log.info( f"Fetched {len(df):,} Employee Master records" ) return df def fetch_Employee_Master(engine: Engine) -> pl.DataFrame: """ Fetch Employee Master data. Source: vw_Employee_Detail + Mapping_PositionUser + Master_Position Target: Employee_Master """ sql = """ SELECT RegionId AS region_id, RegionName AS region, StateId AS state_id, StateName AS state, CityId AS city_id, CityName AS city, A.EmpId AS employee_id, EmpName AS employee_name, Gender AS gender, A.DesignationId AS designation_id, DesignationName AS designation, SupervisorId AS manager_id, SupervisorName AS manager_name, JoinDate AS employee_joining_date, ResignDate AS employee_resign_date, C.PositionCode AS position_code, EmpCode AS employee_legacy_code, RIGHTNAME AS employee_role, CASE WHEN RIGHTNAME IN ('Client', 'Client HO') THEN 'NON CPM' ELSE 'CPM' END AS employee_type FROM OneApp_KelloggsMT.dbo.vw_Employee_Detail A LEFT JOIN ( SELECT DISTINCT PositionId, EmpId FROM OneApp_KelloggsMT.dbo.Mapping_PositionUser WHERE DATEDIFF(DAY, FromDate, GETDATE()) >= 0 AND DATEDIFF(DAY, ToDate, GETDATE()) <= 0 ) B ON A.EmpId = B.EmpId LEFT JOIN ( SELECT PositionId, PositionCode FROM OneApp_KelloggsMT.dbo.Master_Position ) C ON B.PositionId = C.PositionId """ log.info("Fetching Employee Master data") df = pl.read_database( query=sql, connection=engine ) log.info( f"Fetched {len(df):,} Employee Master records" ) return df def fetch_Master_VisibilityReason(engine: Engine) -> pl.DataFrame: """ Source: Master_VisibilityReason Target: Master_VisibilityReason """ sql = """ SELECT DISTINCT 40148 AS project_id, MenuId AS menu_id, VisibilityReasonId AS reason_id, VisibilityReason AS reason FROM OneApp_KelloggsMT.dbo.Master_VisibilityReason """ log.info("Fetching Master Visibility Reason data") df = pl.read_database( query=sql, connection=engine ) log.info( f"Fetched {len(df):,} Master Visibility Reason records" ) return df def fetch_Master_VisibilityDefinition(engine: Engine) -> pl.DataFrame: """ Source: OneApp_KelloggsMT.dbo.Master_VisibilityDefinition Target: Master_VisibilityDefinition """ sql = """ SELECT DISTINCT 40148 AS project_id, VisibilityDefinitionId AS visibility_definition_id, VisibilityDefinitionName AS visibility_definition_name, GETDATE() AS create_date, 'SP-Pius' AS create_by FROM OneApp_KelloggsMT.dbo.Master_VisibilityDefinition """ log.info("Fetching Master Visibility Definition data") df = pl.read_database( query=sql, connection=engine ) log.info( f"Fetched {len(df):,} Master Visibility Definition records" ) return df def fetch_Master_Salesterritorylayer( engine: Engine ) -> pl.DataFrame: """ Source: Master_SalesTerritoryLayerOne Master_SalesTerritoryLayerTwo Master_SalesTerritoryLayerThree Master_SalesTerritoryLayerFour Target: Master_Salesterritorylayer """ sql = """ SELECT DISTINCT 40148 AS project_id, D.StLayerOneId AS st_layer_one_id, D.StLayerOneName AS st_layer_one_name, C.StLayerTwoId AS st_layer_two_id, C.StLayerTwoName AS st_layer_two_name, B.StLayerThreeId AS st_layer_three_id, B.StLayerThreeName AS st_layer_three_name, A.StLayerFourId AS st_layer_four_id, A.StLayerFourName AS st_layer_four_name, GETDATE() AS create_date, 'SP-Pius' AS create_by FROM OneApp_KelloggsMT.dbo.Master_SalesTerritoryLayerFour A INNER JOIN OneApp_KelloggsMT.dbo.Master_SalesTerritoryLayerThree B ON A.StLayerThreeId = B.StLayerThreeId INNER JOIN OneApp_KelloggsMT.dbo.Master_SalesTerritoryLayerTwo C ON B.StLayerTwoId = C.StLayerTwoId INNER JOIN OneApp_KelloggsMT.dbo.Master_SalesTerritoryLayerOne D ON C.StLayerOneId = D.StLayerOneId """ log.info("Fetching Master Sales Territory Layer data") df = pl.read_database( query=sql, connection=engine ) log.info( f"Fetched {len(df):,} Master Sales Territory Layer records" ) return df