iGrafx Sessions Example
Start by using the following command to apply modifications of a STREAM on data inserted before the creation or display of the STREAM :
SET 'auto.offset.reset'='earliest';
Let us consider the following collection of rows :
| timeStamp | userID | targetApp | eventType |
|---|---|---|---|
| 2020-06-16T04 | 1 | appli1 | Start |
| 2020-06-16T04 | 1 | appli1 | event1 |
| 2020-06-16T04 | 1 | appli1 | event2 |
| 2020-06-16T04 | 2 | appli1 | Start |
| 2020-06-16T04 | 2 | appli1 | event4 |
| 2020-06-16T04 | 2 | appli2 | Start |
| 2020-06-16T04 | 2 | appli3 | event5 |
| 2020-06-16T04 | 1 | appli1 | event3 |
| 2020-06-16T04 | 1 | appli1 | ignoreEvent |
| 2020-06-16T04 | 1 | appli1 | End |
| 2020-06-16T04 | 1 | appli2 | aloneEvent1 |
| 2020-06-16T04 | 1 | appli2 | aloneEvent2 |
| 2020-06-16T04 | 2 | appli2 | event6 |
| 2020-06-16T04 | 2 | appli2 | End |
| 2020-06-16T04 | 2 | appli2 | event7 |
| 2020-06-16T04 | 2 | appli3 | End |
The first STREAM to create in ksqlDB is the following, where each row is present in the lines array:
CREATE STREAM s1 (
lines ARRAY<VARCHAR>
) WITH (
kafka_topic = 's1',
partitions = 1,
value_format = 'avro'
);
It is then possible to insert data :
INSERT INTO s1 (lines) VALUES (ARRAY[
'2020-06-16T04;1;appli1;Start',
'2020-06-16T04;1;appli1;event1',
'2020-06-16T04;1;appli1;event2',
'2020-06-16T04;2;appli1;Start',
'2020-06-16T04;2;appli1;event4',
'2020-06-16T04;2;appli2;Start',
'2020-06-16T04;2;appli3;event5',
'2020-06-16T04;1;appli1;event3',
'2020-06-16T04;1;appli1;ignoreEvent',
'2020-06-16T04;1;appli1;End',
'2020-06-16T04;1;appli2;aloneEvent1',
'2020-06-16T04;1;appli2;aloneEvent2',
'2020-06-16T04;2;appli2;event6',
'2020-06-16T04;2;appli2;End' ,
'2020-06-16T04;2;appli2;event7',
'2020-06-16T04;2;appli3;End']);
Now, everything is ready to call the UDF.
In this example, the rows verifying :
- ignorePattern =
.\*;.\*;.*;ignoreEventare ignored - startSessionPattern =
.\*;.\*;.*;Startmatches to the start of a new session - endSessionPattern =
.\*;.\*;.*;Endmatches to the end of the current session
Furthermore, the rows follow the format timeStamp;userID;targetApp;eventType.
The group pattern used in the examples is :
groupSessionPattern=.\*;(.\*);.\*;.*, meaning that rows are divided into groups according to the value of theuserIDcolumn.
Moreover, the pattern used to create the sessionId of a session starting row is :
sessionIdPattern=.\*;(.\*);(.\*);.*meaning that for a session starting row thesessionIdwill be calculated according to the values of theuserIDandtargetAppcolumns.
Thus, in function of the isSessionIdHash value, the following sessionId1,sessionId2,sessionId3,sessionId4
will correspond either to the concatenation of the userId and targetApp columns,
or to the hashed value of the concatenation of the userId and targetApp columns
(these specific columns because they are described by sessionIdPattern = .*;(.*);(.*);.*)
Numerous combinations for the igrafx_sessions functions are possible :
isIgnoreIfNoStart = trueandisIgnoreIfNoEnd = true
CREATE STREAM s2 AS SELECT
igrafx_sessions(lines, '.*;.*;.*;ignoreEvent', '.*;(.*);.*;.*', '.*;.*;.*;Start', '.*;.*;.*;End', '.*;(.*);(.*);.*', true, true, true) AS sessions
FROM s1 EMIT CHANGES;
CREATE STREAM s3 AS SELECT
sessions->session_id AS session_id,
sessions->line AS session_line
FROM s2 EMIT CHANGES;
SELECT session_id, session_line FROM s3 EMIT CHANGES;
The result is then :
| session_id | session_line |
|---|---|
| sessionId1 | 2020-06-16T04;1;appli1;Start |
| sessionId1 | 2020-06-16T04;1;appli1;event1 |
| sessionId1 | 2020-06-16T04;1;appli1;event2 |
| sessionId1 | 2020-06-16T04;1;appli1;event3 |
| sessionId1 | 2020-06-16T04;1;appli1;End |
| sessionId2 | 2020-06-16T04;2;appli2;Start |
| sessionId2 | 2020-06-16T04;2;appli3;event5 |
| sessionId2 | 2020-06-16T04;2;appli2;event6 |
| sessionId2 | 2020-06-16T04;2;appli2;End |
isIgnoreIfNoStart = falseandisIgnoreIfNoEnd = true
CREATE STREAM s4 AS SELECT
igrafx_sessions(lines, '.*;.*;.*;ignoreEvent', '.*;(.*);.*;.*', '.*;.*;.*;Start', '.*;.*;.*;End', '.*;(.*);(.*);.*', true, false, true) AS sessions
FROM s1 EMIT CHANGES;
CREATE STREAM s5 AS SELECT
sessions->session_id AS session_id,
sessions->line AS session_line
FROM s4 EMIT CHANGES;
SELECT session_id, session_line FROM s5 EMIT CHANGES;
The result is then :
| session_id | session_line |
|---|---|
| sessionId1 | 2020-06-16T04;1;appli1;Start |
| sessionId1 | 2020-06-16T04;1;appli1;event1 |
| sessionId1 | 2020-06-16T04;1;appli1;event2 |
| sessionId1 | 2020-06-16T04;1;appli1;event3 |
| sessionId1 | 2020-06-16T04;1;appli1;End |
| sessionId2 | 2020-06-16T04;2;appli2;Start |
| sessionId2 | 2020-06-16T04;2;appli3;event5 |
| sessionId2 | 2020-06-16T04;2;appli2;event6 |
| sessionId2 | 2020-06-16T04;2;appli2;End |
| sessionId2 | 2020-06-16T04;2;appli2;event7 |
| sessionId2 | 2020-06-16T04;2;appli3;End |
isIgnoreIfNoStart = trueandisIgnoreIfNoEnd = false
CREATE STREAM s6 AS SELECT
igrafx_sessions(lines, '.*;.*;.*;ignoreEvent', '.*;(.*);.*;.*', '.*;.*;.*;Start', '.*;.*;.*;End', '.*;(.*);(.*);.*', true, true, false) AS sessions
FROM s1 EMIT CHANGES;
CREATE STREAM s7 AS SELECT
sessions->session_id AS session_id,
sessions->line AS session_line
FROM s6 EMIT CHANGES;
SELECT session_id, session_line FROM s7 EMIT CHANGES;
The result is then :
| session_id | session_line |
|---|---|
| sessionId1 | 2020-06-16T04;1;appli1;Start |
| sessionId1 | 2020-06-16T04;1;appli1;event1 |
| sessionId1 | 2020-06-16T04;1;appli1;event2 |
| sessionId1 | 2020-06-16T04;1;appli1;event3 |
| sessionId1 | 2020-06-16T04;1;appli1;End |
| sessionId2 | 2020-06-16T04;2;appli1;Start |
| sessionId2 | 2020-06-16T04;2;appli1;event4 |
| sessionId3 | 2020-06-16T04;2;appli2;Start |
| sessionId3 | 2020-06-16T04;2;appli3;event5 |
| sessionId3 | 2020-06-16T04;2;appli2;event6 |
| sessionId3 | 2020-06-16T04;2;appli2;End |
isIgnoreIfNoStart = falseandisIgnoreIfNoEnd = false
CREATE STREAM s8 AS SELECT
igrafx_sessions(lines, '.*;.*;.*;ignoreEvent', '.*;(.*);.*;.*', '.*;.*;.*;Start', '.*;.*;.*;End', '.*;(.*);(.*);.*', true, false, false) AS sessions
FROM s1 EMIT CHANGES;
CREATE STREAM s9 AS SELECT
sessions->session_id AS session_id,
sessions->line AS session_line
FROM s8 EMIT CHANGES;
SELECT session_id, session_line FROM s9 EMIT CHANGES;
The result is then :
| session_id | session_line |
|---|---|
| sessionId1 | 2020-06-16T04;1;appli1;Start |
| sessionId1 | 2020-06-16T04;1;appli1;event1 |
| sessionId1 | 2020-06-16T04;1;appli1;event2 |
| sessionId1 | 2020-06-16T04;1;appli1;event3 |
| sessionId1 | 2020-06-16T04;1;appli1;End |
| sessionId2 | 2020-06-16T04;1;appli2;aloneEvent1 |
| sessionId2 | 2020-06-16T04;1;appli2;aloneEvent2 |
| sessionId3 | 2020-06-16T04;2;appli1;Start |
| sessionId3 | 2020-06-16T04;2;appli1;event4 |
| sessionId4 | 2020-06-16T04;2;appli2;Start |
| sessionId4 | 2020-06-16T04;2;appli3;event5 |
| sessionId4 | 2020-06-16T04;2;appli2;event6 |
| sessionId4 | 2020-06-16T04;2;appli2;End |
| sessionId4 | 2020-06-16T04;2;appli2;event7 |
| sessionId4 | 2020-06-16T04;2;appli3;End |
In the case where startSessionPattern and endSessionPattern are both verified by the same row.
The row is then considered as the start of a new session and end the previous session.
The new session is then kept independently of the isIgnoreIfNoEnd value.