Alerting on Predicted Data with Kafka using Mail
Before setting up this workflow, make sure to implement the Regular Project Unarchiving Workflow as this is necessary to make sure your project is accessible.
It is possible to use predicted data to do comparisons and send alerts to a specified email with Kafka.
Here, we want to compare the predicted end date with the given deadline dimension. If the predicted end date is greater than the deadline dimension, then we send an alert.
Don't forget to set the business rule to get the prediction going.
First we must get setup.
We need the Kafka JDBC Source connector, which can be found here.
Note that for this connector to work with Druid we also need the Avatica Connector. It can be found here.
Make sure to take the Shaded version. You can place it in the connect-plugins folder of the docker-compose directory.
Then, add the following line to the volumes section of the connect container in the docker-compose.yml file:
- ./avatica-1.25.0.jar:/usr/share/java/kafka-connect-jdbc/avatica-1.25.0.jar
You must then add this variable to the environment section of the connect container in the docker-compose.yml file:
CLASSPATH: /usr/share/java/kafka-connect-jdbc/avatica-1.25.0.jar
Now, check the .env file and make sure you have the following versions for CONFLUENT_PLATFORM AND KSQLDB_VERSION/
CONFLUENT_PLATFORM=7.8.0
KSQLDB_VERSION=0.29.0
You must now retrieve the Camel Mail Sink Kafka Connector, which can be found here. You can also find the matching documentation here.
Place the Camel Mail Sink Kafka Connector and the Kafka JDBC Source connector in the docker-compose/connect-plugins/ directory, as referenced by the CONNECT_PLUGIN_PATH variable in the docker-compose.yml.

Place the Avatica Connector at the root of the docker-compose directory.
Now that everything is set up, we can now create the necessary connectors and streams to make it work.
The first step is to create the JDBC Connector. That can be done with the following command:
CREATE SOURCE CONNECTOR jdbc_source_alerts WITH (
'connector.class' = 'io.confluent.connect.jdbc.JdbcSourceConnector',
'errors.log.include.messages' = 'true',
'incrementing.column.name' = 'version',
'connection.password' = '<Workgroup Key>',
'validate.non.null' = 'false',
'tasks.max' = '1',
'query' = 'WITH predicted_cases AS (SELECT enddate, caseid, version FROM "<Project ID>" WHERE is_predicted = TRUE AND LOOKUP("case_version", ''<Project ID>_excluded_cases'') IS NULL), real_data AS (SELECT caseid, <Deadline Dimension>, version FROM "<Project ID>" WHERE <Deadline Dimension> IS NOT NULL AND (is_predicted IS NULL OR is_predicted = FALSE) AND LOOKUP("case_version", ''<Project ID>_excluded_cases'') IS NULL) SELECT * FROM (SELECT p.caseid, p.version FROM predicted_cases p JOIN real_data r ON p.caseid = r.caseid AND p.version = r.version WHERE TIME_FLOOR(MILLIS_TO_TIMESTAMP(CAST(p.enddate AS BIGINT)), ''P1D'') > TIME_PARSE(r.<Deadline Dimension>, ''dd/MM/yyyy''))',
'mode' = 'incrementing',
'value.converter.schema.registry.url' = 'http://schema-registry:8081',
'topic.prefix' = 'alerts_',
'connection.user' = '<Workgroup ID>',
'poll.interval.ms' = '3000',
'name' = 'JDBC_SOURCE_ALERTS',
'errors.tolerance' = 'all',
'value.converter' = 'io.confluent.connect.avro.AvroConverter',
'connection.url' = '<JDBC Connection URL>',
'errors.log.enable' = 'true',
'key.converter' = 'io.confluent.connect.avro.AvroConverter',
'key.converter.schema.registry.url' = 'http://schema-registry:8081'
);
Replace the Workgroup Key with your Workgroup Key, Workgroup ID with your Workgroup ID, Project ID with your Project ID and JDBC Connection URL with your JDBC Connection URL. These information may be found in the iGrafx workgroup settings, under the Open API tab. The Deadline Dimension is the dimension that we want to compare with the predicted end date.
The SQL query under the query parameter means that the CASEIDs for which the predicted end date is after the deadline date are selected.
Now let us create the Camel Mail Sink Kafka Connector. That can be done with the following command:
`````shell
CREATE SINK CONNECTOR camel_mail_sink WITH (
'connector.class' = 'org.apache.camel.kafkaconnector.mailsink.CamelMailsinkSinkConnector',
'camel.kamelet.mail-sink.connectionHost' = '
'camel.kamelet.mail-sink.connectionPort' = '
Replace the Mail Connection Host with the mail server host (Example: smtp.gmail.com), Mail Username with the username to access the mail box, Mail Password with the password to access the mail box, Mail Connection Port with the mail server port, From email with the sender email and Email Recipient with the recipient email address. The From email and Email Recipient and Mail Username can both be the same.
If this connector is not being detected, make sure your images are up to date. If not please check the docker hub here for the latest version of each image.
With the connectors having been created, we can now create the KSQLDB streams. First do this command:
SET 'auto.offset.reset'='earliest';
This command sets the offset to the earliest, ensuring that the CLI reads from the beginning of each topic.
Now we create a stream to read the retrieved cases from the JDBC Connector.
CREATE STREAM alerts_stream WITH (
KAFKA_TOPIC='alerts_',
VALUE_FORMAT='AVRO'
);
Finally, we create a stream to send the alerts to the Camel Mail Sink Kafka Connector.
CREATE STREAM email_alerts_stream WITH (
KAFKA_TOPIC = 'email_alerts_topic',
VALUE_FORMAT = 'KAFKA'
) AS
SELECT
'⚠️ Alert ⚠️
Case '+ caseid + ' is predicted to exceed its deadline.
For more details, visit:
' + 'https://<Your Mining Platform>/workgroups/' +
CAST('<Your Workgroup ID>' AS VARCHAR) +
'/projects/<Your Project ID>/case-explorer/' + REPLACE(REPLACE(CASEID, ' ', '%20'), '#', '%23')
AS alert_message
FROM alerts_stream
EMIT CHANGES;
Replace the Workgroup ID with your Workgroup ID, Project ID with your Project ID and Your Mining Platform with your Mining Platform.
Now when there are updated cases that have a duration greater than 40000000000, the alert will be sent to the specified email address.
By setting the VALUE_FORMAT to KAFKA, the alert will be sent as a Kafka message to the specified topic, avoiding double quotes.
Furthermore, the REPLACE(REPLACE(CASEID, ' ', '%20'), '#', '%23') is necessary here because the case id contains both a space and the character # which is not allowed in the URL.
This command makes sure that the space is replaced with %20 and the # is replaced with %23, so that the URL is correctly formatted .
If the CASE ID does not contain a space or a
#character, this command is not necessary and can simply be replaced by+ caseid.
When the alert is sent, the following message will be sent to the email address:

You can then click on the link to see the case details.