Plusieurs méthodes de seize de données modifiées sont disponibles lors de l’utilisation d’une base de données MySQL ou Postgres. Certaines de ces méthodes se chevauchent et sont très similaires quelle que soit la technologie de base de données que vous utilisez, d’autres sont différentes. En fin de compte, nous avons besoin d’un moyen de spécifier et de détecter ce qui a changé et d’une méthode pour envoyer ces modifications à un système cible.
Cet article suppose que vous êtes familiarisé avec la seize de données modifiées, sinon lisez l’article d’introduction précédent ici « Modification de la seize de données : de quoi s’agit-il et remark l’utiliser.” Dans cet article, nous allons approfondir les différentes façons d’implémenter CDC si vous avez une base de données MySQL et Postgres et comparer les approches.
CDC avec horodatage de mise à jour et Kafka
L’un des moyens les plus simples d’implémenter une answer CDC dans MySQL et Postgres consiste à utiliser des horodatages de mise à jour. Chaque fois qu’un enregistrement est inséré ou modifié, l’horodatage de mise à jour est mis à jour à la date et à l’heure actuelles et vous permet de savoir quand cet enregistrement a été modifié pour la dernière fois.
Nous pouvons ensuite créer des options sur mesure pour interroger la base de données à la recherche de nouveaux enregistrements et les écrire dans un système cible ou dans un fichier CSV à traiter ultérieurement. Ou nous pouvons utiliser une answer pré-construite comme Kafka et Kafka Connexion qui a des connecteurs prédéfinis qui interrogent les tables et publient des lignes dans une file d’attente lorsque l’horodatage de mise à jour est supérieur au dernier enregistrement traité. Kafka Join dispose également de connecteurs pour cibler les systèmes qui peuvent ensuite écrire ces enregistrements pour vous.
Récupérer les mises à jour et les publier dans la base de données cible à l’aide de Kafka
Kafka est une plate-forme de diffusion d’événements qui go well with un modèle de pub-sub. Les éditeurs envoient des données à une file d’attente et un ou plusieurs consommateurs peuvent ensuite lire les messages de cette file d’attente. Si nous voulions capturer les modifications d’une base de données MySQL ou Postgres et les envoyer à un entrepôt de données ou à une plate-forme d’analyse, nous devons d’abord configurer un éditeur pour envoyer les modifications, puis un consommateur qui pourrait lire les modifications et les appliquer à notre cible. système.
Pour simplifier ce processus, nous pouvons utiliser Kafka Join. Kafka Join fonctionne comme un intermédiaire avec des connecteurs pré-construits pour publier et consommer des données qui peuvent simplement être configurées avec un fichier de configuration.
Fig 1. Structure CDC avec MySQL, Postgres et Kafka
Comme le montre la determine 1, nous pouvons configurer un connecteur JDBC pour Kafka Join qui spécifie quelle desk nous aimerions consommer, remark détecter les changements qui, dans notre cas, seront en utilisant l’horodatage de mise à jour et dans quel sujet (file d’attente) les publier. . L’utilisation de Kafka Join pour gérer cela signifie que toute la logique nécessaire pour détecter les lignes qui ont changé est faite pour nous. Nous devons seulement nous assurer que le champ d’horodatage de mise à jour est mis à jour (traité dans la part suivante) et Kafka Join s’occupera de :
- Garder une hint de l’horodatage de mise à jour most du dernier enregistrement qu’il a publié
- Interrogation de la base de données pour tous les enregistrements avec des champs d’horodatage de mise à jour plus récents
- Écriture des données dans une file d’attente à consommer en aval
Nous pouvons alors soit configurer des « puits » qui définissent où sortir les données, soit faire en sorte que le système supply communique directement avec Kafka. Encore une fois, Kafka Join possède de nombreux connecteurs de récepteur prédéfinis que nous pouvons simplement configurer pour envoyer les données à de nombreux systèmes cibles différents. Des providers comme Fusée peuvent parler directement à Kafka et ne nécessitent donc pas la configuration d’un récepteur.
Encore une fois, l’utilisation de Kafka Join signifie que non seulement nous pouvons écrire des données à de nombreux endroits différents avec très peu de codage requis, mais nous obtenons également le débit et la tolérance aux pannes de Kafka qui nous aideront à faire évoluer notre answer à l’avenir.
Pour que cela fonctionne, nous devons nous assurer que nous avons mis à jour les champs d’horodatage sur les tables que nous voulons capturer et que ces champs sont toujours mis à jour chaque fois que l’enregistrement est mis à jour. Dans la part suivante, nous expliquons remark implémenter cela dans MySQL et Postgres.
Utilisation de déclencheurs pour les horodatages de mise à jour (MySQL et Postgres)
MySQL et Postgres prennent tous deux en cost les déclencheurs. Les déclencheurs vous permettent d’effectuer des actions dans la base de données immédiatement avant ou après qu’une autre motion se produise. Pour cet exemple, chaque fois qu’une commande de mise à jour est détectée sur une ligne de notre desk supply, nous voulons déclencher une autre mise à jour sur la ligne affectée qui définit l’horodatage de mise à jour sur la date et l’heure actuelles.
Nous souhaitons uniquement que le déclencheur s’exécute sur une commande de mise à jour, automotive dans MySQL et Postgres, vous pouvez définir la colonne d’horodatage de mise à jour pour utiliser automatiquement la date et l’heure actuelles lorsqu’un nouvel enregistrement est inséré. La définition de desk dans MySQL ressemblerait à ceci (la syntaxe Postgres serait très similaire). Notez le COURANT PAR DÉFAUTMots clés TIMESTAMP lors de la déclaration de la mise à jourcolonne d’horodatage qui garantit qu’au second de l’insertion d’un enregistrement, la date et l’heure actuelles sont utilisées par défaut.
CREATE TABLE consumer
(
id INT(6) UNSIGNED AUTO_INCREMENT PRIMARY KEY,
firstname VARCHAR(30) NOT NULL,
lastname VARCHAR(30) NOT NULL,
e-mail VARCHAR(50),
update_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
Cela signifie que notre colonne update_timestamp est définie sur la date et l’heure actuelles pour tout nouvel enregistrement. Nous devons maintenant définir un déclencheur qui mettra à jour ce champ chaque fois qu’un enregistrement est mis à jour dans la desk utilisateur. L’implémentation de MySQL est easy et se présente comme go well with.
DELIMITER $$
CREATE TRIGGER user_update_timestamp
BEFORE UPDATE ON consumer
FOR EACH ROW BEGIN
SET NEW.update_timestamp = CURRENT_TIMESTAMP;
END$$
DELIMITER ;
Pour Postgres, vous devez d’abord définir une fonction qui définira le champ update_timestamp sur l’horodatage actuel, puis le déclencheur exécutera la fonction. Il s’agit d’une différence subtile, mais légèrement plus lourde, automotive vous avez maintenant une fonction et un déclencheur à maintenir dans la base de données postgres.
Utilisation de la syntaxe de mise à jour automatique dans MySQL
Si vous utilisez MySQL, il existe un autre moyen beaucoup plus easy d’implémenter un horodatage de mise à jour. Lors de la définition de la desk dans MySQL, vous pouvez définir la valeur à laquelle définir une colonne lorsque l’enregistrement est mis à jour, ce qui dans notre cas serait de le mettre à jour avec l’horodatage actuel.
CREATE TABLE consumer
(
id INT(6) UNSIGNED AUTO_INCREMENT PRIMARY KEY,
firstname VARCHAR(30) NOT NULL,
lastname VARCHAR(30) NOT NULL,
e-mail VARCHAR(50),
update_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
L’avantage est que nous n’avons plus à maintenir le code du déclencheur (ou le code de la fonction dans le cas de Postgres).
CDC avec Debezium, Kafka et Amazon DMS
Une autre choice pour implémenter une answer CDC consiste à utiliser les journaux de base de données natifs que MySQL et Postgres peuvent produire lorsqu’ils sont configurés pour le faire. Ces journaux de base de données enregistrent chaque opération exécutée sur la base de données qui peut ensuite être utilisée pour répliquer ces modifications dans un système cible.
L’avantage d’utiliser les journaux de base de données est que, premièrement, vous n’avez pas besoin d’écrire de code ni d’ajouter de logique supplémentaire à vos tables comme vous le faites avec les horodatages de mise à jour. Deuxièmement, il prend également en cost la suppression d’enregistrements, ce qui n’est pas potential avec les horodatages de mise à jour.
Dans MySQL, vous faites cela en activant le binlog et dans Postgres, vous configurez le Journal d’écriture anticipée (WAL) pour la réplication. Une fois la base de données configurée pour écrire ces journaux, vous pouvez choisir un système CDC pour vous aider à capturer les modifications. Deux choices populaires sont Debezium et Amazon Database Migration Service (DMS). Ces deux systèmes utilisent le binlog pour MySQL et WAL pour Postgres.
Debezium fonctionne nativement avec Kafka. Il récupère les modifications pertinentes, les convertit en un objet JSON qui contient une cost utile décrivant ce qui a changé et le schéma de la desk et le place sur un sujet Kafka. Cette cost utile contient tout le contexte requis pour appliquer ces modifications à notre système cible, il nous suffit d’écrire un consommateur ou d’utiliser un récepteur Kafka Join pour écrire les données. Comme Debezium utilise Kafka, nous bénéficions de tous les avantages de Kafka tels que la tolérance aux pannes et l’évolutivité.
Fig 2. Structure Debezium CDC pour MySQL et Postgres
AWS DMS fonctionne de manière similaire à Debezium. Il prend en cost de nombreux systèmes supply et cible différents et s’intègre de manière native à tous les providers de données AWS populaires, notamment Kinesis et Redshift.
Le principal avantage de l’utilisation de DMS sur Debezium est qu’il s’agit en fait d’une offre « sans serveur ». Avec Debezium, si vous voulez la flexibilité et la tolérance aux pannes de Kafka, vous avez la cost de déployer un cluster Kafka. DMS comme son nom l’indique est un service. Vous configurez les factors de terminaison supply et cible et AWS se cost de gérer l’infrastructure pour gérer la surveillance des journaux de base de données et la copie des données vers la cible.
Cependant, cette approche sans serveur a ses inconvénients, principalement dans son ensemble de fonctionnalités.
Quelle choice pour CDC ?
Lorsque vous évaluez le modèle à suivre, il est vital d’évaluer votre cas d’utilisation spécifique. L’utilisation des horodatages de mise à jour fonctionne lorsque vous souhaitez uniquement capturer des insertions et des mises à jour. Si vous disposez déjà d’un cluster Kafka, vous pouvez vous lancer très rapidement, surtout si la plupart des tables incluent déjà une sorte d’horodatage de mise à jour.
Si vous préférez utiliser l’approche du journal de base de données, peut-être parce que vous voulez une réplication exacte, vous devriez envisager d’utiliser un service comme Debezium ou AWS DMS. Je suggérerais de vérifier d’abord quel système prend en cost les systèmes supply et cible dont vous avez besoin. Si vous avez des cas d’utilisation plus avancés tels que le masquage de données sensibles ou le réacheminement des données vers différentes recordsdata d’attente en fonction de leur contenu, Debezium est probablement le meilleur choix. Si vous recherchez simplement une réplication easy avec peu de frais généraux, DMS fonctionnera pour vous s’il prend en cost votre système supply et cible.
Si tu as analyse en temps réel besoins, vous pouvez envisager d’utiliser une base de données cible comme Rockset comme couche de service d’analyse. Rockset s’intègre avec MySQL et postgres, à l’aide d’AWS DMS, pour ingérer des flux CDC et indexer les données pour des analyses en moins d’une seconde à grande échelle. Rockset peut également lire les flux CDC à partir de bases de données NoSQL, telles que MongoDB et Amazon DynamoDB.
La bonne réponse dépend de votre cas d’utilisation spécifique et il y a beaucoup plus d’choices que celles discutées ici, ce ne sont là que quelques-unes des façons les plus populaires d’implémenter un système CDC moderne.
Lewis Gavin est ingénieur de données depuis cinq ans et blogue également sur les compétences au sein de la communauté Information depuis quatre ans sur un weblog personnel et Medium. Pendant ses études en informatique, il a travaillé pour l’équipe d’Airbus Helicopter à Munich en améliorant les logiciels de simulation pour les hélicoptères militaires. Il a ensuite travaillé pour Capgemini où il a aidé le gouvernement britannique à entrer dans le monde du Large Information. Il utilise actuellement cette expérience pour aider à transformer le paysage des données à easyfundraising.org.ukun website de cashback caritatif en ligne, où il contribue à façonner leur entreposage de données et capacité de rapport à partir de zéro.