Aller au contenu

Step_07_Aggregation_at_promo_level - Agrégation et analyse des promotions

Vue d'ensemble

Cette étape constitue le cœur de l'analyse promotionnelle en agrégeant les effets de volume au niveau des événements promotionnels, en calculant les KPIs de performance (ROI, uplift), et en classifiant les promotions selon leur performance. Elle se décompose en quatre phases distinctes qui transforment les données granulaires hebdomadaires en insights actionnables au niveau promotion.

Objectif principal

  • Agréger les effets hebdomadaires au niveau événement promotionnel
  • Calculer les marges incrémentales et ROI par promotion
  • Classifier les promotions (value/volume generator/destroyer)
  • Préparer les analyses d'évolution de performance YTD vs LYTD

Position dans la pipeline

  • Étape précédente : Step_06_Valuation_of_volume_effects
  • Étape suivante : Step_08_Simulation_Inputs
  • Appels distincts :
  • main_promo_level_aggregation : Agrégation initiale avec forward buying
  • main_promo_effects_allocation : Allocation proportionnelle des effets
  • main_promo_event_filtering : Filtrage et classification des promotions
  • main_performance_evolution : Analyse comparative YTD/LYTD

Architecture technique

Flux de données

Phase 1 : Agrégation niveau promo
─────────────────────────────────
Entrée
    ├── promo_calendar (dates et mécaniques promo)
    ├── step_2_03_sell_in_base (coûts et marges)
    ├── step_5_03_model_cannibalization_effects (volumes hebdo)
    └── forward_buying_windows (Galbani/Parmalat)

Traitement
    ├── Enrichissement codes promo hebdomadaires
    ├── Calcul forward buying et cannibalisation
    ├── Agrégation volumes ajustés par promo
    └── Calcul marges promotionnelles

Sortie → step_6_01_promo_level_table (données hebdo enrichies)

Phase 2 : Allocation des effets
───────────────────────────────
Entrée
    └── step_6_01_promo_level_table

Traitement
    ├── Agrégation par Promotion_Code × Year
    ├── Calcul Baseline_Margin et marges ajustées
    └── Enrichissement métadonnées (retailer, category)

Sortie → step_6_02_final_promo_table (niveau promotion)

Phase 3 : Filtrage et classification
────────────────────────────────────
Entrée
    └── step_6_02_final_promo_table

Traitement
    ├── Calcul ROI et Volume_Uplift_%
    ├── Filtrage In_Scope (baseline ≥ 0, costs > 0)
    ├── Classification événements (4 types)
    └── Attribution couleurs visualisation

Sortie → step_6_03_final_with_scope_promo_table

Phase 4 : Evolution performance
───────────────────────────────
Entrée
    ├── step_6_01_promo_level_table
    ├── step_3_00_sell_in
    └── step_6_03_final_with_scope_promo_table

Traitement
    ├── Séparation YTD/LYTD par semaine max
    ├── Calcul effets (cost, price, promo)
    ├── Métriques comparatives (ROI, uplift)
    └── Agrégation par retailer_group et category

Sortie → step_6_04_perf_evolution_* (2 tables)

Concepts clés

1. Forward Buying (Galbani/Parmalat)

  • Définition : Stockage consommateur post-promo causant des ventes négatives
  • Fenêtre : 2-4 semaines configurables par retailer/catégorie
  • Calcul :
  • Halo = volumes post-promo > baseline
  • Pantry Loading = volumes post-promo < baseline
  • Forward Buying = min(|Halo + Pantry|, Uplift promo)

2. Cannibalisation (Parmalat spécifique)

  • Produits éligibles : In&Out + promo + UHT normal milk
  • Mécanisme : Les receivers reçoivent le volume cannibalisé des givers
  • Répartition : Proportionnelle à l'uplift des receivers

3. ROI et métriques performance

  • ROI = (Marge incrémentale - Coûts promo) / Coûts promo
  • Volume Uplift % = (Volume promo / Baseline) - 1
  • Marge incrémentale = (Marge promo × Volume promo) - (Marge baseline × Baseline)

