Skip to content

iGrafx Transposition Example

Variation 1

Consider the following data as input :

Case Step 1 Step 2 Step 3 Step 4 Step 5
case 1 12/01/2020 14/01/2020 15/01/2020 16/01/2020 17/01/2020
case 2 14/02/2020 15/02/2020 18/02/2020 18/02/2020
case 3 17/03/2020 24/03/2020

We expect the following output :

Case Task Timestamp
case 1 Step 1 12/01/2020
case 1 Step 2 14/01/2020
case 1 Step 3 15/01/2020
case 1 Step 4 16/01/2020
case 1 Step 5 17/01/2020
case 2 Step 1 14/02/2020
case 2 Step 2 15/02/2020
case 2 Step 3 18/02/2020
case 2 Step 5 18/02/2020
case 3 Step 1 17/03/2020
case 3 Step 3 24/03/2020

Nevertheless, the UDF doesn’t allow us to directly transition from the first table to the second one, so we need to use intermediate ksqlDB streams.

The UDF takes as input a Java List of type STRUCT<TASK VARCHAR(STRING), TIME VARCHAR(STRING)>, where the list represents a single row and each element in the list corresponds to one column in that row. For example, the first element of the list (which corresponds to the first row of the example) might be: STRUCT<Step 1, 12/01/2020>.

Here, the output type is identical to the input type because it is of type Tabular. This instructs ksqlDB to create a row for each element in the output list. The first variation simply removes elements with an empty or null TIME field from the input list.

To test this variation directly in ksqlDB, launch ksqlDB. Then, use the following commands:

  • Create the initial stream representing rows with columns, where each row follows the format: Case | Step 1 | Step 2 | Step 3 | Step 4 | Step 5
CREATE STREAM s1 (
    case VARCHAR,
    step1 VARCHAR,
    step2 VARCHAR,
    step3 VARCHAR,
    step4 VARCHAR,
    step5 VARCHAR
) WITH (
        kafka_topic = 's1',
    partitions = 1,
    value_format = 'avro'
);
  • Next, create the stream that prepares the call to the UDF. Here, an ARRAY<STRUCT(...), STRUCT(...), ...> corresponds to the UDF input parameter, with ARRAY being the ksqlDB type that maps to Java's List.
CREATE STREAM s2 AS SELECT 
    case, 
    ARRAY[STRUCT(task := 'Step 1', time := step1), 
          STRUCT(task := 'Step 2', time := step2), 
          STRUCT(task := 'Step 3', time := step3), 
          STRUCT(task := 'Step 4', time := step4), 
          STRUCT(task := 'Step 5', time := step5)] AS steps
    FROM s1 EMIT CHANGES;
  • Now create the STREAM that is going to call the igrafx_transposition UDF
CREATE STREAM s3 AS SELECT 
    case, 
    igrafx_transposition(steps) AS steps 
    FROM s2 EMIT CHANGES;

With the previous example, the s3 STREAM contains the following data :

case steps
case 1 STRUCT
case 1 STRUCT
case 1 STRUCT
case 1 STRUCT
case 1 STRUCT
case 2 STRUCT
case 2 STRUCT
case 2 STRUCT
case 2 STRUCT
case 3 STRUCT
case 3 STRUCT

Furthermore, we create the final STREAM which deconstructs the STRUCT from s3 into two different columns:

CREATE STREAM s4 AS SELECT 
    case, 
    steps->task AS activity, 
    steps->time AS timestamp 
    FROM s3 EMIT CHANGES;

Once all the STREAMS have been created, it is possible to add the data, and here each INSERT represents a row:

INSERT INTO s1 (case, step1, step2, step3, step4, step5) VALUES ('case 1', '12/01/2020', '14/01/2020', '15/01/2020', '16/01/2020', '17/01/2020');
INSERT INTO s1 (case, step1, step2, step3, step4, step5) VALUES ('case 2', '14/02/2020', '15/02/2020', '18/02/2020', '', '18/02/2020');
INSERT INTO s1 (case, step1, step2, step3, step4, step5) VALUES ('case 3', '17/03/2020', '', '24/03/2020', '', '');

