Skip to content

Full Data Pipeline Example

In this example, we will create a full data pipeline to run within the infrastructure.

The original data comes from two sources provided as 2 separate CSV files, which have a common column.

The illustrated use case comes from the IT ticketing domain, where ServiceNow manages end-user tickets and Jira tracks the related technical issues submitted to developers or technical staff.

Data Pipeline Overview

data_pipeline

Preparation for the pipeline

Source Files

The 2 source files can be found under the path /examples/example_files.

The jira_0_stream.csv file should be placed in the sources/input/jira folder of the Docker Compose module, while the snow_0_stream.csv file should go in the sources/input/snow folder of the same module.

Connectors and UDF Installation

Two different connectors will be required for this test: one is a custom iGrafx connector used to send data from Kafka to a process mining platform, and the other is used to fetch data from CSV files into Kafka.

The infrastructure requires the JARs of both connectors and the JAR containing the custom UDFs to use them.

Connector for Process Mining Platform

You can install the first connector by following the steps outlined in the iGrafx connector section of this document.

Connector for CSV Files

The second connector can be retrieved from the Confluent Hub, where a variety of connectors for common data sources are available.

The connector is named SpoolDirCsvSourceConnector. You can download it by clicking the Download button on this page, which will provide a ZIP archive containing the connector, among other files.

After downloading, extract the contents of the archive and place them in the docker-compose/connect-plugins folder of the Docker Compose project.

Custom UDFs

You can install the custom UDFs by following the steps described in the iGrafx UDFs section of this document.

Steps for the pipeline:

1. Source Connectors

Launch docker compose with the connectors JARs and the Jira/Snow files placed in the appropriate folders. Once it is running, enter the following requests using ksql (either via the ksql CLI or a graphical UI).

Important: The two connectors created below must have at least one file to load into Kafka upon creation. Otherwise, the connector creation will return an error.

Creation of the Connector for the Jira File:

CREATE SOURCE CONNECTOR jira_k WITH (
 'connector.class' = 'com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector',
 'key.converter' = 'io.confluent.connect.avro.AvroConverter',
 'key.converter.schema.registry.url' = 'http://schema-registry:8081',
 'tasks.max'= '1',
 'topic'= 'jira_k',
 'cleanup.policy'= 'MOVE',
 'input.path'= '/data/jira',
 'finished.path'= '/data/processed',
 'error.path'= '/data/error',
 'input.file.pattern'= '^jira_._stream\.csv$',
 'halt.on.error'= 'false',
 'file.buffer.size.bytes'= '1000000',
 'schema.generation.enabled'= 'true',
 'schema.generation.key.fields'= 'Number',
 'schema.generation.key.name'= 'jira_key',
 'schema.generation.value.name'= 'jira_value',
 'csv.keep.carriage.return'= 'false',
 'csv.null.field.indicator'= 'BOTH',
 'csv.first.row.as.header'= 'true',
 'csv.case.sensitive.field.names'= 'true'
);

This connector reads all files matching the ^jira_._stream\.csv$ pattern located in the /data/jira folder. After processing a file, it is moved to either the /data/processed folder or the /data/error folder, depending on the success of the operation. The data from the processed files is stored in the jira_k Kafka topic. In this example, the connector loads data from the jira_0_stream.csv file into the corresponding Kafka topic.

Creation of the Connector for the Snow File:

CREATE SOURCE CONNECTOR snow_k WITH (
 'connector.class' = 'com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector',
 'key.converter' = 'io.confluent.connect.avro.AvroConverter',
 'key.converter.schema.registry.url' = 'http://schema-registry:8081',
 'tasks.max'= '1',
 'topic'= 'snow_k',
 'cleanup.policy'= 'MOVE',
 'input.path'= '/data/snow',
 'finished.path'= '/data/processed',
 'error.path'= '/data/error',
 'input.file.pattern'= '^snow_._stream\.csv$',
 'halt.on.error'= 'false',
 'file.buffer.size.bytes'= '1000000',
 'schema.generation.enabled'= 'true',
 'schema.generation.key.fields'= 'Number',
 'schema.generation.key.name'= 'snow_key',
 'schema.generation.value.name'= 'snow_value',
 'csv.keep.carriage.return'= 'false',
 'csv.null.field.indicator'= 'BOTH',
 'csv.first.row.as.header'= 'true',
 'csv.case.sensitive.field.names'= 'true'
);

This connector reads all files matching the ^snow_._stream\.csv$ pattern located in the /data/snow folder. After processing a file, it is moved to either the /data/processed folder or the /data/error folder, depending on the success of the operation. The data from the processed files is stored in the snow_k Kafka topic. In this example, the connector loads data from the snow_0_stream.csv file into the corresponding Kafka topic.