4. Classification événements

  • Value & Volume generator : ROI ≥ 75e percentile ET uplift ≥ moyenne
  • Volume generator : ROI ≥ 0 ET uplift ≥ moyenne
  • Value generator : ROI ≥ 0 ET uplift < moyenne
  • Value destroyer : ROI < 0
  • Not in scope (Parmalat) : ROI > 0 ET uplift < 0

Implémentation détaillée

1. Phase 1 : Agrégation niveau promo

Galbani : aggregate_promo_level_italy_galbani()

Enrichissement codes promo :

# Propagation codes promo sur semaines flaggées
def ensure_promotion_code_carryover(df_effects, df_sell_in):
    mask_promo = df_effects['Flag_init_Promo'] == '1'

    # Filtre sell-in avec codes promo
    df_sell_in_filtered = df_sell_in[
        ['Retailer_name', 'EAN', 'EAN_desc', 'Promotion_Code', 'Year', 'Week']
    ].drop_duplicates().dropna(subset=['Promotion_Code'])

    # Merge uniquement sur lignes promo
    df_effects_promo = df_effects[mask_promo].merge(
        df_sell_in_filtered,
        on=['Retailer_name', 'EAN', 'EAN_desc', 'Year', 'Week'],
        how='left'
    )

    # Cluster ID pour grouper les semaines
    combined_key = (df['Retailer_name'] + '_' + df['EAN'] + '_' + 
                   df['Year'] + '_' + df['Week'])
    df['Cluster_Promotion_ID'] = pd.factorize(combined_key)[0]

Calcul Forward Buying :

# Retailer behavior factor
def calculate_retailer_behavior_factor(df):
    # Filtre semaines non-promo
    non_promo_df = df[(df['Week_promo_volume'] == 0)]

    # Uplift hors promo = (Volume total / Baseline) - 1
    grouped = non_promo_df.groupby(['Retailer_name', 'Year']).agg({
        'Week_total_volume': 'sum',
        'Baseline_Volume': 'sum'
    })

    grouped['Yearly_no_promo_uplift'] = (
        grouped['Week_total_volume'] / grouped['Baseline_Volume'] - 1
    )

    # Mapping factor
    def map_behavior_factor(uplift):
        if uplift >= 0: return 0
        elif uplift >= -0.25: return 1
        else: return 1.1

    grouped['Retailer_behavior_factor'] = grouped['Yearly_no_promo_uplift'].apply(
        map_behavior_factor
    )

Marges promotionnelles :

