Diagrammes Step 02 - Data Integration
Vue d'ensemble du processus d'intégration
graph TB
Start([main]) --> JSON[get_countries_and_settings_from_json]
JSON --> Loop{Pour chaque<br/>country}
Loop --> ZIP[create_zip_if_needed]
ZIP --> Find[find_zip_files]
Find --> Process[process_files]
Process --> Insert[insert_dataframes_to_db]
Insert --> Check{italy_galbani?}
Check -->|Oui| Union[union_all_tables]
Check -->|Non| Next
Union --> Next
Next --> Loop
Loop -->|Terminé| End([Script executed successfully])
subgraph "Configuration"
CFG1[promo_config.json]
CFG2[Tool_architecture/{country}/]
CFG3[MariaDB Model Schema/]
end
CFG1 -.-> JSON
CFG2 -.-> Process
CFG3 -.-> Insert
Architecture des mappings et types
graph LR
subgraph "Structure Tool Architecture"
ROOT["/app/MariaDB/Tool_architecture/"]
ROOT --> COUNTRY["{country}/"]
COUNTRY --> MAPPINGS["Mappings colonnes"]
COUNTRY --> TYPES["Types/"]
MAPPINGS --> M1["product_base_mapping_{country}.txt"]
MAPPINGS --> M2["promo_calendar_mapping_{country}.txt"]
MAPPINGS --> M3["sell_in_base_mapping_{country}.txt"]
MAPPINGS --> M4["sell_out_base_mapping_{country}.txt"]
TYPES --> T1["product_base_mapping_types_{country}.txt"]
TYPES --> T2["promo_calendar_mapping_types_{country}.txt"]
TYPES --> T3["sell_in_base_mapping_types_{country}.txt"]
TYPES --> T4["sell_out_base_mapping_types_{country}.txt"]
ROOT --> SCHEMA["MariaDB Model Schema/"]
SCHEMA --> S1["product_base_model_schema.txt"]
SCHEMA --> S2["promo_calendar_model_schema.txt"]
SCHEMA --> S3["sell_in_base_model_schema.txt"]
SCHEMA --> S4["sell_out_base_model_schema.txt"]
end
style MAPPINGS fill:#e3f2fd
style TYPES fill:#fff3e0
style SCHEMA fill:#e8f5e9
Processus de lecture depuis ZIP
sequenceDiagram
participant Main as process_files()
participant ZIP as read_file_from_zip()
participant Mapping as read_mappings()
participant DF as DataFrame
Main->>Main: generate_file_mapping()
Note over Main: PRODUCT-BASE.txt → product_base<br/>PROMOCALENDAR.txt → promo_calendar<br/>etc.
loop Pour chaque fichier dans mapping
Main->>Mapping: Read column mapping
Main->>Mapping: Read type mapping
Main->>Main: Extract datetime columns
Main->>ZIP: read_file_from_zip(file_name, types, dates, dayfirst)
alt Italy Business Unit
ZIP->>ZIP: Read bytes → decode CP1252
ZIP->>ZIP: Normalize NFD
ZIP->>ZIP: Remove combining marks
ZIP->>ZIP: Clean double spaces
Note over ZIP: città → citta<br/>perché → perche
else Other Business Units
ZIP->>ZIP: Standard read with encoding
end
ZIP->>ZIP: Remove BOM if present
ZIP->>ZIP: pd.read_csv with parameters
ZIP-->>Main: Raw DataFrame
Main->>DF: Rename columns (mapping)
alt Parmalat
Main->>DF: convert_parmalat_dates()
Note over DF: YYYYMMDD → datetime
Main->>DF: clean_format_column()
Note over DF: '1000 ML' → 1000.0
end
Main->>DF: Add missing columns = None
Main-->>Main: Store in dataframes dict
end
Gestion des dates Parmalat
flowchart LR
subgraph "convert_parmalat_dates"
Check{promo_calendar?} -->|Oui| Map(Date mappings)
Check -->|Non| Skip(Return df)
Map --> D1(Promo_start_date: %Y%m%d)
Map --> D2(Promo_end_date: %Y%m%d)
Map --> D3(Promo_Sell_Out_Start: %Y%m%d)
Map --> D4(Promo_Sell_Out_End: %Y%m%d)
D1 --> Conv("pd.to_datetime<br/>format='%Y%m%d'<br/>errors='coerce'")
D2 --> Conv
D3 --> Conv
D4 --> Conv
end
subgraph "clean_format_column_parmalat"
Check2{sell_in_base?} -->|Oui| Extract(extract digits)
Check2 -->|Non| Skip2(Return df)
Extract --> Float("astype(float)")
Ex1("1000 ML → 1000.0")
Ex2("250 GR → 250.0")
Float -.-> Ex1
Float -.-> Ex2
end
Création et insertion des tables
stateDiagram-v2
[*] --> ReadSchema: read_model_schema_and_define_table()
ReadSchema --> ParseSchema: ast.literal_eval(schema)
ParseSchema --> CheckExists: inspector.has_table()
CheckExists --> DropTable: Table exists
CheckExists --> CreateColumns: Table not exists
DropTable --> CreateColumns: Table dropped
state CreateColumns {
[*] --> ParseType: For each field
ParseType --> MapType: Eval type string
MapType --> String: "String" → String(355)
MapType --> Other: Other → eval(type)
String --> Column: Create Column
Other --> Column: Create Column
Column --> CheckKey: Primary key?
CheckKey --> SetPrimary: Key = "PRI"
CheckKey --> CheckNull: Not primary
SetPrimary --> AddColumn: Add to columns
CheckNull --> AddColumn: Nullable check
AddColumn --> [*]
}
CreateColumns --> CreateTable: Table(*columns)
CreateTable --> ExecuteCreate: metadata.create_all()
ExecuteCreate --> InsertData: insert_dataframes_to_db()
InsertData --> ToSQL: df.to_sql()
note right of ToSQL
Parameters:
- if_exists='append'
- index=False
- method='multi'
- chunksize=10000
end note
ToSQL --> [*]: Data inserted
Union des tables sell-out (Galbani)
flowchart TB
subgraph "Tables source"
T1[(sell_out_base)]
T2[(sell_out_base_competitors)]
T3[(sell_out_base_private_label)]
end
subgraph "union_all_tables"
Load("Load each table<br/>pd.read_sql")
Replace("Replace Brand '0' → 'Others'")
Concat("pd.concat(dataframes)")
Save("to_sql('sell_out_base_union')")
end
T1 --> Load
T2 --> Load
T3 --> Load
Load --> Replace
Replace --> Concat
Concat --> Save
Save --> Result[(sell_out_base_union)]
style Result fill:#90EE90
graph TD
subgraph "Format simple (1 pays)"
J1["{<br/> 'Country': 'France',<br/> 'Dayfirst_datetime': false<br/>}"]
end
subgraph "Format multiple (plusieurs pays)"
J2["[<br/> {<br/> 'Country': 'Italy_galbani',<br/> 'Dayfirst_datetime': true<br/> },<br/> {<br/> 'Country': 'Italy_parmalat',<br/> 'Dayfirst_datetime': true<br/> }<br/>]"]
end
subgraph "Parsing"
P[get_countries_and_settings_from_json]
P --> Check{Type?}
Check -->|dict| Convert[Convertir en liste]
Check -->|list| Process[Process directly]
Convert --> Process
Process --> Output["unique_countries: ['Italy_galbani', 'Italy_parmalat']<br/>dayfirst_settings: {'Italy_galbani': True, 'Italy_parmalat': True}"]
end
J1 --> P
J2 --> P
Flux de traitement par fichier
flowchart LR
subgraph "Pour chaque fichier ZIP"
direction TB
subgraph "1. Configuration"
M1("mapping_{country}.txt")
M2("mapping_types_{country}.txt")
M3("model_schema.txt")
end
subgraph "2. Lecture"
R1(read_file_from_zip)
R2(Normalisation IT)
R3(Remove BOM)
R4(pd.read_csv)
end
subgraph "3. Transformation"
T1(Rename columns)
T2{Parmalat?}
T3(Convert dates)
T4(Clean format)
T5(Add missing cols)
end
subgraph "4. Insertion"
I1(read_model_schema)
I2(Drop if exists)
I3(Create table)
I4(df.to_sql)
end
end
M1 --> R1
M2 --> R1
R1 --> R2 --> R3 --> R4
R4 --> T1
T1 --> T2
T2 -->|Oui| T3
T2 -->|Oui| T4
T2 -->|Non| T5
T3 --> T5
T4 --> T5
M3 --> I1
T5 --> I1
I1 --> I2 --> I3 --> I4
Points clés de l'intégration
mindmap
root((Step 02<br/>Data<br/>Integration))
Configuration
promo_config.json
Countries list
Dayfirst settings
Tool Architecture
Mappings colonnes
Types colonnes
Schemas tables
Normalisation
Italie uniquement
NFD decomposition
Remove accents
CP1252 → UTF-8
BOM removal
All countries
Spécificités
Parmalat
Dates YYYYMMDD
Format cleaning
1000 ML → 1000.0
Galbani
Union sell-out
Brand 0 → Others
3 tables → 1
Robustesse
Missing columns
Add with None
Schema validation
Drop & recreate
Chunked insert
10000 rows/batch
Gestion des erreurs
stateDiagram-v2
[*] --> TryProcess: process_files()
TryProcess --> FileError: File not in ZIP
TryProcess --> MappingError: Mapping not found
TryProcess --> DateError: Date parse error
TryProcess --> SchemaError: Schema error
TryProcess --> Success: All OK
FileError --> Log1: Log + continue
MappingError --> Log2: Log + continue
DateError --> Coerce: errors='coerce' → NaT
SchemaError --> Propagate: Exception raised
Log1 --> NextFile: Process next file
Log2 --> NextFile
Coerce --> Continue: Continue with NaT
NextFile --> TryProcess
Continue --> Success
Success --> [*]
Propagate --> [*]: Stop execution
note right of FileError
Non-blocking errors:
- Missing file
- Missing mapping
- Date parsing
end note
note right of SchemaError
Blocking errors:
- Schema read error
- Table creation error
end note