Introduction
L’objectif de cet article est de décrire l’orchestration de tâches dans Databricks à travers les jobs et les delta live tables. Nous dériverons les traitements réalisés dans l’article précédent sous la forme de nouveaux notebooks, pour que les données puissent être traitées directement par Databricks en temps réel. Une orchestration dans Databricks permet également un monitoring fin, notamment sur la qualité des données traitées.
Prérequis
Pour reproduire les manipulations de cet article, il est nécessaire d’avoir une source de données contenant les fichiers appropriés et accessibles depuis Databricks. Les articles précédents peuvent aider à cela. Il faut que la ressource Databricks utilisée soit « Premium », sans quoi la partie Delta Live Table ne vous sera pas accessible.
Structured Streaming
Autoloader
Databricks met à disposition un mécanisme de lecture de données, configurable en micro batch, multi batch, single batch ou en temps réel. Nous déclarons un tel mécanisme en ajoutant le mot clé cloudFiles. Cette fonctionnalité est fault-tolerant grâce à un checkpointing qui permet de savoir quels fichiers ont déjà été ajoutés et de ne pas avoir de doublons. Pour cela, nous devons déclarer un chemin pour le stockage de ces checkpoints. Cela évite d’avoir à bouger les fichiers de work à inprogress à done comme nous l’avons fait dans l’article précédent. Il est nécessaire d’avoir un emplacement par stream lu, ici un pour station_information et un pour station_status.
![]() |
---|
Figure 1 : Autoloader des fichiers station_information |
Enfin, nous déclarons la structure des fichiers attendus. Cela permet de pouvoir manipuler directement les propriétés tel qu’attendu sans avoir à les caster. Cela a du sens principalement dans le cas de structures imbriquées, pour pouvoir accéder facilement aux sous-propriétés en faisant data.stations par exemple, ou de pouvoir utiliser des fonctions sur des tableaux comme explode.
![]() |
---|
Figure 2 : Structure de la donnée brute |
Transformations SQL
Pour faire des manipulation SQL , il faut stocker les données des dataframes dans des tables ou des vues. Nous choisissons de stocker dans des vues temporaires pour les transformations intermédiaires car les vues temporaires ne persistent pas au-delà de la session et ne consomment donc pas d’espace de stockage. Quand les résultats auront besoin de persister, nous stockerons les données dans des tables. Ici, nous stockons les données lues en faisant .createOrReplaceTempView(nom_de_la_vue)
![]() |
---|
Figure 3 : Création d'une vue temporaire |
Écriture des données live
Pour écrire des données live, nous appliquons la méthode writeStream. Cette méthode utilise 3 paramètres : le checkpoint, le trigger et l’outputmode. Similaire à la lecture, le checkpoint de l’écriture permet à Databricks d’être fault-tolerant et de ne pas avoir de doublons. Le trigger défini le mode de fonctionnement du traitement. Once=True signifie de traiter l’ensemble de données présentes encore non traitées et de les traiter en un unique batch. availableOnce=True est également un traitement one-shot mais divisera le travail en micro-batch. Enfin avec la propriété processingTime nous pouvons définir l’intervalle de temps entre chaque batch.
![]() |
---|
Figure 4 : Écriture en table Delta |
Avec outputmode nous choisissons entre append ou complete. Append ajoutera les données de façon incrémentale. C’est l’option utilisée en général pour les tables bronze et silver. Pour les tables gold qui sont souvent le résultat d’agrégation nous choisirons l’option outputmode=complete. Cela réécrira totalement la table.
Traitement effectué
Nous commencons donc par indiquer où se situe notre source de données qu’il faudra lire de façon incrémentale, à savoir notre bronze work.
![]() |
---|
Figure 5 : Lecture des fichiers station_information |
Nous créons une vue temporaire pour ces données brutes pour faire une première transformation : aplatir le tableau des stations et sauvegardons le résultat dans une nouvelle table Delta station_status_bronze.
![]() |
---|
Figure 6 : Traitements intermédaires |
![]() |
---|
Figure 7 : Écriture dans le bronze |
Nous réitérons le même enchainement pour passer du bronze au silver : on lit à partir de notre source de données, ici la table station_status_bronze que nous venons de créer pour la placer dans une vue temporaire, puis nous appliquons une transformation (choix des colonnes et cast de la date) enregistrons le résultat dans une nouvelle table station_status_silver.
![]() |
---|
Figure 8 : Traitement de bronze à silver |
Orchestration
Jobs
Pour orchestrer des tâches, Databricks mets à disposition le panneau Workflow où nous pouvons créer des Jobs. Un Job est composé d’une ou plusieurs taches, que l’on lié avec des dépendances, planifier dans le temps, choisir le mécanisme de retry souhaité, etc.
![]() |
---|
Figure 9 : Onglet Workflow |
Cette partie est l’équivalent de ce que faisait ADF avec les pipelines et les activités. En choisissant une exécution en cours ou passées, nous pouvons consulter l’état de chaque commande des notebooks exécutés.
Tasks
Les taches peuvent être de plusieurs types mais dans cet article nous ne couvrirons que le type Notebook et Delta Live Table.
![]() |
---|
Figure 10 : Création d'une tâche d'un job |
Nous créons donc un job avec nos 2 notebooks créés, 1 pour station_information et 1 pour station_status. Dans notre cas les tâches ne sont pas dépendantes l’une de l’autre donc nous n’avons pas à créer de dépendances entre elles.
![]() |
---|
Figure 11 : Job traitant station_status et information de bronze à silver |
Les Delta Live Table (DLT) se prêtent aussi bien à notre cas d’usage car elles permettent de monitorer de façon plus fine la qualité des données ingérées dans nos tables. Pour l’observer on crée un nouveau notebook, identique à celui de la transformation des données de station status mais en utilisant les delta live table.
![]() |
---|
Figure 12 : Supervision du statut du job |
Delta Live Table
Les Delta Live Table (DLT) se prêtent aussi bien à notre cas d’usage car elles permettent de monitorer de façon plus fine la qualité des données ingérée dans nos tables. Pour l’observer nous créons un nouveau notebook, identique à celui de la transformation des données de station status mais en utilisant les delta live table.
Contrairement à un notebook classique, seul un langage au choix est autorisé (c’est la magic command % qui n’est pas acceptée, or elle est indispensable pour changer la langue de l’instruction, en faisant %sql par exemple). Ici nous choisissons de travailler en SQL, il faut donc choisir la langue par défaut du notebook à SQL.
![]() |
---|
Figure 13 : Sélection du langage par défaut |
Pour définir des DLT, il est impératif d’utiliser certains mots clés. Le premier est le mot clé LIVE. Il sert à indiquer qu’il s’agit d’une DLT. De plus une Delta Live Table sera écrite dans la database live, il faudra donc ajouter en préfixe live. au nom des tables que nous voulons requêter.
![]() |
---|
Figure 14 : Création d'une Delta Live Table |
L’autre mot clé est STREAMING, qui sert lors de la création d’une table à définir qu’elle sera peuplée de façon incrémentale. Le mot clé STREAM est nécessaire lors de l’appel d’une table qui est peuplée de façon incrémentale.
![]() |
---|
Figure 15 : Traitement intermédiaires à partir d'une DLT |
Nous retrouvons donc en 3 requêtes : la création d’une table des données brutes, la création d’une table des données bronze et la création de la table des données silver.
![]() |
---|
Figure 16 : Création de la DLT de silver |
De retour dans l’onglet Workflow, nous choisissons le sous-menu ‘Delta Live Tables’ pour créer un nouveau pipeline en spécifiant un nouveau notebook. En l’exécutant, il est possible de consulter en temps réel le traitement des données.
![]() |
---|
Figure 17 : Monitoring du pipeline de DLT |
Contrairement aux taches des jobs, en cliquant sur une étape du pipeline, un aperçu de la qualité de la donnée est obtenu, notamment si des lignes ont été drop car ne répondant pas à d’éventuelles contraintes.
![]() |
---|
Figure 18 : Monitoring de la qualité de la donnée insérée |
Il est désormais possible de remplacer la tâche de transformation de station_status de type notebook par celle de type DLT en choisissant notre DLT pipeline. En exécutant le job et en cliquant sur l’étape du DLT, nous retombons sur la visualisation de la qualité des tables précédente.
![]() |
---|
Figure 19 : Job contenant une tâche Notebook et une tâche de pipeline DLT |
Nous pouvons donc orchestrer les pipelines DLT grâce aux jobs et leurs ajouter des dépendances et des mécaniques de retry par exemple.
Conclusion
L’orchestration dans Databricks permet un contrôle des actions plus fin que dans un outil externe tel qu’Azure Data Factory. De plus Databricks nous offre la possibilité de travailler avec les DLT, ce qui renforce le contrôle sur la qualité des données et non uniquement sur la bonne exécution ou non des notebooks.