iGrafx Sessions Example

Start by using the following command to apply modifications of a STREAM on data inserted before the creation or display of the STREAM :

SET 'auto.offset.reset'='earliest';

Let us consider the following collection of rows :

timeStamp userID targetApp eventType
2020-06-16T04 1 appli1 Start
2020-06-16T04 1 appli1 event1
2020-06-16T04 1 appli1 event2
2020-06-16T04 2 appli1 Start
2020-06-16T04 2 appli1 event4
2020-06-16T04 2 appli2 Start
2020-06-16T04 2 appli3 event5
2020-06-16T04 1 appli1 event3
2020-06-16T04 1 appli1 ignoreEvent
2020-06-16T04 1 appli1 End
2020-06-16T04 1 appli2 aloneEvent1
2020-06-16T04 1 appli2 aloneEvent2
2020-06-16T04 2 appli2 event6
2020-06-16T04 2 appli2 End
2020-06-16T04 2 appli2 event7
2020-06-16T04 2 appli3 End

The first STREAM to create in ksqlDB is the following, where each row is present in the lines array:

CREATE STREAM s1 (
    lines ARRAY<VARCHAR>
) WITH (
    kafka_topic = 's1',
    partitions = 1,
    value_format = 'avro'
);

It is then possible to insert data :

INSERT INTO s1 (lines) VALUES (ARRAY[
    '2020-06-16T04;1;appli1;Start', 
    '2020-06-16T04;1;appli1;event1', 
    '2020-06-16T04;1;appli1;event2', 
    '2020-06-16T04;2;appli1;Start', 
    '2020-06-16T04;2;appli1;event4', 
    '2020-06-16T04;2;appli2;Start', 
    '2020-06-16T04;2;appli3;event5',
    '2020-06-16T04;1;appli1;event3', 
    '2020-06-16T04;1;appli1;ignoreEvent', 
    '2020-06-16T04;1;appli1;End', 
    '2020-06-16T04;1;appli2;aloneEvent1', 
    '2020-06-16T04;1;appli2;aloneEvent2',  
    '2020-06-16T04;2;appli2;event6', 
    '2020-06-16T04;2;appli2;End' , 
    '2020-06-16T04;2;appli2;event7', 
    '2020-06-16T04;2;appli3;End']);

Now, everything is ready to call the UDF.

In this example, the rows verifying :

  • ignorePattern = .\*;.\*;.*;ignoreEvent are ignored
  • startSessionPattern = .\*;.\*;.*;Start matches to the start of a new session
  • endSessionPattern = .\*;.\*;.*;End matches to the end of the current session

Furthermore, the rows follow the format timeStamp;userID;targetApp;eventType.

The group pattern used in the examples is :

  • groupSessionPattern = .\*;(.\*);.\*;.*, meaning that rows are divided into groups according to the value of the userID column.

Moreover, the pattern used to create the sessionId of a session starting row is :

  • sessionIdPattern = .\*;(.\*);(.\*);.* meaning that for a session starting row the sessionId will be calculated according to the values of the userID and targetApp columns.

Thus, in function of the isSessionIdHash value, the following sessionId1,sessionId2,sessionId3,sessionId4 will correspond either to the concatenation of the userId and targetApp columns, or to the hashed value of the concatenation of the userId and targetApp columns (these specific columns because they are described by sessionIdPattern = .*;(.*);(.*);.*)

Numerous combinations for the igrafx_sessions functions are possible :

  • isIgnoreIfNoStart = true and isIgnoreIfNoEnd = true
CREATE STREAM s2 AS SELECT 
    igrafx_sessions(lines, '.*;.*;.*;ignoreEvent', '.*;(.*);.*;.*', '.*;.*;.*;Start', '.*;.*;.*;End', '.*;(.*);(.*);.*', true, true, true) AS sessions 
    FROM s1 EMIT CHANGES;

CREATE STREAM s3 AS SELECT 
    sessions->session_id AS session_id, 
    sessions->line AS session_line 
    FROM s2 EMIT CHANGES;

SELECT session_id, session_line FROM s3 EMIT CHANGES;

The result is then :

session_id session_line
sessionId1 2020-06-16T04;1;appli1;Start
sessionId1 2020-06-16T04;1;appli1;event1
sessionId1 2020-06-16T04;1;appli1;event2
sessionId1 2020-06-16T04;1;appli1;event3
sessionId1 2020-06-16T04;1;appli1;End
sessionId2 2020-06-16T04;2;appli2;Start
sessionId2 2020-06-16T04;2;appli3;event5
sessionId2 2020-06-16T04;2;appli2;event6
sessionId2 2020-06-16T04;2;appli2;End
  • isIgnoreIfNoStart = false and isIgnoreIfNoEnd = true
