chevron up
10 minutes de lecture
Orchestration de jobs et tables delta dans Databricks
Tim - il y a 9 mois
L’objectif de cet article est de décrire l’orchestration de taches dans Databricks à travers les jobs et les delta live tables ainsi que de passer en revue les possibilités de monitorer et contrôler la qualité de la donnée.

Table of contents

Introduction

Prérequis

Structured Streaming

Autoloader

Transformations SQL

Écriture des données live

Traitement effectué

Orchestration

Jobs

Tasks

Delta Live Table

Conclusion

Introduction

L’objectif de cet article est de décrire l’orchestration de tâches dans Databricks à travers les jobs et les delta live tables. Nous dériverons les traitements réalisés dans l’article précédent sous la forme de nouveaux notebooks, pour que les données puissent être traitées directement par Databricks en temps réel. Une orchestration dans Databricks permet également un monitoring fin, notamment sur la qualité des données traitées.

Prérequis

Pour reproduire les manipulations de cet article, il est nécessaire d’avoir une source de données contenant les fichiers appropriés et accessibles depuis Databricks. Les articles précédents peuvent aider à cela. Il faut que la ressource Databricks utilisée soit « Premium », sans quoi la partie Delta Live Table ne vous sera pas accessible.

Structured Streaming

Autoloader

Databricks met à disposition un mécanisme de lecture de données, configurable en micro batch, multi batch, single batch ou en temps réel. Nous déclarons un tel mécanisme en ajoutant le mot clé cloudFiles. Cette fonctionnalité est fault-tolerant grâce à un checkpointing qui permet de savoir quels fichiers ont déjà été ajoutés et de ne pas avoir de doublons. Pour cela, nous devons déclarer un chemin pour le stockage de ces checkpoints. Cela évite d’avoir à bouger les fichiers de work à inprogress à done comme nous l’avons fait dans l’article précédent. Il est nécessaire d’avoir un emplacement par stream lu, ici un pour station_information et un pour station_status.

monitoring_1.png
Figure 1 : Autoloader des fichiers station_information

Enfin, nous déclarons la structure des fichiers attendus. Cela permet de pouvoir manipuler directement les propriétés tel qu’attendu sans avoir à les caster. Cela a du sens principalement dans le cas de structures imbriquées, pour pouvoir accéder facilement aux sous-propriétés en faisant data.stations par exemple, ou de pouvoir utiliser des fonctions sur des tableaux comme explode.

monitoring_2.png
Figure 2 : Structure de la donnée brute

Transformations SQL

Pour faire des manipulation SQL , il faut stocker les données des dataframes dans des tables ou des vues. Nous choisissons de stocker dans des vues temporaires pour les transformations intermédiaires car les vues temporaires ne persistent pas au-delà de la session et ne consomment donc pas d’espace de stockage. Quand les résultats auront besoin de persister, nous stockerons les données dans des tables. Ici, nous stockons les données lues en faisant .createOrReplaceTempView(nom_de_la_vue)

monitoring_3.png
Figure 3 : Création d'une vue temporaire

Écriture des données live

Pour écrire des données live, nous appliquons la méthode writeStream. Cette méthode utilise 3 paramètres : le checkpoint, le trigger et l’outputmode. Similaire à la lecture, le checkpoint de l’écriture permet à Databricks d’être fault-tolerant et de ne pas avoir de doublons. Le trigger défini le mode de fonctionnement du traitement. Once=True signifie de traiter l’ensemble de données présentes encore non traitées et de les traiter en un unique batch. availableOnce=True est également un traitement one-shot mais divisera le travail en micro-batch. Enfin avec la propriété processingTime nous pouvons définir l’intervalle de temps entre chaque batch.

monitoring_4.png
Figure 4 : Écriture en table Delta

Avec outputmode nous choisissons entre append ou complete. Append ajoutera les données de façon incrémentale. C’est l’option utilisée en général pour les tables bronze et silver. Pour les tables gold qui sont souvent le résultat d’agrégation nous choisirons l’option outputmode=complete. Cela réécrira totalement la table.

Traitement effectué

Nous commencons donc par indiquer où se situe notre source de données qu’il faudra lire de façon incrémentale, à savoir notre bronze work.

monitoring_5_1.png
Figure 5 : Lecture des fichiers station_information

Nous créons une vue temporaire pour ces données brutes pour faire une première transformation : aplatir le tableau des stations et sauvegardons le résultat dans une nouvelle table Delta station_status_bronze.

monitoring_5.png
Figure 6 : Traitements intermédaires
monitoring_5_2.png
Figure 7 : Écriture dans le bronze

