Alerting with Kafka using Mail
Before setting up this workflow, make sure to implement the Regular Project Unarchiving Workflow as this is necessary to make sure your project is accessible.
It is possible to send alerts to a specified email with Kafka.
Here, we will send a notification to a given email when there is a new version of a CASE ID that matches a specific pattern. Here that pattern is when the duration is greater than 40000000000.
First we must get setup.
We need the Kafka JDBC Source connector, which can be found here.
Note that for this connector to work with Druid we also need the Avatica Connector. It can be found here.
Make sure to take the Shaded version. You can place it in the connect-plugins folder of the docker-compose directory.
Then, add the following line to the volumes section of the connect container in the docker-compose.yml file:
- ./avatica-1.25.0.jar:/usr/share/java/kafka-connect-jdbc/avatica-1.25.0.jar
You must then add this variable to the environment section of the connect container in the docker-compose.yml file:
CLASSPATH: /usr/share/java/kafka-connect-jdbc/avatica-1.25.0.jar
Now, check the .env file and make sure you have the following versions for CONFLUENT_PLATFORM AND KSQLDB_VERSION/
CONFLUENT_PLATFORM=7.8.0
KSQLDB_VERSION=0.29.0
You must now retrieve the Camel Mail Sink Kafka Connector, which can be found here. You can also find the matching documentation here.
Place the Camel Mail Sink Kafka Connector and the Kafka JDBC Source connector in the docker-compose/connect-plugins/ directory, as referenced by the CONNECT_PLUGIN_PATH variable in the docker-compose.yml.

Place the Avatica Connector at the root of the docker-compose directory.
Moreover, we can now create the necessary connectors and streams to make it work.
The first step is to create the JDBC Connector. That can be done with the following command:
CREATE SOURCE CONNECTOR jdbc_source_alerts WITH (
'connector.class'= 'io.confluent.connect.jdbc.JdbcSourceConnector',
'errors.log.include.messages'= 'true',
'incrementing.column.name'= 'version',
'connection.password'= '<Workgroup Key>',
'validate.non.null'= 'false',
'tasks.max'= '1',
'query'= 'SELECT * FROM (SELECT caseid, version FROM "<Project ID>" WHERE LOOKUP("case_version", ''<Project ID>'') IS NULL AND duration>40000000000 GROUP BY caseid, version ORDER BY version ASC)',
'mode'= 'incrementing',
'value.converter.schema.registry.url'= 'http://schema-registry:8081',
'topic.prefix'= 'alerts_',
'connection.user'= '<Workgroup ID>',
'poll.interval.ms'= '3000',
'name'= 'JDBC_SOURCE_ALERTS',
'errors.tolerance'= 'all',
'value.converter'= 'io.confluent.connect.avro.AvroConverter',
'connection.url'= '<JDBC Connection URL>',
'errors.log.enable'= 'true',
'key.converter'= 'io.confluent.connect.avro.AvroConverter',
'key.converter.schema.registry.url'= 'http://schema-registry:8081'
);
Replace the Workgroup Key with your Workgroup Key, Workgroup ID with your Workgroup ID, Project ID with your Project ID and JDBC Connection URL with your JDBC Connection URL. These information may be found in the iGrafx workgroup settings, under the Open API tab.
You may also change the query if you desire other information.
The SQL query under the query parameter means that the CASEIDs that have a duration greater than 40000000000 will be sent to Kafka.
Now let us create the Camel Mail Sink Kafka Connector. That can be done with the following command:
`````shell
CREATE SINK CONNECTOR camel_mail_sink WITH (
'connector.class' = 'org.apache.camel.kafkaconnector.mailsink.CamelMailsinkSinkConnector',
'camel.kamelet.mail-sink.connectionHost' = '
'camel.kamelet.mail-sink.connectionPort' = '
Replace the Mail Connection Host with the mail server host (Example: smtp.gmail.com), Mail Username with the username to access the mail box, Mail Password with the password to access the mail box, Mail Connection Port with the mail server port, From email with the sender email and Email Recipient with the recipient email address.
If this connector is not being detected, make sure your images are up to date. If not please check the docker hub here for the latest version of each image.
With the connectors having been created, we can now create the KSQLDB streams. First do this command:
SET 'auto.offset.reset'='earliest';
This command sets the offset to the earliest, ensuring that the CLI reads from the beginning of each topic.
Now we create a stream to read the retrieved cases from the JDBC Connector.
CREATE STREAM alerts_stream WITH (
KAFKA_TOPIC='alerts_',
VALUE_FORMAT='AVRO'
);
Finally we create a stream to send the alerts to the Camel Mail Sink Kafka Connector.
CREATE STREAM email_alerts_stream WITH (
KAFKA_TOPIC = 'email_alerts_topic',
VALUE_FORMAT = 'DELIMITED'
) AS
SELECT
'New alert for case ' + caseid +
' has been detected! For more details, visit: ' +
'https://<Your Mining Platform>/workgroups/' +
CAST('<Your Workgroup ID>' AS VARCHAR) +
'/projects/<Your Project ID>/case-explorer/' + caseid + ' '
AS alert_message
FROM alerts_stream
EMIT CHANGES;
Replace the Workgroup ID with your Workgroup ID, Project ID with your Project ID and Your Mining Platform with your Mining Platform.
Now when there are updated cases that have a duration greater than 40000000000, the alert will be sent to the specified email address.
It will look like this:

If you want to check the detected connectors you may do the following:
curl -s http://localhost:8083/connector-plugins | jq