chevron up
12 minutes de lecture
Comment implémenter un pipeline databricks Delta Live Tables ?
Hassane - il y a 8 mois
Le but de cet article est de voir un exemple d’implémentation d’un pipeline de données end-to-end en utilisant les Delta Live Tables (DLT) et ses avantages par rapport à un Auto Loader classique.

Table of contents

Contrôle de la qualité des données

Visualisation des résultats

Conclusion

Pour notre Platform Lakehouse de notre démo, nous suivrons un format de stockage Delta Lake (architecture multicouches).

Tout d’abord, on utilisera le mécanisme Auto Loader dans notre début de pipeline DLT afin de lire nos données en streaming à partir de notre source de données DBFS (on peut aussi utiliser ADLS Gen2, Azure Blob Storage, S3 bucket comme source de stockage).

Nous pourrons aussi créer notre table DLT bronze qui contiendra nos données brutes. Par la suite, nous effectuerons des fonctions de nettoyage, de transformation et d’enrichissement sur les données qui seront stockées sur une table de type DLT nommée silver. Finalement, nous analyserons nos données en calculant une mesure qui sera notre table DLT gold comme le montre la figure suivante :

figure1_deltahassane.PNG

Figure 1 : Ingestion des données avec notre pipeline Delta Live Tables

L’implémentation des pipelines DLT peut se faire en utilisant l’API Python ou bien l’API SQL. Pour notre traitement, nous utiliserons l’interface Python qui est définie avec le module dlt. Vous devez importer la bibliothèque dlt dans vos pipelines Delta Live Tables. On utilise l’élément décoratif @dlt.table à une fonction pour définir une table DLT. Pour le nom de notre table DLT nous pouvons soit choisir le nom de la fonction, soit le paramètre name. Les DLT peuvent avoir des propriétés qui sont optionnelles comme :

  • comment : rajouter un commentaire ou une description à notre table.
  • path : définir un emplacement de stockage pour les données de la table.
  • Les contraintes de la qualité de nos données. Les différentes propriétés seront visibles dans la partie monitoring de notre pipeline DLT sur le Workfow databricks.

figure2_deltahassane.PNG

Figure 2 : Table DLT bronze en utilisant un Auto Loader à partir de ma source de données

Pour la gestion des répertoires de schéma et de checkpoint, elle se fait d’une manière automatique contrairement à un Auto Loader classique où la configuration manuelle de ces répertoires est obligatoire.

Contrôle de la qualité des données

Pour gérer la qualité des données avec Delta Live table, on utilise les contraintes sur le contenu de nos tables DL. Elles permettent de valider facilement les données avant qu’elles ne soient ajoutées à nos tables Delta en utilisant les éléments décoratifs @expect, @expect_or_drop et @expect_or_fail avec des requêtes Python ou SQL.

  • @expect : permet de voir les enregistrements non valides qui violent la contrainte mais ils seront conservés dans notre jeu de données cible.
  • @expect_or_drop : permet de supprimer les enregistrements non valides dans notre table DLT qui violent la contrainte.
  • @expect_or_fail : permet d’arrêter directement l’exécution de notre pipeline lorsque des lignes non valides existent dans nos données.

Dans notre démo nous avons utilisé deux métriques pour contrôler la qualité de notre table DLT silver :

  • Contrainte1 : empêcher l’ajout des lignes non valides lorsque le nom du device (colonne device_name) est à NULL.
  • Contrainte2 : une métrique sur les lignes où le niveau de batterie (colonne bettery_level) est égal à zéro mais sans les supprimer sur notre dataset silver.

Pour notre table DLT gold, nous allons calculer une mesure qui permet de compter le nombre de devices pour chaque niveau de batterie en utilisant une fonction d’agrégation. Pour accéder à une table de type DLT dans notre pipeline avec l’API Python, nous utilisons soit la fonction read() de notre bibliothèque dlt soit la fonction spark.table() en ajoutant le mot clé LIVE avant le nom de notre table DLT dans l’argument de fonction.

figure3_deltahassane.PNG

Figure 3 : Implémentation de la table DLT silver en utilisant les contraintes pour la qualité de nos données et la partie transformation

figure4_deltahassane.PNG

Figure 4 : Implémentation de la table DLT gold

Visualisation des résultats

Pour afficher les détails de notre traitement de pipeline, nous utiliserons l’interface User offerte par le Workflow databricks qui nous donne une visualisation sous forme de graphe orienté acyclique DAG (Directed Acyclic Graphic). Celle-ci nous montre l’enchaînement des différentes Delta Live Tables créées et leurs dépendances. Après initialisation et exécution de notre pipeline, les détails de notre traitement s’afficheront sur le panel droit de l’interface (identifiant, Statut, la durée …etc.) comme le montre la figure ci-dessous :

figure5_deltahassane.PNG

Figure 5 : Vue graphique du résultat de l’exécution notre pipeline

En cliquant sur l’une de nos tables DLT, nous afficherons des informations détaillées comme le schéma de notre jeu de données, le statut, le nombre de lignes traitées et aussi les contraintes implémentées de la qualité des données. Pour la partie qualité des données, nous avons des informations sur nos deux métriques (Nom, le nombre de lignes non valides et son pourcentage …etc.)

figure6_deltahassane.PNG

Figure 6 : Vue détaillée de la table DLT silver

Il existe une autre manière de maintenir notre système de monitoring : en utilisant les événements qui sont stockés automatiquement par le système lors du déploiement de notre pipeline dans une table nommée events du format Delta dans notre emplacement de stockage. En utilisant cette table des événements, nous pouvons personnaliser notre code en fonction de nos propres exigences de monitoring. Pour notre traitement qui est exécuté, nous avons implémenté un système de monitoring qui permet d’avoir des détails d’une manière chronologique sur les deux contraintes de qualité des données :

figure7_deltahassane.PNG

Figure 7 : Chargement de la table delta des évènements

figure8_deltahassane.PNG

Figure 8 : Informations détaillées et personnalisées sur la contrainte 1

figure9_deltahassane.PNG

Figure 9 : Informations détaillées et personnalisées sur la contrainte 2

Conclusion

Avec Delta Live Tables, nous pouvons valoriser et contrôler nos données en appliquant les meilleures pratiques telles que la gestion des erreurs, les tests, le monitoring et la documentation afin de déployer des pipelines fiables qui seront utilisés comme tâches dans un job automatisé.