Skip to content

Aggregation Connector Examples

For the following 2 examples the threshold.valuePattern property is not set, so the aggregation is only made according to the threshold.elementNumber and threshold.timeoutInSeconds properties.

Example 1

First, start by using the following command to apply modifications of a STREAM on data inserted before the creation or display of the STREAM :

SET 'auto.offset.reset'='earliest';

In this first example, we will simply aggregate messages with a single column of type :

line VARCHAR

Therefore, the first command to write in ksqlDB creates the STREAM that will feed the aggregation connector with data :

CREATE STREAM INPUT_DATA (
    line VARCHAR
) WITH (
    KAFKA_TOPIC='aggregation_input', 
    PARTITIONS=1, 
    REPLICAS=1, 
    VALUE_FORMAT='AVRO'
);

We then add a STREAM over the output topic to manipulate the aggregated data Remember to use : aggregationColumnName ARRAY<STRUCT<...[INPUT_DATA columns]...>> :

CREATE STREAM OUTPUT_DATA (
    LINEAG ARRAY<STRUCT<LINE VARCHAR>>
) WITH (
    KAFKA_TOPIC='aggregation_output', 
    PARTITIONS=1, 
    REPLICAS=1, 
    VALUE_FORMAT='AVRO'
);

We can then create the connector with the correct aggregationColumnName :

CREATE SINK CONNECTOR AggregationConnectorTest WITH (
    'connector.class' = 'com.igrafx.kafka.sink.aggregation.adapters.AggregationSinkConnector', 
    'tasks.max' = '1',
    'topics' = 'aggregation_input',
    'topicOut' = 'aggregation_output',
    'aggregationColumnName' = 'LINEAG',
    'threshold.elementNumber' = '6',
    'threshold.timeoutInSeconds' = '30',
    'bootstrap.servers' = 'broker:29092',
    'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
    'value.converter' = 'io.confluent.connect.avro.AvroConverter',
    'value.converter.schema.registry.url' = 'http://schema-registry:8081'
);

And insert new data to aggregate :

INSERT INTO INPUT_DATA (line) VALUES ('2020-06-16T04;1;appli1;Start');
INSERT INTO INPUT_DATA (line) VALUES ('2020-06-16T04;1;appli1;event1');
INSERT INTO INPUT_DATA (line) VALUES ('2020-06-16T04;1;appli1;End');
INSERT INTO INPUT_DATA (line) VALUES ('2020-06-16T04;2;appli2;Start');
INSERT INTO INPUT_DATA (line) VALUES ('2020-06-16T04;2;appli2;event2');
INSERT INTO INPUT_DATA (line) VALUES ('2020-06-16T04;2;appli2;End');
INSERT INTO INPUT_DATA (line) VALUES ('2020-06-16T04;3;appli3;Start');

The OUTPUT_DATA STREAM contains the results of the aggregation, results which we can display via :

SELECT * FROM OUTPUT_DATA EMIT CHANGES;

To then break down the generated STRUCT and only manipulate an ARRAY<VARCHAR>, we can use the following command

This is only possible with the versions 0.17.0 or higher of ksqlDB :

CREATE STREAM CORRECT_DATA AS SELECT transform(LINEAG, s => s->LINE) AS LINEAG FROM OUTPUT_DATA EMIT CHANGES;

Furthermore, we can display its results with :

SELECT * FROM CORRECT_DATA EMIT CHANGES;

With this example, the data in the INPUT_DATA STREAM corresponds to :

LINE
2020-06-16T04;1;appli1;Start
2020-06-16T04;1;appli1;event1
2020-06-16T04;1;appli1;End
2020-06-16T04;2;appli2;Start
2020-06-16T04;2;appli2;event2
2020-06-16T04;2;appli2;End
2020-06-16T04;3;appli3;Start

After the aggregation, the result in the OUTPUT_DATA STREAM should be as follows

The second row only appears after 30 seconds, which corresponds to the threshold.timeoutInSeconds property :

LINEAG
[{LINE=2020-06-16T04;1;appli1;Start}, {LINE=2020-06-16T04;1;appli1;event1}, {LINE=2020-06-16T04;1;appli1;End}, {LINE=2020-06-16T04;2;appli2;Start}, {LINE=2020-06-16T04;2;appli2;event2}, {LINE=2020-06-16T04;2;appli2;End}]
[{LINE=2020-06-16T04;3;appli3;Start}]

And the result in the CORRECT_DATA STREAM should be :

