PostgreSQL für Change Data Capture konfigurieren

Database

Vor einigen Tagen wurde ich bei einem Gespräch mit Kunden gefragt, welche Möglichkeiten es in PostgreSQL für Change Data Capture (CDC) gibt. Die offensichtliche Antwort war natürlich erstmal: Trigger! - Und so ist es auch bei der zu migrierenden Datenbank aktuell implementiert. Das Nutzen von Triggern kann jedoch einen nicht zu vernachlässigenden Einfluss auf die Performance haben. Trigger sind Datenbank Operationen und werden entweder vor oder nach einem DML (Data Manipulation Language) Statement wie z.B. Insert, Update oder Delete ausgeführt.

Gibt es also eine Alternative für PostgreSQL, um CDC zu ermöglichen?

Ja, und diese lässt sich sogar sehr einfach implementieren. Dafür nutze ich den Transaktionslog (Write Ahead Log / WAL) von PostgreSQL, der jedes ausgeführte DML enthält. Das Verfahren nennt sich Log Based CDC. Als Demo Setup nutze ich einen PostgreSQL v12.3 Docker Container auf meinem Windows 10 Notebook mit WSL2.

Der erste Schritt ist nun den PostgreSQL Container zu definieren. Aus Bequemlichkeit nutze ich docker-compose mit einer minimalen Konfiguration. Es ist dabei wichtig, dass der postgresql.conf Konfigurationsparameter ‘wal_level’ auf den Wert ‘logical’ gesetzt wird. Dadurch enthält der WAL die notwendigen Informationen für logische Replikation.

Meine docker-compose.yml sieht also so aus:

 

version: "3.8"
services:
  db:
    image: "postgres:latest"
    container_name: "pg_cdc_test"
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
      - POSTGRES_DB=testdb
    ports:
      - "5432:5432"
    command:
      - "postgres"
      - "-c"
      - "wal_level=logical"
    volumes:
      - pg_cdc_data:/var/lib/postgresql/data
volumes:
  pg_cdc_data:

 

Nun starte ich den Container und lasse mir seine Status anzeigen.

 

docker-compose up -d

Creating network "postgres-cdc_default" with the default driver
Creating volume "postgres-cdc_my_dbdata" with default driver
Pulling db (postgres:latest)...
latest: Pulling from library/postgres
8559a31e96f4: Pull complete
04866763fec8: Pull complete
1705d51f48e5: Pull complete
e59f13162b50: Pull complete
f34bb6f66594: Pull complete
cbfb60b6801a: Pull complete
e8207269011b: Pull complete
89bccd0fcae0: Pull complete
d3be4c4d3a6e: Pull complete
6593b341f133: Pull complete
b63c7214eb05: Pull complete
a4594bc5ebc6: Pull complete
462172dd94a5: Pull complete
abac28c8c3a0: Pull complete
Digest: sha256:9ba6355d27ba9cd0acda1e28afaae4a5b7b2301bbbdc91794dcfca95ab08d2ef
Status: Downloaded newer image for postgres:latest
Creating postgres_cdc_test ... done

docker ps

CONTAINER ID        IMAGE               COMMAND                  CREATED             STATUS              PORTS          NAMES
2fed14ed0def        postgres:latest     "docker-entrypoint.s…"   3 minutes ago       Up 2 minutes        0.0.0.0:5432->5432/tcp   pg_cdc_test

 

Klasse! Der Container läuft und ich kann mich als nächstes direkt in psql einloggen. Zuerst prüfe ich, ob auch das WAL-Level auf logisch konfiguriert ist.

Hinweis: Der postgresql.conf Parameter ‘max_replication_slots’ muss mindestens den Wert 1 (Standard: 10) haben.

 

docker exec -it pg_cdc_test psql -U postgres postgres

psql (12.3 (Debian 12.3-1.pgdg100+1))
Type "help" for help.

postgres=# show wal_level;
 wal_level
-----------
 logical
(1 row)

postgres=# show max_replication_slots;
 max_replication_slots
-----------------------
 10
(1 row)

 

Als Nächstes erzeuge ich eine Tabelle, auf die später das DML abzielt. 

 

postgres=# CREATE TABLE ice_cream (id int primary key, name varchar, price numeric(3,2));
CREATE TABLE

 

Jetzt lege ich einen Replicaton Slot mit dem Namen ‘cdc_slot’ an. Dieser verwendet das ‘test_decoding’ Plugin, welches als Modul mit PostgreSQL verfügbar ist. Der Replication Slot ist dann auch in der Katalogview pg_replication_slots ersichtlich.

 

SELECT pg_create_logical_replication_slot('cdc_slot', 'test_decoding');
 pg_create_logical_replication_slot
