Aller au contenu

Diagrammes Step 02 - Data Integration

Vue d'ensemble du processus d'intégration

graph TB
    Start([main]) --> JSON[get_countries_and_settings_from_json]
    JSON --> Loop{Pour chaque<br/>country}

    Loop --> ZIP[create_zip_if_needed]
    ZIP --> Find[find_zip_files]
    Find --> Process[process_files]

    Process --> Insert[insert_dataframes_to_db]
    Insert --> Check{italy_galbani?}

    Check -->|Oui| Union[union_all_tables]
    Check -->|Non| Next
    Union --> Next

    Next --> Loop
    Loop -->|Terminé| End([Script executed successfully])

    subgraph "Configuration"
        CFG1[promo_config.json]
        CFG2[Tool_architecture/&#123;country&#125;/]
        CFG3[MariaDB Model Schema/]
    end

    CFG1 -.-> JSON
    CFG2 -.-> Process
    CFG3 -.-> Insert

Architecture des mappings et types

graph LR
    subgraph "Structure Tool Architecture"
        ROOT["/app/MariaDB/Tool_architecture/"]
        ROOT --> COUNTRY["{country}/"]

        COUNTRY --> MAPPINGS["Mappings colonnes"]
        COUNTRY --> TYPES["Types/"]

        MAPPINGS --> M1["product_base_mapping_{country}.txt"]
        MAPPINGS --> M2["promo_calendar_mapping_{country}.txt"]
        MAPPINGS --> M3["sell_in_base_mapping_{country}.txt"]
        MAPPINGS --> M4["sell_out_base_mapping_{country}.txt"]

        TYPES --> T1["product_base_mapping_types_{country}.txt"]
        TYPES --> T2["promo_calendar_mapping_types_{country}.txt"]
        TYPES --> T3["sell_in_base_mapping_types_{country}.txt"]
        TYPES --> T4["sell_out_base_mapping_types_{country}.txt"]

        ROOT --> SCHEMA["MariaDB Model Schema/"]
        SCHEMA --> S1["product_base_model_schema.txt"]
        SCHEMA --> S2["promo_calendar_model_schema.txt"]
        SCHEMA --> S3["sell_in_base_model_schema.txt"]
        SCHEMA --> S4["sell_out_base_model_schema.txt"]
    end

    style MAPPINGS fill:#e3f2fd
    style TYPES fill:#fff3e0
    style SCHEMA fill:#e8f5e9

Processus de lecture depuis ZIP

sequenceDiagram
    participant Main as process_files()
    participant ZIP as read_file_from_zip()
    participant Mapping as read_mappings()
    participant DF as DataFrame

    Main->>Main: generate_file_mapping()
    Note over Main: PRODUCT-BASE.txt → product_base<br/>PROMOCALENDAR.txt → promo_calendar<br/>etc.

    loop Pour chaque fichier dans mapping
        Main->>Mapping: Read column mapping
        Main->>Mapping: Read type mapping
        Main->>Main: Extract datetime columns

        Main->>ZIP: read_file_from_zip(file_name, types, dates, dayfirst)

        alt Italy Business Unit
            ZIP->>ZIP: Read bytes → decode CP1252
            ZIP->>ZIP: Normalize NFD
            ZIP->>ZIP: Remove combining marks
            ZIP->>ZIP: Clean double spaces
            Note over ZIP: città → citta<br/>perché → perche
        else Other Business Units
            ZIP->>ZIP: Standard read with encoding
        end

        ZIP->>ZIP: Remove BOM if present
        ZIP->>ZIP: pd.read_csv with parameters
        ZIP-->>Main: Raw DataFrame

        Main->>DF: Rename columns (mapping)

        alt Parmalat
            Main->>DF: convert_parmalat_dates()
            Note over DF: YYYYMMDD → datetime
            Main->>DF: clean_format_column()
            Note over DF: '1000 ML' → 1000.0
        end

        Main->>DF: Add missing columns = None
        Main-->>Main: Store in dataframes dict
    end

Gestion des dates Parmalat

flowchart LR
    subgraph "convert_parmalat_dates"
        Check{promo_calendar?} -->|Oui| Map(Date mappings)
        Check -->|Non| Skip(Return df)

        Map --> D1(Promo_start_date: %Y%m%d)
        Map --> D2(Promo_end_date: %Y%m%d)
        Map --> D3(Promo_Sell_Out_Start: %Y%m%d)
        Map --> D4(Promo_Sell_Out_End: %Y%m%d)

        D1 --> Conv("pd.to_datetime<br/>format='%Y%m%d'<br/>errors='coerce'")
        D2 --> Conv
        D3 --> Conv
        D4 --> Conv
    end

    subgraph "clean_format_column_parmalat"
        Check2{sell_in_base?} -->|Oui| Extract(extract digits)
        Check2 -->|Non| Skip2(Return df)

        Extract --> Float("astype(float)")

        Ex1("1000 ML → 1000.0")
        Ex2("250 GR → 250.0")

        Float -.-> Ex1
        Float -.-> Ex2
    end

Création et insertion des tables

stateDiagram-v2
    [*] --> ReadSchema: read_model_schema_and_define_table()

    ReadSchema --> ParseSchema: ast.literal_eval(schema)
    ParseSchema --> CheckExists: inspector.has_table()

    CheckExists --> DropTable: Table exists
    CheckExists --> CreateColumns: Table not exists
    DropTable --> CreateColumns: Table dropped

    state CreateColumns {
        [*] --> ParseType: For each field
        ParseType --> MapType: Eval type string

        MapType --> String: "String" → String(355)
        MapType --> Other: Other → eval(type)

        String --> Column: Create Column
        Other --> Column: Create Column

        Column --> CheckKey: Primary key?
        CheckKey --> SetPrimary: Key = "PRI"
        CheckKey --> CheckNull: Not primary
        SetPrimary --> AddColumn: Add to columns
        CheckNull --> AddColumn: Nullable check

        AddColumn --> [*]
    }

    CreateColumns --> CreateTable: Table(*columns)
    CreateTable --> ExecuteCreate: metadata.create_all()

    ExecuteCreate --> InsertData: insert_dataframes_to_db()

    InsertData --> ToSQL: df.to_sql()

    note right of ToSQL
        Parameters:
        - if_exists='append'
        - index=False
        - method='multi'
        - chunksize=10000
    end note

    ToSQL --> [*]: Data inserted

Union des tables sell-out (Galbani)

flowchart TB
    subgraph "Tables source"
        T1[(sell_out_base)]
        T2[(sell_out_base_competitors)]
        T3[(sell_out_base_private_label)]
    end

    subgraph "union_all_tables"
        Load("Load each table<br/>pd.read_sql")
        Replace("Replace Brand '0' → 'Others'")
        Concat("pd.concat(dataframes)")
        Save("to_sql('sell_out_base_union')")
    end

    T1 --> Load
    T2 --> Load
    T3 --> Load

    Load --> Replace
    Replace --> Concat
    Concat --> Save

    Save --> Result[(sell_out_base_union)]

    style Result fill:#90EE90

Configuration promo_config.json

graph TD
    subgraph "Format simple (1 pays)"
        J1["{<br/>&nbsp;&nbsp;'Country': 'France',<br/>&nbsp;&nbsp;'Dayfirst_datetime': false<br/>}"]
    end

    subgraph "Format multiple (plusieurs pays)"
        J2["[<br/>&nbsp;&nbsp;{<br/>&nbsp;&nbsp;&nbsp;&nbsp;'Country': 'Italy_galbani',<br/>&nbsp;&nbsp;&nbsp;&nbsp;'Dayfirst_datetime': true<br/>&nbsp;&nbsp;},<br/>&nbsp;&nbsp;{<br/>&nbsp;&nbsp;&nbsp;&nbsp;'Country': 'Italy_parmalat',<br/>&nbsp;&nbsp;&nbsp;&nbsp;'Dayfirst_datetime': true<br/>&nbsp;&nbsp;}<br/>]"]
    end

    subgraph "Parsing"
        P[get_countries_and_settings_from_json]
        P --> Check{Type?}
        Check -->|dict| Convert[Convertir en liste]
        Check -->|list| Process[Process directly]
        Convert --> Process
        Process --> Output["unique_countries: ['Italy_galbani', 'Italy_parmalat']<br/>dayfirst_settings: {'Italy_galbani': True, 'Italy_parmalat': True}"]
    end

    J1 --> P
    J2 --> P

Flux de traitement par fichier

flowchart LR
    subgraph "Pour chaque fichier ZIP"
        direction TB

        subgraph "1. Configuration"
            M1("mapping_{country}.txt")
            M2("mapping_types_{country}.txt")
            M3("model_schema.txt")
        end

        subgraph "2. Lecture"
            R1(read_file_from_zip)
            R2(Normalisation IT)
            R3(Remove BOM)
            R4(pd.read_csv)
        end

        subgraph "3. Transformation"
            T1(Rename columns)
            T2{Parmalat?}
            T3(Convert dates)
            T4(Clean format)
            T5(Add missing cols)
        end

        subgraph "4. Insertion"
            I1(read_model_schema)
            I2(Drop if exists)
            I3(Create table)
            I4(df.to_sql)
        end
    end

    M1 --> R1
    M2 --> R1
    R1 --> R2 --> R3 --> R4
    R4 --> T1
    T1 --> T2
    T2 -->|Oui| T3
    T2 -->|Oui| T4
    T2 -->|Non| T5
    T3 --> T5
    T4 --> T5

    M3 --> I1
    T5 --> I1
    I1 --> I2 --> I3 --> I4

Points clés de l'intégration

mindmap
  root((Step 02<br/>Data<br/>Integration))
    Configuration
      promo_config.json
        Countries list
        Dayfirst settings
      Tool Architecture
        Mappings colonnes
        Types colonnes
        Schemas tables

    Normalisation
      Italie uniquement
        NFD decomposition
        Remove accents
        CP1252 → UTF-8
      BOM removal
        All countries

    Spécificités
      Parmalat
        Dates YYYYMMDD
        Format cleaning
        1000 ML → 1000.0
      Galbani
        Union sell-out
        Brand 0 → Others
        3 tables → 1

    Robustesse
      Missing columns
        Add with None
      Schema validation
        Drop & recreate
      Chunked insert
        10000 rows/batch

Gestion des erreurs

stateDiagram-v2
    [*] --> TryProcess: process_files()

    TryProcess --> FileError: File not in ZIP
    TryProcess --> MappingError: Mapping not found
    TryProcess --> DateError: Date parse error
    TryProcess --> SchemaError: Schema error
    TryProcess --> Success: All OK

    FileError --> Log1: Log + continue
    MappingError --> Log2: Log + continue
    DateError --> Coerce: errors='coerce' → NaT
    SchemaError --> Propagate: Exception raised

    Log1 --> NextFile: Process next file
    Log2 --> NextFile
    Coerce --> Continue: Continue with NaT

    NextFile --> TryProcess
    Continue --> Success
    Success --> [*]
    Propagate --> [*]: Stop execution

    note right of FileError
        Non-blocking errors:
        - Missing file
        - Missing mapping
        - Date parsing
    end note

    note right of SchemaError
        Blocking errors:
        - Schema read error
        - Table creation error
    end note