LINEAG
[2020-06-16T04;1;appli1;Start, 2020-06-16T04;1;appli1;event1, 2020-06-16T04;1;appli1;End, 2020-06-16T04;2;appli2;Start, 2020-06-16T04;2;appli2;event2, 2020-06-16T04;2;appli2;End]
[2020-06-16T04;3;appli3;Start]

Here, the first row corresponds to the aggregation of the first 6 lines of INPUT_DATA as the threshold.elementNumber property of the connector was equal to 6, and the second row corresponds to the aggregation of the last line of INPUT_DATA as the threshold.timeoutInSeconds property of the connector is equal to 30 and in the 30 seconds that followed the flush of the first result, only "2020-06-16T04;3;appli3;Start" was received by the connector.

When the testing is done, you can delete the connector with :

DROP CONNECTOR AGGREGATIONCONNECTORTEST;

Example 2

First, start by using the following command to apply modifications of a STREAM on data inserted before the creation or display of the STREAM :

SET 'auto.offset.reset'='earliest';

In this second example, we will aggregate messages with a single column of type in order to show how the aggregation works on a more complex type, but the principles are the same as the latter example:

dataArray ARRAY<STRUCT<columnID INT, text VARCHAR, quote BOOLEAN>>

The first command to write in ksqlDB creates the STREAM that will feed the aggregation connector with data :

CREATE STREAM INPUT_DATA2 (
    dataArray ARRAY<STRUCT<columnID INT, text VARCHAR, quote BOOLEAN>>
) WITH (
    KAFKA_TOPIC='aggregation_input2', 
    PARTITIONS=1, 
    REPLICAS=1, 
    VALUE_FORMAT='AVRO'
);

We then add a STREAM over the output topic to manipulate the aggregated data. Remember to use : aggregationColumnName ARRAY<STRUCT<...[INPUT_DATA columns]...>> :

CREATE STREAM OUTPUT_DATA2 (
    LINEAG ARRAY<STRUCT<DATAARRAY ARRAY<STRUCT<columnID INT, text VARCHAR, quote BOOLEAN>>>>
) WITH (
    KAFKA_TOPIC='aggregation_output2', 
    PARTITIONS=1, 
    REPLICAS=1, 
    VALUE_FORMAT='AVRO'
);

We can then create the connector with the correct aggregationColumnName:

CREATE SINK CONNECTOR AggregationConnectorTest2 WITH (
    'connector.class' = 'com.igrafx.kafka.sink.aggregation.adapters.AggregationSinkConnector', 
    'tasks.max' = '1',
    'topics' = 'aggregation_input2',
    'topicOut' = 'aggregation_output2',
    'aggregationColumnName' = 'LINEAG',
    'threshold.elementNumber' = '3',
    'threshold.timeoutInSeconds' = '30',
    'bootstrap.servers' = 'broker:29092',
    'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
    'value.converter' = 'io.confluent.connect.avro.AvroConverter',
    'value.converter.schema.registry.url' = 'http://schema-registry:8081'
);

And insert new data to aggregate :

INSERT INTO INPUT_DATA2 (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '3', quote := false), STRUCT(columnId := 1, text := 'A', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:05', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:10', quote := false)]);
INSERT INTO INPUT_DATA2 (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '3', quote := false), STRUCT(columnId := 1, text := 'B', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:15', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:16', quote := false)]);
INSERT INTO INPUT_DATA2 (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '3', quote := false), STRUCT(columnId := 1, text := 'C', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:16', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:17', quote := false)]);
INSERT INTO INPUT_DATA2 (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '3', quote := false), STRUCT(columnId := 1, text := 'D', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:26', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:27', quote := false)]);
INSERT INTO INPUT_DATA2 (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '3', quote := false), STRUCT(columnId := 1, text := 'E', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:29', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:31', quote := false)]);
INSERT INTO INPUT_DATA2 (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '3', quote := false), STRUCT(columnId := 1, text := 'F', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:29', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:31', quote := false)]);
INSERT INTO INPUT_DATA2 (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '3', quote := false), STRUCT(columnId := 1, text := 'G', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:31', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:32', quote := false)]);

The OUTPUT_DATA2 STREAM contains the results of the aggregation, results we can display via :

SELECT * FROM OUTPUT_DATA2 EMIT CHANGES;

To then break down the generated STRUCT and only manipulate an ARRAY<ARRAY<STRUCT<columnID INT, text VARCHAR, quote BOOLEAN>>>, we can use the following command

