iGrafx Aggregation Description
- module : aggregation
- package : com.igrafx.kafka.sink.aggregation
This connector is designed to aggregate multiple records originating from the same partition into a single, structured array. For example, if the incoming data contains two columns with the following types:
LINE1 VARCHAR,
LINE2 VARCHAR
The result of the aggregation will be sent to a Kafka topic in the following format:
LINEAG ARRAY<STRUCT<LINE1 VARCHAR, LINE2 VARCHAR>>
Each record from Kafka will be aggregated with others in an array structure.
In this case, LINEAG is used as the value for the aggregationColumnName connector property, defining the name of the aggregated column.
Here, the aggregation column:
LINEAG ARRAY<STRUCT<...>>
that is appended over the columns of the incoming data to represent the aggregation results. The ARRAY contains the different aggregated results, while the STRUCT preserves the various columns of the input data.
The aggregation is triggered based on several thresholds:
- Element Number: When the number of aggregated elements reaches a specified count, the aggregation result is sent to Kafka.
- Value Pattern: A regular expression (ReGex) pattern can be defined to flush the current aggregation to Kafka if an incoming sink record's value matches this pattern.
- Timeout: After a certain period since the last aggregation was sent, the current aggregated data is pushed to Kafka, even if the element count threshold hasn't been met.
- Retention: This threshold is governed by the retention.ms configuration in the source Kafka topic. It is not set by the user in the connector’s properties but can impact data retention within the aggregation window. See the Retention section below for more details.
Note: The aggregation schema is obtained from the Kafka output topic specified by the topicOut property. Therefore, this topic’s schema must be created before any data is sent to the connector.