Connector Properties

To set up the connector, specify the following properties (example values provided):

'connector.class' = 'com.igrafx.kafka.sink.aggregation.adapters.AggregationSinkConnector', 
'tasks.max' = '1',
'topics' = 'aggregation_input_topic',
'topicOut' = 'aggregation_output_topic',
'aggregationColumnName' = 'aggregationColumnNameTest',
'threshold.elementNumber' = '6',
'threshold.valuePattern' = '.*regex_example.*',
'threshold.timeoutInSeconds' = 3000,
'bootstrap.servers' = 'broker:29092',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'value.converter' = 'io.confluent.connect.avro.AvroConverter',
'value.converter.schema.registry.url' = 'http://schema-registry:8081'

Certain properties should remain fixed:

  • connector.class (String): Specifies the connector class to be used.
  • key.converter (String): Defines the converter for Kafka record keys.
  • value.converter (String): Defines the converter for Kafka record values.

The other properties can be customized based on your requirements:

  • tasks.max (Int): Number of tasks to instantiate for this connector.
  • topics (String): List of Kafka topics that contain the data to aggregate.
  • topicOut (String): Kafka topic where aggregated results are contained.
  • aggregationColumnName: Name of the column that stores the aggregation result in ksqlDB.
  • threshold.elementNumber (Int): Maximum number of elements in a single aggregation batch; once reached, the aggregation is sent to topicOut.
  • threshold.valuePattern (String): Optional. A regex pattern that, when matched by the incoming record’s value, triggers the aggregation to be published immediately to topicOut, regardless of the element count. If this property is not defined, this threshold will not apply. Note that the pattern will need to align with the format of the SinkRecord (e.g., JSON structure).
  • threshold.timeoutInSeconds (Int): Maximum time (in seconds) since the last aggregation was sent; once exceeded, the current aggregation is pushed to topicOut, even if threshold.elementNumber is not met.
  • bootstrap.servers (String): List of Kafka brokers.
  • value.converter.schema.registry.url: URL for the Kafka Schema Registry.

For more information on regular expressions used in threshold.valuePattern, refer to this regex guide.