Data-Transform Database
A PostgreSQL database is available to perform data transformations that are not yet supported in ksqlDB. This setup allows you to add additional columns or perform advanced processing on data before it's ingested back into Kafka.
The following example demonstrates how to use an intermediate PostgreSQL database to generate an additional column cnt, which numbers events within each case (identified here by the INCIDENT column).
Step 1: Create a Table with Auto-Increment Index id¶
The auto-increment id index allows the ksqlDB connector to continuously retrieve the latest data. If available, other fields (such as timestamp or unique event identifier) can also serve this purpose.
Note: This table can be set up to create automatically on initial launch (see
conf/pg-initdb.d/).
Example command to create the table in your local PostgreSQL instance (accessible on the default port, with connection details in the .env file):
CREATE TABLE public."JDBC_TABLE" (
id SERIAL PRIMARY KEY NOT NULL
);
Step 2: Feed the Table from a Kafka Topic Using a JDBC Sink Connector¶
The following JDBC sink connector populates the PostgreSQL JDBC_TABLE table from the JDBC_TABLE Kafka topic:
CREATE SINK CONNECTOR JDBC_SINK_01 WITH (
'connector.class' = 'io.confluent.connect.jdbc.JdbcSinkConnector',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'topics' = 'JDBC_TABLE',
'table.name.format' = 'JDBC_TABLE',
'connection.url' = 'jdbc:postgresql://data-transform:5432/transform?verifyServerCertificate=false',
'connection.user' = 'datamanager',
'connection.password' = '1r8P!eXx',
'auto.evolve' = 'true'
);
Step 3: Read Data from the Table Using a JDBC Source Connector¶
The following JDBC source connector reads data from the PostgreSQL table and includes a generated cnt column that assigns sequential numbers to events within each INCIDENT case.
The query parameter in the connector specifies this transformation:
CREATE SOURCE CONNECTOR JDBCSOURCEConnector1 WITH (
'connector.class' = 'io.confluent.connect.jdbc.JdbcSourceConnector',
'tasks.max' = '1',
'connection.url' = 'jdbc:postgresql://data-transform:5432/transform?verifyServerCertificate=false',
'connection.user' = 'datamanager',
'connection.password' = '1r8P!eXx',
'mode' = 'incrementing',
'incrementing.column.name' = 'id',
'numeric.mapping' = 'best_fit',
'topic.prefix' = 'jdbc_cnt_case_lines',
'query' = 'SELECT * , ROW_NUMBER() OVER(PARTITION BY INCIDENT ORDER BY id ASC) as cnt from JDBC_TABLE'
);