Diagrammes Step 03 - Data Cleaning and Quality Checks
Vue d'ensemble du Data Cleaning
graph TB
Start([main]) --> Config(get_countries_and_operations_from_json)
Config --> Parse(parse_years_in_scope)
Parse --> Load(Load DataFrames from DB)
Load --> Check1{Specific_re_classification?}
Check1 -->|1| ReClass("re_classification_{business_unit}")
Check1 -->|0| Update1(update_table_names step_2_01_)
ReClass --> Check2{Categories_scoping?}
Update1 --> Check2
Check2 -->|1| CatScope("categories_scoping_{business_unit}")
Check2 -->|0| Update2(update_table_names step_2_02_)
CatScope --> Check3{Promo_costs_recomputation?}
Update2 --> Check3
Check3 -->|1| PromoCosts("promo_costs_recomputation_{business_unit}")
Check3 -->|0| Update3(update_table_names step_2_03_)
PromoCosts --> Parmalat{italy_parmalat?}
Parmalat -->|Oui| Margins(calculate_promo_margin_parmalat)
Parmalat -->|Non| QC
Margins --> Exposers(substitute_exposers_after_margin)
Exposers --> QC
Update3 --> QC
QC(main_quality_checks) --> End([Script executed successfully])
style ReClass fill:#e3f2fd
style CatScope fill:#fff3e0
style PromoCosts fill:#ffebee
style Margins fill:#f3e5f5
style Exposers fill:#f3e5f5
Re-classification Process
flowchart LR
subgraph "Galbani Re-classification"
G1[EANs_Replacement_List_Italy] --> G2[process_sell_in_data]
G2 --> G3[Replace EAN codes]
G3 --> G4[Aggregate duplicate KPIs]
G4 --> G5[Merge New_category]
G5 --> GM{Table type?}
GM -->|promo_calendar| GM1[Merge on EAN only]
GM -->|product_base| GM2[Merge on EAN + EAN_desc]
GM -->|sell_out| GM3[Merge on EAN only]
GM -->|others| GM4[Merge on EAN + EAN_desc]
end
subgraph "Parmalat Re-classification"
P1[Product_reclassification_parmalat] --> P2[Drop duplicates on EAN]
P2 --> P3[Merge on EAN only<br/>for all tables]
P3 --> P4[Check line inflation]
end
Categories Scoping
graph TD
subgraph "Galbani"
GC1[Italy_category_filtering.xlsx] --> GC2[Merge on Category]
GC2 --> GC3[Add Flag_exclusion_non_cheese_products]
end
subgraph "Parmalat"
PC1[Category_filtering_parmalat] --> PC2[Merge on Brand + Category]
PC2 --> PC3[Add Flag_exclusion]
end
Tables[All tables<br/>step_2_01_*] --> GC2
Tables --> PC2
GC3 --> Output1[Tables step_2_02_*]
PC3 --> Output1
sequenceDiagram
participant Main as promo_costs_recomputation_italy_galbani
participant Preprocess as preprocess_and_aggregate_promo_codes
participant Discount as retrieve_on_invoice_discount
participant Duration as compute_event_duration
participant DB as Database
Main->>Preprocess: df_sell_in, engine, years_in_scope
Preprocess->>Preprocess: Extract Year/Week from Period
Preprocess->>Discount: Retrieve on_invoice_discount
Note over Discount: SQL Query:<br/>SUM(Detail_5+8+9) /<br/>SUM(Gross_sales-Detail_1-2-3)
Discount-->>Preprocess: On_invoice_discount %
Preprocess->>Preprocess: Find First_Week, Last_Week
Preprocess->>Duration: compute_event_duration
Note over Duration: Count non-zero weeks<br/>per Promotion_Code
Duration-->>Preprocess: Event_duration_in_weeks
loop Aggregate similar discounts
Preprocess->>Preprocess: Group by base attributes
Preprocess->>Preprocess: Find clusters ±0.4%
Preprocess->>Preprocess: Assign same Promotion_Code
end
Preprocess->>Preprocess: find_and_aggregate_duplicate_rows
Preprocess-->>Main: Preprocessed df
Main->>Main: Calculate KPIs on non-promo data
Note over Main: Pure_savings_%<br/>On_invoice_savings_%<br/>Off_invoice_savings_%
Main->>Main: Calculate promo costs
Note over Main: Direct_Promo_costs<br/>Pure_savings_yearly<br/>Total_promo_costs_yearly<br/>Unit_promo_cost_yearly
Main->>DB: Save promo_costs_Italy_galbani
Main->>DB: Save step_2_03_sell_in_base
flowchart TB
subgraph "1. Prétraitement"
A[process_promotion_code_substitution] --> B[Substituer codes promo<br/>sauf 'Annullata']
B --> C[find_and_aggregate_duplicate_rows]
C --> D[Exclure promos<br/>volumes négatifs/année]
end
subgraph "2. Merge références"
E[extract_retailer_codes] --> F[Code avant '-']
F --> G[Merge retailer_classification]
G --> H[Merge campaign_discount<br/>avec Product_code]
H --> I[Merge promo_price_adjustment<br/>avec Product_code]
end
subgraph "3. Calculs"
J[Off_invoice_savings_%<br/>sur données PROMO only] --> K[Direct_Promo_costs =<br/>Detail_5 + Detail_11 + adjustment]
K --> L[Pure_savings =<br/>Detail_5 × Campaign_discount]
L --> M[Proportional_off_invoice]
M --> N[Total_promo_costs]
N --> O[Unit_promo_cost]
end
subgraph "4. Agrégation finale"
P[Groupby Retailer_classification<br/>× EAN × Promotion_Code<br/>× Year × Week]
P --> Q[Sum volumes et coûts]
Q --> R[Merge back to main df]
end
D --> E
I --> J
O --> P
R --> Save[Save step_2_03_sell_in_base]
Calcul des marges Parmalat
stateDiagram-v2
[*] --> CalcUnitMargins: calculate_promo_margin_parmalat
state CalcUnitMargins {
[*] --> PromoMargin: Promo unit marginal contribution
[*] --> NoPromoMargin: No-promo unit marginal contribution
[*] --> AvgMargin: Average unit marginal contribution
PromoMargin --> PromoPrice: Promo unit net price
NoPromoMargin --> NoPromoPrice: No-promo unit net price
AvgMargin --> AvgPrice: Average unit net price
PromoPrice --> DeltaPromo: Delta promo = Promo - Avg
NoPromoPrice --> DeltaNoPromo: Delta no-promo = NoPromo - Avg
}
CalcUnitMargins --> ApplyFormula: Apply reallocation formula
state ApplyFormula {
[*] --> CheckFlag: Flag_Promo?
CheckFlag --> PromoCalc: = 1
CheckFlag --> NoPromoCalc: = 0
PromoCalc --> Formula1: (avg_margin + delta_promo) × Volume
NoPromoCalc --> Formula2: (avg_margin + delta_no_promo) × Volume
Formula1 --> NewMargin: New_Marginal_Contribution
Formula2 --> NewMargin
}
ApplyFormula --> FinalMargin: Promo_margin_total
note right of FinalMargin
= New_Marginal_Contribution
+ Supply_chain_gap
+ Fixed_industrial_costs
end note
FinalMargin --> UnitMargin: Promo_margin_per_unit
UnitMargin --> [*]
Substitution des exposants Parmalat
flowchart LR
subgraph "Identification"
EX1[Exposers_mapping_parmalat] --> EX2[Identifier EAN_exposer<br/>et Product_code_exposer]
EX2 --> EX3{Row is exposer?}
end
subgraph "Vérification overlap"
PC1[promo_calendar] --> PC2[Extract Week_IDs<br/>start/end]
PC2 --> PC3[check_promo_overlap_week_ids]
note1[Week_ID format: YYYYWW<br/>Ex: 202351 → 202402]
end
subgraph "Substitution"
SUB1{Has Promo_Code?}
SUB1 -->|Non| SUB2[Substitute directly]
SUB1 -->|Oui| PC3
PC3 -->|Overlap| SUB3[Replace EAN + Promo_Code]
PC3 -->|No overlap| SUB4[Keep original]
end
subgraph "Agrégation"
AGG1[Substitutions done] --> AGG2[find_and_aggregate_duplicate_rows]
AGG2 --> AGG3[Reduce row count]
end
EX3 -->|Oui| SUB1
EX3 -->|Non| Skip[Keep original]
SUB2 --> Trace[substitution_trace]
SUB3 --> Trace
SUB4 --> Trace
Trace --> AGG1
Skip --> AGG3
Quality Checks Overview
graph TD
subgraph "Quality Checks"
QC1("QC1: Negative values<br/>Share of negative GSV/NIS/NNS by year")
QC2("QC2: Waterfall<br/>Calculated vs actual NIS comparison")
QC3("QC3: EAN uniqueness sell-in<br/>EANs with multiple Product_Codes")
QC4("QC4: Promo alignment<br/>Volumes gap promo calendar vs sell-in")
QC5("QC5: Volumes by category<br/>Pivot table category × year")
QC6("QC6: Duplicates<br/>Count in all step_2_02_* tables")
QC7("QC7: EAN uniqueness sell-out<br/>Join with product_base")
QC8("QC8: Sell-in/out comparison<br/>By category/retailer/year/month")
end
DB[(Database)] --> QC1
DB --> QC2
DB --> QC3
DB --> QC4
DB --> QC5
DB --> QC6
DB --> QC7
DB --> QC8
QC1 --> CSV1("qc1_negative_values_{bu}.csv")
QC2 --> CSV2("qc2_waterfall_{bu}.csv")
QC3 --> CSV3("qc3_ean_uniqueness_{bu}.csv")
QC4 --> CSV4("qc4_promo_alignment_{bu}.csv")
QC5 --> CSV5("qc5_volumes_by_category_{bu}.csv")
QC6 --> TXT("qc6_duplicates_summary_{bu}.txt")
QC7 --> CSV7("qc7_sellout_ean_uniqueness_{bu}.csv")
QC8 --> CSV8("qc8_sellin_sellout_comparison_{bu}.csv")
Safe Division Pattern
flowchart LR
subgraph "safe_divide (Series)"
SD1(numerator) --> SD2{denominator = 0?}
SD2 -->|Yes| SD3(Replace with NaN)
SD2 -->|No| SD4(Keep value)
SD5{numerator = ±inf?} -->|Yes| SD6(Replace with NaN)
SD5 -->|No| SD7(Keep value)
SD3 --> SD8(Divide)
SD4 --> SD8
SD6 --> SD8
SD7 --> SD8
SD8 --> SD9(fillna default_value)
end
subgraph "safe_divide_scalar"
SS1(Check denominator = 0)
SS2(Check numerator NaN/inf)
SS3(Return default if unsafe)
SS4(Return division if safe)
SS1 --> SS3
SS2 --> SS3
SS1 --> SS4
SS2 --> SS4
end
stateDiagram-v2
[*] --> Raw: Raw tables from DB
Raw --> Step201: re_classification
state Step201 {
[*] --> AddNewCategory: Add New_category
AddNewCategory --> ReplaceEAN: Replace EANs (Galbani)
ReplaceEAN --> [*]
}
Step201 --> Step202: categories_scoping
state Step202 {
[*] --> AddFlag: Add Flag_exclusion
AddFlag --> [*]
}
Step202 --> Step203: promo_costs_recomputation
state Step203 {
[*] --> CalcCosts: Calculate promo costs
CalcCosts --> CalcMargins: Calculate margins (Parmalat)
CalcMargins --> SubstExposers: Substitute exposers (Parmalat)
SubstExposers --> [*]
}
Step203 --> QualityChecks: 8 Quality Checks
QualityChecks --> [*]: CSV/TXT outputs
Points clés du Data Cleaning
mindmap
root((Step 03<br/>Data<br/>Cleaning))
Operations
Re-classification
EAN replacement (Galbani)
New_category addition
Product_reclassification merge
Categories scoping
Flag_exclusion
Non-cheese products (Galbani)
Brand+Category (Parmalat)
Promo costs
Agrégation ±0.4% (Galbani)
Code substitution (Parmalat)
Margin calculation (Parmalat)
Exposer substitution (Parmalat)
Spécificités Galbani
KPIs calculation
Pure_savings_%
On_invoice_savings_%
Off_invoice_savings_%
Event duration
Non-zero weeks count
Discount aggregation
Similar discounts grouped
Spécificités Parmalat
Retailer codes
Extract before dash
PA03026609-NAME → PA03026609
Campaign discount
Merge with Product_code
Set to 0 if no match
Margin reallocation
Delta price methodology
Supply chain gap inclusion
Exposer logic
Promo overlap check
YYYYWW format handling
Quality Checks
8 contrôles
Negative values
Waterfall analysis
EAN uniqueness
Promo alignment
Volumes by category
Duplicates detection
Sell-in/out comparison
Output formats
CSV with separator ;
TXT for duplicates