CREATE STREAM s4 AS SELECT 
    igrafx_sessions(lines, '.*;.*;.*;ignoreEvent', '.*;(.*);.*;.*', '.*;.*;.*;Start', '.*;.*;.*;End', '.*;(.*);(.*);.*', true, false, true) AS sessions 
    FROM s1 EMIT CHANGES;

CREATE STREAM s5 AS SELECT 
    sessions->session_id AS session_id, 
    sessions->line AS session_line 
    FROM s4 EMIT CHANGES;

SELECT session_id, session_line FROM s5 EMIT CHANGES;

The result is then :

session_id session_line
sessionId1 2020-06-16T04;1;appli1;Start
sessionId1 2020-06-16T04;1;appli1;event1
sessionId1 2020-06-16T04;1;appli1;event2
sessionId1 2020-06-16T04;1;appli1;event3
sessionId1 2020-06-16T04;1;appli1;End
sessionId2 2020-06-16T04;2;appli2;Start
sessionId2 2020-06-16T04;2;appli3;event5
sessionId2 2020-06-16T04;2;appli2;event6
sessionId2 2020-06-16T04;2;appli2;End
sessionId2 2020-06-16T04;2;appli2;event7
sessionId2 2020-06-16T04;2;appli3;End
  • isIgnoreIfNoStart = true and isIgnoreIfNoEnd = false
CREATE STREAM s6 AS SELECT 
    igrafx_sessions(lines, '.*;.*;.*;ignoreEvent', '.*;(.*);.*;.*', '.*;.*;.*;Start', '.*;.*;.*;End', '.*;(.*);(.*);.*', true, true, false) AS sessions 
    FROM s1 EMIT CHANGES;

CREATE STREAM s7 AS SELECT 
    sessions->session_id AS session_id, 
    sessions->line AS session_line 
    FROM s6 EMIT CHANGES;

SELECT session_id, session_line FROM s7 EMIT CHANGES;

The result is then :

session_id session_line
sessionId1 2020-06-16T04;1;appli1;Start
sessionId1 2020-06-16T04;1;appli1;event1
sessionId1 2020-06-16T04;1;appli1;event2
sessionId1 2020-06-16T04;1;appli1;event3
sessionId1 2020-06-16T04;1;appli1;End
sessionId2 2020-06-16T04;2;appli1;Start
sessionId2 2020-06-16T04;2;appli1;event4
sessionId3 2020-06-16T04;2;appli2;Start
sessionId3 2020-06-16T04;2;appli3;event5
sessionId3 2020-06-16T04;2;appli2;event6
sessionId3 2020-06-16T04;2;appli2;End
  • isIgnoreIfNoStart = false and isIgnoreIfNoEnd = false
CREATE STREAM s8 AS SELECT 
    igrafx_sessions(lines, '.*;.*;.*;ignoreEvent', '.*;(.*);.*;.*', '.*;.*;.*;Start', '.*;.*;.*;End', '.*;(.*);(.*);.*', true, false, false) AS sessions 
    FROM s1 EMIT CHANGES;

CREATE STREAM s9 AS SELECT 
    sessions->session_id AS session_id, 
    sessions->line AS session_line 
    FROM s8 EMIT CHANGES;

SELECT session_id, session_line FROM s9 EMIT CHANGES;

The result is then :

session_id session_line
sessionId1 2020-06-16T04;1;appli1;Start
sessionId1 2020-06-16T04;1;appli1;event1
sessionId1 2020-06-16T04;1;appli1;event2
sessionId1 2020-06-16T04;1;appli1;event3
sessionId1 2020-06-16T04;1;appli1;End
sessionId2 2020-06-16T04;1;appli2;aloneEvent1
sessionId2 2020-06-16T04;1;appli2;aloneEvent2
sessionId3 2020-06-16T04;2;appli1;Start
sessionId3 2020-06-16T04;2;appli1;event4
sessionId4 2020-06-16T04;2;appli2;Start
sessionId4 2020-06-16T04;2;appli3;event5
sessionId4 2020-06-16T04;2;appli2;event6
sessionId4 2020-06-16T04;2;appli2;End
sessionId4 2020-06-16T04;2;appli2;event7
sessionId4 2020-06-16T04;2;appli3;End

In the case where startSessionPattern and endSessionPattern are both verified by the same row. The row is then considered as the start of a new session and end the previous session. The new session is then kept independently of the isIgnoreIfNoEnd value.