iGrafx Transposition Example
Variation 1¶
Consider the following data as input :
| Case | Step 1 | Step 2 | Step 3 | Step 4 | Step 5 |
|---|---|---|---|---|---|
| case 1 | 12/01/2020 | 14/01/2020 | 15/01/2020 | 16/01/2020 | 17/01/2020 |
| case 2 | 14/02/2020 | 15/02/2020 | 18/02/2020 | 18/02/2020 | |
| case 3 | 17/03/2020 | 24/03/2020 |
We expect the following output :
| Case | Task | Timestamp |
|---|---|---|
| case 1 | Step 1 | 12/01/2020 |
| case 1 | Step 2 | 14/01/2020 |
| case 1 | Step 3 | 15/01/2020 |
| case 1 | Step 4 | 16/01/2020 |
| case 1 | Step 5 | 17/01/2020 |
| case 2 | Step 1 | 14/02/2020 |
| case 2 | Step 2 | 15/02/2020 |
| case 2 | Step 3 | 18/02/2020 |
| case 2 | Step 5 | 18/02/2020 |
| case 3 | Step 1 | 17/03/2020 |
| case 3 | Step 3 | 24/03/2020 |
Nevertheless, the UDF doesn’t allow us to directly transition from the first table to the second one, so we need to use intermediate ksqlDB streams.
The UDF takes as input a Java List of type STRUCT<TASK VARCHAR(STRING), TIME VARCHAR(STRING)>,
where the list represents a single row and each element in the list corresponds to one column in that row.
For example, the first element of the list (which corresponds to the first row of the example) might be: STRUCT<Step 1, 12/01/2020>.
Here, the output type is identical to the input type because it is of type Tabular. This instructs ksqlDB to create a row for each element in the output list. The first variation simply removes elements with an empty or null TIME field from the input list.
To test this variation directly in ksqlDB, launch ksqlDB. Then, use the following commands:
- Create the initial stream representing rows with columns, where each row follows the format:
Case | Step 1 | Step 2 | Step 3 | Step 4 | Step 5
CREATE STREAM s1 (
case VARCHAR,
step1 VARCHAR,
step2 VARCHAR,
step3 VARCHAR,
step4 VARCHAR,
step5 VARCHAR
) WITH (
kafka_topic = 's1',
partitions = 1,
value_format = 'avro'
);
- Next, create the stream that prepares the call to the UDF. Here, an
ARRAY<STRUCT(...), STRUCT(...), ...>corresponds to the UDF input parameter, with ARRAY being the ksqlDB type that maps to Java'sList.
CREATE STREAM s2 AS SELECT
case,
ARRAY[STRUCT(task := 'Step 1', time := step1),
STRUCT(task := 'Step 2', time := step2),
STRUCT(task := 'Step 3', time := step3),
STRUCT(task := 'Step 4', time := step4),
STRUCT(task := 'Step 5', time := step5)] AS steps
FROM s1 EMIT CHANGES;
- Now create the STREAM that is going to call the igrafx_transposition UDF
CREATE STREAM s3 AS SELECT
case,
igrafx_transposition(steps) AS steps
FROM s2 EMIT CHANGES;
With the previous example, the s3 STREAM contains the following data :
| case | steps |
|---|---|
| case 1 | STRUCT |
| case 1 | STRUCT |
| case 1 | STRUCT |
| case 1 | STRUCT |
| case 1 | STRUCT |
| case 2 | STRUCT |
| case 2 | STRUCT |
| case 2 | STRUCT |
| case 2 | STRUCT |
| case 3 | STRUCT |
| case 3 | STRUCT |
Furthermore, we create the final STREAM which deconstructs the STRUCT from s3 into two different columns:
CREATE STREAM s4 AS SELECT
case,
steps->task AS activity,
steps->time AS timestamp
FROM s3 EMIT CHANGES;
Once all the STREAMS have been created, it is possible to add the data, and here each INSERT represents a row:
INSERT INTO s1 (case, step1, step2, step3, step4, step5) VALUES ('case 1', '12/01/2020', '14/01/2020', '15/01/2020', '16/01/2020', '17/01/2020');
INSERT INTO s1 (case, step1, step2, step3, step4, step5) VALUES ('case 2', '14/02/2020', '15/02/2020', '18/02/2020', '', '18/02/2020');
INSERT INTO s1 (case, step1, step2, step3, step4, step5) VALUES ('case 3', '17/03/2020', '', '24/03/2020', '', '');
INSERT INTO s1 (case, step1, step2, step3, step4, step5) VALUES ('case 4', '17/03/2020', '25/03/2020', '', '16/03/2020', '24/03/2020');
INSERT INTO s1 (case, step1, step2, step3, step4, step5) VALUES ('case 5', '', '', '', '16/03/2020', '');
INSERT INTO s1 (case, step1, step2, step3, step4, step5) VALUES ('case 6', '', '', '', '', '');
INSERT INTO s1 (case, step1, step2, step3, step4, step5) VALUES ('case 7', '17/03/2020', '17/03/2020', '17/03/2020', '17/03/2020', '17/03/2020');
INSERT INTO s1 (case, step1, step2, step3, step4, step5) VALUES ('cas e8', '17/03/2020', '16/03/2020', '17/03/2020', '18/03/2020', '17/03/2020');
Finally, we can display the final result in ksqlDB with :
SELECT case, activity, timestamp FROM s4 EMIT CHANGES;
Variation 2¶
Let us consider this case as input :
| Case | Step 1 | Step 2 | Step 3 | Step 4 | Step 5 |
|---|---|---|---|---|---|
| case 1 | 17/03/2020 | 16/03/2020 | 17/03/2020 | 18/03/2020 | 17/03/2020 |
We get this output if isStartInformation = true and isTaskNameAscending = true :
| Case | Activity | Start | End |
|---|---|---|---|
| case 1 | Step 2 | 16/03/2020 | 17/03/2020 |
| case 1 | Step 1 | 17/03/2020 | 17/03/2020 |
| case 1 | Step 3 | 17/03/2020 | 17/03/2020 |
| case 1 | Step 5 | 17/03/2020 | 18/03/2020 |
| case 1 | Step 4 | 18/03/2020 | 18/03/2020 |
Otherwise, we get this output if isStartInformation = true and isTaskNameAscending = false :
| Case | Activity | Start | End |
|---|---|---|---|
| case 1 | Step 2 | 16/03/2020 | 17/03/2020 |
| case 1 | Step 5 | 17/03/2020 | 17/03/2020 |
| case 1 | Step 3 | 17/03/2020 | 17/03/2020 |
| case 1 | Step 1 | 17/03/2020 | 18/03/2020 |
| case 1 | Step 4 | 18/03/2020 | 18/03/2020 |
We get the following output if isStartInformation = false and isTaskNameAscending = true :
| Case | Activity | Start | End |
|---|---|---|---|
| case 1 | Step 2 | 16/03/2020 | 16/03/2020 |
| case 1 | Step 1 | 16/03/2020 | 17/03/2020 |
| case 1 | Step 3 | 17/03/2020 | 17/03/2020 |
| case 1 | Step 5 | 17/03/2020 | 17/03/2020 |
| case 1 | Step 4 | 17/03/2020 | 18/03/2020 |
We get the following output if isStartInformation = false and isTaskNameAscending = false :
| Case | Activity | Start | End |
|---|---|---|---|
| case 1 | Step 2 | 16/03/2020 | 16/03/2020 |
| case 1 | Step 5 | 16/03/2020 | 17/03/2020 |
| case 1 | Step 3 | 17/03/2020 | 17/03/2020 |
| case 1 | Step 1 | 17/03/2020 | 17/03/2020 |
| case 1 | Step 4 | 17/03/2020 | 18/03/2020 |
To test this variation directly in ksqlDB, we first need to launch ksqlDB, then, write the following commands :
The first two STREAMS are the same as the ones in the first variation.
- Create the initial stream representing rows with columns, where each row follows the format:
Case | Step 1 | Step 2 | Step 3 | Step 4 | Step 5
CREATE STREAM s1 (
case VARCHAR,
step1 VARCHAR,
step2 VARCHAR,
step3 VARCHAR,
step4 VARCHAR,
step5 VARCHAR
) WITH (
kafka_topic = 's1',
partitions = 1,
value_format = 'avro'
);
- Next, create the stream that prepares the call to the UDF. Here, an
ARRAY<STRUCT(...), STRUCT(...), ...>corresponds to the UDF input parameter, with ARRAY being the ksqlDB type that maps to Java'sList.
CREATE STREAM s2 AS SELECT
case,
ARRAY[STRUCT(task := 'Step 1', time := step1),
STRUCT(task := 'Step 2', time := step2),
STRUCT(task := 'Step 3', time := step3),
STRUCT(task := 'Step 4', time := step4),
STRUCT(task := 'Step 5', time := step5)] AS steps
FROM s1 EMIT CHANGES;
- Now we create the STREAM that is going to call the iGrafx transposition UDF function.
CREATE STREAM s3 AS SELECT
case,
igrafx_transposition(steps, "dd/MM/yyyy", true, true) AS steps
FROM s2 EMIT CHANGES;
or
CREATE STREAM s3 AS SELECT
case,
igrafx_transposition(steps, "dd/MM/yyyy", true, false) AS steps
FROM s2 EMIT CHANGES;
or
CREATE STREAM s3 AS SELECT
case,
igrafx_transposition(steps, "dd/MM/yyyy", false, true) AS steps
FROM s2 EMIT CHANGES;
or
CREATE STREAM s3 AS SELECT
case,
igrafx_transposition(steps, "dd/MM/yyyy", false, false) AS steps
FROM s2 EMIT CHANGES;
- Now we create the final STREAM which deconstructs the
STRUCTfroms3into 4 different columns:
CREATE STREAM s4 AS SELECT
case,
steps->task AS activity,
steps->start AS start_date,
steps->stop AS end_date
FROM s3 EMIT CHANGES;
Once all the STREAMS have been created, we can add the data.
Here each INSERT represents to a row.
INSERT INTO s1 (case, step1, step2, step3, step4, step5) VALUES ('case 1', '12/01/2020', '14/01/2020', '15/01/2020', '16/01/2020', '17/01/2020');
INSERT INTO s1 (case, step1, step2, step3, step4, step5) VALUES ('case 2', '14/02/2020', '15/02/2020', '18/02/2020', '', '18/02/2020');
INSERT INTO s1 (case, step1, step2, step3, step4, step5) VALUES ('case 3', '17/03/2020', '', '24/03/2020', '', '');
INSERT INTO s1 (case, step1, step2, step3, step4, step5) VALUES ('case 4', '17/03/2020', '25/03/2020', '', '16/03/2020', '24/03/2020');
INSERT INTO s1 (case, step1, step2, step3, step4, step5) VALUES ('case 5', '', '', '', '16/03/2020', '');
INSERT INTO s1 (case, step1, step2, step3, step4, step5) VALUES ('case 6', '', '', '', '', '');
INSERT INTO s1 (case, step1, step2, step3, step4, step5) VALUES ('case 7', '17/03/2020', '17/03/2020', '17/03/2020', '17/03/2020', '17/03/2020');
INSERT INTO s1 (case, step1, step2, step3, step4, step5) VALUES ('case 8', '17/03/2020', '16/03/2020', '17/03/2020', '18/03/2020', '17/03/2020');
Finally, we can display the final result with :
SELECT cas, activite, debut, fin FROM s4 EMIT CHANGES;
3rd UDF example¶
Let us consider that the initial row corresponds to :
| Case | Step 1 | Step 2 | Step 3 | Step 4 | Step 5 | Total Price |
|---|---|---|---|---|---|---|
| case 1 | 12/01/2020 | 14/01/2020 | 15/01/2020 | 16/01/2020 | 17/01/2020 | 240 |
An information is given about the Total Price related to the process. In case of use of the UDF (for instance with the first variation), if the created STREAMS keep the information of the Total Price column, then the Total Price is present for each of the created rows:
CREATE STREAM s1 (
case VARCHAR,
step1 VARCHAR,
step2 VARCHAR,
step3 VARCHAR,
step4 VARCHAR,
step5 VARCHAR,
price INTEGER
) WITH (
kafka_topic = 's1',
partitions = 1,
value_format = 'avro'
);
CREATE STREAM s2 AS SELECT
case,
ARRAY[STRUCT(task := 'Step 1', time := step1),
STRUCT(task := 'Step 2', time := step2),
STRUCT(task := 'Step 3', time := step3),
STRUCT(task := 'Step 4', time := step4),
STRUCT(task := 'Step 5', time := step5)] AS steps,
price
FROM s1 EMIT CHANGES;
CREATE STREAM s3 AS SELECT
case,
igrafx_transposition(steps) AS steps,
price
FROM s2 EMIT CHANGES;
CREATE STREAM s4 AS SELECT
case,
steps->task AS activity,
steps->time AS timestamp,
price
FROM s3 EMIT CHANGES;
The result for the current example is:
| Case | Activity | Timestamp | Price |
|---|---|---|---|
| case 1 | Step 1 | 12/01/2020 | 240 |
| case 1 | Step 2 | 14/01/2020 | 240 |
| case 1 | Step 3 | 15/01/2020 | 240 |
| case 1 | Step 4 | 16/01/2020 | 240 |
| case 1 | Step 5 | 17/01/2020 | 240 |
This can be problematic if, for instance, we then want to sum all the values in the Price column to determine the total price , because the result wouldn't take the real value for the total price of each process.
Here, the calculated value would be 5x240, whereas the real total price of the process is 240.