def compute_promotional_margin(df_sell_in, df_ultime):
    # Coûts moyens par période
    df_sell_in['period_temp'] = df_sell_in.apply(lambda row: 
        1 if row['Year'] == 2023 else  # annuel
        ((week - 1) // 26) + 1 if row['Year'] == 2024 else  # semestriel
        ((week - 1) // 13) + 1 if row['Year'] >= 2025 else 1  # trimestriel
    )

    # Marge ajustée
    df_sell_in['Adjusted_gross_margin'] = (
        df_sell_in['Net_net_sales'] +
        df_sell_in['Promo_funding_off_invoice_Detail_6'] +
        (df_sell_in['Average_unit_Costs_Product_and_Other'] * df_sell_in['Volumes']) +
        df_sell_in['Logistics_Cost'] - 
        (df_sell_in['Average_unit_Costs_Level_Two'] * df_sell_in['Volumes']) +
        (df_sell_in['Supply_chain_gap'] * df_sell_in['Volumes'])
    )

    # Moyenne pondérée par promo
    weighted_promo_margin = df_sell_in.groupby(
        ['Retailer_name', 'EAN', 'EAN_desc', 'Year', 'Promotion_Code']
    ).apply(compute_weighted_average).reset_index(name='Promo_margin_at_event_level')

Parmalat : aggregate_promo_level_italy_parmalat()

Spécificités : - Utilise Retailer_classification au lieu de Retailer_name - Pas d'EAN_desc dans les agrégations - Forward buying calculé semaine par semaine

def calculate_halo_pantry_and_forward_buying_parmalat(df_effects_enriched, engine):
    # Configuration fenêtres FB
    forward_buying_windows = pd.read_sql("""
        SELECT Retailer_classification as Retailer_name, 
               New_category, Forward_buying_weeks_window
        FROM Forward_Buying_weeks_window_parmalat
    """, engine)

    # Process par groupe Retailer × EAN × Year
    def calculate_forward_buying_for_group(group):
        # Identifie dernière semaine de chaque bloc promo
        for i in range(len(group)):
            if promo_mask[i]:
                if i == len(group) - 1:  # Dernière semaine année
                    group.loc[i, 'Is_Last_Promo_Week'] = True
                elif not promo_mask[i + 1]:  # Semaine suivante non-promo
                    group.loc[i, 'Is_Last_Promo_Week'] = True
                elif group.loc[i, 'Promotion_Code'] != group.loc[i + 1, 'Promotion_Code']:
                    group.loc[i, 'Is_Last_Promo_Week'] = True

        # Calcul FB uniquement sur dernières semaines
        for promo_idx in last_promo_indices:
            # Fenêtre post-promo
            post_promo_mask = (
                (group['Week_int'] > promo_week_int) & 
                (group['Week_int'] <= promo_week_int + window_size) &
                (group['Promotion_Code'].isna())  # Non-promo uniquement
            )

            # Calcul volumes
            volume_diffs = valid_data['Week_total_volume'] - valid_data['Baseline_Volume']
            total_halo = volume_diffs[volume_diffs > 0].sum()
            total_pantry = volume_diffs[volume_diffs < 0].sum()

            # Logic FB
            if total_pantry < 0 and total_halo > 0:
                forward_buying_volume = abs(total_halo + total_pantry)

Cannibalisation UHT :

def calculate_cannibalization_adjustment_parmalat(df_effects_enriched, engine):
    # Filtre UHT uniquement
    uht_mask = df_effects_enriched['New_category'] == 'UHT normal milk'

    # Tous sont givers
    df_effects_enriched.loc[all_givers_idx, 'Cannibalization_role'] = 'Giver'
    volume_cannibalized = np.maximum(0, baseline_vols - actual_vols)

    # Receivers : In&Out + promo
    receiver_mask = (
        group['EAN'].isin(inout_eans) & 
        group['Promotion_Code'].notna()
    )

    # Répartition proportionnelle uplift
    if total_receiver_uplift > 0:
        weights = receiver_uplifts / total_receiver_uplift
        volumes_received = net_cannibalization * weights

2. Phase 2 : Allocation des effets

Galbani : compute_promo_effects_allocation_italy_galbani()

# Agrégation niveau promotion
aggregated_df = df_promo.groupby([
    'Retailer_name', 'EAN', 'EAN_desc', 'New_category', 'Promotion_Code', 'Year'
]).agg({
    'Valid_Baseline_Volume': 'sum',  # Seulement si Week_promo_volume != 0
    'Promo_margin_at_event_level': 'first',
    'NSV_at_event_level': 'first',
    'Total_promo_costs_yearly': 'first',
    'Total_promo_volume_yearly': 'first',
    'adj_Week_promo_volume': 'sum'  # Volume ajusté FB
})

# Baseline Margin = marge sans coûts promo
aggregated_df['Baseline_Margin'] = aggregated_df.apply(
    lambda row: row['Promo_margin_at_event_level'] + 
                (row['Total_promo_costs_yearly'] / row['Total_promo_volume_yearly'])
    if row['Total_promo_volume_yearly'] > 0 else row['Promo_margin_at_event_level']
)

# Ajustement baseline ≤ volume promo
aggregated_df['Valid_finale_Baseline_Volume'] = aggregated_df.apply(
    lambda x: min(x['adj_Week_promo_volume'], x['Valid_Baseline_Volume'])
)

Parmalat : compute_promo_effects_allocation_italy_parmalat()

Similaire mais : - Agrège Forward_Buying_Volume et Volume_cannibalized_received - Utilise durée promo du calendrier (Promo_start_WEEK_ID, Promo_end_WEEK_ID)

3. Phase 3 : Filtrage et classification

compute_performance_metrics()

# Métriques de base
df['Incremental_Margin'] = (
    df['Promo_margin_at_event_level'] * df['Adj_Promo_Volume'] - 
    df['Baseline_Margin'] * df['Baseline_Volume']
)
df['Volume_Uplift_%'] = df['Adj_Promo_Volume'] / df['Baseline_Volume'] - 1

# Gestion infinis
df['Volume_Uplift_%'] = df['Volume_Uplift_%'].replace(
    [float('inf'), -float('inf')], np.nan
)

# In_Scope standard
df['In_Scope_Performance'] = (
    (df['ROI'].between(-4, 4)) &
    (df['Volume_Uplift_%'].between(-1, 8))
).astype(int)

Classification Galbani : classify_events()

# Par Year × Category
for (year, category), group in grouped:
    in_scope = group[group['In_Scope'] == 1]

    # Métriques pondérées
    weighted_avg_uplift = (in_scope['Adj_Promo_Volume'].sum() - 
                          in_scope['Baseline_Volume'].sum()) / 
                         in_scope['Baseline_Volume'].sum()

    # Percentiles (sans NaN)
    roi_clean = in_scope['ROI'].dropna()
    roi_percentile_75 = np.percentile(roi_clean, 75)

    # Classification
    for idx, row in group.iterrows():
        roi = row['ROI']
        uplift = row['Volume_Uplift_%']

        if roi >= abs(roi_percentile_75) and uplift >= weighted_avg_uplift:
            df.at[idx, 'Event_Type'] = 'Value & Volume generator'
        elif roi >= 0 and uplift >= weighted_avg_uplift:
            df.at[idx, 'Event_Type'] = 'Volume generator'
        elif roi >= 0 and uplift < weighted_avg_uplift:
            df.at[idx, 'Event_Type'] = 'Value generator'
        elif roi < 0:
            df.at[idx, 'Event_Type'] = 'Value destroyer'

Classification Parmalat : classify_events_parmalat()

Règles strictes : - Exclut d'abord : ROI > 0 ET uplift < 0 → "Not in scope events" - Puis applique classification standard sur événements valides

# Flag weird events en premier
df.loc[(df['ROI'] > 0) & (df['Volume_Uplift_%'] < 0), 'Event_Type'] = 'Not in scope events'

# Classification sur In_Scope_event uniquement
in_scope = group[(group['In_Scope'] == 1) & (group['In_Scope_event'] == True)]

4. Phase 4 : Evolution performance

performance_evolution_italy_galbani()

Détermination périodes :

def determine_period_status(df):
    max_year = df['Year'].max()
    max_week = df[df['Year'] == max_year]['Week'].max()

    def period_status(year, week):
        if year == max_year and week <= max_week:
            return 'YTD'
        elif year == max_year - 1 and week <= max_week:
            return 'LYTD'
        else:
            return 'Irrelevant'

Calcul effets :

# Cost effect
cost_effect = ((costs_current - costs_last) * volume_total_last) - 
              ((costs_current - costs_last) * volume_baseline_last)

# Price effect
price_effect = ((no_promo_price_current * (1 - promo_discount) - promo_price_last) * 
                volume_total_last) - 
               ((no_promo_price_current - no_promo_price_last) * volume_baseline_last)

# Promo effect (résiduel)
promo_effect = (inc_margin_current - inc_margin_last) - (cost_effect + price_effect)

Métriques comparatives :

# Variations en %
uplift_variation = (uplift_current - uplift_last) * 100
roi_variation = (roi_current - roi_last) * 100
value_destroyer_variation = safe_divide(
    (count_current - count_last), count_last
) * 100

Gestion des erreurs

1. Protection divisions

def safe_divide_new(numerator, denominator, default_value=0):
    if denominator == 0 or pd.isna(denominator) or np.isinf(numerator):
        return default_value
    result = numerator / denominator
    if pd.isna(result) or np.isinf(result):
        return default_value
    return result

2. Gestion mémoire (grandes tables)

def save_dataframe_in_batches(df, table_name, engine, batch_size=10000):
    total_rows = len(df)
    num_batches = (total_rows + batch_size - 1) // batch_size

    # Premier batch : replace
    batch_df = df.iloc[0:batch_size]
    batch_df.to_sql(table_name, con=engine, if_exists='replace', index=False)

    # Batches suivants : append
    for batch_num in range(1, num_batches):
        start_idx = batch_num * batch_size
        end_idx = min((batch_num + 1) * batch_size, total_rows)
        batch_df = df.iloc[start_idx:end_idx]
        batch_df.to_sql(table_name, con=engine, if_exists='append', index=False)
        del batch_df
        gc.collect()

3. Colonnes manquantes

# Protection EAN_desc Parmalat
if business_unit == 'italy_parmalat':
    # Pas d'EAN_desc dans les clés
    merge_keys = ['Retailer_name', 'EAN', 'Year', 'Week']
else:
    merge_keys = ['Retailer_name', 'EAN', 'EAN_desc', 'Year', 'Week']

Performance et optimisation

1. Chunking pour grandes données

# Process par chunks de retailers
for (retailer, ean, year), group_indices in df_sorted.groupby(
    ['Retailer_name', 'EAN', 'Year']
).groups.items():
    group_data = df_sorted.loc[group_indices].copy()
    processed_group = calculate_forward_buying_for_group(group_data)
    result_dfs.append(processed_group)

    if processed_groups % 5000 == 0:
        log_message(f"Processed {processed_groups}/{total_groups} groups...")

2. Optimisation mémoire

# Nettoyage explicite avant sauvegarde
del df_promo_calendar, df_sell_in, df_effects
gc.collect()

3. Agrégations vectorisées

# Éviter apply() quand possible
# Mauvais
df['result'] = df.apply(lambda x: complex_function(x), axis=1)

# Bon
df['result'] = np.where(
    condition,
    df['col1'] * df['col2'],
    default_value
)

Points d'attention maintenance

1. Ordre d'exécution critique

Les 4 phases DOIVENT s'exécuter dans l'ordre : 1. main_promo_level_aggregation 2. main_promo_effects_allocation 3. main_promo_event_filtering 4. main_performance_evolution

2. Différences business units

Aspect Galbani Parmalat
Clé retailer Retailer_name Retailer_classification
Clé produit EAN + EAN_desc EAN seul
Forward buying Bloc complet Semaine par semaine
Cannibalisation Non Oui (UHT)
Events exclus Non ROI>0 & uplift<0

3. Tables intermédiaires

  • forward_buying_mapping_* : Traçabilité FB
  • cannibalization_mapping_parmalat : Traçabilité cannib
  • KPIs_classification_*.csv : Debug classification

4. Compatibilité Qliksense

  • Table category : contient en fait données SKU (EAN_combined)
  • Retailer_group : duplique Retailer_name pour Parmalat
  • Couleurs : assignées par Category et Retailer_group

Troubleshooting

Problème : ROI extrêmes ou infinis

Diagnostic :

SELECT Promotion_Code, ROI, Promo_Costs, Incremental_Margin
FROM step_6_03_final_with_scope_promo_table
WHERE ABS(ROI) > 10 OR ROI IS NULL
ORDER BY ABS(ROI) DESC;

Causes : - Coûts promo = 0 → division par zéro - Baseline = 0 → uplift infini - Données manquantes

Solution :

# Ajouter filtre In_Scope
df['In_Scope'] = df['In_Scope'] & (df['Promo_Costs'] > 0)

Problème : Forward buying négatif

Symptôme : Forward_Buying_Volume < 0

Vérification :

SELECT Retailer_name, EAN, Week, 
       Forward_Buying_Volume, Total_Halo_Volume, Total_Pantry_Loading_Volume
FROM forward_buying_mapping_parmalat
WHERE Forward_Buying_Volume < 0;

Cause : Logic error dans calcul net

Solution : Vérifier abs() dans calcul

Problème : Classification incohérente

Symptôme : Value destroyer avec ROI > 0

Debug :

# Ajouter logs dans classify_events
log_message(f"Year {year}, Category {category}:")
log_message(f"  Weighted avg uplift: {weighted_avg_uplift:.2%}")
log_message(f"  ROI P75: {roi_percentile_75:.2f}")
log_message(f"  Total events: {len(group)}")

Problème : Performance evolution vide

Causes possibles : 1. Pas assez de données LYTD 2. Filtre In_Scope trop restrictif 3. Mauvaise détection max_week

Vérification :

SELECT Year, Period_Status, COUNT(*) as nb_rows
FROM (
    SELECT Year, 
           CASE 
               WHEN Year = (SELECT MAX(Year) FROM step_3_00_sell_in) THEN 'YTD'
               WHEN Year = (SELECT MAX(Year) FROM step_3_00_sell_in) - 1 THEN 'LYTD'
               ELSE 'Irrelevant'
           END as Period_Status
    FROM step_3_00_sell_in
) t
GROUP BY Year, Period_Status;

Exemples d'utilisation

Ajout nouvelle classification

# Dans classify_events_custom()
def classify_events_custom(df):
    # Nouvelle règle : Super performers
    super_performer_mask = (
        (df['ROI'] > 2) & 
        (df['Volume_Uplift_%'] > 3)
    )
    df.loc[super_performer_mask, 'Event_Type'] = 'Super performer'

    # Puis classification standard
    df = classify_events(df)
    return df

Export données pour analyse

# Après phase 3
debug_df = df[df['Event_Type'] == 'Value destroyer'][
    ['Promotion_Code', 'Retailer_name', 'EAN', 'ROI', 
     'Volume_Uplift_%', 'Promo_Costs', 'Baseline_Volume']
]
debug_df.to_excel(f'value_destroyers_{business_unit}.xlsx', index=False)

Personnalisation fenêtre forward buying

-- Table Forward_Buying_weeks_window_custom
INSERT INTO Forward_Buying_weeks_window_parmalat 
VALUES ('Carrefour', 'UHT normal milk', 3),
       ('Leclerc', 'Fresh cheese', 2);

Monitoring performance

# Ajouter dans main()
start_time = time.time()
main_promo_level_aggregation()
log_message(f"Phase 1 completed in {time.time() - start_time:.1f}s")

# Vérifier tailles tables
for table in ['step_6_01_promo_level_table', 'step_6_02_final_promo_table']:
    count = pd.read_sql(f"SELECT COUNT(*) as c FROM {table}", engine).iloc[0,0]
    log_message(f"{table}: {count:,} rows")

Adaptation nouveau pays

def aggregate_promo_level_new_country(engine):
    # Structure minimale
    df_promo = pd.read_sql("SELECT * FROM promo_calendar", engine)
    df_sell_in = pd.read_sql("SELECT * FROM step_2_03_sell_in_base", engine)

    # Pas de forward buying
    df_promo['Forward_Buying_Volume'] = 0

    # Agrégation simple
    result = df_promo.groupby(['Promotion_Code', 'Year']).agg({
        'Promo_Volume': 'sum',
        'Promo_Costs': 'sum',
        'Baseline_Volume': 'sum'
    })

    # Classification basique
    result['ROI'] = (result['Promo_Margin'] - result['Promo_Costs']) / result['Promo_Costs']
    result['Event_Type'] = np.where(result['ROI'] > 0, 'Winner', 'Loser')

    return result