Alerting with Kafka using Slack

Before setting up this workflow, make sure to implement the Regular Project Unarchiving Workflow as this is necessary to make sure your project is accessible.

It is possible to send alerts to a preferred destination with Kafka.

Here, we will send a notification to Slack when there is a new version of a CASE ID that matches a specific pattern. Here that pattern is when the duration is greater than 40000000000.

First we must get setup.

We need the Kafka JDBC Source connector, which can be found here.

Note that for this connector to work with Druid we also need the Avatica Connector. It can be found here. Make sure to take the Shaded version.

Then, add the following line to the volumes section of the connect container in the docker-compose.yml file:

      - ./avatica-1.25.0.jar:/usr/share/java/kafka-connect-jdbc/avatica-1.25.0.jar

You must then add this variable to the environment section of the connect container in the docker-compose.yml file:

      CLASSPATH: /usr/share/java/kafka-connect-jdbc/avatica-1.25.0.jar

Now, check the .env file and make sure you have the following versions for CONFLUENT_PLATFORM AND KSQLDB_VERSION/

CONFLUENT_PLATFORM=7.8.0
KSQLDB_VERSION=0.29.0

You must now retrieve the Camel Slack Sink Kafka connector, which can be found here. You can also find the matching documentation here.

Place the Camel Slack Sink Kafka connector and the Kafka JDBC Source connector in the docker-compose/connect-plugins/ directory, as referenced by the CONNECT_PLUGIN_PATH variable in the docker-compose.yml.

Place the Avatica Connector at the root of the docker-compose directory.

Before going further you must create a Slack App to be able to retrieve a webhook URL. You can do so here. Under Incoming Webhooks you will find the webhook URL.

Moreover, we can now create the necessary connectors and streams to make it work.

The first step is to create the JDBC Connector. That can be done with the following command:

CREATE SOURCE CONNECTOR jdbc_source_alerts WITH (
'connector.class'= 'io.confluent.connect.jdbc.JdbcSourceConnector',
'errors.log.include.messages'= 'true',
'incrementing.column.name'= 'version',
'connection.password'= '<Workgroup Key>',
'validate.non.null'= 'false',
'tasks.max'= '1',
'query'= 'SELECT * FROM (SELECT caseid, version FROM "<Project ID>" WHERE LOOKUP("case_version", ''<Project ID>'') IS NULL  AND duration>40000000000 GROUP BY caseid, version ORDER BY version ASC)',
'mode'= 'incrementing',
'value.converter.schema.registry.url'= 'http://schema-registry:8081',
'topic.prefix'= 'alerts_',
'connection.user'= '<Workgroup ID>',
'poll.interval.ms'= '3000',
'name'= 'JDBC_SOURCE_ALERTS',
'errors.tolerance'= 'all',
'value.converter'= 'io.confluent.connect.avro.AvroConverter',
'connection.url'= '<JDBC Connection URL>',
'errors.log.enable'= 'true',
'key.converter'= 'io.confluent.connect.avro.AvroConverter',
'key.converter.schema.registry.url'= 'http://schema-registry:8081'
);

Replace the Workgroup Key with your Workgroup Key, Workgroup ID with your Workgroup ID, Project ID with your Project ID and JDBC Connection URL with your JDBC Connection URL. These information may be found in the iGrafx workgroup settings, under the Open API tab.

The SQL query under the query parameter means that the CASEIDs that have a duration greater than 40000000000 will be sent to Kafka.

Now let us create the Camel Slack Sink Kafka connector. That can be done with the following command: `````shell CREATE SINK CONNECTOR camel_slack_sink WITH ( 'connector.class'='org.apache.camel.kafkaconnector.slacksink.CamelSlacksinkSinkConnector', 'camel.kamelet.slack-sink.channel'='#test-alerts', 'camel.kamelet.slack-sink.webhookUrl'='', 'camel.kamelet.slack-sink.iconEmoji'=':bell:', 'camel.kamelet.slack-sink.username'='Kafka Alerts', 'tasks.max'='1', 'topics'='slack_alerts_topic', 'value.converter'='org.apache.kafka.connect.storage.StringConverter', 'key.converter'='org.apache.kafka.connect.storage.StringConverter' ); ``````

Replace the Webhook URL with your Webhook URL.

If this connector is not being detected, make sure your images are up to date. If not please check the docker hub here for the latest version of each image.

With the connectors having been created, we can now create the KSQLDB streams. First do this command:

SET 'auto.offset.reset'='earliest'; 

This command sets the offset to the earliest, ensuring that the CLI reads from the beginning of each topic.

Now we create a stream to read the retrieved cases from the JDBC Connector.

CREATE STREAM alerts_stream WITH (
KAFKA_TOPIC='alerts_',
VALUE_FORMAT='AVRO'
);

Finally we create a stream to send the alerts to the Camel Slack Sink Kafka connector.

CREATE STREAM slack_alerts_stream WITH (
KAFKA_TOPIC='slack_alerts_topic',
VALUE_FORMAT='DELIMITED'
) AS
SELECT
'New alert for case ' + caseid + ' has been detected! ' +
'For more details, visit: ' +
'https://<your-mining-platform>/workgroups/' + CAST('<Workgroup ID>' AS VARCHAR) +
'/projects/<Project ID>/case-explorer/' + caseid AS alert_message
FROM alerts_stream
EMIT CHANGES;

Replace the Workgroup ID with your Workgroup ID, Project ID with your Project ID.

Now when there are updated cases that have a duration greater than 40000000000, the alert will be sent to the Slack channel.

It will look like this:

Image

If you want to check the detected connectors you may do the following:

curl -s http://localhost:8083/connector-plugins | jq