Step_02_Data_Integration - Intégration des données¶
Vue d'ensemble¶
Cette étape réalise l'intégration complète des données depuis les fichiers ZIP vers la base de données MariaDB. Elle applique les mappings spécifiques à chaque pays, gère les conversions de types et crée les tables de base nécessaires pour les étapes suivantes du pipeline.
Objectif principal¶
- Lire les données depuis les fichiers ZIP
- Appliquer les mappings de colonnes spécifiques au pays
- Convertir les types de données selon les schémas définis
- Créer et peupler les tables de base dans MariaDB
- Gérer les spécificités par business unit (notamment Italie)
Position dans la pipeline¶
- Étape précédente : Step_01_Specific_Pre_Processing (pour l'Italie) ou Step_00_Download_Inputs
- Étape suivante : Step_03_Data_Cleaning_and_Data_Quality_Checks
- Paramètres d'appel :
/app/inputs/{business_unit}/
et/app/MariaDB/Tool_architecture/
Architecture technique¶
Flux de données¶
Entrée
│
├── Fichiers ZIP
│ └── compress_input_data.zip
│ ├── PRODUCT-BASE.txt
│ ├── PROMOCALENDAR.txt
│ ├── SELLIN-BASE.txt
│ └── SELLOUT-BASE.txt
│
├── Configuration
│ └── promo_config.json (pays et paramètres dates)
│
└── Mappings Tool Architecture
└── {country}/
├── Mappings colonnes (.txt)
├── Types/ (définitions types)
└── MariaDB Model Schema/
Traitement
│
├── Lecture des ZIP
│ ├── Normalisation caractères (Italie)
│ └── Parsing CSV avec paramètres locaux
│
├── Application mappings
│ ├── Renommage colonnes
│ ├── Conversion types
│ └── Parsing dates
│
└── Gestion spécifique
├── Parmalat : conversion dates YYYYMMDD
└── Galbani : union tables sell-out
Sortie (Tables MariaDB)
│
├── product_base
├── promo_calendar
├── sell_in_base
├── sell_out_base
└── sell_out_base_union (Galbani uniquement)
Structure des fichiers de configuration¶
promo_config.json¶
ou pour plusieurs pays :[
{
"Country": "Italy_galbani",
"Dayfirst_datetime": true
},
{
"Country": "Italy_parmalat",
"Dayfirst_datetime": true
}
]
Fichiers de mapping¶
Pour chaque pays et chaque table, deux fichiers :
1. Mapping colonnes : {table}_mapping_{country}.txt
- Mapping types :
{table}_mapping_types_{country}.txt
Concepts clés¶
1. Gestion multi-pays¶
- Lecture du
promo_config.json
pour identifier les pays - Application des mappings spécifiques à chaque pays
- Paramètre
dayfirst
pour le parsing des dates
2. Normalisation des caractères (Italie)¶
Pour les business units italiennes uniquement :
- Décomposition NFD (Canonical Decomposition)
- Suppression des marques diacritiques
- Exemple : città
→ citta
3. Schémas dynamiques¶
Les tables sont créées/recréées selon les schémas définis : - Drop de la table existante si présente - Création avec les types corrects - Support des clés primaires et contraintes NULL
4. Union des tables sell-out (Galbani)¶
Spécifique à Italy_galbani
:
- Union de 3 tables : sell_out_base
, sell_out_base_competitors
, sell_out_base_private_label
- Remplacement de '0' par 'Others' dans la colonne Brand
- Création de sell_out_base_union
Implémentation détaillée¶
1. Fonction principale : main()
¶
def main(data_path, tool_architecture_path):
json_path = f"/app/inputs/{business_unit}/promo_config.json"
unique_countries, dayfirst_settings = get_countries_and_settings_from_json(json_path)
for country in unique_countries:
# Traitement par pays
create_zip_if_needed(data_path)
zip_files = find_zip_files(data_path)
dataframes = process_files(zip_files, country, tool_architecture_path, dayfirst_settings[country])
insert_dataframes_to_db(dataframes, engine, tool_architecture_path, country)
# Union spécifique Galbani
if country == 'Italy_galbani':
union_all_tables(engine)
2. Lecture des fichiers ZIP : read_file_from_zip()
¶
Processus standard¶
- Ouverture du fichier dans le ZIP
- Suppression du BOM UTF-8 si présent
- Lecture CSV avec pandas
Processus Italie (Galbani/Parmalat)¶
- Lecture en bytes
- Décodage CP1252
- Normalisation NFD des caractères
- Suppression des accents
- Nettoyage des espaces doubles
- Lecture CSV depuis string normalisé
if business_unit.lower() in ["italy_galbani", "italy_parmalat"]:
content_str = unicodedata.normalize('NFD', content_str)
content_str = ''.join(c for c in content_str if unicodedata.category(c) != 'Mn')
3. Traitement des fichiers : process_files()
¶
Pour chaque fichier dans le ZIP :
-
Lecture des mappings
-
Extraction des colonnes datetime
-
Application des conversions spécifiques
- Renommage colonnes selon mapping
- Conversion dates Parmalat (YYYYMMDD)
-
Nettoyage colonne Format Parmalat
-
Ajout colonnes manquantes
4. Spécificités Parmalat¶
Conversion dates : convert_parmalat_dates()
¶
date_mappings = {
'Promo_start_date': '%Y%m%d',
'Promo_end_date': '%Y%m%d',
'Promo_Sell_Out_Start': '%Y%m%d',
'Promo_Sell_Out_End': '%Y%m%d'
}
Nettoyage Format : clean_format_column_parmalat()
¶
- Extraction de la partie numérique uniquement
'1000 ML'
→1000.0
'250 GR'
→250.0
5. Création des tables : read_model_schema_and_define_table()
¶
-
Lecture du schéma
-
Drop si existante
-
Création avec types corrects
- String : longueur par défaut 355
- Gestion des clés primaires
- Gestion des contraintes NULL
6. Insertion en base : insert_dataframes_to_db()
¶
df.to_sql(final_table_name, con=engine, if_exists='append',
index=False, method='multi', chunksize=10000)
append
(table déjà créée)
- Méthode multi
pour performance
Tables créées¶
Tables communes (toutes business units)¶
Table | Description | Colonnes clés |
---|---|---|
product_base |
Référentiel produits | EAN, Product_desc, Brand, Category |
promo_calendar |
Calendrier promotionnel | Promo_ID, Promo_start_date, Promo_end_date |
sell_in_base |
Données sell-in | Week_ID, EAN, Sales_Volumes, Sales_Value |
sell_out_base |
Données sell-out | Week_ID, EAN, Channel, Sales_Volumes |
Table spécifique Galbani¶
Table | Description | Particularité |
---|---|---|
sell_out_base_union |
Union des 3 tables sell-out | Brand '0' → 'Others' |
Gestion des erreurs¶
Types d'erreurs gérés¶
Type | Traitement | Impact |
---|---|---|
Fichier manquant dans ZIP | Log + continue | Non bloquant par fichier |
Erreur mapping | Log + continue | Non bloquant par pays |
Erreur parsing date | errors='coerce' → NaT |
Valeurs manquantes |
Erreur schéma | Exception propagée | Bloquant |
ZIP non trouvé | Log + continue | Non bloquant par pays |
Stratégie de robustesse¶
- Création ZIP si absent : Génération automatique depuis fichiers TXT
- Colonnes manquantes : Ajout avec valeur
None
- BOM UTF-8 : Suppression automatique
- Encodages multiples : CP1252 par défaut, gestion spéciale Italie
Performance et optimisation¶
1. Lecture optimisée¶
- Lecture directe depuis ZIP sans extraction
- Traitement en mémoire des transformations
2. Insertion par chunks¶
Équilibre entre mémoire et performance3. Transactions groupées¶
- Une transaction par table complète
- Rollback automatique en cas d'erreur
4. Types explicites¶
- Évite l'inférence de types pandas
- Réduit la consommation mémoire
Points d'attention maintenance¶
1. Nouveaux pays¶
Pour ajouter un pays :
1. Ajouter dans promo_config.json
2. Créer les fichiers de mapping dans Tool_architecture/{country}/
3. Créer les fichiers de types dans Tool_architecture/{country}/Types/
4. Vérifier le paramètre dayfirst
pour les dates
2. Modification des schémas¶
- Les schémas sont dans Tool_architecture/MariaDB Model Schema/
- Format : liste de dictionnaires avec Field, Type, Key, Null
- La table est recréée à chaque exécution
3. Formats de dates¶
- Standard : selon
dayfirst
du pays - Parmalat : toujours YYYYMMDD
- Vérifier les nouveaux formats dans
convert_parmalat_dates()
Troubleshooting¶
Problème : "Error processing {file} for {country}"¶
Causes possibles : - Fichier absent du ZIP - Mapping manquant - Format de fichier incorrect
Diagnostic :
# Lister contenu du ZIP
from zipfile import ZipFile
with ZipFile(zip_path) as z:
print(z.namelist())
Problème : Caractères mal encodés¶
Symptômes : Caractères � ou illisibles
Solution : 1. Vérifier l'encodage source (devrait être CP1252) 2. Pour l'Italie, vérifier que la normalisation NFD est appliquée 3. Tester avec différents encodages si nécessaire
Problème : Dates non parsées¶
Symptômes : Colonnes dates restent en string
Vérifications :
1. Le type est-il 'datetime64'
dans le fichier de types ?
2. Le format correspond-il au dayfirst
setting ?
3. Pour Parmalat, les dates sont-elles en YYYYMMDD ?
Problème : Table non créée¶
Symptômes : Erreur "table does not exist"
Vérifications : 1. Le fichier schéma existe-t-il dans MariaDB Model Schema/ ? 2. Le format du schéma est-il correct (dict avec 'rows') ? 3. Les permissions DB permettent-elles CREATE/DROP ?
Exemples d'utilisation¶
Ajout d'un nouveau type de fichier¶
Pour ajouter NEWFILE-BASE.txt :
-
Modifier
generate_file_mapping()
: -
Créer les fichiers de mapping pour chaque pays
- Créer le schéma dans MariaDB Model Schema/
Modification du traitement spécifique business unit¶
Pour ajouter un traitement spécifique :
Ajouter la fonction dans le flux de process_files()
après le renommage des colonnes.