INSERT INTO s1 (case, step1, step2, step3, step4, step5) VALUES ('case 4', '17/03/2020', '25/03/2020', '', '16/03/2020', '24/03/2020');
INSERT INTO s1 (case, step1, step2, step3, step4, step5) VALUES ('case 5', '', '', '', '16/03/2020', '');
INSERT INTO s1 (case, step1, step2, step3, step4, step5) VALUES ('case 6', '', '', '', '', '');

INSERT INTO s1 (case, step1, step2, step3, step4, step5) VALUES ('case 7', '17/03/2020', '17/03/2020', '17/03/2020', '17/03/2020', '17/03/2020');
INSERT INTO s1 (case, step1, step2, step3, step4, step5) VALUES ('cas e8', '17/03/2020', '16/03/2020', '17/03/2020', '18/03/2020', '17/03/2020');

Finally, we can display the final result in ksqlDB with :

SELECT case, activity, timestamp FROM s4 EMIT CHANGES;

Variation 2

Let us consider this case as input :

Case Step 1 Step 2 Step 3 Step 4 Step 5
case 1 17/03/2020 16/03/2020 17/03/2020 18/03/2020 17/03/2020

We get this output if isStartInformation = true and isTaskNameAscending = true :

Case Activity Start End
case 1 Step 2 16/03/2020 17/03/2020
case 1 Step 1 17/03/2020 17/03/2020
case 1 Step 3 17/03/2020 17/03/2020
case 1 Step 5 17/03/2020 18/03/2020
case 1 Step 4 18/03/2020 18/03/2020

Otherwise, we get this output if isStartInformation = true and isTaskNameAscending = false :

Case Activity Start End
case 1 Step 2 16/03/2020 17/03/2020
case 1 Step 5 17/03/2020 17/03/2020
case 1 Step 3 17/03/2020 17/03/2020
case 1 Step 1 17/03/2020 18/03/2020
case 1 Step 4 18/03/2020 18/03/2020

We get the following output if isStartInformation = false and isTaskNameAscending = true :

Case Activity Start End
case 1 Step 2 16/03/2020 16/03/2020
case 1 Step 1 16/03/2020 17/03/2020
case 1 Step 3 17/03/2020 17/03/2020
case 1 Step 5 17/03/2020 17/03/2020
case 1 Step 4 17/03/2020 18/03/2020

We get the following output if isStartInformation = false and isTaskNameAscending = false :

Case Activity Start End
case 1 Step 2 16/03/2020 16/03/2020
case 1 Step 5 16/03/2020 17/03/2020
case 1 Step 3 17/03/2020 17/03/2020
case 1 Step 1 17/03/2020 17/03/2020
case 1 Step 4 17/03/2020 18/03/2020

To test this variation directly in ksqlDB, we first need to launch ksqlDB, then, write the following commands :

The first two STREAMS are the same as the ones in the first variation.

  • Create the initial stream representing rows with columns, where each row follows the format: Case | Step 1 | Step 2 | Step 3 | Step 4 | Step 5
CREATE STREAM s1 (
    case VARCHAR,
    step1 VARCHAR,
    step2 VARCHAR,
    step3 VARCHAR,
    step4 VARCHAR,
    step5 VARCHAR
) WITH (
        kafka_topic = 's1',
    partitions = 1,
    value_format = 'avro'
);
  • Next, create the stream that prepares the call to the UDF. Here, an ARRAY<STRUCT(...), STRUCT(...), ...> corresponds to the UDF input parameter, with ARRAY being the ksqlDB type that maps to Java's List.
CREATE STREAM s2 AS SELECT 
    case, 
    ARRAY[STRUCT(task := 'Step 1', time := step1), 
          STRUCT(task := 'Step 2', time := step2), 
          STRUCT(task := 'Step 3', time := step3), 
          STRUCT(task := 'Step 4', time := step4), 
          STRUCT(task := 'Step 5', time := step5)] AS steps
    FROM s1 EMIT CHANGES;
  • Now we create the STREAM that is going to call the iGrafx transposition UDF function.
CREATE STREAM s3 AS SELECT 
    case, 
    igrafx_transposition(steps, "dd/MM/yyyy", true, true) AS steps 
    FROM s2 EMIT CHANGES;

or

CREATE STREAM s3 AS SELECT 
    case, 
    igrafx_transposition(steps, "dd/MM/yyyy", true, false) AS steps 
    FROM s2 EMIT CHANGES;

or

