Aller au contenu

Step_03_Data_Cleaning_and_Data_Quality_Checks - Nettoyage et contrôle qualité des données

Vue d'ensemble

Cette étape effectue le nettoyage des données et applique une série de contrôles qualité pour garantir l'intégrité et la cohérence des données avant les étapes de modélisation. Elle comprend deux phases distinctes : le nettoyage (main) et les contrôles qualité (main_quality_checks).

Objectif principal

  • Nettoyer et enrichir les données selon les spécificités de chaque business unit
  • Effectuer des transformations complexes (reclassification, calcul de coûts promotionnels, etc.)
  • Générer des rapports de contrôle qualité pour validation
  • Préparer les données pour la phase de modélisation

Position dans la pipeline

  • Étape précédente : Step_02_Data_Integration
  • Étape suivante : Step_04_Building_Model_Database
  • Appels distincts :
  • main avec paramètre : /app/inputs/{business_unit}/
  • main_quality_checks avec paramètre : /app/outputs/{business_unit}/

Architecture technique

Flux de données - Phase de nettoyage

Entrée (Tables step_2_*)
    ├── Configuration
    │   └── promo_config.json
    │       ├── Specific_re_classification
    │       ├── Categories_scoping
    │       └── Promo_costs_recomputation
    └── Tables de référence (Italie)
        ├── Galbani (13 tables)
        └── Parmalat (10 tables)

Traitement par opération
    ├── 1. Re-classification (step_2_01_*)
    │   ├── Ajout colonne New_category
    │   └── Remplacement EAN (Galbani)
    ├── 2. Categories scoping (step_2_02_*)
    │   ├── Flag_exclusion_non_cheese_products (Galbani)
    │   └── Flag_exclusion (Parmalat)
    └── 3. Promo costs recomputation (step_2_03_*)
        ├── Calculs coûts promotionnels
        ├── Marges (Parmalat)
        └── Substitution exposants (Parmalat)

Sortie
    ├── Tables nettoyées step_2_0X_*
    ├── Tables audit (promo_costs_Italy_*)
    └── Tables de traçabilité

Flux de données - Phase contrôles qualité

8 contrôles qualité
    ├── QC1: Analyse valeurs négatives
    ├── QC2: Analyse waterfall
    ├── QC3: Unicité EAN/Product_code (sell-in)
    ├── QC4: Alignement calendrier promo
    ├── QC5: Volumes par catégorie
    ├── QC6: Détection doublons
    ├── QC7: Unicité EAN/Product_code (sell-out)
    └── QC8: Comparaison sell-in/sell-out

Sortie (/app/outputs/{business_unit}/Quality checks/)
    ├── qc1_negative_values_{business_unit}.csv
    ├── qc2_waterfall_{business_unit}.csv
    ├── qc3_ean_uniqueness_{business_unit}.csv
    ├── qc4_promo_alignment_{business_unit}.csv
    ├── qc5_volumes_by_category_{business_unit}.csv
    ├── qc6_duplicates_summary_{business_unit}.txt
    ├── qc7_sellout_ean_uniqueness_{business_unit}.csv
    └── qc8_sellin_sellout_comparison_{business_unit}.csv

Concepts clés

1. Opérations conditionnelles

Les opérations sont activées/désactivées via promo_config.json : - Specific_re_classification (0/1) - Categories_scoping (0/1) - Promo_costs_recomputation (0/1)

2. Préfixes de tables progressifs

  • Tables source : sans préfixe
  • Après re-classification : step_2_01_*
  • Après categories scoping : step_2_02_*
  • Après promo costs : step_2_03_* (sell-in uniquement)

3. Spécificités business unit

  • Galbani : Remplacement EAN, flag produits non-fromagers
  • Parmalat : Calcul marges, substitution exposants, codes promo

4. Division sécurisée

Fonctions safe_divide et safe_divide_scalar pour éviter divisions par zéro

Implémentation détaillée - Phase nettoyage

1. Fonction principale : main()

