Aller au contenu

Step_01_Specific_Pre_Processing - Pré-traitement spécifique Italie

Vue d'ensemble

Cette étape effectue des pré-traitements spécifiques pour les business units italiennes (Galbani et Parmalat) de Lactalis. Elle transforme et enrichit les données brutes téléchargées pour les adapter aux spécificités du marché italien et aux exigences particulières de chaque marque.

Objectif principal

Préparer et structurer les données spécifiques à l'Italie en : - Transformant les fichiers Excel en formats standardisés - Créant ou complétant les fichiers ZIP requis - Chargeant les données de référence dans la base de données - Gérant les particularités de chaque marque (Galbani vs Parmalat)

Position dans la pipeline

  • Étape précédente : Step_00_Download_Inputs
  • Étape suivante : Step_02_Data_Integration
  • Condition d'exécution : Uniquement si BUSINESS_UNIT = italy_galbani ou italy_parmalat

Architecture technique

Flux de données

Entrée (fichiers téléchargés)
    ├── Galbani
    │   ├── GALBANI dati Sell-Out.xlsx
    │   ├── SELLOUT_*.xlsx (3 fichiers)
    │   └── Fichiers de référence (.xlsx)
    └── Parmalat
        ├── Fichiers de configuration (.xlsx)
        └── Pas de données sell-out

Traitement
    ├── Transformation des données
    │   ├── Normalisation des caractères
    │   ├── Agrégation et mapping
    │   └── Validation des formats
    └── Chargement en base
        ├── Tables de référence
        └── Tables de données

Sortie
    ├── Fichier ZIP enrichi
    │   └── SELLOUT-BASE.txt ajouté/modifié
    └── Tables MariaDB
        ├── Tables spécifiques Galbani
        └── Tables spécifiques Parmalat

Structure des données

Fichiers d'entrée Galbani (13 fichiers)

Fichier Description Table de destination
GALBANI dati Sell-Out.xlsx Données sell-out principales Transformé en SELLOUT-BASE.txt dans le ZIP
SELLOUT_private_label.xlsx Ventes marques distributeurs sell_out_base_private_label
SELLOUT_Competitors.xlsx Ventes concurrents sell_out_base_competitors
SELLOUT_allmarket.xlsx Données marché global sell_out_base_allmarket
Forward_Buying_Classification.xlsx Classification risque forward buying Forward_buying_classification_Italy_galbani
Out_products_Italy.xlsx Produits hors périmètre Out_products_Italy
EANs_Replacement_List.xlsx Correspondance EAN EANs_Replacement_List_Italy
Specific_credit_notes_allocation.xlsx Allocations notes de crédit Specific_credit_notes_allocation_Italy
In_Out_Comparable_Products.xlsx Produits comparables IN/OUT In_Out_Comparable_products_Italy
Retailer_group_classification_italy.xlsx Classification des retailers Retailer_Group_Italy
Sub_Category_Mapping_Italy_Galbani.xlsx Mapping sous-catégories Sub_Category_Mapping_Italy
Specific_discount_only_promo_products_Italy.xlsx Produits avec remises promo uniquement Specific_discount_only_promo_products_Italy
Italy_category_filtering.xlsx Filtrage des catégories Italy_category_filtering
Italy_product_reclassification.xlsx Reclassification des produits Italy_product_reclassification

Fichiers d'entrée Parmalat (10 fichiers)

Fichier Description Table de destination
Campaign_discount_parmalat.xlsx Remises campagnes Campaign_discount_parmalat
Category_filtering_parmalat.xlsx Filtrage catégories Category_filtering_parmalat
Exposers_mapping_parmalat.xlsx Mapping exposants/produits Exposers_mapping_parmalat
Forward_Buying_weeks_window_parmalat.xlsx Fenêtre temporelle forward buying Forward_Buying_weeks_window_parmalat
Product_reclassification_parmalat.xlsx Reclassification produits Product_reclassification_parmalat
Promo_price_adjustment_parmalat.xlsx Ajustements prix promotionnels Promo_price_adjustment_parmalat
Retailer_group_classification_parmalat.xlsx Classification groupes retailers Retailer_group_classification_parmalat
Statistical_methodology_parameters_parmalat.xlsx Paramètres méthodologie statistique Statistical_methodology_parameters_parmalat
Promotion_code_substitution_parmalat.xlsx Substitution codes promotionnels Promotion_code_substitution_parmalat
In_Out_Comparable_Products_Parmalat.xlsx Produits comparables IN/OUT In_Out_Comparable_products_parmalat

Concepts clés

1. Normalisation des caractères italiens

La fonction normalize_italian_chars() traite systématiquement tous les caractères accentués : - Problématique : Encodages multiples possibles (UTF-8, CP1252, etc.) - Solution : Normalisation NFKD + suppression des diacritiques - Exemple : cittàcitta, perchéperche

