Aller au contenu

Step_02_Data_Integration - Intégration des données

Vue d'ensemble

Cette étape réalise l'intégration complète des données depuis les fichiers ZIP vers la base de données MariaDB. Elle applique les mappings spécifiques à chaque pays, gère les conversions de types et crée les tables de base nécessaires pour les étapes suivantes du pipeline.

Objectif principal

  • Lire les données depuis les fichiers ZIP
  • Appliquer les mappings de colonnes spécifiques au pays
  • Convertir les types de données selon les schémas définis
  • Créer et peupler les tables de base dans MariaDB
  • Gérer les spécificités par business unit (notamment Italie)

Position dans la pipeline

  • Étape précédente : Step_01_Specific_Pre_Processing (pour l'Italie) ou Step_00_Download_Inputs
  • Étape suivante : Step_03_Data_Cleaning_and_Data_Quality_Checks
  • Paramètres d'appel : /app/inputs/{business_unit}/ et /app/MariaDB/Tool_architecture/

Architecture technique

Flux de données

Entrée
    ├── Fichiers ZIP
    │   └── compress_input_data.zip
    │       ├── PRODUCT-BASE.txt
    │       ├── PROMOCALENDAR.txt
    │       ├── SELLIN-BASE.txt
    │       └── SELLOUT-BASE.txt
    ├── Configuration
    │   └── promo_config.json (pays et paramètres dates)
    └── Mappings Tool Architecture
        └── {country}/
            ├── Mappings colonnes (.txt)
            ├── Types/ (définitions types)
            └── MariaDB Model Schema/

Traitement
    ├── Lecture des ZIP
    │   ├── Normalisation caractères (Italie)
    │   └── Parsing CSV avec paramètres locaux
    ├── Application mappings
    │   ├── Renommage colonnes
    │   ├── Conversion types
    │   └── Parsing dates
    └── Gestion spécifique
        ├── Parmalat : conversion dates YYYYMMDD
        └── Galbani : union tables sell-out

Sortie (Tables MariaDB)
    ├── product_base
    ├── promo_calendar
    ├── sell_in_base
    ├── sell_out_base
    └── sell_out_base_union (Galbani uniquement)

Structure des fichiers de configuration

promo_config.json

{
    "Country": "France",
    "Dayfirst_datetime": false
}
ou pour plusieurs pays :
[
    {
        "Country": "Italy_galbani",
        "Dayfirst_datetime": true
    },
    {
        "Country": "Italy_parmalat",
        "Dayfirst_datetime": true
    }
]

Fichiers de mapping

Pour chaque pays et chaque table, deux fichiers : 1. Mapping colonnes : {table}_mapping_{country}.txt

{
    'Column_origine': 'Column_destination',
    'Ean': 'EAN',
    'Libelle_produit': 'Product_desc'
}

  1. Mapping types : {table}_mapping_types_{country}.txt
    {
        'EAN': 'str',
        'Sales_Volumes': 'float64',
        'Week_start': 'datetime64'
    }
    

Concepts clés

1. Gestion multi-pays

  • Lecture du promo_config.json pour identifier les pays
  • Application des mappings spécifiques à chaque pays
  • Paramètre dayfirst pour le parsing des dates

2. Normalisation des caractères (Italie)

Pour les business units italiennes uniquement : - Décomposition NFD (Canonical Decomposition) - Suppression des marques diacritiques - Exemple : cittàcitta

3. Schémas dynamiques

Les tables sont créées/recréées selon les schémas définis : - Drop de la table existante si présente - Création avec les types corrects - Support des clés primaires et contraintes NULL

4. Union des tables sell-out (Galbani)

Spécifique à Italy_galbani : - Union de 3 tables : sell_out_base, sell_out_base_competitors, sell_out_base_private_label - Remplacement de '0' par 'Others' dans la colonne Brand - Création de sell_out_base_union

Implémentation détaillée

1. Fonction principale : main()

def main(data_path, tool_architecture_path):
    json_path = f"/app/inputs/{business_unit}/promo_config.json"
    unique_countries, dayfirst_settings = get_countries_and_settings_from_json(json_path)

    for country in unique_countries:
        # Traitement par pays
        create_zip_if_needed(data_path)
        zip_files = find_zip_files(data_path)

        dataframes = process_files(zip_files, country, tool_architecture_path, dayfirst_settings[country])
        insert_dataframes_to_db(dataframes, engine, tool_architecture_path, country)

        # Union spécifique Galbani
        if country == 'Italy_galbani':
            union_all_tables(engine)

2. Lecture des fichiers ZIP : read_file_from_zip()

Processus standard

  1. Ouverture du fichier dans le ZIP
  2. Suppression du BOM UTF-8 si présent
  3. Lecture CSV avec pandas

Processus Italie (Galbani/Parmalat)

  1. Lecture en bytes
  2. Décodage CP1252
  3. Normalisation NFD des caractères
  4. Suppression des accents
  5. Nettoyage des espaces doubles
  6. Lecture CSV depuis string normalisé
if business_unit.lower() in ["italy_galbani", "italy_parmalat"]:
    content_str = unicodedata.normalize('NFD', content_str)
    content_str = ''.join(c for c in content_str if unicodedata.category(c) != 'Mn')

3. Traitement des fichiers : process_files()

Pour chaque fichier dans le ZIP :

  1. Lecture des mappings

    mapping_path = os.path.join(tool_architecture_path, country, f"{base_df_name}_mapping_{country}.txt")
    type_mapping_path = os.path.join(tool_architecture_path, country, "Types", f"{base_df_name}_mapping_types_{country}.txt")
    

  2. Extraction des colonnes datetime

    parse_dates = [key for key, value in dtype_dict.items() if value == 'datetime64']
    

  3. Application des conversions spécifiques

  4. Renommage colonnes selon mapping
  5. Conversion dates Parmalat (YYYYMMDD)
  6. Nettoyage colonne Format Parmalat

  7. Ajout colonnes manquantes

    for col in expected_columns:
        if col not in df.columns:
            df[col] = None
    

4. Spécificités Parmalat

Conversion dates : convert_parmalat_dates()

date_mappings = {
    'Promo_start_date': '%Y%m%d',
    'Promo_end_date': '%Y%m%d',
    'Promo_Sell_Out_Start': '%Y%m%d',
    'Promo_Sell_Out_End': '%Y%m%d'
}
Conversion du format YYYYMMDD vers datetime

Nettoyage Format : clean_format_column_parmalat()

  • Extraction de la partie numérique uniquement
  • '1000 ML'1000.0
  • '250 GR'250.0

5. Création des tables : read_model_schema_and_define_table()

  1. Lecture du schéma

    schema_path = f"MariaDB Model Schema/{base_df_name}_model_schema.txt"
    

  2. Drop si existante

    if inspector.has_table(final_table_name):
        Table(final_table_name, metadata, autoload_with=engine).drop(engine)
    

  3. Création avec types corrects

  4. String : longueur par défaut 355
  5. Gestion des clés primaires
  6. Gestion des contraintes NULL

6. Insertion en base : insert_dataframes_to_db()

df.to_sql(final_table_name, con=engine, if_exists='append', 
          index=False, method='multi', chunksize=10000)
- Insertion par chunks de 10 000 lignes - Mode append (table déjà créée) - Méthode multi pour performance

Tables créées

Tables communes (toutes business units)

Table Description Colonnes clés
product_base Référentiel produits EAN, Product_desc, Brand, Category
promo_calendar Calendrier promotionnel Promo_ID, Promo_start_date, Promo_end_date
sell_in_base Données sell-in Week_ID, EAN, Sales_Volumes, Sales_Value
sell_out_base Données sell-out Week_ID, EAN, Channel, Sales_Volumes

Table spécifique Galbani

Table Description Particularité
sell_out_base_union Union des 3 tables sell-out Brand '0' → 'Others'

Gestion des erreurs

Types d'erreurs gérés

Type Traitement Impact
Fichier manquant dans ZIP Log + continue Non bloquant par fichier
Erreur mapping Log + continue Non bloquant par pays
Erreur parsing date errors='coerce' → NaT Valeurs manquantes
Erreur schéma Exception propagée Bloquant
ZIP non trouvé Log + continue Non bloquant par pays

Stratégie de robustesse

  1. Création ZIP si absent : Génération automatique depuis fichiers TXT
  2. Colonnes manquantes : Ajout avec valeur None
  3. BOM UTF-8 : Suppression automatique
  4. Encodages multiples : CP1252 par défaut, gestion spéciale Italie

Performance et optimisation

1. Lecture optimisée

  • Lecture directe depuis ZIP sans extraction
  • Traitement en mémoire des transformations

2. Insertion par chunks

chunksize=10000
Équilibre entre mémoire et performance

3. Transactions groupées

  • Une transaction par table complète
  • Rollback automatique en cas d'erreur

4. Types explicites

  • Évite l'inférence de types pandas
  • Réduit la consommation mémoire

Points d'attention maintenance

1. Nouveaux pays

Pour ajouter un pays : 1. Ajouter dans promo_config.json 2. Créer les fichiers de mapping dans Tool_architecture/{country}/ 3. Créer les fichiers de types dans Tool_architecture/{country}/Types/ 4. Vérifier le paramètre dayfirst pour les dates

2. Modification des schémas

  • Les schémas sont dans Tool_architecture/MariaDB Model Schema/
  • Format : liste de dictionnaires avec Field, Type, Key, Null
  • La table est recréée à chaque exécution

3. Formats de dates

  • Standard : selon dayfirst du pays
  • Parmalat : toujours YYYYMMDD
  • Vérifier les nouveaux formats dans convert_parmalat_dates()

Troubleshooting

Problème : "Error processing {file} for {country}"

Causes possibles : - Fichier absent du ZIP - Mapping manquant - Format de fichier incorrect

Diagnostic :

# Lister contenu du ZIP
from zipfile import ZipFile
with ZipFile(zip_path) as z:
    print(z.namelist())

Problème : Caractères mal encodés

Symptômes : Caractères � ou illisibles

Solution : 1. Vérifier l'encodage source (devrait être CP1252) 2. Pour l'Italie, vérifier que la normalisation NFD est appliquée 3. Tester avec différents encodages si nécessaire

Problème : Dates non parsées

Symptômes : Colonnes dates restent en string

Vérifications : 1. Le type est-il 'datetime64' dans le fichier de types ? 2. Le format correspond-il au dayfirst setting ? 3. Pour Parmalat, les dates sont-elles en YYYYMMDD ?

Problème : Table non créée

Symptômes : Erreur "table does not exist"

Vérifications : 1. Le fichier schéma existe-t-il dans MariaDB Model Schema/ ? 2. Le format du schéma est-il correct (dict avec 'rows') ? 3. Les permissions DB permettent-elles CREATE/DROP ?

Exemples d'utilisation

Ajout d'un nouveau type de fichier

Pour ajouter NEWFILE-BASE.txt :

  1. Modifier generate_file_mapping() :

    base_mapping = {
        'PRODUCT-BASE.txt': 'product_base',
        'PROMOCALENDAR.txt': 'promo_calendar',
        'SELLIN-BASE.txt': 'sell_in_base',
        'SELLOUT-BASE.txt': 'sell_out_base',
        'NEWFILE-BASE.txt': 'new_file_base',  # Ajout
    }
    

  2. Créer les fichiers de mapping pour chaque pays

  3. Créer le schéma dans MariaDB Model Schema/

Modification du traitement spécifique business unit

Pour ajouter un traitement spécifique :

if business_unit.lower() == "new_business_unit":
    df = custom_transformation(df, df_name)

Ajouter la fonction dans le flux de process_files() après le renommage des colonnes.