chevron up
10 minutes de lecture
Orchestration de notebooks Databricks par Azure Data Factory (ADF)
Tim - il y a 9 mois
L’objectif de cet article est de transformer des données brutes pour leur donner de la valeur via des notebooks Databricks, l’ensemble orchestré par Azure Data Factory (ADF)

Table of contents

Introduction

Contexte

Prérequis

ADLS

Databricks

Utilisation générale

Connexion aux données bronze

Dataframe

Transformations et actions

Table delta

Orchestration dans ADF

Linked services (Databricks + DeltaLake)

Datasets

Pipeline et activités

Monitoring et alerting

Pourquoi monitorer

Dans ADF

Dans Databricks

Dans Azure

Conclusion

Introduction

Contexte

Dans l’article précédent nous avons récupéré des données d’une api dans notre Azure DataLake Storage (ADLS). Ces données sont brutes et peu exploitables facilement en tant que tel. L’objectif de cet article est de transformer des données brutes pour leur donner de la valeur. Plus concrètement nous avons récupéré des données externes dans notre bronze et nous allons créer les pipelines permettant de peupler le silver à l’aide de Azure Data Factory (ADF) et d’un notebook hébergé sous Databricks.

Prérequis

Pour pouvoir reproduire toutes les étapes de l’article, il est indispensable d’avoir complété les étapes de l’article précédent, à savoir créer les pipelines de récupération de données et de stockage dans le DataLake. Pour faire le traitement des données, on a besoin de d’un cluster Databricks dont la configuration n’est pas couverte dans cet article.

ADLS

Adaptation de l’existant à work, in progress, done On modifie le dataset pour que tous les fichiers provenant de l’api arrivent dans un sous-dossier work. Au moment du traitement des fichiers on les déplacera dans un sous-dossier inprogress et enfin une fois que le traitement sera terminé on les déplacera dans un sous-dossier done, et dans une arborescence avec l’année, le mois et le jour comme on avait au départ.

orchestration_1.png
Figure 1 : Nouvelle arborescence dans le bronze
orchestration_2.png
Figure 2 : Adaptation du dataset existant

Ne pas travailler les fichiers directement dans work évite le problème suivant : comment savoir quels fichiers ont déjà été traités ? Si on déplace / supprime les fichiers présents dans le bronze à la fin de chaque traitement, les fichiers qui sont arrivés en cours de traitement risquent d’être déplacés eux aussi sans avoir été traités. Ainsi on crée une zone de stockage de fichiers non traités (work), une zone de fichiers en cours de traitement (inprogress) et une zone de fichier déjà traité (done).

Databricks

Utilisation générale

Afin de pouvoir effectuer le traitement des données, Databricks a besoin de ressources de calcul, sous la forme d’un cluster. La configuration de cluster n’entre pas dans le cadre de cet article. Dans l’onglet Workspace, ou Repos si vous utilisez un dépôt Git, on va créer un notebook dans lequel les instructions du traitement seront codées. Une fois le notebook attaché au cluster, il sera possible de jouer les instructions. De plus dans le cadre de cet article, il est nécessaire que le notebook puisse accéder aux fichiers présents sur l’ADLS (fichiers bronze). Une fois les données nettoyées, on les écrira dans une table delta qui sera notre silver.

Connexion aux données bronze

Pour que le cluster soit en mesure de travailler sur les fichiers présents dans le container bronze, une solution est de faire un point de montage. Il s’agit de créer un lien entre le file system propre à Databricks et l’ALDS. Dans mon cas l’ADLS est protégé et nécessite une configuration supplémentaire pour autoriser l’accès.

orchestration_3.png
Figure 3 : Création du mountpoint

Une fois le mountpoint crée, on peut consulter les fichiers directement dans le file system de Databricks.

orchestration_4.png
Figure 4 : Liste des mountpoints

Dataframe

Dans Databricks et plus précisément dans Spark, on préférera manipuler les données dans des dataframes. Spark est fait pour être efficace lors des manipulations de dataframe. Chaque transformation, comme un filtre ou une agrégation par exemple, retournera un nouveau dataframe en sortie. On aura donc un enchainement de dataframe que spark analysera et pourra optimiser. On commence donc par instancier un dataframe avec le contenu de nos fichiers bronze présents dans inprogress. On peut tout de suite consulter le schéma des données récupérées en dépliant le schéma juste en dessous.

orchestration_5.png
Figure 5 : Création des dataframes de données brutes

Transformations et actions

On nettoie les données en enchainant les transformations sur le dataframe. 2 types d’opérations sont possible sur les dataframes : les transformations et les actions. Les transformations sont les instructions qui génèreront un nouveau dataframe (un filtre, un tri, une agrégation par exemple), tandis que les actions sont les commandes qui permettent de faire remonter les données à l’utilisateur, par exemple en demandant d’afficher un dataframe. Plus précisément, les transformations sont des opérations qui s’enchainent mais qui n’exécutent par, et pouvant être combinées, alors que les actions lancent un calcul distribué et déclenchent toute la chaine de transformations qui la précède.