It is only possible with the versions 0.17.0 or higher of ksqlDB :

CREATE STREAM CORRECT_DATA2 AS SELECT transform(LINEAG, s => s->DATAARRAY) AS LINEAG FROM OUTPUT_DATA2 EMIT CHANGES;

Now, we can display its result with :

SELECT * FROM CORRECT_DATA2 EMIT CHANGES;

With this example, the data in the INPUT_DATA2 STREAM corresponds to :

DATAARRAY
[{COLUMNID=0, TEXT=3, QUOTE=false}, {COLUMNID=1, TEXT=A, QUOTE=true}, {COLUMNID=2, TEXT=10/10/10 08:05, QUOTE=false}, {COLUMNID=3, TEXT=10/10/10 08:10, QUOTE=false}]
[{COLUMNID=0, TEXT=3, QUOTE=false}, {COLUMNID=1, TEXT=B, QUOTE=true}, {COLUMNID=2, TEXT=10/10/10 08:15, QUOTE=false}, {COLUMNID=3, TEXT=10/10/10 08:16, QUOTE=false}]
[{COLUMNID=0, TEXT=3, QUOTE=false}, {COLUMNID=1, TEXT=C, QUOTE=true}, {COLUMNID=2, TEXT=10/10/10 08:16, QUOTE=false}, {COLUMNID=3, TEXT=10/10/10 08:17, QUOTE=false}]
[{COLUMNID=0, TEXT=3, QUOTE=false}, {COLUMNID=1, TEXT=D, QUOTE=true}, {COLUMNID=2, TEXT=10/10/10 08:26, QUOTE=false}, {COLUMNID=3, TEXT=10/10/10 08:27, QUOTE=false}]
[{COLUMNID=0, TEXT=3, QUOTE=false}, {COLUMNID=1, TEXT=E, QUOTE=true}, {COLUMNID=2, TEXT=10/10/10 08:29, QUOTE=false}, {COLUMNID=3, TEXT=10/10/10 08:31, QUOTE=false}]
[{COLUMNID=0, TEXT=3, QUOTE=false}, {COLUMNID=1, TEXT=F, QUOTE=true}, {COLUMNID=2, TEXT=10/10/10 08:29, QUOTE=false}, {COLUMNID=3, TEXT=10/10/10 08:31, QUOTE=false}]
[{COLUMNID=0, TEXT=3, QUOTE=false}, {COLUMNID=1, TEXT=G, QUOTE=true}, {COLUMNID=2, TEXT=10/10/10 08:31, QUOTE=false}, {COLUMNID=3, TEXT=10/10/10 08:32, QUOTE=false}]

After the aggregation, the result in the OUTPUT_DATA2 STREAM should be as follows. The third row only appears after 30 seconds, which corresponds to the threshold.timeoutInSeconds property :

LINEAG
[{DATAARRAY=[{COLUMNID=0, TEXT=3, QUOTE=false}, {COLUMNID=1, TEXT=A, QUOTE=true}, {COLUMNID=2, TEXT=10/10/10 08:05, QUOTE=false}, {COLUMNID=3, TEXT=10/10/10 08:10, QUOTE=false}]}, {DATAARRAY=[{COLUMNID=0, TEXT=3, QUOTE=false}, {COLUMNID=1, TEXT=B, QUOTE=true}, {COLUMNID=2, TEXT=10/10/10 08:15, QUOTE=false}, {COLUMNID=3, TEXT=10/10/10 08:16, QUOTE=false}]}, {DATAARRAY=[{COLUMNID=0, TEXT=3, QUOTE=false}, {COLUMNID=1, TEXT=C, QUOTE=true}, {COLUMNID=2, TEXT=10/10/10 08:16, QUOTE=false}, {COLUMNID=3, TEXT=10/10/10 08:17, QUOTE=false}]}]
[{DATAARRAY=[{COLUMNID=0, TEXT=3, QUOTE=false}, {COLUMNID=1, TEXT=D, QUOTE=true}, {COLUMNID=2, TEXT=10/10/10 08:26, QUOTE=false}, {COLUMNID=3, TEXT=10/10/10 08:27, QUOTE=false}]}, {DATAARRAY=[{COLUMNID=0, TEXT=3, QUOTE=false}, {COLUMNID=1, TEXT=E, QUOTE=true}, {COLUMNID=2, TEXT=10/10/10 08:29, QUOTE=false}, {COLUMNID=3, TEXT=10/10/10 08:31, QUOTE=false}]}, {DATAARRAY=[{COLUMNID=0, TEXT=3, QUOTE=false}, {COLUMNID=1, TEXT=F, QUOTE=true}, {COLUMNID=2, TEXT=10/10/10 08:29, QUOTE=false}, {COLUMNID=3, TEXT=10/10/10 08:31, QUOTE=false}]}]
[{DATAARRAY=[{COLUMNID=0, TEXT=3, QUOTE=false}, {COLUMNID=1, TEXT=G, QUOTE=true}, {COLUMNID=2, TEXT=10/10/10 08:31, QUOTE=false}, {COLUMNID=3, TEXT=10/10/10 08:32, QUOTE=false}]}]

