Files
data_pipeline/masters/dimensions.py
T
Ankit Malik 7fbbffec65 first commit
2026-06-12 10:54:00 +05:30

363 lines
8.7 KiB
Python

import os
import pyarrow
import sys
import logging
from datetime import date, timedelta
import polars as pl
from sqlalchemy import create_engine, text
from sqlalchemy.engine import Engine, URL
import clickhouse_connect
from dotenv import load_dotenv
from log import log
from clickhouse_task.create_table import create_clickhouse_table , check
from db_con.connection import *
from mids import *
#PROJECT_ID = 40148
p=40148
def fetch_Store_master(engine: Engine) -> pl.DataFrame:
sql = """
SELECT
RegionId AS region_id,
RegionName AS region,
StateId AS state_id,
StateName AS state,
CityId AS city_id,
CityName AS city,
CityCode AS cpm_city_id,
ChannelId AS channel_id,
ChannelName AS channel,
DistributorId AS distributor_id,
Distributor AS distributor_name,
ChainId AS keyaccount_id,
ChainName AS keyaccount,
StoreUniqueCode AS insight_store_id,
StoreCode AS client_store_code,
Latitude AS latitude,
Longitude AS longitude,
StoreCategoryId AS store_category_id,
StoreCategory AS store_category,
StoreTypeId AS store_type_id,
StoreType AS store_type,
StoreClassId AS store_classification_id,
StoreClass AS store_classification,
StLayerFourId,
StoreId AS store_id,
StoreName AS store_name,
Address AS address
FROM OneApp_KelloggsMT.dbo.vw_storedetail
"""
log.info("Fetching Store Master data")
df = pl.read_database(
query=sql,
connection=engine
)
log.info(
f"Fetched {len(df):,} stores"
)
return df
def fetch_sku_master(engine: Engine) -> pl.DataFrame:
sql = """
SELECT
CM.CategoryId AS category_id,
CM.CategoryCode AS category_code,
CM.CategoryName AS category_name,
SCA.SubCategoryId AS sub_category_id,
SCA.SubCategoryCode AS sub_category_code,
SCA.SubCategoryName AS sub_category_name,
BR.BrandId AS brand_id,
BR.BrandCode AS brand_code,
BR.BrandName AS brand_name,
SB.SubBrandId AS sub_brand_id,
SB.SubBrandCode AS sub_brand_code,
SB.SubBrandName AS sub_brand_name,
P.ProductId AS product_id,
P.ProductName AS product_name,
P.ProductCode AS product_code,
P.MRP AS mrp,
FL.FlavourId AS flavour_id,
FL.Flavour AS flavour,
P.Grammage AS grammage,
P.ProductSequence AS product_sequence,
P.CaseSize AS case_size,
MC.Company AS company_name,
MC.IsCompetitor AS is_competitor,
P.PTR AS ptr
FROM OneApp_KelloggsMT.dbo.Master_Product P
RIGHT JOIN OneApp_KelloggsMT.dbo.Master_Flavour FL
ON P.FlavourId = FL.FlavourId
RIGHT JOIN OneApp_KelloggsMT.dbo.Master_SubBrand SB
ON P.SubBrandId = SB.SubBrandId
RIGHT JOIN OneApp_KelloggsMT.dbo.Master_Brand BR
ON SB.BrandId = BR.BrandId
RIGHT JOIN OneApp_KelloggsMT.dbo.Master_SubCategory SCA
ON BR.SubCategoryId = SCA.SubCategoryId
RIGHT JOIN OneApp_KelloggsMT.dbo.Master_Category CM
ON SCA.CategoryId = CM.CategoryId
RIGHT JOIN OneApp_KelloggsMT.dbo.Master_Company MC
ON MC.CompanyId = BR.CompanyId
"""
log.info("Fetching SKU Master data")
df = pl.read_database(
query=sql,
connection=engine
)
log.info(
f"Fetched {len(df):,} SKU Master rows"
)
return df
def fetch_display_master(engine: Engine) -> pl.DataFrame:
"""
Fetch Display Master data.
Source: Master_Display
Target: display_master
"""
sql = """
SELECT
DisplayId AS display_id,
DisplayCode AS display_code,
DisplayName AS display_name,
DisplayRefImage AS display_ref_url
FROM OneApp_KelloggsMT.dbo.Master_Display
"""
log.info("Fetching Display Master data")
df = pl.read_database(
query=sql,
connection=engine
)
log.info(
f"Fetched {len(df):,} Display Master records"
)
return df
import polars as pl
from sqlalchemy.engine import Engine
from loguru import logger as log
def fetch_employee_master(engine: Engine) -> pl.DataFrame:
"""
Fetch Employee Master data.
Source: vw_Employee_Detail + Mapping_PositionUser + Master_Position
Target: Employee_Master
"""
sql = """
SELECT
RegionId AS region_id,
RegionName AS region,
StateId AS state_id,
StateName AS state,
CityId AS city_id,
CityName AS city,
A.EmpId AS employee_id,
EmpName AS employee_name,
Gender AS gender,
A.DesignationId AS designation_id,
DesignationName AS designation,
SupervisorId AS manager_id,
SupervisorName AS manager_name,
JoinDate AS employee_joining_date,
ResignDate AS employee_resign_date,
C.PositionCode AS position_code,
EmpCode AS employee_legacy_code,
RIGHTNAME AS employee_role,
CASE
WHEN RIGHTNAME IN ('Client', 'Client HO')
THEN 'NON CPM'
ELSE 'CPM'
END AS employee_type
FROM OneApp_KelloggsMT.dbo.vw_Employee_Detail A
LEFT JOIN
(
SELECT DISTINCT
PositionId,
EmpId
FROM OneApp_KelloggsMT.dbo.Mapping_PositionUser
WHERE DATEDIFF(DAY, FromDate, GETDATE()) >= 0
AND DATEDIFF(DAY, ToDate, GETDATE()) <= 0
) B
ON A.EmpId = B.EmpId
LEFT JOIN
(
SELECT
PositionId,
PositionCode
FROM OneApp_KelloggsMT.dbo.Master_Position
) C
ON B.PositionId = C.PositionId
"""
log.info("Fetching Employee Master data")
df = pl.read_database(
query=sql,
connection=engine
)
log.info(
f"Fetched {len(df):,} Employee Master records"
)
return df
def fetch_employee_master(engine: Engine) -> pl.DataFrame:
"""
Fetch Employee Master data.
Source: vw_Employee_Detail + Mapping_PositionUser + Master_Position
Target: Employee_Master
"""
sql = """
SELECT
RegionId AS region_id,
RegionName AS region,
StateId AS state_id,
StateName AS state,
CityId AS city_id,
CityName AS city,
A.EmpId AS employee_id,
EmpName AS employee_name,
Gender AS gender,
A.DesignationId AS designation_id,
DesignationName AS designation,
SupervisorId AS manager_id,
SupervisorName AS manager_name,
JoinDate AS employee_joining_date,
ResignDate AS employee_resign_date,
C.PositionCode AS position_code,
EmpCode AS employee_legacy_code,
RIGHTNAME AS employee_role,
CASE
WHEN RIGHTNAME IN ('Client', 'Client HO')
THEN 'NON CPM'
ELSE 'CPM'
END AS employee_type
FROM OneApp_KelloggsMT.dbo.vw_Employee_Detail A
LEFT JOIN
(
SELECT DISTINCT
PositionId,
EmpId
FROM OneApp_KelloggsMT.dbo.Mapping_PositionUser
WHERE DATEDIFF(DAY, FromDate, GETDATE()) >= 0
AND DATEDIFF(DAY, ToDate, GETDATE()) <= 0
) B
ON A.EmpId = B.EmpId
LEFT JOIN
(
SELECT
PositionId,
PositionCode
FROM OneApp_KelloggsMT.dbo.Master_Position
) C
ON B.PositionId = C.PositionId
"""
log.info("Fetching Employee Master data")
df = pl.read_database(
query=sql,
connection=engine
)
log.info(
f"Fetched {len(df):,} Employee Master records"
)
return df