------------------------------------
 (cdc_slot,0/1663650)
(1 row)

postgres=# SELECT * FROM pg_replication_slots;
-[ RECORD 1 ]-------+--------------
slot_name           | cdc_slot
plugin              | test_decoding
slot_type           | logical
datoid              | 13408
database            | postgres
temporary           | f
active              | f
active_pid          |
xmin                |
catalog_xmin        | 489
restart_lsn         | 0/1663618
confirmed_flush_lsn | 0/1663650

 

Als nächstes führe ich einen Insert auf der ice_cream Tabelle aus und schaue den Output des Replication Slots mit der Funktion pg_logical_slot_peek_changes an.

 

postgres=# INSERT INTO ice_cream (id, name, price) values (1, 'Vanilla', '1.00');
INSERT 0 1

postgres=# SELECT * FROM pg_logical_slot_peek_changes('cdc_slot', NULL, NULL);
-[ RECORD 1 ]---------------------------------------------------------------------------------------------
lsn  | 0/1663650
xid  | 489
data | BEGIN 489
-[ RECORD 2 ]---------------------------------------------------------------------------------------------
lsn  | 0/1663650
xid  | 489
data | table public.ice_cream: INSERT: id[integer]:1 name[character varying]:'Vanilla' price[numeric]:1.00
-[ RECORD 3 ]---------------------------------------------------------------------------------------------
lsn  | 0/1663768
xid  | 489
data | COMMIT 489

 

Der Replication Slot zeigt die Transaktion unseres Inserts.

Was passiert aber nun, wenn ich den Replication Slot erneut abfrage?

 

postgres=# SELECT * FROM pg_logical_slot_peek_changes('cdc_slot', NULL, NULL);
-[ RECORD 1 ]---------------------------------------------------------------------------------------------
lsn  | 0/1663650
xid  | 489
data | BEGIN 489
-[ RECORD 2 ]---------------------------------------------------------------------------------------------
lsn  | 0/1663650
xid  | 489
data | table public.ice_cream: INSERT: id[integer]:1 name[character varying]:'Vanilla' price[numeric]:1.00
-[ RECORD 3 ]---------------------------------------------------------------------------------------------
lsn  | 0/1663768
xid  | 489
data | COMMIT 489

 

Die Insert Transaktion ist immer noch zu sehen. Sie wird also vom Replication Slot bei Nutzung der Funktion pg_logical_slot_peek_changes weiter vorgehalten. Ich “spähen” [engl. peek] also nur in den Replication Slot, ohne den Eintrag dabei zu entfernen.

Um den Transaktionslog aus dem Slot zu konsumieren (nach dem Lesen entfernen), wende ich die Funktion pg_logical_slot_get_changes an. Wiederholt man die Query, erhält man den Eintrag nicht mehr.

 

postgres=# SELECT * FROM pg_logical_slot_get_changes('cdc_slot', NULL, NULL);
-[ RECORD 1 ]---------------------------------------------------------------------------------------------
lsn  | 0/1663650
xid  | 489
data | BEGIN 489
-[ RECORD 2 ]---------------------------------------------------------------------------------------------
lsn  | 0/1663650
xid  | 489
data | table public.ice_cream: INSERT: id[integer]:1 name[character varying]:'Vanilla' price[numeric]:1.00
-[ RECORD 3 ]---------------------------------------------------------------------------------------------
lsn  | 0/1663768
xid  | 489
data | COMMIT 489

postgres=# SELECT * FROM pg_logical_slot_get_changes('cdc_slot', NULL, NULL);
(0 rows)

 

Der Replication Slot der PostgreSQL Instanz (= Publisher) hält nun alle DML Statments solange vor, bis diese durch eine andere Anwendung (= Consumer) abgeholt werden. Damit haben wir die Basis für PostgreSQL mit CDC geschaffen. Mit einem entsprechenden Interface, z.B. auf Basis von Rest, könnte nun Kafka als Consumer des Replication Slots genutzt werden und der DML Transaktionslog andersweitig verwendet werden.

Dirk Aumueller Autor

Dirk Aumueller arbeitet als Associate Partner für die Proventa AG. Sein technologischer Schwerpunkt liegt bei Datenbankarchitekturen mit PostgreSQL sowie Data Management Lösungen mit Pentaho. Zusätzlich zu seinen Datenbanktransformations-Projekteinsätzen ist er regelmäßig als PostgreSQL Trainer unterwegs und betreut Studenten bei ihren Abschlussarbeiten. Seine fachlichen Erfahrungen erstrecken sich über die Branchen Telco und Financial Services.

Tags