Talend ESB: Création d’un curseur sous MongoDB avec Camel

Le but de cet article est l’utilisation du composant Camel MongoDB afin d’exploiter un ‘tailable cursor‘ ouvert sur une collection, ce curseur se comportera comme la fonction ‘tail -f’ en unix pour intercepter les données nouvellement créées et les rediriger vers une destination quelconque.

Le scénario

Afin de bien décomposer les étapes du fonctionnement, j’utiliserai les propriétés JMX afin de contrôler la route Camel, le scénario se déroulera de la sorte:

  • Création de la collection (Manuellement avec Robomongo)
  • Injection des données dans MongoDB avec une route utilisant un ‘delayer’ pour ralentir le flux de données
  • Lancement d’une seconde route qui va créer le ‘tailable cursor’ et intercepter les données pour les envoyer dans la console
  • Arrêt de la seconde route contenant le curseur avec Java mission control
  • Le traitement de la première route se poursuit
  • Relance de la seconde route avec JMC, celle-ci va reprendre à la dernière occurrence consommée (pas de doublons envoyés)

Les sources de cette démo sont bien sûr disponibles sur mon compte Github

Installation de MongoDB avec Docker

  • Télécharger et installer Docker Toolbox
  • Télécharger et installer un client MongoDB, par exemple RoboMongo
  • Installer MongoDB avec Docker

Attention à la compatibilité, si on analyse attentivement les librairies de Talend, on peut s’apercevoir que la version 6.1 de Talend qui embarque camel-mongodb-alldep-2.15.4/ est compatible avec la version 2 de MongoDB tandis que Talend 6.2 qui embarque camel-mongodb-alldep-2.16.3/ est compatible avec MongoDB version 3.

Dans mon cas j’utiliserai la version Talend 6.2 et l’image docker de MongoDB 2.6, qui prouve la rétrocompatibilité du driver, il faut ensuite lancer les commandes suivantes:

# docker-machine start default
# eval $(docker-machine env default)
# docker pull mongo:2.6
# docker run -d -p 27017:27017 mongo:2.6

Récupérer l’IP de la machine docker

docker-machine ip default

Lancer ensuite RoboMongo pour se connecter au serveur

art23_img3_mongodb_camel_tailable_cursor

Créer une DB nommée ‘testDB’

art23_img5_mongodb_camel_tailable_cursor

art23_img6_mongodb_camel_tailable_cursor

Une fois la DB créée il faut créer une collection de type ‘capped collection‘, cette collection un peu spéciale agit comme un buffer circulaire qui ré-écrase les données une fois l’espace rempli.

Ouvrir le shell

art23_img4_mongodb_camel_tailable_cursor

Taper la commande:

 use testDB 

Réouvir le shell et créer la collection avec la commande:

 db.createCollection("camelTestCapped", { capped : true, size : 5242880, max : 5000 } ) 

1ère route

Une fois la base de données et la collection créées, nous allons pouvoir lancer la route d’insertion des données, cette route est décomposée en plusieurs étapes:

art23_img8_mongodb_camel_tailable_cursor

  • Lecture des lignes d’un fichier CSV contenant une colonne texte et une autre de type entier
  • Unmarshal de ces lignes avec Camel bindy pour créer des POJO
  • Split des lignes
  • Un delayer qui va volontairement ralentir le processus et nous laisser le temps de démarrer et de stopper le ‘tailable cursor’
  • Marshal des données pour convertir en JSON, le but du marshal est de garder le type entier du champ « increasing » afin que MongoDB l’interprète correctement et créé les champs de type INT32
  • Insertion dans la collection de type « capped »

2ème route

Lançons ensuite avec Talend la route qui va créer le curseur

art23_img9_mongodb_camel_tailable_cursor

La route est paramétrée en mode noautostartup, le contexte sera lancé mais la route non démarrée, il faudra ensuite la démarrer manuellement avec JMC, l’arrêt via le studio n’étant pas propre le curseur est détruit et la dernière valeur en cours ne sera pas sauvegardée, par défaut la sauvegarde de la dernière occurrence se fait toutes les secondes avec l’option cursorRegenerationDelay.

Lancer « Oracle Java Mission Control » qui se trouve dans le répertoire d’installation de JVM (JAVA_HOME\bin\jmc.exe), celui-ci va nous permettre via les propriétés JMX de contrôler le lancement et l’arrêt de la route Camel.

Java Mission Control

Démarrer la route avec le MBean browser

art23_img11_mongodb_camel_tailable_cursor

La route consomme des données arrivant dans la ‘capped’ collection et les envoie dans la console.

art23_img12_mongodb_camel_tailable_cursor

Stoppons ensuite la route avec JMC

art23_img13_mongodb_camel_tailable_cursor

Le résultat s’affiche dans la console de Talend

art23_img14_mongodb_camel_tailable_cursor

Le composant Camel MongoDB peut stocker la dernière occurrence dans une autre DB et dans une autre collection, il suffit d’interroger la DB avec Robomongo

art23_img15_mongodb_camel_tailable_cursor

La dernière occurrence est 46, c’est-à-dire que la route ne consommera les données qu’à partir de cette valeur sinon elle risque de reprendre depuis le départ lors de sa relance, ce qui créera des doublons dans la destination.

Relançons maintenant la route tailable_cursor avec JMC, les données consommées débutent à partir de l’ID 47.

art23_img16_mongodb_camel_tailable_cursor

Conclusion

Voici une façon très pratique de faire du ‘streaming’ de données avec Camel et MongoDB, utile si vous voulez intercepter vos données en direct sans créer de batch journalier pour les extraire.

Laisser un commentaire

Votre adresse de messagerie ne sera pas publiée. Les champs obligatoires sont indiqués avec *

You may use these HTML tags and attributes:

<a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong> 

Petit calcul pour valider votre commentaire! merci * Time limit is exhausted. Please reload CAPTCHA.