Step_07_Aggregation_at_promo_level - Agrégation et analyse des promotions¶
Vue d'ensemble¶
Cette étape constitue le cœur de l'analyse promotionnelle en agrégeant les effets de volume au niveau des événements promotionnels, en calculant les KPIs de performance (ROI, uplift), et en classifiant les promotions selon leur performance. Elle se décompose en quatre phases distinctes qui transforment les données granulaires hebdomadaires en insights actionnables au niveau promotion.
Objectif principal¶
- Agréger les effets hebdomadaires au niveau événement promotionnel
- Calculer les marges incrémentales et ROI par promotion
- Classifier les promotions (value/volume generator/destroyer)
- Préparer les analyses d'évolution de performance YTD vs LYTD
Position dans la pipeline¶
- Étape précédente : Step_06_Valuation_of_volume_effects
- Étape suivante : Step_08_Simulation_Inputs
- Appels distincts :
main_promo_level_aggregation
: Agrégation initiale avec forward buyingmain_promo_effects_allocation
: Allocation proportionnelle des effetsmain_promo_event_filtering
: Filtrage et classification des promotionsmain_performance_evolution
: Analyse comparative YTD/LYTD
Architecture technique¶
Flux de données¶
Phase 1 : Agrégation niveau promo
─────────────────────────────────
Entrée
├── promo_calendar (dates et mécaniques promo)
├── step_2_03_sell_in_base (coûts et marges)
├── step_5_03_model_cannibalization_effects (volumes hebdo)
└── forward_buying_windows (Galbani/Parmalat)
Traitement
├── Enrichissement codes promo hebdomadaires
├── Calcul forward buying et cannibalisation
├── Agrégation volumes ajustés par promo
└── Calcul marges promotionnelles
Sortie → step_6_01_promo_level_table (données hebdo enrichies)
Phase 2 : Allocation des effets
───────────────────────────────
Entrée
└── step_6_01_promo_level_table
Traitement
├── Agrégation par Promotion_Code × Year
├── Calcul Baseline_Margin et marges ajustées
└── Enrichissement métadonnées (retailer, category)
Sortie → step_6_02_final_promo_table (niveau promotion)
Phase 3 : Filtrage et classification
────────────────────────────────────
Entrée
└── step_6_02_final_promo_table
Traitement
├── Calcul ROI et Volume_Uplift_%
├── Filtrage In_Scope (baseline ≥ 0, costs > 0)
├── Classification événements (4 types)
└── Attribution couleurs visualisation
Sortie → step_6_03_final_with_scope_promo_table
Phase 4 : Evolution performance
───────────────────────────────
Entrée
├── step_6_01_promo_level_table
├── step_3_00_sell_in
└── step_6_03_final_with_scope_promo_table
Traitement
├── Séparation YTD/LYTD par semaine max
├── Calcul effets (cost, price, promo)
├── Métriques comparatives (ROI, uplift)
└── Agrégation par retailer_group et category
Sortie → step_6_04_perf_evolution_* (2 tables)
Concepts clés¶
1. Forward Buying (Galbani/Parmalat)¶
- Définition : Stockage consommateur post-promo causant des ventes négatives
- Fenêtre : 2-4 semaines configurables par retailer/catégorie
- Calcul :
- Halo = volumes post-promo > baseline
- Pantry Loading = volumes post-promo < baseline
- Forward Buying = min(|Halo + Pantry|, Uplift promo)
2. Cannibalisation (Parmalat spécifique)¶
- Produits éligibles : In&Out + promo + UHT normal milk
- Mécanisme : Les receivers reçoivent le volume cannibalisé des givers
- Répartition : Proportionnelle à l'uplift des receivers
3. ROI et métriques performance¶
- ROI = (Marge incrémentale - Coûts promo) / Coûts promo
- Volume Uplift % = (Volume promo / Baseline) - 1
- Marge incrémentale = (Marge promo × Volume promo) - (Marge baseline × Baseline)
4. Classification événements¶
- Value & Volume generator : ROI ≥ 75e percentile ET uplift ≥ moyenne
- Volume generator : ROI ≥ 0 ET uplift ≥ moyenne
- Value generator : ROI ≥ 0 ET uplift < moyenne
- Value destroyer : ROI < 0
- Not in scope (Parmalat) : ROI > 0 ET uplift < 0
Implémentation détaillée¶
1. Phase 1 : Agrégation niveau promo¶
Galbani : aggregate_promo_level_italy_galbani()
¶
Enrichissement codes promo :
# Propagation codes promo sur semaines flaggées
def ensure_promotion_code_carryover(df_effects, df_sell_in):
mask_promo = df_effects['Flag_init_Promo'] == '1'
# Filtre sell-in avec codes promo
df_sell_in_filtered = df_sell_in[
['Retailer_name', 'EAN', 'EAN_desc', 'Promotion_Code', 'Year', 'Week']
].drop_duplicates().dropna(subset=['Promotion_Code'])
# Merge uniquement sur lignes promo
df_effects_promo = df_effects[mask_promo].merge(
df_sell_in_filtered,
on=['Retailer_name', 'EAN', 'EAN_desc', 'Year', 'Week'],
how='left'
)
# Cluster ID pour grouper les semaines
combined_key = (df['Retailer_name'] + '_' + df['EAN'] + '_' +
df['Year'] + '_' + df['Week'])
df['Cluster_Promotion_ID'] = pd.factorize(combined_key)[0]
Calcul Forward Buying :
# Retailer behavior factor
def calculate_retailer_behavior_factor(df):
# Filtre semaines non-promo
non_promo_df = df[(df['Week_promo_volume'] == 0)]
# Uplift hors promo = (Volume total / Baseline) - 1
grouped = non_promo_df.groupby(['Retailer_name', 'Year']).agg({
'Week_total_volume': 'sum',
'Baseline_Volume': 'sum'
})
grouped['Yearly_no_promo_uplift'] = (
grouped['Week_total_volume'] / grouped['Baseline_Volume'] - 1
)
# Mapping factor
def map_behavior_factor(uplift):
if uplift >= 0: return 0
elif uplift >= -0.25: return 1
else: return 1.1
grouped['Retailer_behavior_factor'] = grouped['Yearly_no_promo_uplift'].apply(
map_behavior_factor
)
Marges promotionnelles :
def compute_promotional_margin(df_sell_in, df_ultime):
# Coûts moyens par période
df_sell_in['period_temp'] = df_sell_in.apply(lambda row:
1 if row['Year'] == 2023 else # annuel
((week - 1) // 26) + 1 if row['Year'] == 2024 else # semestriel
((week - 1) // 13) + 1 if row['Year'] >= 2025 else 1 # trimestriel
)
# Marge ajustée
df_sell_in['Adjusted_gross_margin'] = (
df_sell_in['Net_net_sales'] +
df_sell_in['Promo_funding_off_invoice_Detail_6'] +
(df_sell_in['Average_unit_Costs_Product_and_Other'] * df_sell_in['Volumes']) +
df_sell_in['Logistics_Cost'] -
(df_sell_in['Average_unit_Costs_Level_Two'] * df_sell_in['Volumes']) +
(df_sell_in['Supply_chain_gap'] * df_sell_in['Volumes'])
)
# Moyenne pondérée par promo
weighted_promo_margin = df_sell_in.groupby(
['Retailer_name', 'EAN', 'EAN_desc', 'Year', 'Promotion_Code']
).apply(compute_weighted_average).reset_index(name='Promo_margin_at_event_level')
Parmalat : aggregate_promo_level_italy_parmalat()
¶
Spécificités :
- Utilise Retailer_classification
au lieu de Retailer_name
- Pas d'EAN_desc
dans les agrégations
- Forward buying calculé semaine par semaine
def calculate_halo_pantry_and_forward_buying_parmalat(df_effects_enriched, engine):
# Configuration fenêtres FB
forward_buying_windows = pd.read_sql("""
SELECT Retailer_classification as Retailer_name,
New_category, Forward_buying_weeks_window
FROM Forward_Buying_weeks_window_parmalat
""", engine)
# Process par groupe Retailer × EAN × Year
def calculate_forward_buying_for_group(group):
# Identifie dernière semaine de chaque bloc promo
for i in range(len(group)):
if promo_mask[i]:
if i == len(group) - 1: # Dernière semaine année
group.loc[i, 'Is_Last_Promo_Week'] = True
elif not promo_mask[i + 1]: # Semaine suivante non-promo
group.loc[i, 'Is_Last_Promo_Week'] = True
elif group.loc[i, 'Promotion_Code'] != group.loc[i + 1, 'Promotion_Code']:
group.loc[i, 'Is_Last_Promo_Week'] = True
# Calcul FB uniquement sur dernières semaines
for promo_idx in last_promo_indices:
# Fenêtre post-promo
post_promo_mask = (
(group['Week_int'] > promo_week_int) &
(group['Week_int'] <= promo_week_int + window_size) &
(group['Promotion_Code'].isna()) # Non-promo uniquement
)
# Calcul volumes
volume_diffs = valid_data['Week_total_volume'] - valid_data['Baseline_Volume']
total_halo = volume_diffs[volume_diffs > 0].sum()
total_pantry = volume_diffs[volume_diffs < 0].sum()
# Logic FB
if total_pantry < 0 and total_halo > 0:
forward_buying_volume = abs(total_halo + total_pantry)
Cannibalisation UHT :
def calculate_cannibalization_adjustment_parmalat(df_effects_enriched, engine):
# Filtre UHT uniquement
uht_mask = df_effects_enriched['New_category'] == 'UHT normal milk'
# Tous sont givers
df_effects_enriched.loc[all_givers_idx, 'Cannibalization_role'] = 'Giver'
volume_cannibalized = np.maximum(0, baseline_vols - actual_vols)
# Receivers : In&Out + promo
receiver_mask = (
group['EAN'].isin(inout_eans) &
group['Promotion_Code'].notna()
)
# Répartition proportionnelle uplift
if total_receiver_uplift > 0:
weights = receiver_uplifts / total_receiver_uplift
volumes_received = net_cannibalization * weights
2. Phase 2 : Allocation des effets¶
Galbani : compute_promo_effects_allocation_italy_galbani()
¶
# Agrégation niveau promotion
aggregated_df = df_promo.groupby([
'Retailer_name', 'EAN', 'EAN_desc', 'New_category', 'Promotion_Code', 'Year'
]).agg({
'Valid_Baseline_Volume': 'sum', # Seulement si Week_promo_volume != 0
'Promo_margin_at_event_level': 'first',
'NSV_at_event_level': 'first',
'Total_promo_costs_yearly': 'first',
'Total_promo_volume_yearly': 'first',
'adj_Week_promo_volume': 'sum' # Volume ajusté FB
})
# Baseline Margin = marge sans coûts promo
aggregated_df['Baseline_Margin'] = aggregated_df.apply(
lambda row: row['Promo_margin_at_event_level'] +
(row['Total_promo_costs_yearly'] / row['Total_promo_volume_yearly'])
if row['Total_promo_volume_yearly'] > 0 else row['Promo_margin_at_event_level']
)
# Ajustement baseline ≤ volume promo
aggregated_df['Valid_finale_Baseline_Volume'] = aggregated_df.apply(
lambda x: min(x['adj_Week_promo_volume'], x['Valid_Baseline_Volume'])
)
Parmalat : compute_promo_effects_allocation_italy_parmalat()
¶
Similaire mais :
- Agrège Forward_Buying_Volume
et Volume_cannibalized_received
- Utilise durée promo du calendrier (Promo_start_WEEK_ID
, Promo_end_WEEK_ID
)
3. Phase 3 : Filtrage et classification¶
compute_performance_metrics()
¶
# Métriques de base
df['Incremental_Margin'] = (
df['Promo_margin_at_event_level'] * df['Adj_Promo_Volume'] -
df['Baseline_Margin'] * df['Baseline_Volume']
)
df['Volume_Uplift_%'] = df['Adj_Promo_Volume'] / df['Baseline_Volume'] - 1
# Gestion infinis
df['Volume_Uplift_%'] = df['Volume_Uplift_%'].replace(
[float('inf'), -float('inf')], np.nan
)
# In_Scope standard
df['In_Scope_Performance'] = (
(df['ROI'].between(-4, 4)) &
(df['Volume_Uplift_%'].between(-1, 8))
).astype(int)
Classification Galbani : classify_events()
¶
# Par Year × Category
for (year, category), group in grouped:
in_scope = group[group['In_Scope'] == 1]
# Métriques pondérées
weighted_avg_uplift = (in_scope['Adj_Promo_Volume'].sum() -
in_scope['Baseline_Volume'].sum()) /
in_scope['Baseline_Volume'].sum()
# Percentiles (sans NaN)
roi_clean = in_scope['ROI'].dropna()
roi_percentile_75 = np.percentile(roi_clean, 75)
# Classification
for idx, row in group.iterrows():
roi = row['ROI']
uplift = row['Volume_Uplift_%']
if roi >= abs(roi_percentile_75) and uplift >= weighted_avg_uplift:
df.at[idx, 'Event_Type'] = 'Value & Volume generator'
elif roi >= 0 and uplift >= weighted_avg_uplift:
df.at[idx, 'Event_Type'] = 'Volume generator'
elif roi >= 0 and uplift < weighted_avg_uplift:
df.at[idx, 'Event_Type'] = 'Value generator'
elif roi < 0:
df.at[idx, 'Event_Type'] = 'Value destroyer'
Classification Parmalat : classify_events_parmalat()
¶
Règles strictes : - Exclut d'abord : ROI > 0 ET uplift < 0 → "Not in scope events" - Puis applique classification standard sur événements valides
# Flag weird events en premier
df.loc[(df['ROI'] > 0) & (df['Volume_Uplift_%'] < 0), 'Event_Type'] = 'Not in scope events'
# Classification sur In_Scope_event uniquement
in_scope = group[(group['In_Scope'] == 1) & (group['In_Scope_event'] == True)]
4. Phase 4 : Evolution performance¶
performance_evolution_italy_galbani()
¶
Détermination périodes :
def determine_period_status(df):
max_year = df['Year'].max()
max_week = df[df['Year'] == max_year]['Week'].max()
def period_status(year, week):
if year == max_year and week <= max_week:
return 'YTD'
elif year == max_year - 1 and week <= max_week:
return 'LYTD'
else:
return 'Irrelevant'
Calcul effets :
# Cost effect
cost_effect = ((costs_current - costs_last) * volume_total_last) -
((costs_current - costs_last) * volume_baseline_last)
# Price effect
price_effect = ((no_promo_price_current * (1 - promo_discount) - promo_price_last) *
volume_total_last) -
((no_promo_price_current - no_promo_price_last) * volume_baseline_last)
# Promo effect (résiduel)
promo_effect = (inc_margin_current - inc_margin_last) - (cost_effect + price_effect)
Métriques comparatives :
# Variations en %
uplift_variation = (uplift_current - uplift_last) * 100
roi_variation = (roi_current - roi_last) * 100
value_destroyer_variation = safe_divide(
(count_current - count_last), count_last
) * 100
Gestion des erreurs¶
1. Protection divisions¶
def safe_divide_new(numerator, denominator, default_value=0):
if denominator == 0 or pd.isna(denominator) or np.isinf(numerator):
return default_value
result = numerator / denominator
if pd.isna(result) or np.isinf(result):
return default_value
return result
2. Gestion mémoire (grandes tables)¶
def save_dataframe_in_batches(df, table_name, engine, batch_size=10000):
total_rows = len(df)
num_batches = (total_rows + batch_size - 1) // batch_size
# Premier batch : replace
batch_df = df.iloc[0:batch_size]
batch_df.to_sql(table_name, con=engine, if_exists='replace', index=False)
# Batches suivants : append
for batch_num in range(1, num_batches):
start_idx = batch_num * batch_size
end_idx = min((batch_num + 1) * batch_size, total_rows)
batch_df = df.iloc[start_idx:end_idx]
batch_df.to_sql(table_name, con=engine, if_exists='append', index=False)
del batch_df
gc.collect()
3. Colonnes manquantes¶
# Protection EAN_desc Parmalat
if business_unit == 'italy_parmalat':
# Pas d'EAN_desc dans les clés
merge_keys = ['Retailer_name', 'EAN', 'Year', 'Week']
else:
merge_keys = ['Retailer_name', 'EAN', 'EAN_desc', 'Year', 'Week']
Performance et optimisation¶
1. Chunking pour grandes données¶
# Process par chunks de retailers
for (retailer, ean, year), group_indices in df_sorted.groupby(
['Retailer_name', 'EAN', 'Year']
).groups.items():
group_data = df_sorted.loc[group_indices].copy()
processed_group = calculate_forward_buying_for_group(group_data)
result_dfs.append(processed_group)
if processed_groups % 5000 == 0:
log_message(f"Processed {processed_groups}/{total_groups} groups...")
2. Optimisation mémoire¶
3. Agrégations vectorisées¶
# Éviter apply() quand possible
# Mauvais
df['result'] = df.apply(lambda x: complex_function(x), axis=1)
# Bon
df['result'] = np.where(
condition,
df['col1'] * df['col2'],
default_value
)
Points d'attention maintenance¶
1. Ordre d'exécution critique¶
Les 4 phases DOIVENT s'exécuter dans l'ordre :
1. main_promo_level_aggregation
2. main_promo_effects_allocation
3. main_promo_event_filtering
4. main_performance_evolution
2. Différences business units¶
Aspect | Galbani | Parmalat |
---|---|---|
Clé retailer | Retailer_name |
Retailer_classification |
Clé produit | EAN + EAN_desc |
EAN seul |
Forward buying | Bloc complet | Semaine par semaine |
Cannibalisation | Non | Oui (UHT) |
Events exclus | Non | ROI>0 & uplift<0 |
3. Tables intermédiaires¶
forward_buying_mapping_*
: Traçabilité FBcannibalization_mapping_parmalat
: Traçabilité cannibKPIs_classification_*.csv
: Debug classification
4. Compatibilité Qliksense¶
- Table category : contient en fait données SKU (EAN_combined)
- Retailer_group : duplique Retailer_name pour Parmalat
- Couleurs : assignées par Category et Retailer_group
Troubleshooting¶
Problème : ROI extrêmes ou infinis¶
Diagnostic :
SELECT Promotion_Code, ROI, Promo_Costs, Incremental_Margin
FROM step_6_03_final_with_scope_promo_table
WHERE ABS(ROI) > 10 OR ROI IS NULL
ORDER BY ABS(ROI) DESC;
Causes : - Coûts promo = 0 → division par zéro - Baseline = 0 → uplift infini - Données manquantes
Solution :
Problème : Forward buying négatif¶
Symptôme : Forward_Buying_Volume
< 0
Vérification :
SELECT Retailer_name, EAN, Week,
Forward_Buying_Volume, Total_Halo_Volume, Total_Pantry_Loading_Volume
FROM forward_buying_mapping_parmalat
WHERE Forward_Buying_Volume < 0;
Cause : Logic error dans calcul net
Solution : Vérifier abs()
dans calcul
Problème : Classification incohérente¶
Symptôme : Value destroyer avec ROI > 0
Debug :
# Ajouter logs dans classify_events
log_message(f"Year {year}, Category {category}:")
log_message(f" Weighted avg uplift: {weighted_avg_uplift:.2%}")
log_message(f" ROI P75: {roi_percentile_75:.2f}")
log_message(f" Total events: {len(group)}")
Problème : Performance evolution vide¶
Causes possibles : 1. Pas assez de données LYTD 2. Filtre In_Scope trop restrictif 3. Mauvaise détection max_week
Vérification :
SELECT Year, Period_Status, COUNT(*) as nb_rows
FROM (
SELECT Year,
CASE
WHEN Year = (SELECT MAX(Year) FROM step_3_00_sell_in) THEN 'YTD'
WHEN Year = (SELECT MAX(Year) FROM step_3_00_sell_in) - 1 THEN 'LYTD'
ELSE 'Irrelevant'
END as Period_Status
FROM step_3_00_sell_in
) t
GROUP BY Year, Period_Status;
Exemples d'utilisation¶
Ajout nouvelle classification¶
# Dans classify_events_custom()
def classify_events_custom(df):
# Nouvelle règle : Super performers
super_performer_mask = (
(df['ROI'] > 2) &
(df['Volume_Uplift_%'] > 3)
)
df.loc[super_performer_mask, 'Event_Type'] = 'Super performer'
# Puis classification standard
df = classify_events(df)
return df
Export données pour analyse¶
# Après phase 3
debug_df = df[df['Event_Type'] == 'Value destroyer'][
['Promotion_Code', 'Retailer_name', 'EAN', 'ROI',
'Volume_Uplift_%', 'Promo_Costs', 'Baseline_Volume']
]
debug_df.to_excel(f'value_destroyers_{business_unit}.xlsx', index=False)
Personnalisation fenêtre forward buying¶
-- Table Forward_Buying_weeks_window_custom
INSERT INTO Forward_Buying_weeks_window_parmalat
VALUES ('Carrefour', 'UHT normal milk', 3),
('Leclerc', 'Fresh cheese', 2);
Monitoring performance¶
# Ajouter dans main()
start_time = time.time()
main_promo_level_aggregation()
log_message(f"Phase 1 completed in {time.time() - start_time:.1f}s")
# Vérifier tailles tables
for table in ['step_6_01_promo_level_table', 'step_6_02_final_promo_table']:
count = pd.read_sql(f"SELECT COUNT(*) as c FROM {table}", engine).iloc[0,0]
log_message(f"{table}: {count:,} rows")
Adaptation nouveau pays¶
def aggregate_promo_level_new_country(engine):
# Structure minimale
df_promo = pd.read_sql("SELECT * FROM promo_calendar", engine)
df_sell_in = pd.read_sql("SELECT * FROM step_2_03_sell_in_base", engine)
# Pas de forward buying
df_promo['Forward_Buying_Volume'] = 0
# Agrégation simple
result = df_promo.groupby(['Promotion_Code', 'Year']).agg({
'Promo_Volume': 'sum',
'Promo_Costs': 'sum',
'Baseline_Volume': 'sum'
})
# Classification basique
result['ROI'] = (result['Promo_Margin'] - result['Promo_Costs']) / result['Promo_Costs']
result['Event_Type'] = np.where(result['ROI'] > 0, 'Winner', 'Loser')
return result