2. Structure des données sell-out

Format standardisé avec 23 colonnes :

Week_ID, Week_start, Retailer, Channel, Supplier, Brand, Category,
Sub_Category_1, Sub_Category_2, EAN, EAN_desc, Format, Kg_L,
Sales_Volumes, Sales_Volume_Promo, Sales_Volume_Non_promo, Sales_Value,
Sales_Value_Promo, Sales_Value_Non_promo, Weighted_distribution,
Weighted_distribution_Promo, Weighted_distribution_Non_promo, 
Sales_Volume_Promo_Incremental

3. Mapping des mesures

Transformation des libellés italiens vers les colonnes standardisées : - Vendite in VolumeSales_Volumes - Vendite in Volume PromoSales_Volume_Promo - Vendite in Volume Senza promozioniSales_Volume_Non_promo - Vendite in ValoreSales_Value - Vendite in Valore PromoSales_Value_Promo - Vendite in Valore Senza promozioniSales_Value_Non_promo - Vendite in Volume Incrementali PromoSales_Volume_Promo_Incremental - DPWeighted_distribution - DP PromoWeighted_distribution_Promo - DP Senza promozioniWeighted_distribution_Non_promo

4. Gestion des duplicatas

Stratégies différentes selon les tables : - Sans déduplication : Tables transactionnelles et de mapping - Déduplication simple : Sur une colonne (ex: EAN pour Product_reclassification_parmalat) - Déduplication composite : Sur plusieurs colonnes (ex: Retailer + Product + Year pour Campaign_discount_parmalat)

Implémentation détaillée

1. Fonction principale : main_Italy()

Point d'entrée qui : 1. Détermine la business unit active 2. Crée la connexion base de données 3. Route vers la fonction spécifique (Galbani ou Parmalat)

def main_Italy():
    log_message(f"Starting pre-processing for business unit: {business_unit}")

    global engine
    engine = create_engine(f'mysql+pymysql://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}')

    if business_unit.lower() == 'italy_galbani':
        main_galbani()
    elif business_unit.lower() == 'italy_parmalat':
        main_parmalat()
    else:
        log_message(f"Unknown business unit: {business_unit}. No processing performed.")

2. Traitement Galbani : main_galbani()

Étapes du processus

  1. Création/vérification du ZIP
    create_zip_if_needed(base_path)
    
  2. Vérifie si un ZIP existe déjà
  3. Crée compress_input_data.zip si nécessaire
  4. Inclut les fichiers TXT requis (PRODUCT-BASE, SELLIN-BASE, etc.)

  5. Chargement des 10 tables de référence Ordre d'exécution :

  6. Forward buying classification
  7. Out products Italy
  8. EANs replacement list
  9. Specific credit notes allocation
  10. In/Out comparable products
  11. Retailer group classification
  12. Sub-category mapping
  13. Specific discount only promo products
  14. Italy category filtering
  15. Italy product reclassification

  16. Traitement du fichier sell-out principal

    excel_file_path = find_excel_file_galbani(base_path)
    if excel_file_path:
        df = process_excel_to_df(excel_file_path, None, mapping)
        aggregated_df = aggregate_data(df)
        save_df_to_existing_zip(aggregated_df, base_path, 'SELLOUT-BASE')
    

  17. Traitement des 3 fichiers sell-out spécifiques Via process_sell_out_data_galbani() :

  18. SELLOUT_private_label.xlsxsell_out_base_private_label
  19. SELLOUT_Competitors.xlsxsell_out_base_competitors
  20. SELLOUT_allmarket.xlsxsell_out_base_allmarket (format différent)

Particularités Galbani

  • Données hebdomadaires : Format "Settimana al DD-MM-YYYY"
  • Calcul Week_ID : Numéro de semaine ISO sur 2 chiffres
  • Colonnes spécifiques Excel :
  • Measures ou Measures Causal Periodicity
  • GeographyChannel
  • Sotto Tipo ou Info Sotto TipoCategory
  • Info Major BrandBrand

3. Traitement Parmalat : main_parmalat()

Étapes du processus

  1. Création/vérification du ZIP Même logique que Galbani

  2. Chargement des 10 tables de configuration Ordre d'exécution :

  3. Campaign discount
  4. Category filtering
  5. Exposers mapping
  6. Forward buying weeks window
  7. Product reclassification
  8. Promo price adjustment
  9. Retailer group classification
  10. Statistical methodology parameters
  11. Promotion code substitution
  12. In/Out comparable products

  13. Création d'un fichier sell-out vide

    create_empty_sell_out_parmalat(base_path)
    

  14. Crée un DataFrame vide avec les 23 colonnes standard
  15. L'ajoute au ZIP comme SELLOUT-BASE.txt