2. Streams and Data Transformation

Let us create the necessary Streams on top of Kafka topics.

To do that, we create a ksqlDB stream on top of the Kafka topic that has been created by the Jira Connector:

CREATE STREAM JIRA_00
 WITH (
 KAFKA_TOPIC='jira_k',
 PARTITIONS=1,
 REPLICAS=1,
 VALUE_FORMAT='AVRO'
 );

SELECT * FROM JIRA_00 EMIT CHANGES;

And another ksqlDB stream on top of the Kafka topic created by the Snow Connector:

CREATE STREAM SNOW_00
 WITH (
 KAFKA_TOPIC='snow_k',
 PARTITIONS=1,
 REPLICAS=1,
 VALUE_FORMAT='AVRO'
 );

SELECT * FROM SNOW_00 EMIT CHANGES;

Now we remove the unwanted columns and transform the dates.

First we remove the unwanted Jira columns, and transform the dates:

CREATE STREAM JIRA_01 AS
SELECT
    JIRA_KEY,
    JIRA_ID,
    IMPACT AS JIRA_IMPACT,
    TIMESTAMPTOSTRING(STRINGTOTIMESTAMP(REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(CREATION, 'september', '09'), 'january', '01'), 'february', '02'), 'mars', '03'), 'april', '04'), 'may', '05'), 'june', '06'), 'july', '07'), 'august', '08'), 'october', '10'), 'november', '11'), 'december', '12') ,'d M y H:m'), 'dd-MM-yyyy HH:mm:ss' ) AS JIRA_CREATION,
    TIMESTAMPTOSTRING(STRINGTOTIMESTAMP(REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(UPDATED, 'september', '09'), 'january', '01'), 'february', '02'), 'mars', '03'), 'april', '04'), 'may', '05'), 'june', '06'), 'july', '07'), 'august', '08'), 'october', '10'), 'november', '11'), 'december', '12') ,'d M y H:m'), 'dd-MM-yyyy HH:mm:ss' ) AS JIRA_UPDATED,
    TIMESTAMPTOSTRING(STRINGTOTIMESTAMP(REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(RESOLVED, 'september', '09'), 'january', '01'), 'february', '02'), 'mars', '03'), 'april', '04'), 'may', '05'), 'june', '06'), 'july', '07'), 'august', '08'), 'october', '10'), 'november', '11'), 'december', '12') ,'d M y H:m'), 'dd-MM-yyyy HH:mm:ss' ) AS JIRA_RESOLVED,
    NUMBER AS JIRA_NUMBER
FROM JIRA_00
EMIT CHANGES;

SELECT * FROM JIRA_01 EMIT CHANGES;

Now we remove the unwanted Snow columns, and transform the dates:

CREATE STREAM SNOW_01 AS
SELECT
    NUMBER AS SNOW_NUMBER,
    PRIORITY AS SNOW_PRIORITY,
    TICKET_STATUS AS SNOW_TICKET_STATUS,
    UPDATED AS SNOW_UPDATED,
    DATE_LAST_MODIF AS SNOW_DATE_LAST_MODIF,
    DATE_END_HOUR AS SNOW_DATE_END_HOUR,
    DATE_HAPPENING_HOUR AS SNOW_DATE_HAPPENING_HOUR
FROM SNOW_00
EMIT CHANGES;

SELECT * FROM SNOW_01 EMIT CHANGES;

The dates in both the JIRA_01 and SNOW_01 are now coherent as they have the same format.

We must now do a JOIN between the Jira and Snow streams.

A left join is then performed on the JIRA_NUMBER and SNOW_NUMBER columns of the JIRA_01 and SNOW_01 streams.

Because of the left join, events with a JIRA_NUMBER value that do not match any SNOW_NUMBER value are discarded, while events with a SNOW_NUMBER value that do not correspond to any JIRA_NUMBER still appear but have null values for the Jira fields.

