Basic ksqlDB Example
This initial example offers a quick look at ksqlDB's capabilities for building a pipeline within the environment. Make sure the docker compose is started before running this example.
Creation of a pipeline¶
First, use the following command to apply modifications to a STREAM on data inserted prior to the creation or display of the STREAM:````sql
SET 'auto.offset.reset'='earliest';
Then create the first stream of the pipeline. It will receive the new messages and in this example will serve as an entry point to the pipeline :
CREATE STREAM STEP_1 (
message VARCHAR,
message_value INT
) WITH (
kafka_topic = 'topic_step_1',
partitions = 1,
value_format = 'AVRO'
);
Create the second stream of the pipeline.
Here, the data to process will come from the STEP_1 stream, and all the events that have a message_value greater or equal than 1 will be processed.
The stream will apply an uppercase function to all the message parameters of the processed events.
CREATE STREAM STEP_2 AS
SELECT UCASE(one.message) AS upper_message, message_value
FROM STEP_1 one
WHERE one.message_value >= 1;
Now, create the last stream of the pipeline.
Data will come from the STEP_2 stream and will apply a lowercase function to all the message parameters
of the events that have a message_value greater or equal than 2.
CREATE STREAM STEP_3 AS
SELECT LCASE(two.upper_message) AS lower_message
FROM STEP_2 two
WHERE two.message_value >= 2;
Adding data¶
The pipeline having been created, we can now add data to it.
The following events will be inserted only in the STEP_1 stream,
but will still be processed throughout the entire pipeline,
with each stream applying its own transformations and filters.
Add the events with the following code :
INSERT INTO STEP_1 (message, message_value) VALUES ('LiVeCoNnEcT', 1);
INSERT INTO STEP_1 (message, message_value) VALUES ('iGrafx', 2);
INSERT INTO STEP_1 (message, message_value) VALUES ('ksqlDB', 0);
Now, you can view the values of each stream by using the relevant query in the code below:
SELECT * FROM STEP_1 EMIT CHANGES;
SELECT * FROM STEP_2 EMIT CHANGES;
SELECT * FROM STEP_3 EMIT CHANGES;
You will notice that in STEP_1, the data is represented in the same way as we inserted it.
In STEP_2, only two events appear, the third one being inferior to one,
is not processed. The message parameter in both events should be in uppercase
as the stream applies an uppercase function on the data it receives.
In STEP_3, only one message is being processed, as it is the only one with a message_value superior or equal to 2.
The message parameter of this event should be in lowercase,
as the stream applies a lowercase function on the data it receives.
Note that in ksqlDB, data is managed in real time and any new event that arrives at any time in the pipeline will also go through the entire pipeline. You can see this if you add a fourth event:
INSERT INTO STEP_1 (message, message_value) VALUES ('Hi', 2);
By using the same display requests as before, you will see that the new event goes through each stream, with the respective transformations.
This is a straightforward example of ksql’s capabilities, using SQL-like syntax to process data in real time while leveraging the advantages of distributed systems—such as data replication, scalability, and load balancing—by running ksqlDB on top of Kafka.