Particularités Parmalat

  • Pas de données sell-out réelles : Uniquement un fichier vide structuré
  • Focus configuration : Tables de paramétrage métier
  • Gestion systématique des duplicatas avec log :
    initial_rows = len(df)
    df = df.drop_duplicates(subset=['key_columns'])
    final_rows = len(df)
    if initial_rows > final_rows:
        log_message(f"Table: Removed {initial_rows - final_rows} duplicate rows")
    

4. Fonctions utilitaires clés

create_zip_if_needed()

def create_zip_if_needed(input_directory):
    # Vérifie l'existence d'un ZIP
    existing_zips = [f for f in os.listdir(input_directory) if f.endswith('.zip')]
    if existing_zips:
        return

    # Liste des fichiers requis
    required_files = [
        'PRODUCT-BASE.txt',
        'SELLIN-BASE.txt',
        'PROMOCALENDAR.txt',
        'SELLOUT-BASE.txt'
    ]
    # Création du ZIP avec les fichiers disponibles

process_excel_to_df()

Transforme un Excel avec colonnes pivotées en DataFrame normalisé : 1. Lecture avec typage forcé (EAN en string) 2. Identification des colonnes semaines 3. Calcul des dates de début de semaine 4. Dépivotage : une ligne par semaine × mesure × produit 5. Application du mapping des mesures

aggregate_data()

Agrégation intelligente des données : - Somme : Sales_Volumes, Sales_Value et variantes - Moyenne : Weighted_distribution et variantes - Groupement : Sur 13 dimensions (Week_ID, EAN, etc.)

save_df_to_existing_zip()

Gestion sécurisée des ZIP : 1. Recherche du ZIP existant via find_zip_files_in_directory() 2. Sauvegarde temporaire en CSV (; comme séparateur, CP1252) 3. Suppression de l'ancien fichier si présent via remove_if_exists() 4. Ajout du nouveau fichier au ZIP 5. Nettoyage du fichier temporaire

sqlcol()

Génère le mapping des types SQL depuis un DataFrame : - objectNVARCHAR(255) - datetimeDateTime() - floatNUMERIC(15, 5) - intInteger()

Gestion des erreurs

Types d'erreurs gérés

Type Traitement Exemple
FileNotFoundError Log + continuation Fichier Excel manquant
ValueError Exception si feuille Excel manquante Sheet non trouvée
Duplicatas Suppression + log (Parmalat) Lignes en double
Erreurs SQL Propagation exception Contraintes DB

Logs spécifiques

Chaque opération importante est loggée : - Début/fin de traitement par business unit - Fichiers trouvés ou manquants - Nombre de lignes insérées par table - Duplicatas supprimés (avec décompte) - Création/modification de fichiers ZIP

Performance et optimisation

1. Traitement par lots

df.to_sql(..., method='multi', chunksize=10000)
- Insertion par paquets de 10 000 lignes - Réduit la charge mémoire et les timeouts

2. Typage explicite à la lecture

dtype_spec = {
    'EAN': str,
    'Product_code': str,
    'Year': str,
    # etc.
}
df = pd.read_excel(file_path, dtype=dtype_spec)

3. Normalisation systématique

df = normalize_italian_chars(df)
Appliquée immédiatement après chaque lecture Excel

Tables créées dans MariaDB

Tables Galbani (13 tables)

Table Colonnes principales Déduplication
Forward_buying_classification_Italy_galbani M7_SUB_CAT_PROD, SUB_CAT_PROD_LIV2, Forward_Buying_Risk, Category_sell_out Non
Out_products_Italy EAN, EAN_desc Non
EANs_Replacement_List_Italy EAN_to_replace, EAN_to_keep, EAN_desc_to_replace, EAN_desc_to_keep Non
Specific_credit_notes_allocation_Italy Retailer_name, EAN_desc, années 2022-2026 (BOOLEAN) Non
In_Out_Comparable_products_Italy EAN_inout, EAN_desc_inout, EAN_substitute, EAN_desc_substitute, Apply_Ontop_Substitution_Rules Non
Retailer_Group_Italy Retailer_name, Retailer_group Non
Sub_Category_Mapping_Italy Sub_Category_1, Category Non
Specific_discount_only_promo_products_Italy year, Retailer_group, Retailer_name, EAN, EAN_desc, Discount_Type, Discount Non
Italy_category_filtering Toutes colonnes en NVARCHAR(255) Non
Italy_product_reclassification Toutes colonnes en NVARCHAR(255) Non
sell_out_base_private_label Structure sell-out standard (23 colonnes) Via aggregate_data()
sell_out_base_competitors Structure sell-out standard (23 colonnes) Via aggregate_data()
sell_out_base_allmarket Year, Channel, Category, EAN_desc + métriques ventes Via groupby

Tables Parmalat (10 tables)