orchestration_6.png
Figure 6 : Transformations et action

Ainsi on peut consulter les dataframes en exécutant l’action suivante : display

orchestration_7.png
Figure 7 : Affichage des données brutes

On remarque que la colonne data contient un tableau de json. On va aplatir la structure en faisant un explode qui va créer une colonne par élément du tableau. On en profite aussi pour cast la colonne lastUpdatedOther en timestamp pour une meilleure interprétation, et pour renommer la colonne en date.

orchestration_8.png
Figure 8 : Dataframe dont le tableau a été explode

Puis on va sélectionner uniquement les colonnes qui nous intéresse.

orchestration_9.png
Figure 9 : On select uniquement certaines colonnes

Table delta

Une fois le dataframe souhaité obtenu, on va sauvegarder le résultat pour faire persister les résultats en dehors de la session. Databricks mets à disposition le format ‘Delta’ qui est une surcouche applicable aux datalakes. Cette surcouche permet d’avoir des transactions ACID pour une grande fiabilité des données. De plus des fichiers d’historisation sont peuplés lors de chaque modification dans le DeltaLake. Ces fichiers (le delta_log) permettent de consulter l’état des données à n’importe quel moment, utile pour debugger ou monitorer. On va sauvegarder les résultats dans le container silver.

orchestration_10.png
Figure 10 : Écriture des données dans le Deltalake

Les données sont sauvegardées sous format Delta et donc des tables Delta ont automatiquement été créées à l’emplacement suivant.

orchestration_21.png

Pour créer manuellement des tables Delta à l’emplacement souhaité (ici dans ‘default’ et pas dans ‘delta’), on joue les instructions suivantes, ce qui va créer des tables en utilisant la localisation des données du silver.

orchestration_11.png
Figure 11 : Création des tables Delta

Orchestration dans ADF

Linked services (Databricks + DeltaLake)

Pour lier le notebook à l’orchestrateur (ADF), il faut ajouter un nouveau service linked, Databricks. Ce service est dans la partie compute et pas Data store comme les linked services déjà existant. On peut aussi ajouter le service linked Azure Databricks Delta Lake pour pouvoir accéder aux tables delta du silver à partir d’ADF. (voir article précédent pour la création de service linked).

Datasets

Pour créer un pipeline où on manipulera les données de bronze, les déplacer de work à inprogress, puis de inprogress à done il faut créer des nouveaux datasets. Pour plus d’efficacité lors de la copie des fichiers, on ne réutilisera pas le dataset existant du bronze work sous format JSON, mais on créera des datasets sous format binaire pour work, inprogress et done.

orchestration_12.png
Figure 12 : Dataset inprogress

La configuration des datasets est décrite en détails dans l’article précédent. On peut aussi ajouter les datasets des tables Delta de silver. Pour qu’ADF puisse détecter la database et les tables, il faut s’assurer que le cluster de Databricks soit actif.

orchestration_13.png
Figure 13 : Dataset d'une table Delta

Pipeline et activités

On construit un pipeline pour déclencher le traitement du notebook. Il faut en amont s’assurer que les données soient déplacées de work à inprogress et en aval de inprogress à done. Pour cela on utilise des activités de copy sur les datasets créés précédemment et des dépendances entre les activités. Il existe 4 types de dépendances possibles. Par défaut, encadré vert sur le côté droit des activités, ce sont des dépendances en cas de succès de l’activité. En cliquant sur le + gris on peut créer des dépendances sur l’échec de l’activité, sur la complétion de l’activité (succès ou échec), ou sur le skip de l’activité. Ici on ne souhaite poursuivre les traitements que si toutes les activités de copy se déroulent correctement. On obtient alors le pipeline suivant.

orchestration_14.png
Figure 14 : Pipeline de traitement de bronze à silver

L'activité de Databricks est référencée sous le nom Notebook (car il existe d’autres types d’exécutable sous Databricks).

Dans l’onglet Azure Databricks on choisit le linked service, et dans l’onglet Settings on ajoute le chemin vers le notebook. Si le cluster est actif alors on peut parcourir en direct les fichiers existant sur le file system du cluster.

orchestration_15.png
Figure 15 : Activité du Notebook Databricks

Monitoring et alerting

Pourquoi monitorer