Nous réitérons le même enchainement pour passer du bronze au silver : on lit à partir de notre source de données, ici la table station_status_bronze que nous venons de créer pour la placer dans une vue temporaire, puis nous appliquons une transformation (choix des colonnes et cast de la date) enregistrons le résultat dans une nouvelle table station_status_silver.

monitoring_6.png
Figure 8 : Traitement de bronze à silver

Orchestration

Jobs

Pour orchestrer des tâches, Databricks mets à disposition le panneau Workflow où nous pouvons créer des Jobs. Un Job est composé d’une ou plusieurs taches, que l’on lié avec des dépendances, planifier dans le temps, choisir le mécanisme de retry souhaité, etc.

monitoring_7.png
Figure 9 : Onglet Workflow

Cette partie est l’équivalent de ce que faisait ADF avec les pipelines et les activités. En choisissant une exécution en cours ou passées, nous pouvons consulter l’état de chaque commande des notebooks exécutés.

Tasks

Les taches peuvent être de plusieurs types mais dans cet article nous ne couvrirons que le type Notebook et Delta Live Table.

monitoring_8.png
Figure 10 : Création d'une tâche d'un job

Nous créons donc un job avec nos 2 notebooks créés, 1 pour station_information et 1 pour station_status. Dans notre cas les tâches ne sont pas dépendantes l’une de l’autre donc nous n’avons pas à créer de dépendances entre elles.

monitoring_9.png
Figure 11 : Job traitant station_status et information de bronze à silver

Les Delta Live Table (DLT) se prêtent aussi bien à notre cas d’usage car elles permettent de monitorer de façon plus fine la qualité des données ingérées dans nos tables. Pour l’observer on crée un nouveau notebook, identique à celui de la transformation des données de station status mais en utilisant les delta live table.

monitoring_10.png
Figure 12 : Supervision du statut du job

Delta Live Table

Les Delta Live Table (DLT) se prêtent aussi bien à notre cas d’usage car elles permettent de monitorer de façon plus fine la qualité des données ingérée dans nos tables. Pour l’observer nous créons un nouveau notebook, identique à celui de la transformation des données de station status mais en utilisant les delta live table.

Contrairement à un notebook classique, seul un langage au choix est autorisé (c’est la magic command % qui n’est pas acceptée, or elle est indispensable pour changer la langue de l’instruction, en faisant %sql par exemple). Ici nous choisissons de travailler en SQL, il faut donc choisir la langue par défaut du notebook à SQL.

monitoring_11.png
Figure 13 : Sélection du langage par défaut

Pour définir des DLT, il est impératif d’utiliser certains mots clés. Le premier est le mot clé LIVE. Il sert à indiquer qu’il s’agit d’une DLT. De plus une Delta Live Table sera écrite dans la database live, il faudra donc ajouter en préfixe live. au nom des tables que nous voulons requêter.

monitoring_12.png
Figure 14 : Création d'une Delta Live Table

L’autre mot clé est STREAMING, qui sert lors de la création d’une table à définir qu’elle sera peuplée de façon incrémentale. Le mot clé STREAM est nécessaire lors de l’appel d’une table qui est peuplée de façon incrémentale.

monitoring_13.png
Figure 15 : Traitement intermédiaires à partir d'une DLT

Nous retrouvons donc en 3 requêtes : la création d’une table des données brutes, la création d’une table des données bronze et la création de la table des données silver.

monitoring_14.png
Figure 16 : Création de la DLT de silver

De retour dans l’onglet Workflow, nous choisissons le sous-menu ‘Delta Live Tables’ pour créer un nouveau pipeline en spécifiant un nouveau notebook. En l’exécutant, il est possible de consulter en temps réel le traitement des données.

monitoring_15.png
Figure 17 : Monitoring du pipeline de DLT

Contrairement aux taches des jobs, en cliquant sur une étape du pipeline, un aperçu de la qualité de la donnée est obtenu, notamment si des lignes ont été drop car ne répondant pas à d’éventuelles contraintes.

monitoring_16.png
Figure 18 : Monitoring de la qualité de la donnée insérée

Il est désormais possible de remplacer la tâche de transformation de station_status de type notebook par celle de type DLT en choisissant notre DLT pipeline. En exécutant le job et en cliquant sur l’étape du DLT, nous retombons sur la visualisation de la qualité des tables précédente.

monitoring_17.png
Figure 19 : Job contenant une tâche Notebook et une tâche de pipeline DLT

Nous pouvons donc orchestrer les pipelines DLT grâce aux jobs et leurs ajouter des dépendances et des mécaniques de retry par exemple.

Conclusion

L’orchestration dans Databricks permet un contrôle des actions plus fin que dans un outil externe tel qu’Azure Data Factory. De plus Databricks nous offre la possibilité de travailler avec les DLT, ce qui renforce le contrôle sur la qualité des données et non uniquement sur la bonne exécution ou non des notebooks.