def main(data_path):
    countries_operations_df = get_countries_and_operations_from_json(json_path)

    for index, row in countries_operations_df.iterrows():
        country = row['Country']
        years_in_scope = parse_years_in_scope(row['Years_in_scope_for_tool_refresh'])

        # Chargement des DataFrames
        processed_dataframes = {
            'product_base': pd.read_sql('product_base', engine),
            'promo_calendar': pd.read_sql('promo_calendar', engine),
            'sell_in_base': pd.read_sql('sell_in_base', engine),
            'sell_out_base': pd.read_sql('sell_out_base', engine),
        }

        # Application conditionnelle des opérations
        if row['Specific_re_classification'] == 1:
            # Appel fonction spécifique business unit
        if row['Categories_scoping'] == 1:
            # Appel fonction spécifique business unit
        if row['Promo_costs_recomputation'] == 1:
            # Appel fonction spécifique business unit + post-processing

2. Re-classification (Opération 1)

Galbani : re_classification_italy_galbani()

Processus : 1. Remplacement des EAN selon EANs_Replacement_List_Italy 2. Agrégation des KPIs pour les doublons créés 3. Ajout New_category depuis Italy_product_reclassification.xlsx

Spécificité : Traitement différencié par type de table - promo_calendar : merge sur EAN uniquement - product_base : merge sur EAN + EAN_desc - sell_out : merge sur EAN uniquement - Autres : merge sur EAN + EAN_desc

Parmalat : re_classification_italy_parmalat()

Processus : 1. Chargement depuis Product_reclassification_parmalat 2. Déduplication sur EAN 3. Merge sur EAN uniquement pour toutes les tables

3. Categories scoping (Opération 2)

Galbani : categories_scoping_italy_galbani()

  • Ajout Flag_exclusion_non_cheese_products
  • Merge sur Category

Parmalat : categories_scoping_italy_parmalat()

  • Ajout Flag_exclusion
  • Merge sur Brand + Category

4. Promo costs recomputation (Opération 3)

Galbani : promo_costs_recomputation_italy_galbani()

Étapes complexes : 1. Prétraitement et agrégation des codes promo - Calcul On_invoice_discount depuis SQL - Recherche First_Week et Event_duration - Agrégation codes promo similaires (±0.4%)

  1. Calcul des KPIs annuels
  2. Pure savings, On/Off invoice savings
  3. Gestion produits "only promo"

  4. Calcul des coûts unitaires

  5. Direct_Promo_costs
  6. Total_promo_costs_yearly
  7. Unit_promo_cost_yearly

Tables créées : - promo_costs_Italy_galbani : Détail des calculs - step_2_03_sell_in_base : Table principale mise à jour

Parmalat : promo_costs_recomputation_italy_parmalat()

Étapes spécifiques : 1. Substitution codes promotionnels - Exclusion codes annulés ("Annullata") - Agrégation des doublons

  1. Exclusion promotions volumes négatifs

  2. Calculs par SUPERGRUPPO×GRUPPO

  3. Pure savings depuis Campaign_discount
  4. Off-invoice savings sur données promo uniquement
  5. Allocation ajustements prix par Product_code

  6. Post-processing marges (calculate_promo_margin_parmalat())

  7. Marginal contribution réallouée
  8. Delta prix promo/non-promo

  9. Substitution exposants (substitute_exposers_after_margin_parmalat())

  10. Vérification overlap périodes promo
  11. Traçabilité dans exposer_substitution_trace_parmalat

Tables créées : - promo_costs_Italy_parmalat - parmalat_margin_calculation_audit - exposer_substitution_trace_parmalat

Implémentation détaillée - Phase contrôles qualité

1. QC1 : Analyse valeurs négatives

def quality_check_negative_values(engine, country):
    # KPIs analysés : Gross_sales, Net_invoice_sales, Net_net_sales
    # Calcul part des valeurs négatives par année et global

Output : Tableau croisé dynamique Year × KPI

2. QC2 : Analyse waterfall

def waterfall_analysis(engine, country):
    # Agrégation annuelle des KPIs
    # Calcul Net_invoice_sales théorique vs réel
    # Identification des deltas

Métriques calculées : - Net_invoice_sales_calculated - Delta avec/sans Detail_11 - NSV Calculated

3. QC3 : Unicité EAN/Product_code (sell-in)

Identifie les EAN associés à plusieurs Product_code dans sell-in

4. QC4 : Alignement calendrier promotionnel

Compare volumes et net_net_sales entre : - sell_in_base (agrégé par année) - promo_calendar (somme Volumes_Sell_In)