Lors de la phase de production, quand la phase de développement est terminée et que le produit est utilisé de façon industrielle, il est commun que les pipelines s’exécutent automatiquement via des triggers (time trigger, blob trigger, etc.). À moins de regarder manuellement dans l’outil si tout se déroule comme prévu (où que l’on ait mis en place un système de monitoring ou d’alerting), on ne sait pas quand il y a un problème, et d’où vient le problème. Dans ce genre de cas, le problème peut être remonté par les utilisateurs finaux qui obtiennent un résultat inattendu. Il faut alors attendre une remonté du problème jusqu’à la personne en charge de la maintenance et qu’il remonte toute la chaine de traitement pour découvrir l’anomalie. Cette perte de temps peut être grandement raccourcie en étant averti en amont, au moment du bug, par une alerte qui pointe vers l’étape en défaut. Ou bien avoir des outils pour monitorer et vérifier la qualité des résultats en cours de traitement et ainsi vérifier que les données sont correctes. Un autre avantage du monitoring est que l’utilisateur final ne découvre pas par lui-même le problème car on peut soit résoudre le problème avant, soit le prévenir. Plusieurs outils nous sont disponibles pour monitorer et créer des alertes dans notre cas.

Dans ADF

ADF nous offre un onglet « Monitor » où on peut consulter les statuts des dernières exécutions de pipeline.

orchestration_16.png
Figure 16 : Monitoring dans ADF

En utilisant les filtres on peut facilement retrouver les pipelines souhaités jusque sur les 45 derniers jours. Une fois un pipeline sélectionné, on peut regarder l’état de chacune des activités, les paramètres utilisés, les inputs, les outputs et le détail des activités.

orchestration_17.png
Figure 17 : Statut d'un pipeline et de ses activités
orchestration_18.png
Figure 18 : Détail d'une activité

On ne peut pas directement créer d’alerting dans ADF, il faudra passer par Azure Monitoring, ce que nous verrons plus loin.

Dans Databricks

Tel que nous l’avons utilisé, les notebook Databricks sont orchestré par ADF. Pour retrouver facilement le détail d’une exécution d’un notebook Databricks, il faut donc passer par l’outil de monitoring d’ADF et utiliser le lien vers le notebook

orchestration_19.png
Figure 19 : Lien vers Databricks dans le détail de l'activité

On retrouve alors le notebook tel qu’il a été exécuté lorsqu’il a été appelé par ADF. On peut retrouver la stacktrace s’il y a eu une exception et les éventuelles debug, comme lors de la phase de développement. En revanche, nativement, Databricks ne sait pas envoyer de logs à Azure, ce qui ne permet pas de monitorer de façon très personnalisée directement dans les autres outils comme ADF (ou Azure App Insight) ce qui se passe lors de l’exécution d’un notebook. On peut néanmoins utiliser des bibliothèques externes pour faire communiquer Databricks avec App Insight.

Dans Azure

L’écosystème Azure propose de nombreux outils permettant un monitoring plus ou moins fin de tous ses services. Nous n’en couvrirons qu’une partie, relative à notre cas d’usage.

Pour le monitoring, on peut utiliser Azure App Insight et Azure Dashboard. App Insight, via l’onglet ‘Logs’ permet de query des informations des autres services. Par exemple query les activités ou pipelines qui ont fail ces dernières 24h, ou l’heure de la dernière exécution terminée avec succès des pipelines. Ces query sont codées en Kusto Query Language (ou KQL) et sont un point d’entrée à une grande partie du monitoring et de l’alerting. Azure Application Insight étant une ressource à part qu’il faut ajouter et payer, on n’entrera pas dans les détails de son utilisation dans cet article.

À partir de ces query, on peut créer dans Dashboard en indiquant que le résultat d’une query sera affiché sous la forme d’un tableau ou d’un graph par exemple, dans Azure Dashboard. Ainsi on peut créer un Dashboard regroupant toutes les informations utiles pour monitorer nos traitements et vérifier en un même endroit si tout se passe bien.

À l’aide des query, on peut aussi utiliser l’outil d’alerting d’Azure qui permet de trigger ou non des alertes sur le résultat des query. On peut par exemple programmer une alerte qui toutes les 30min vérifiera que le pipeline principal n’a pas été en erreur.

On peut aussi directement passer par la ressource ADF du portail Azure pour accéder à l’alerting ce qui permet d’avoir des conditions « préfabriquées » sans avoir à passer par App Insight

orchestration_20.png
Figure 20 : Création d'alerte pour ADF

On choisit ensuite l’action qui sera exécuté si la condition programmée est vraie. Ainsi on peut programmer des envoies de mails aux personnes en charge de la maintenance en cas d’erreur.

Conclusion

Tout est maintenant configuré pour que le traitement, orchestré par ADF et réalisé en partie par Databricks fonctionne à la demande. En revanche le monitoring d’un tel pipeline est perfectible car la plupart des détails de ce qui se passe dans le notebook reste invisible pour ADF. De plus le monitoring se fait dans un autre outil Azure et demande de la configuration supplémentaire. Une solution est de déplacer l’orchestration directement dans Databricks. Cela permet de gérer la totalité du monitoring directement dans Databricks et d’avoir une vision plus fine. Dans le prochain article on verra comment déporter l’orchestration dans Databricks via les Delta Live Tables.