Table Colonnes principales Déduplication
Campaign_discount_parmalat Retailer_hierarchy, Retailer_name, Product_code, Product, EAN, Year, Campaign_discount Sur 5 colonnes
Category_filtering_parmalat Brand, Category, Flag_exclusion Sur Brand + Category
Exposers_mapping_parmalat Product_exposer, Product_code_exposer, EAN_exposer, Product_code, EAN Non
Forward_Buying_weeks_window_parmalat Retailer_classification, New_category, Forward_buying_weeks_window Sur 2 colonnes
Product_reclassification_parmalat Product_code, EAN, New_category Sur EAN
Promo_price_adjustment_parmalat Retailer_hierarchy, Retailer_name, Product_code, Product, EAN, Year, Promo_price_adjustment Sur 5 colonnes
Retailer_group_classification_parmalat Retailer_hierarchy, Retailer_name, Retailer_classification Sur 2 colonnes
Statistical_methodology_parameters_parmalat New_category + colonnes 1-52 (valeurs L/H) Non
Promotion_code_substitution_parmalat Promotion_code_to_replace, Promotion_code_to_keep, Cancelled_promotions_to_exclude Sur code_to_replace
In_Out_Comparable_products_parmalat Package_info, EAN_inout, EAN_substitute Non

Points d'attention pour la maintenance

1. Évolution des formats Excel

  • Noms de colonnes : Vérifier les mappings si renommage
  • Structure des dates : Format "Settimana al" pour Galbani
  • Nouvelles mesures : Étendre le dictionnaire mapping

2. Gestion des caractères spéciaux

  • La normalisation supprime TOUS les accents
  • Impact possible sur les recherches textuelles
  • Cohérence nécessaire avec les autres étapes

3. Performance sur gros volumes

Les fichiers sell-out peuvent être volumineux : - SELLOUT_allmarket.xlsx : Potentiellement > 100K lignes - Surveiller la mémoire lors du dépivotage - Considérer le traitement par chunks si nécessaire

Troubleshooting

Problème : "Sheet 'X' not found in Excel file"

Cause : Le fichier Excel n'a pas la feuille attendue

Diagnostic :

import pandas as pd
xl_file = pd.ExcelFile(file_path)
print(xl_file.sheet_names)  # Liste les feuilles disponibles

Solution : Adapter le code ou corriger le fichier source

Problème : Duplicatas inattendus (Parmalat)

Symptôme : Logs montrant "Removed X duplicate rows"

Analyse : 1. Vérifier la source des duplicatas 2. Confirmer que la déduplication est appropriée 3. Ajuster les colonnes de déduplication si nécessaire

Problème : ZIP non trouvé après création

Cause possible : Permissions ou espace disque

Vérification :

ls -la /app/inputs/{business_unit}/
df -h  # Vérifier l'espace disque

Problème : Erreur d'encodage

Symptôme : Caractères illisibles après traitement

Solution : 1. Vérifier l'encodage du fichier source 2. S'assurer que normalize_italian_chars() est appelée 3. Valider l'encodage CP1252 pour les exports

Exemples d'utilisation

Ajout d'un nouveau fichier de référence Galbani

  1. Créer la fonction de recherche :

    def find_excel_file_for_new_reference_galbani(base_path):
        target_file_name = "New_Reference_File.xlsx"
        search_path = os.path.join(base_path)
    
        for root, dirs, files in os.walk(search_path):
            if 'Trash' in root:
                continue
            if target_file_name in files:
                return os.path.join(root, target_file_name)
        return None
    

  2. Créer la fonction d'insertion :

    def insert_new_reference_galbani(engine, excel_file_path):
        try:
            dtype_spec = {'key_column': str}  # Définir les types
            df = pd.read_excel(excel_file_path, dtype=dtype_spec)
            df = normalize_italian_chars(df)
    
            sql_dtypes = {column: NVARCHAR(length=255) for column in df.columns}
    
            df.to_sql('New_Reference_Table', con=engine, if_exists='replace', 
                      index=False, dtype=sql_dtypes, method='multi', chunksize=10000)
            log_message("Data inserted successfully into New_Reference_Table.")
        except FileNotFoundError:
            log_message(f"The file at {excel_file_path} was not found.")
    

  3. Ajouter l'appel dans main_galbani() :

    new_ref_path = find_excel_file_for_new_reference_galbani(base_path)
    if new_ref_path:
        insert_new_reference_galbani(engine, new_ref_path)
    

Modification du mapping des mesures

Pour ajouter une nouvelle mesure dans le mapping Galbani :

mapping = {
    'Vendite in Volume': 'Sales_Volumes',
    'Vendite in Valore': 'Sales_Value',
    # ... existant ...
    'Nuovo Indicatore Volume': 'New_Volume_Metric',  # Ajout
}

Ne pas oublier d'ajouter la colonne correspondante dans la structure de sortie.