In ksqlDB, a WITHIN clause is required when joining two streams. Events from the two streams will only match if their NUMBER values are equal and if they occur within the same time interval (600 minutes in this case, based on the records' timestamps).

For more information on joining collections in ksqlDB, see the ksqlDB documentation on joins.

CREATE STREAM SNOW_JIRA_00 AS
SELECT
    snow.SNOW_NUMBER,
    SNOW_PRIORITY,
    SNOW_TICKET_STATUS,
    SNOW_UPDATED,
    SNOW_DATE_LAST_MODIF,
    SNOW_DATE_END_HOUR,
    SNOW_DATE_HAPPENING_HOUR,
    jira.JIRA_KEY,
    JIRA_ID,
    JIRA_IMPACT,
    JIRA_CREATION,
    JIRA_UPDATED,
    JIRA_RESOLVED,
    jira.JIRA_NUMBER
FROM SNOW_01 snow
LEFT JOIN JIRA_01 jira WITHIN 600 MINUTES ON snow.SNOW_NUMBER = jira.JIRA_NUMBER
EMIT CHANGES;

SELECT * FROM SNOW_JIRA_00 EMIT CHANGES;

Now it's time to apply a transposition function to the data. This function is implemented as a User Defined Function (UDF) and was specifically created to support iGrafx's data processing needs. You can get more information about this function by running the following command:

DESCRIBE FUNCTION igrafx_transposition;

The next two streams, TRANSPOSE1_JIRA and TRANSPOSE1_SNOW, are not yet invoking the UDF but are instead preparing the data for it. The UDF expects the following format for its parameter:

STRUCT<TASK VARCHAR(STRING), TIME VARCHAR(STRING)>

Here, we create the TRANSPOSE1_JIRA and TRANSPOSE1_SNOW streams:

CREATE STREAM TRANSPOSE1_JIRA AS
SELECT
    SNOW_NUMBER,
    ARRAY[
        STRUCT(task := 'jira_Creation', time := JIRA_CREATION),
        STRUCT(task := 'jira_Updated', time := JIRA_UPDATED),
        STRUCT(task := 'jira_Resolved', time := JIRA_RESOLVED)
    ] AS steps,
    SNOW_PRIORITY,
    SNOW_TICKET_STATUS,
    SNOW_UPDATED,
    SNOW_DATE_LAST_MODIF,
    SNOW_DATE_END_HOUR,
    SNOW_DATE_HAPPENING_HOUR,
    JIRA_KEY,
    JIRA_ID,
    JIRA_IMPACT
FROM SNOW_JIRA_00
EMIT CHANGES;

SELECT * FROM TRANSPOSE1_JIRA EMIT CHANGES;
CREATE STREAM TRANSPOSE1_SNOW AS
SELECT
    SNOW_NUMBER,
    ARRAY[
        STRUCT(task := 'Snow_Updated', time := SNOW_UPDATED),
        STRUCT(task := 'Snow_Date_last_modif', time := SNOW_DATE_LAST_MODIF),
        STRUCT(task := 'Snow_Date_end_hour', time := SNOW_DATE_END_HOUR),
        STRUCT(task := 'Snow_Date_happening_hour', time := SNOW_DATE_HAPPENING_HOUR)
    ] AS steps,
    SNOW_PRIORITY,
    SNOW_TICKET_STATUS,
    SNOW_UPDATED,
    SNOW_DATE_LAST_MODIF,
    SNOW_DATE_END_HOUR,
    SNOW_DATE_HAPPENING_HOUR
FROM SNOW_01
EMIT CHANGES;

SELECT * FROM TRANSPOSE1_SNOW EMIT CHANGES;

Note that in this case, the TRANSPOSE1_JIRA stream prepares the dates for Jira events that have a corresponding NUMBER in the Snow data, while the TRANSPOSE1_SNOW stream prepares the dates for every Snow event.

We can now call the UDF for the Jira dates:

CREATE STREAM TRANSPOSE2_JIRA AS
SELECT
    SNOW_NUMBER,
    igrafx_transposition(steps) AS steps,
    SNOW_PRIORITY,
    SNOW_TICKET_STATUS
FROM TRANSPOSE1_JIRA
EMIT CHANGES;

SELECT * FROM TRANSPOSE2_JIRA EMIT CHANGES;

And the UDF for the Snow dates:

CREATE STREAM TRANSPOSE2_SNOW AS
SELECT
    SNOW_NUMBER,
    igrafx_transposition(steps) AS steps,
    SNOW_PRIORITY,
    SNOW_TICKET_STATUS
FROM TRANSPOSE1_SNOW
EMIT CHANGES;

SELECT * FROM TRANSPOSE2_SNOW EMIT CHANGES;

Since the UDF returns data in a single column with the format STRUCT<TASK VARCHAR(STRING), TIME VARCHAR(STRING)>, we now extract the TASK and TIME fields into separate columns for both Jira and Snow data.

CREATE STREAM SNOW_JIRA_01 AS
SELECT
    SNOW_NUMBER,
    steps->TASK ,
    steps->TIME,
    SNOW_PRIORITY AS PRIORITY,
    SNOW_TICKET_STATUS AS TICKET_STATUS
FROM TRANSPOSE2_JIRA
EMIT CHANGES;

SELECT * FROM SNOW_JIRA_01 EMIT CHANGES;

CREATE STREAM SNOW_JIRA_02 AS
SELECT
    SNOW_NUMBER,
    steps->TASK ,
    steps->TIME,
    SNOW_PRIORITY AS PRIORITY,
    SNOW_TICKET_STATUS AS TICKET_STATUS
FROM TRANSPOSE2_SNOW
PARTITION BY SNOW_NUMBER
EMIT CHANGES;

SELECT * FROM SNOW_JIRA_02 EMIT CHANGES;

Now, Jira and Snow data can be merged together, with both of them adding information about their related dates.

CREATE STREAM SNOW_JIRA_UNION_00 (
    SNOW_NUMBER VARCHAR KEY,
    TASK VARCHAR,
    TIME VARCHAR,
    PRIORITY VARCHAR,
    TICKET_STATUS VARCHAR
) WITH (
    kafka_topic='SNOW_JIRA_UNION_00',
    partitions=1,
    value_format='avro'
);

insert INTO SNOW_JIRA_UNION_00 SELECT * FROM SNOW_JIRA_02 EMIT CHANGES;

insert INTO SNOW_JIRA_UNION_00 SELECT * FROM SNOW_JIRA_01 EMIT CHANGES;

SELECT * FROM SNOW_JIRA_UNION_00 EMIT CHANGES;
`````

# 3. Sending data to the iGrafx Mining platform

The last stream is used to prepare the data for the connector in the correct format.

CREATE STREAM IGRAFX_00 WITH (kafka_topic = 'igrafx_00', partitions = 1, value_format = 'AVRO') AS SELECT SNOW_NUMBER, ARRAY[ STRUCT(columnId := 0, text := SNOW_NUMBER, quote := false), STRUCT(columnId := 1, text := TASK , quote := false), STRUCT(columnId := 2, text := TIME , quote := false), STRUCT(columnId := 3, text := PRIORITY, quote := false), STRUCT(columnId := 4, text := TICKET_STATUS, quote := false) ] AS dataArray FROM SNOW_JIRA_UNION_00 EMIT CHANGES;

SELECT * FROM IGRAFX_00 EMIT CHANGES; `````

In the following command, the values of the api.url, api.authUrl, workGroupId, workGroupKey, projectId properties must be changed. The first four values can be found on the iGrafx Mining platform, in the OpenAPI section of the Workgroup settings.

Note that the following command will work in KsqlDB - CLI. To make it work in Kafka-UI, you will have to remove the double escape character from csv.escape and csv.endOfLine: ````aiignore 'csv.endOfLine' = '\n',

'csv.escape' = '\', ```` We can now create the final element of the pipeline with the following command:

CREATE SINK CONNECTOR IGrafxConnectorCM WITH (
 'connector.class' = 'com.igrafx.kafka.sink.aggregationmain.domain.IGrafxAggregationSinkConnector',
 'tasks.max' = '1',
 'topics' = 'igrafx_00',
 'api.url' = '???',
 'api.authUrl' = '???',
 'workGroupId' = '???',
 'workGroupKey' = '???',
 'projectId' = '???',
 'csv.encoding' = 'UTF-8',
 'csv.separator' = ';',
 'csv.quote' = '"',
 'csv.fieldsNumber' = '5',
 'csv.header' = 'true',
 'csv.defaultTextValue' = '',
 'retentionTimeInDay' = '100',
 'columnMapping.create' = 'true',
 'columnMapping.caseIdColumnIndex' = '0',
 'columnMapping.activityColumnIndex' = '1',
 'columnMapping.timeInformationList' = '{2;dd-MM-yyyy HH:mm:ss}',
 'columnMapping.dimensionsInformationList' = '[{"columnIndex": 3, "name": "PRIORITY", "isCaseScope": true, "aggregation": "FIRST"},{"columnIndex": 4, "name": "TICKET_STATUS", "isCaseScope": true, "aggregation": "FIRST"}]',
 'csv.endOfLine' = '\\n',
 'csv.escape' = '\\',
 'csv.comment' = '#',
 'threshold.elementNumber' = '6575',
 'threshold.timeoutInSeconds' = 20,
 'bootstrap.servers' = 'broker:29092',
 'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
 'value.converter' = 'io.confluent.connect.avro.AvroConverter',
 'value.converter.schema.registry.url' = 'http://schema-registry:8081'
);

As there are less than 6575 events, the timeout threshold will be used, and the data will be sent to the Process Mining Platform after 20 seconds.