Finally, the result in the CORRECT_DATA2 STREAM should be :

LINEAG
[[{COLUMNID=0, TEXT=3, QUOTE=false}, {COLUMNID=1, TEXT=A, QUOTE=true}, {COLUMNID=2, TEXT=10/10/10 08:05, QUOTE=false}, {COLUMNID=3, TEXT=10/10/10 08:10, QUOTE=false}], [{COLUMNID=0, TEXT=3, QUOTE=false}, {COLUMNID=1, TEXT=B, QUOTE=true}, {COLUMNID=2, TEXT=10/10/10 08:15, QUOTE=false}, {COLUMNID=3, TEXT=10/10/10 08:16, QUOTE=false}], [{COLUMNID=0, TEXT=3, QUOTE=false}, {COLUMNID=1, TEXT=C, QUOTE=true}, {COLUMNID=2, TEXT=10/10/10 08:16, QUOTE=false}, {COLUMNID=3, TEXT=10/10/10 08:17, QUOTE=false}]]
[[{COLUMNID=0, TEXT=3, QUOTE=false}, {COLUMNID=1, TEXT=D, QUOTE=true}, {COLUMNID=2, TEXT=10/10/10 08:26, QUOTE=false}, {COLUMNID=3, TEXT=10/10/10 08:27, QUOTE=false}], [{COLUMNID=0, TEXT=3, QUOTE=false}, {COLUMNID=1, TEXT=E, QUOTE=true}, {COLUMNID=2, TEXT=10/10/10 08:29, QUOTE=false}, {COLUMNID=3, TEXT=10/10/10 08:31, QUOTE=false}], [{COLUMNID=0, TEXT=3, QUOTE=false}, {COLUMNID=1, TEXT=F, QUOTE=true}, {COLUMNID=2, TEXT=10/10/10 08:29, QUOTE=false}, {COLUMNID=3, TEXT=10/10/10 08:31, QUOTE=false}]]
[[{COLUMNID=0, TEXT=3, QUOTE=false}, {COLUMNID=1, TEXT=G, QUOTE=true}, {COLUMNID=2, TEXT=10/10/10 08:31, QUOTE=false}, {COLUMNID=3, TEXT=10/10/10 08:32, QUOTE=false}]]

Here, the first two rows correspond to the aggregation of the first 6 lines of INPUT_DATA2 as the threshold.elementNumber property of the connector was equal to 3 (so it makes 2 rows with 3 lines in each aggregation), and the third row corresponds to the aggregation of only the last line of INPUT_DATA2 as the threshold.timeoutInSeconds property of the connector is equal to 30 and in the 30 seconds that followed the flush of the first result, only "[{COLUMNID=0, TEXT=3, QUOTE=false}, {COLUMNID=1, TEXT=G, QUOTE=true}, {COLUMNID=2, TEXT=10/10/10 08:31, QUOTE=false}, {COLUMNID=3, TEXT=10/10/10 08:32, QUOTE=false}]" was received by the connector

Here, the first two rows correspond to the aggregation of the first six lines of INPUT_DATA2 because the threshold.elementNumber property of the connector was set to 3. This results in two rows with three lines in each aggregation. The third row corresponds to the aggregation of only the last line of INPUT_DATA2, as the threshold.timeoutInSeconds property of the connector is set to 30. Within the 30 seconds following the flush of the first result, only "[{COLUMNID=0, TEXT=3, QUOTE=false}, {COLUMNID=1, TEXT=G, QUOTE=true}, {COLUMNID=2, TEXT=10/10/10 08:31, QUOTE=false}, {COLUMNID=3, TEXT=10/10/10 08:32, QUOTE=false}]" was received by the connector.

When the testing is done, you can delete the connector with :

DROP CONNECTOR AGGREGATIONCONNECTORTEST2;