Step_03_Data_Cleaning_and_Data_Quality_Checks - Nettoyage et contrôle qualité des données¶
Vue d'ensemble¶
Cette étape effectue le nettoyage des données et applique une série de contrôles qualité pour garantir l'intégrité et la cohérence des données avant les étapes de modélisation. Elle comprend deux phases distinctes : le nettoyage (main
) et les contrôles qualité (main_quality_checks
).
Objectif principal¶
- Nettoyer et enrichir les données selon les spécificités de chaque business unit
- Effectuer des transformations complexes (reclassification, calcul de coûts promotionnels, etc.)
- Générer des rapports de contrôle qualité pour validation
- Préparer les données pour la phase de modélisation
Position dans la pipeline¶
- Étape précédente : Step_02_Data_Integration
- Étape suivante : Step_04_Building_Model_Database
- Appels distincts :
main
avec paramètre :/app/inputs/{business_unit}/
main_quality_checks
avec paramètre :/app/outputs/{business_unit}/
Architecture technique¶
Flux de données - Phase de nettoyage¶
Entrée (Tables step_2_*)
│
├── Configuration
│ └── promo_config.json
│ ├── Specific_re_classification
│ ├── Categories_scoping
│ └── Promo_costs_recomputation
│
└── Tables de référence (Italie)
├── Galbani (13 tables)
└── Parmalat (10 tables)
Traitement par opération
│
├── 1. Re-classification (step_2_01_*)
│ ├── Ajout colonne New_category
│ └── Remplacement EAN (Galbani)
│
├── 2. Categories scoping (step_2_02_*)
│ ├── Flag_exclusion_non_cheese_products (Galbani)
│ └── Flag_exclusion (Parmalat)
│
└── 3. Promo costs recomputation (step_2_03_*)
├── Calculs coûts promotionnels
├── Marges (Parmalat)
└── Substitution exposants (Parmalat)
Sortie
│
├── Tables nettoyées step_2_0X_*
├── Tables audit (promo_costs_Italy_*)
└── Tables de traçabilité
Flux de données - Phase contrôles qualité¶
8 contrôles qualité
│
├── QC1: Analyse valeurs négatives
├── QC2: Analyse waterfall
├── QC3: Unicité EAN/Product_code (sell-in)
├── QC4: Alignement calendrier promo
├── QC5: Volumes par catégorie
├── QC6: Détection doublons
├── QC7: Unicité EAN/Product_code (sell-out)
└── QC8: Comparaison sell-in/sell-out
Sortie (/app/outputs/{business_unit}/Quality checks/)
│
├── qc1_negative_values_{business_unit}.csv
├── qc2_waterfall_{business_unit}.csv
├── qc3_ean_uniqueness_{business_unit}.csv
├── qc4_promo_alignment_{business_unit}.csv
├── qc5_volumes_by_category_{business_unit}.csv
├── qc6_duplicates_summary_{business_unit}.txt
├── qc7_sellout_ean_uniqueness_{business_unit}.csv
└── qc8_sellin_sellout_comparison_{business_unit}.csv
Concepts clés¶
1. Opérations conditionnelles¶
Les opérations sont activées/désactivées via promo_config.json
:
- Specific_re_classification
(0/1)
- Categories_scoping
(0/1)
- Promo_costs_recomputation
(0/1)
2. Préfixes de tables progressifs¶
- Tables source : sans préfixe
- Après re-classification :
step_2_01_*
- Après categories scoping :
step_2_02_*
- Après promo costs :
step_2_03_*
(sell-in uniquement)
3. Spécificités business unit¶
- Galbani : Remplacement EAN, flag produits non-fromagers
- Parmalat : Calcul marges, substitution exposants, codes promo
4. Division sécurisée¶
Fonctions safe_divide
et safe_divide_scalar
pour éviter divisions par zéro
Implémentation détaillée - Phase nettoyage¶
1. Fonction principale : main()
¶
def main(data_path):
countries_operations_df = get_countries_and_operations_from_json(json_path)
for index, row in countries_operations_df.iterrows():
country = row['Country']
years_in_scope = parse_years_in_scope(row['Years_in_scope_for_tool_refresh'])
# Chargement des DataFrames
processed_dataframes = {
'product_base': pd.read_sql('product_base', engine),
'promo_calendar': pd.read_sql('promo_calendar', engine),
'sell_in_base': pd.read_sql('sell_in_base', engine),
'sell_out_base': pd.read_sql('sell_out_base', engine),
}
# Application conditionnelle des opérations
if row['Specific_re_classification'] == 1:
# Appel fonction spécifique business unit
if row['Categories_scoping'] == 1:
# Appel fonction spécifique business unit
if row['Promo_costs_recomputation'] == 1:
# Appel fonction spécifique business unit + post-processing
2. Re-classification (Opération 1)¶
Galbani : re_classification_italy_galbani()
¶
Processus :
1. Remplacement des EAN selon EANs_Replacement_List_Italy
2. Agrégation des KPIs pour les doublons créés
3. Ajout New_category
depuis Italy_product_reclassification.xlsx
Spécificité : Traitement différencié par type de table
- promo_calendar
: merge sur EAN uniquement
- product_base
: merge sur EAN + EAN_desc
- sell_out
: merge sur EAN uniquement
- Autres : merge sur EAN + EAN_desc
Parmalat : re_classification_italy_parmalat()
¶
Processus :
1. Chargement depuis Product_reclassification_parmalat
2. Déduplication sur EAN
3. Merge sur EAN uniquement pour toutes les tables
3. Categories scoping (Opération 2)¶
Galbani : categories_scoping_italy_galbani()
¶
- Ajout
Flag_exclusion_non_cheese_products
- Merge sur
Category
Parmalat : categories_scoping_italy_parmalat()
¶
- Ajout
Flag_exclusion
- Merge sur
Brand
+Category
4. Promo costs recomputation (Opération 3)¶
Galbani : promo_costs_recomputation_italy_galbani()
¶
Étapes complexes :
1. Prétraitement et agrégation des codes promo
- Calcul On_invoice_discount
depuis SQL
- Recherche First_Week et Event_duration
- Agrégation codes promo similaires (±0.4%)
- Calcul des KPIs annuels
- Pure savings, On/Off invoice savings
-
Gestion produits "only promo"
-
Calcul des coûts unitaires
- Direct_Promo_costs
- Total_promo_costs_yearly
- Unit_promo_cost_yearly
Tables créées :
- promo_costs_Italy_galbani
: Détail des calculs
- step_2_03_sell_in_base
: Table principale mise à jour
Parmalat : promo_costs_recomputation_italy_parmalat()
¶
Étapes spécifiques : 1. Substitution codes promotionnels - Exclusion codes annulés ("Annullata") - Agrégation des doublons
-
Exclusion promotions volumes négatifs
-
Calculs par SUPERGRUPPO×GRUPPO
- Pure savings depuis
Campaign_discount
- Off-invoice savings sur données promo uniquement
-
Allocation ajustements prix par Product_code
-
Post-processing marges (
calculate_promo_margin_parmalat()
) - Marginal contribution réallouée
-
Delta prix promo/non-promo
-
Substitution exposants (
substitute_exposers_after_margin_parmalat()
) - Vérification overlap périodes promo
- Traçabilité dans
exposer_substitution_trace_parmalat
Tables créées :
- promo_costs_Italy_parmalat
- parmalat_margin_calculation_audit
- exposer_substitution_trace_parmalat
Implémentation détaillée - Phase contrôles qualité¶
1. QC1 : Analyse valeurs négatives¶
def quality_check_negative_values(engine, country):
# KPIs analysés : Gross_sales, Net_invoice_sales, Net_net_sales
# Calcul part des valeurs négatives par année et global
Output : Tableau croisé dynamique Year × KPI
2. QC2 : Analyse waterfall¶
def waterfall_analysis(engine, country):
# Agrégation annuelle des KPIs
# Calcul Net_invoice_sales théorique vs réel
# Identification des deltas
Métriques calculées : - Net_invoice_sales_calculated - Delta avec/sans Detail_11 - NSV Calculated
3. QC3 : Unicité EAN/Product_code (sell-in)¶
Identifie les EAN associés à plusieurs Product_code dans sell-in
4. QC4 : Alignement calendrier promotionnel¶
Compare volumes et net_net_sales entre :
- sell_in_base
(agrégé par année)
- promo_calendar
(somme Volumes_Sell_In)
5. QC5 : Volumes par catégorie¶
Tableau croisé New_category × Year avec sommes des volumes
6. QC6 : Détection doublons¶
Compte les lignes dupliquées dans toutes les tables step_2_02_*
7. QC7 : Unicité EAN/Product_code (sell-out)¶
Joint sell_out_base
avec product_base
pour enrichir les détails produits
8. QC8 : Comparaison sell-in/sell-out¶
Granularité : New_category × Retailer × Year × Month
- Calcul différences absolues et pourcentages
- Gestion division par zéro avec safe_divide()
Gestion des erreurs¶
Types d'erreurs gérés¶
Type | Traitement | Impact |
---|---|---|
Table manquante | Log + skip | Non bloquant |
Colonne manquante | Log + skip table | Non bloquant par table |
Division par zéro | Retour valeur par défaut | Transparent |
Données manquantes | Fillna approprié | Continuation |
Erreur SQL | Try/except + log | Non bloquant par QC |
Stratégies de robustesse¶
- Vérification colonnes : Test existence avant utilisation
- Gestion NaN : Remplacement par valeurs neutres
- Déduplication : Logs détaillés des suppressions
- Traçabilité : Tables audit pour opérations complexes
Performance et optimisation¶
1. Requêtes SQL optimisées¶
- Agrégations côté base de données quand possible
- Sélection colonnes nécessaires uniquement
2. Traitement par chunks¶
3. Vectorisation¶
- Utilisation
apply
avec parcimonie - Préférence pour opérations vectorisées pandas
4. Gestion mémoire¶
- Drop colonnes temporaires après utilisation
- Copies DataFrame uniquement si nécessaire
Points d'attention maintenance¶
1. Ajout nouvelle business unit¶
- Créer fonctions spécifiques :
re_classification_{business_unit}()
categories_scoping_{business_unit}()
-
promo_costs_recomputation_{business_unit}()
-
Gérer tables de référence spécifiques
-
Adapter contrôles qualité si structure différente
2. Modification calculs promotionnels¶
- Les formules sont codées en dur
- Documenter tout changement de logique métier
- Maintenir cohérence entre business units
3. Évolution contrôles qualité¶
- Nouveaux QC : ajouter dans dictionnaire
quality_checks
- Format sortie : CSV avec séparateur
;
par défaut - Nommage :
qc{N}_{description}_{business_unit}.csv
Troubleshooting¶
Problème : "Function {function_name} not found"¶
Cause : Business unit non implémentée
Solution :
1. Vérifier orthographe dans promo_config.json
2. Implémenter les fonctions manquantes
3. Ou désactiver l'opération (mettre à 0)
Problème : Explosion nombre de lignes¶
Symptômes : Row count augmente après merge
Vérifications : 1. Clés de jointure uniques ? 2. Déduplication nécessaire avant merge ? 3. Logs : "WARNING: Line inflation detected"
Problème : Calculs incorrects Parmalat¶
Points de vérification : 1. Campaign_discount : valeurs négatives mises à 0 ? 2. Product_code utilisé comme clé (pas seulement EAN) 3. Codes retailer extraits correctement
Problème : QC échoue silencieusement¶
Diagnostic :
Exemples d'utilisation¶
Ajout d'un nouveau contrôle qualité¶
def new_quality_check(engine, country):
table_name = f"step_2_03_sell_in_base"
query = f"SELECT ... FROM {table_name}"
df = pd.read_sql(query, engine)
# Logique du contrôle
result_df = ...
return result_df
# Dans main_quality_checks()
quality_checks = {
# ... existing checks ...
new_quality_check: "qc9_new_check"
}
Modification seuil agrégation Galbani¶
Dans preprocess_and_aggregate_promo_codes_italy_galbani()
:
# Seuil actuel : 0.4%
if abs(row['On_invoice_discount'] - prev_discount) <= 0.004:
# Pour passer à 0.5% :
if abs(row['On_invoice_discount'] - prev_discount) <= 0.005:
Debug substitution exposants Parmalat¶
Consulter la table de trace :