5. QC5 : Volumes par catégorie

Tableau croisé New_category × Year avec sommes des volumes

6. QC6 : Détection doublons

Compte les lignes dupliquées dans toutes les tables step_2_02_*

7. QC7 : Unicité EAN/Product_code (sell-out)

Joint sell_out_base avec product_base pour enrichir les détails produits

8. QC8 : Comparaison sell-in/sell-out

Granularité : New_category × Retailer × Year × Month - Calcul différences absolues et pourcentages - Gestion division par zéro avec safe_divide()

Gestion des erreurs

Types d'erreurs gérés

Type Traitement Impact
Table manquante Log + skip Non bloquant
Colonne manquante Log + skip table Non bloquant par table
Division par zéro Retour valeur par défaut Transparent
Données manquantes Fillna approprié Continuation
Erreur SQL Try/except + log Non bloquant par QC

Stratégies de robustesse

  1. Vérification colonnes : Test existence avant utilisation
  2. Gestion NaN : Remplacement par valeurs neutres
  3. Déduplication : Logs détaillés des suppressions
  4. Traçabilité : Tables audit pour opérations complexes

Performance et optimisation

1. Requêtes SQL optimisées

  • Agrégations côté base de données quand possible
  • Sélection colonnes nécessaires uniquement

2. Traitement par chunks

df.to_sql(..., method='multi', chunksize=10000)

3. Vectorisation

  • Utilisation apply avec parcimonie
  • Préférence pour opérations vectorisées pandas

4. Gestion mémoire

  • Drop colonnes temporaires après utilisation
  • Copies DataFrame uniquement si nécessaire

Points d'attention maintenance

1. Ajout nouvelle business unit

  1. Créer fonctions spécifiques :
  2. re_classification_{business_unit}()
  3. categories_scoping_{business_unit}()
  4. promo_costs_recomputation_{business_unit}()

  5. Gérer tables de référence spécifiques

  6. Adapter contrôles qualité si structure différente

2. Modification calculs promotionnels

  • Les formules sont codées en dur
  • Documenter tout changement de logique métier
  • Maintenir cohérence entre business units

3. Évolution contrôles qualité

  • Nouveaux QC : ajouter dans dictionnaire quality_checks
  • Format sortie : CSV avec séparateur ; par défaut
  • Nommage : qc{N}_{description}_{business_unit}.csv

Troubleshooting

Problème : "Function {function_name} not found"

Cause : Business unit non implémentée

Solution : 1. Vérifier orthographe dans promo_config.json 2. Implémenter les fonctions manquantes 3. Ou désactiver l'opération (mettre à 0)

Problème : Explosion nombre de lignes

Symptômes : Row count augmente après merge

Vérifications : 1. Clés de jointure uniques ? 2. Déduplication nécessaire avant merge ? 3. Logs : "WARNING: Line inflation detected"

Problème : Calculs incorrects Parmalat

Points de vérification : 1. Campaign_discount : valeurs négatives mises à 0 ? 2. Product_code utilisé comme clé (pas seulement EAN) 3. Codes retailer extraits correctement

Problème : QC échoue silencieusement

Diagnostic :

# Vérifier logs pour chaque QC
# Format : "Error during {qc_name} check for {country}: {error}"

Exemples d'utilisation

Ajout d'un nouveau contrôle qualité

def new_quality_check(engine, country):
    table_name = f"step_2_03_sell_in_base"
    query = f"SELECT ... FROM {table_name}"
    df = pd.read_sql(query, engine)

    # Logique du contrôle
    result_df = ...

    return result_df

# Dans main_quality_checks()
quality_checks = {
    # ... existing checks ...
    new_quality_check: "qc9_new_check"
}

Modification seuil agrégation Galbani

Dans preprocess_and_aggregate_promo_codes_italy_galbani() :

# Seuil actuel : 0.4%
if abs(row['On_invoice_discount'] - prev_discount) <= 0.004:

# Pour passer à 0.5% :
if abs(row['On_invoice_discount'] - prev_discount) <= 0.005:

Debug substitution exposants Parmalat

Consulter la table de trace :

SELECT * FROM exposer_substitution_trace_parmalat
WHERE Substitution_applied = 0
-- Voir les raisons de non-substitution