Aller au contenu

Step_00_Download_Inputs - Téléchargement des Données d'Entrée

Vue d'ensemble

Cette étape constitue le point d'entrée de la pipeline d'excellence promotionnelle Lactalis. Elle est responsable du téléchargement sécurisé des fichiers d'entrée depuis un serveur SFTP distant vers l'environnement local de traitement.

Objectif principal

Récupérer de manière fiable et automatisée l'ensemble des données nécessaires au traitement promotionnel depuis les serveurs centraux de Lactalis.

Position dans la pipeline

  • Étape précédente : Aucune (première étape)
  • Étape suivante : Step_01_Specific_Pre_Processing (pour l'Italie) ou Step_02_Data_Integration

Architecture technique

Flux de données

Serveur SFTP Lactalis
    ├── /data/to/startflow/{pays}/
    │   ├── {business_unit}/
    │   └── {business_unit avec __}/
Application locale
    └── /app/inputs/{business_unit}/

Variables d'environnement requises

Variable Description Exemple
SFTP_HOST Adresse du serveur SFTP sftp.lactalis.com
SFTP_PORT Port de connexion SFTP 22
SFTP_USERNAME Nom d'utilisateur SFTP promo_user
SFTP_PASSWORD Mot de passe SFTP ****
BUSINESS_UNIT Unité commerciale concernée italy_galbani

Concepts clés

1. Business Unit (Unité commerciale)

L'unité commerciale détermine : - Le chemin distant des fichiers sources - Le répertoire local de destination - Les traitements spécifiques ultérieurs (ex: pré-processing Italie)

Format standard : {pays}_{marque} Exemples : italy_galbani, italy_parmalat, france_president

2. Structure des répertoires distants

Le script gère deux formats de nommage possibles : - Format standard : /data/to/startflow/{pays}/{business_unit}/ - Format alternatif : /data/to/startflow/{pays}/{business_unit avec __}/

Cette flexibilité permet de gérer les variations de nommage historiques.

3. Mécanisme de retry avec backoff exponentiel

Pour garantir la robustesse face aux problèmes réseau : - Nombre maximum de tentatives : 5 - Délai initial : 5 secondes - Délai maximum : 60 secondes - Progression : Doublement du délai à chaque échec (5s → 10s → 20s → 40s → 60s)

Implémentation détaillée

1. Fonction principale : download_files_from_sftp()

Paramètres

  • max_retries (int, défaut=5) : Nombre maximum de tentatives
  • base_delay (int, défaut=5) : Délai initial entre les tentatives (secondes)
  • max_delay (int, défaut=60) : Délai maximum entre les tentatives (secondes)

Processus d'exécution

  1. Initialisation de la connexion

    transport = paramiko.Transport((sftp_host, sftp_port))
    transport.banner_timeout = 120  # Timeout étendu pour connexions lentes
    transport.auth_timeout = 120
    transport.set_keepalive(30)  # Maintien de la connexion active
    

  2. Authentification et création du client SFTP

    transport.connect(username=sftp_username, password=sftp_password)
    sftp = paramiko.SFTPClient.from_transport(transport)
    

  3. Détermination du répertoire distant

  4. Appel à get_remote_directory() pour identifier le bon chemin
  5. Gestion automatique des variantes de nommage

  6. Téléchargement des fichiers

  7. Listage du contenu du répertoire distant
  8. Filtrage (exclusion des fichiers cachés et répertoires)
  9. Téléchargement individuel de chaque fichier

  10. Gestion des erreurs et retry

  11. Capture spécifique de chaque type d'erreur
  12. Application du backoff exponentiel
  13. Nettoyage systématique des ressources

2. Fonction auxiliaire : get_remote_directory()

Paramètres

  • sftp : Client SFTP actif
  • business_unit : Unité commerciale cible

Logique de résolution

  1. Extraction du pays depuis le business_unit
  2. Construction des chemins possibles
  3. Test séquentiel de l'existence de chaque chemin
  4. Retour du premier chemin valide trouvé

3. Gestion des erreurs

Le script gère spécifiquement plusieurs types d'erreurs :

Type d'erreur Action Retry
AuthenticationException Arrêt immédiat Non
SSHException Log et retry Oui
SFTPError Log et retry Oui
EOFError Log et retry Oui
Autres exceptions Log et retry Oui

Logging et monitoring

Fichiers de log

  1. Log principal : Via le module logger personnalisé
  2. Log Paramiko : paramiko_debug.log pour le débogage SSH/SFTP détaillé

Messages de log clés

  • Début/fin de connexion
  • Identification du répertoire distant utilisé
  • Progression du téléchargement de chaque fichier
  • Erreurs et tentatives de retry
  • Nettoyage des ressources

Intégration dans la pipeline

Appel depuis le script bash

run_step "Step_00_Download_Inputs" "download_files_from_sftp" "false"

Paramètres : - Script : Step_00_Download_Inputs - Fonction : download_files_from_sftp - Needs DB : false (pas de connexion base de données requise)

Prérequis système

  • Python 3.x avec module paramiko
  • Accès réseau au serveur SFTP
  • Permissions d'écriture dans /app/inputs/

Considérations de sécurité

  1. Authentification
  2. Utilisation de variables d'environnement pour les credentials
  3. Pas de stockage en dur des mots de passe

  4. Timeouts étendus

  5. Banner timeout : 120s
  6. Auth timeout : 120s
  7. Keepalive : 30s Ces valeurs permettent de gérer les connexions lentes ou instables

  8. Nettoyage des ressources

  9. Fermeture systématique des connexions SFTP et transport
  10. Bloc finally pour garantir le nettoyage même en cas d'erreur

Points d'attention pour la maintenance

1. Évolution de la structure des répertoires

Si de nouveaux formats de nommage apparaissent, modifier la fonction get_remote_directory() :

paths_to_try = [
    f"{base_path}/{business_unit}",
    f"{base_path}/{business_unit.replace('_', '__')}",
    # Ajouter de nouveaux formats ici
]

2. Performance sur gros volumes

Pour des répertoires contenant de nombreux fichiers volumineux : - Considérer l'ajout d'une barre de progression - Implémenter un téléchargement parallèle - Ajouter une vérification de checksum

3. Gestion des échecs partiels

Actuellement, un échec sur un fichier interrompt tout le processus. Pour plus de robustesse : - Implémenter un mode "best effort" avec log des échecs - Ajouter une reprise sur échec partiel

Exemples d'utilisation

Cas nominal - Italy Galbani

BUSINESS_UNIT=italy_galbani
Remote: /data/to/startflow/italy/italy__galbani/
Local: /app/inputs/italy_galbani/

Cas avec nommage alternatifs

BUSINESS_UNIT=france_president
Remote: /data/to/startflow/france/france__president/
Local: /app/inputs/france_president/

Troubleshooting

Problème : "No valid remote directory found"

Causes possibles : - Le répertoire n'existe pas sur le serveur - Permissions insuffisantes - Nouveau format de nommage non géré

Solution : 1. Vérifier manuellement l'existence du répertoire sur le serveur 2. Contrôler les permissions de l'utilisateur SFTP 3. Adapter get_remote_directory() si nécessaire

Problème : Timeouts fréquents

Causes possibles : - Connexion réseau instable - Serveur SFTP surchargé - Fichiers très volumineux

Solution : 1. Augmenter les valeurs de timeout 2. Implémenter une reprise de téléchargement partiel 3. Optimiser la bande passante disponible

Problème : "Authentication failed"

Causes possibles : - Credentials incorrects - Compte verrouillé - Changement de configuration serveur

Solution : 1. Vérifier les variables d'environnement 2. Tester la connexion manuellement 3. Contacter l'administrateur du serveur SFTP