CREATE STREAM s3 AS SELECT 
    case, 
    igrafx_transposition(steps, "dd/MM/yyyy", false, true) AS steps 
    FROM s2 EMIT CHANGES;

or

CREATE STREAM s3 AS SELECT 
    case, 
    igrafx_transposition(steps, "dd/MM/yyyy", false, false) AS steps 
    FROM s2 EMIT CHANGES;
  • Now we create the final STREAM which deconstructs the STRUCT from s3 into 4 different columns:
CREATE STREAM s4 AS SELECT 
    case, 
    steps->task AS activity, 
    steps->start AS start_date,
    steps->stop AS end_date 
    FROM s3 EMIT CHANGES;

Once all the STREAMS have been created, we can add the data. Here each INSERT represents to a row.

INSERT INTO s1 (case, step1, step2, step3, step4, step5) VALUES ('case 1', '12/01/2020', '14/01/2020', '15/01/2020', '16/01/2020', '17/01/2020');
INSERT INTO s1 (case, step1, step2, step3, step4, step5) VALUES ('case 2', '14/02/2020', '15/02/2020', '18/02/2020', '', '18/02/2020');
INSERT INTO s1 (case, step1, step2, step3, step4, step5) VALUES ('case 3', '17/03/2020', '', '24/03/2020', '', '');

INSERT INTO s1 (case, step1, step2, step3, step4, step5) VALUES ('case 4', '17/03/2020', '25/03/2020', '', '16/03/2020', '24/03/2020');
INSERT INTO s1 (case, step1, step2, step3, step4, step5) VALUES ('case 5', '', '', '', '16/03/2020', '');
INSERT INTO s1 (case, step1, step2, step3, step4, step5) VALUES ('case 6', '', '', '', '', '');

INSERT INTO s1 (case, step1, step2, step3, step4, step5) VALUES ('case 7', '17/03/2020', '17/03/2020', '17/03/2020', '17/03/2020', '17/03/2020');
INSERT INTO s1 (case, step1, step2, step3, step4, step5) VALUES ('case 8', '17/03/2020', '16/03/2020', '17/03/2020', '18/03/2020', '17/03/2020');

Finally, we can display the final result with :

SELECT cas, activite, debut, fin FROM s4 EMIT CHANGES;

3rd UDF example

Let us consider that the initial row corresponds to :

Case Step 1 Step 2 Step 3 Step 4 Step 5 Total Price
case 1 12/01/2020 14/01/2020 15/01/2020 16/01/2020 17/01/2020 240

An information is given about the Total Price related to the process. In case of use of the UDF (for instance with the first variation), if the created STREAMS keep the information of the Total Price column, then the Total Price is present for each of the created rows:

CREATE STREAM s1 (
    case VARCHAR,
    step1 VARCHAR,
    step2 VARCHAR,
    step3 VARCHAR,
    step4 VARCHAR,
    step5 VARCHAR,
    price INTEGER
) WITH (
        kafka_topic = 's1',
    partitions = 1,
    value_format = 'avro'
);
CREATE STREAM s2 AS SELECT 
    case, 
    ARRAY[STRUCT(task := 'Step 1', time := step1), 
          STRUCT(task := 'Step 2', time := step2), 
          STRUCT(task := 'Step 3', time := step3), 
          STRUCT(task := 'Step 4', time := step4), 
          STRUCT(task := 'Step 5', time := step5)] AS steps,
    price
    FROM s1 EMIT CHANGES;
CREATE STREAM s3 AS SELECT 
    case, 
    igrafx_transposition(steps) AS steps,
    price
    FROM s2 EMIT CHANGES;
CREATE STREAM s4 AS SELECT 
    case, 
    steps->task AS activity, 
    steps->time AS timestamp,
    price
    FROM s3 EMIT CHANGES;

The result for the current example is:

Case Activity Timestamp Price
case 1 Step 1 12/01/2020 240
case 1 Step 2 14/01/2020 240
case 1 Step 3 15/01/2020 240
case 1 Step 4 16/01/2020 240
case 1 Step 5 17/01/2020 240

This can be problematic if, for instance, we then want to sum all the values in the Price column to determine the total price , because the result wouldn't take the real value for the total price of each process.

Here, the calculated value would be 5x240, whereas the real total price of the process is 240.