Basic iGrafx Connector Example

This is an example on how to send sending data from Kafka with the iGrafx Connector to the Process360 Live Process Mining platform.

Make sure docker compose is started before running this example.

First of all, create a new iGrafx project within your workgroup.

Then use the following command to apply transformations to a STREAM for data that was inserted before the STREAM's creation or display:

SET 'auto.offset.reset'='earliest';

Next, set up the ksqlDB STREAM that corresponds to the source topic of the connector. The connector will receive data from the topic associated with this STREAM:

CREATE STREAM IGRAFX_AVRO (
    dataArray ARRAY<STRUCT<columnID INT, text VARCHAR, quote BOOLEAN>>
) WITH (
    kafka_topic = 'igrafx_avro',
    partitions = 1,
    value_format = 'AVRO'
);

And the STREAM associated with the Kafka topic that will receive the Logging Events of the connector :

CREATE STREAM LOGGING (
    EVENT_TYPE VARCHAR,
    IGRAFX_PROJECT VARCHAR,
    EVENT_DATE BIGINT,
    EVENT_SEQUENCE_ID VARCHAR,
    PAYLOAD VARCHAR
) WITH (
    KAFKA_TOPIC='logging_connector_test',
    PARTITIONS=1,
    REPLICAS=1,
    VALUE_FORMAT='AVRO',
    VALUE_AVRO_SCHEMA_FULL_NAME='com.igrafx.IGrafxKafkaLoggingEventsSchema'
);

It is now time instantiate the connector.

In the following command, the values of the api.url, api.authUrl, workGroupId, workGroupKey, projectId properties must be changed. The first four values can be found on the iGrafx Mining platform, in the OpenAPI section of the Workgroup settings.

Note that the following command will work in KsqlDB - CLI. To make it work in Kafka-UI, you will have to remove the double escape character from csv.escape and csv.endOfLine: ````aiignore 'csv.endOfLine' = '\n',

'csv.escape' = '\', ````

Here is the command to instantiate the connector:

CREATE SINK CONNECTOR IGrafxConnectorCMLogging WITH (
'connector.class' = 'com.igrafx.kafka.sink.aggregationmain.domain.IGrafxAggregationSinkConnector',
'tasks.max' = '1',
'topics' = 'igrafx_avro',
'api.url' = '???',
'api.authUrl' = '???',
'workGroupId' = '???',
'workGroupKey' = '???',
'projectId' = '???',
'csv.encoding' = 'UTF-8',
'csv.separator' = ',',
'csv.quote' = '"',
'csv.fieldsNumber' = '9',
'csv.header' = 'true',
'csv.defaultTextValue' = 'null',
'retentionTimeInDay' = '100',
'columnMapping.create' = 'true',
'columnMapping.caseIdColumnIndex' = '0',
'columnMapping.activityColumnIndex' = '1',
'columnMapping.timeInformationList' = '{2;dd/MM/yy HH:mm},{3;dd/MM/yy HH:mm}',
'columnMapping.dimensionsInformationList' = '[{"columnIndex": 4, "name": "Country", "isCaseScope": true, "aggregation": "FIRST"},{"columnIndex": 5, "name": "Region", "isCaseScope": false},{"columnIndex": 6, "name": "City", "isCaseScope": false}]',
'columnMapping.metricsInformationList' = '[{"columnIndex": 7, "name": "Price", "unit": "Euros", "isCaseScope": true, "aggregation": "MIN"},{"columnIndex": 8, "name": "DepartmentNumber", "isCaseScope": false}]',
'csv.endOfLine' = '\\n',
'csv.escape' = '\\',
'csv.comment' = '#',
'kafkaLoggingEvents.isLogging' = 'true',
'kafkaLoggingEvents.topic' = 'logging_connector_test',
'threshold.elementNumber' = '20',
'threshold.valuePattern' = '',
'threshold.timeoutInSeconds' = 30,
'bootstrap.servers' = 'broker:29092',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'value.converter' = 'io.confluent.connect.avro.AvroConverter',
'value.converter.schema.registry.url' = 'http://schema-registry:8081'
);

In this setup, the connector will attempt to aggregate 20 basic events as defined by the threshold.elementNumber property. However, it will send the aggregation to the iGrafx Mining Platform even if fewer events are available, after 30 seconds have passed since the last transmission, based on the threshold.timeoutInSeconds property. This means that if 20 events are not accumulated within 30 seconds, it will still send the available events.

Additionally, the connector will create a Column Mapping for the iGrafx Project and will log file-related events in the logging_connector_test Kafka topic (configured by the kafkaLoggingEvents.topic property).

To disable these features: - Set columnMapping.create to false if you do not want the connector to create a Column Mapping for the iGrafx Project. - Set kafkaLoggingEvents.isLogging to false if you do not want the connector to log events in a Kafka topic.

Important: Currently, if the project associated with the connector already has a column mapping and columnMapping.create is set to true, the connector will enter a FAILED state. Ensure that columnMapping.create is set to false if the target project already has a column mapping.

To add process events to the igrafx_avro Kafka topic, use the following command (in this example, there are 51 events):

``` INSERT INTO IGRAFX_AVRO (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '3', quote := false), STRUCT(columnId := 1, text := 'A', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:05', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:10', quote := false), STRUCT(columnId := 4, text := 'Germany', quote := false), STRUCT(columnId := 5, text := 'Region1', quote := false), STRUCT(columnId := 6, text := 'Berlin', quote := false), STRUCT(columnId := 7, text := '10', quote := false), STRUCT(columnId := 8, text := '10', quote := false)]); INSERT INTO IGRAFX_AVRO (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '3', quote := false), STRUCT(columnId := 1, text := 'B', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:15', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:16', quote := false), STRUCT(columnId := 4, text := 'USA', quote := false), STRUCT(columnId := 5, text := 'Region2', quote := false), STRUCT(columnId := 6, text := 'Washington', quote := false), STRUCT(columnId := 7, text := '20', quote := false), STRUCT(columnId := 8, text := '16', quote := false)]); INSERT INTO IGRAFX_AVRO (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '3', quote := false), STRUCT(columnId := 1, text := 'C', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:16', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:17', quote := false), STRUCT(columnId := 4, text := 'France', quote := false), STRUCT(columnId := 5, text := 'Region3', quote := false), STRUCT(columnId := 6, text := 'Paris', quote := false), STRUCT(columnId := 7, text := '20', quote := false), STRUCT(columnId := 8, text := '56', quote := false)]); INSERT INTO IGRAFX_AVRO (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '3', quote := false), STRUCT(columnId := 1, text := 'D', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:26', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:27', quote := false), STRUCT(columnId := 4, text := 'Italy', quote := false), STRUCT(columnId := 5, text := 'Region4', quote := false), STRUCT(columnId := 6, text := 'Rome', quote := false), STRUCT(columnId := 7, text := '30', quote := false), STRUCT(columnId := 8, text := '47', quote := false)]); INSERT INTO IGRAFX_AVRO (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '3', quote := false), STRUCT(columnId := 1, text := 'E', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:29', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:31', quote := false), STRUCT(columnId := 4, text := 'Germany', quote := false), STRUCT(columnId := 5, text := 'Region5', quote := false), STRUCT(columnId := 6, text := 'Berlin', quote := false), STRUCT(columnId := 7, text := '50', quote := false), STRUCT(columnId := 8, text := '20', quote := false)]); INSERT INTO IGRAFX_AVRO (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '3', quote := false), STRUCT(columnId := 1, text := 'F', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:29', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:31', quote := false), STRUCT(columnId := 4, text := 'Italy', quote := false), STRUCT(columnId := 5, text := 'Region6', quote := false), STRUCT(columnId := 6, text := 'Rome', quote := false), STRUCT(columnId := 7, text := '50', quote := false), STRUCT(columnId := 8, text := '10', quote := false)]); INSERT INTO IGRAFX_AVRO (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '3', quote := false), STRUCT(columnId := 1, text := 'G', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:31', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:32', quote := false), STRUCT(columnId := 4, text := 'France', quote := false), STRUCT(columnId := 5, text := 'Region7', quote := false), STRUCT(columnId := 6, text := 'Paris', quote := false), STRUCT(columnId := 7, text := '60', quote := false), STRUCT(columnId := 8, text := '40', quote := false)]); INSERT INTO IGRAFX_AVRO (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '3', quote := false), STRUCT(columnId := 1, text := 'H', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:32', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:33', quote := false), STRUCT(columnId := 4, text := 'Germany', quote := false), STRUCT(columnId := 5, text := 'Region8', quote := false), STRUCT(columnId := 6, text := 'Berlin', quote := false), STRUCT(columnId := 7, text := '45', quote := false), STRUCT(columnId := 8, text := '10', quote := false)]); INSERT INTO IGRAFX_AVRO (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '3', quote := false), STRUCT(columnId := 1, text := 'I', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:33', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:34', quote := false), STRUCT(columnId := 4, text := 'France', quote := false), STRUCT(columnId := 5, text := 'Region9', quote := false), STRUCT(columnId := 6, text := 'Paris', quote := false), STRUCT(columnId := 7, text := '10', quote := false), STRUCT(columnId := 8, text := '10', quote := false)]); INSERT INTO IGRAFX_AVRO (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '3', quote := false), STRUCT(columnId := 1, text := 'J', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:34', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:35', quote := false), STRUCT(columnId := 4, text := 'Spain', quote := false), STRUCT(columnId := 5, text := 'Region10', quote := false), STRUCT(columnId := 6, text := 'Madrid', quote := false), STRUCT(columnId := 7, text := '20', quote := false), STRUCT(columnId := 8, text := '19', quote := false)]); INSERT INTO IGRAFX_AVRO (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '3', quote := false), STRUCT(columnId := 1, text := 'K', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:35', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:36', quote := false), STRUCT(columnId := 4, text := 'Canada', quote := false), STRUCT(columnId := 5, text := 'Region11', quote := false), STRUCT(columnId := 6, text := 'Ottawa', quote := false), STRUCT(columnId := 7, text := '30', quote := false), STRUCT(columnId := 8, text := '10', quote := false)]); INSERT INTO IGRAFX_AVRO (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '3', quote := false), STRUCT(columnId := 1, text := 'L', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:36', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:37', quote := false), STRUCT(columnId := 4, text := 'USA', quote := false), STRUCT(columnId := 5, text := 'Region12', quote := false), STRUCT(columnId := 6, text := 'Washington', quote := false), STRUCT(columnId := 7, text := '40', quote := false), STRUCT(columnId := 8, text := '16', quote := false)]); INSERT INTO IGRAFX_AVRO (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '3', quote := false), STRUCT(columnId := 1, text := 'M', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:37', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:38', quote := false), STRUCT(columnId := 4, text := 'USA', quote := false), STRUCT(columnId := 5, text := 'Region13', quote := false), STRUCT(columnId := 6, text := 'Washington', quote := false), STRUCT(columnId := 7, text := '50', quote := false), STRUCT(columnId := 8, text := '10', quote := false)]); INSERT INTO IGRAFX_AVRO (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '3', quote := false), STRUCT(columnId := 1, text := 'N', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:38', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:39', quote := false), STRUCT(columnId := 4, text := 'Canada', quote := false), STRUCT(columnId := 5, text := 'Region14', quote := false), STRUCT(columnId := 6, text := 'Ottawa', quote := false), STRUCT(columnId := 7, text := '20', quote := false), STRUCT(columnId := 8, text := '10', quote := false)]); INSERT INTO IGRAFX_AVRO (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '3', quote := false), STRUCT(columnId := 1, text := 'O', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:39', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:40', quote := false), STRUCT(columnId := 4, text := 'Italy', quote := false), STRUCT(columnId := 5, text := 'Region15', quote := false), STRUCT(columnId := 6, text := 'Rome', quote := false), STRUCT(columnId := 7, text := '30', quote := false), STRUCT(columnId := 8, text := '84', quote := false)]); INSERT INTO IGRAFX_AVRO (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '3', quote := false), STRUCT(columnId := 1, text := 'P', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:40', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:41', quote := false), STRUCT(columnId := 4, text := 'Canada', quote := false), STRUCT(columnId := 5, text := 'Region16', quote := false), STRUCT(columnId := 6, text := 'Ottawa', quote := false), STRUCT(columnId := 7, text := '40', quote := false), STRUCT(columnId := 8, text := '74', quote := false)]); INSERT INTO IGRAFX_AVRO (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '3', quote := false), STRUCT(columnId := 1, text := 'Q', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:41', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:42', quote := false), STRUCT(columnId := 4, text := 'Italy', quote := false), STRUCT(columnId := 5, text := 'Region17', quote := false), STRUCT(columnId := 6, text := 'Rome', quote := false), STRUCT(columnId := 7, text := '50', quote := false), STRUCT(columnId := 8, text := '75', quote := false)]); INSERT INTO IGRAFX_AVRO (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '3', quote := false), STRUCT(columnId := 1, text := 'R', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:42', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:43', quote := false), STRUCT(columnId := 4, text := 'Canada', quote := false), STRUCT(columnId := 5, text := 'Region18', quote := false), STRUCT(columnId := 6, text := 'Ottawa', quote := false), STRUCT(columnId := 7, text := '20', quote := false), STRUCT(columnId := 8, text := '60', quote := false)]); INSERT INTO IGRAFX_AVRO (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '3', quote := false), STRUCT(columnId := 1, text := 'S', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:43', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:44', quote := false), STRUCT(columnId := 4, text := 'Germany', quote := false), STRUCT(columnId := 5, text := 'Region19', quote := false), STRUCT(columnId := 6, text := 'Berlin', quote := false), STRUCT(columnId := 7, text := '20', quote := false), STRUCT(columnId := 8, text := '10', quote := false)]); INSERT INTO IGRAFX_AVRO (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '3', quote := false), STRUCT(columnId := 1, text := 'T', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:44', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:45', quote := false), STRUCT(columnId := 4, text := 'Canada', quote := false), STRUCT(columnId := 5, text := 'Region20', quote := false), STRUCT(columnId := 6, text := 'Ottawa', quote := false), STRUCT(columnId := 7, text := '10', quote := false), STRUCT(columnId := 8, text := '10', quote := false)]); INSERT INTO IGRAFX_AVRO (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '3', quote := false), STRUCT(columnId := 1, text := 'U', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:45', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:46', quote := false), STRUCT(columnId := 4, text := 'USA', quote := false), STRUCT(columnId := 5, text := 'Region21', quote := false), STRUCT(columnId := 6, text := 'Washington', quote := false), STRUCT(columnId := 7, text := '30', quote := false), STRUCT(columnId := 8, text := '10', quote := false)]); INSERT INTO IGRAFX_AVRO (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '3', quote := false), STRUCT(columnId := 1, text := 'V', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:46', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:47', quote := false), STRUCT(columnId := 4, text := 'Canada', quote := false), STRUCT(columnId := 5, text := 'Region22', quote := false), STRUCT(columnId := 6, text := 'Ottawa', quote := false), STRUCT(columnId := 7, text := '40', quote := false), STRUCT(columnId := 8, text := '10', quote := false)]); INSERT INTO IGRAFX_AVRO (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '3', quote := false), STRUCT(columnId := 1, text := 'W', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:47', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:48', quote := false), STRUCT(columnId := 4, text := 'Canada', quote := false), STRUCT(columnId := 5, text := 'Region23', quote := false), STRUCT(columnId := 6, text := 'Ottawa', quote := false), STRUCT(columnId := 7, text := '50', quote := false), STRUCT(columnId := 8, text := '10', quote := false)]); INSERT INTO IGRAFX_AVRO (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '3', quote := false), STRUCT(columnId := 1, text := 'X', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:48', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:49', quote := false), STRUCT(columnId := 4, text := 'Germany', quote := false), STRUCT(columnId := 5, text := 'Region24', quote := false), STRUCT(columnId := 6, text := 'Berlin', quote := false), STRUCT(columnId := 7, text := '40', quote := false), STRUCT(columnId := 8, text := '10', quote := false)]); INSERT INTO IGRAFX_AVRO (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '3', quote := false), STRUCT(columnId := 1, text := 'Y', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:49', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:50', quote := false), STRUCT(columnId := 4, text := 'Spain', quote := false), STRUCT(columnId := 5, text := 'Region25', quote := false), STRUCT(columnId := 6, text := 'Madrid', quote := false), STRUCT(columnId := 7, text := '20', quote := false), STRUCT(columnId := 8, text := '10', quote := false)]); INSERT INTO IGRAFX_AVRO (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '3', quote := false), STRUCT(columnId := 1, text := 'Z', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:50', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:51', quote := false), STRUCT(columnId := 4, text := 'France', quote := false), STRUCT(columnId := 5, text := 'Region26', quote := false), STRUCT(columnId := 6, text := 'Paris', quote := false), STRUCT(columnId := 7, text := '10', quote := false), STRUCT(columnId := 8, text := '10', quote := false)]); INSERT INTO IGRAFX_AVRO (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '5', quote := false), STRUCT(columnId := 1, text := 'A', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:05', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:10', quote := false), STRUCT(columnId := 4, text := 'France', quote := false), STRUCT(columnId := 5, text := 'Region27', quote := false), STRUCT(columnId := 6, text := 'Paris', quote := false), STRUCT(columnId := 7, text := '10', quote := false), STRUCT(columnId := 8, text := '10', quote := false)]); INSERT INTO IGRAFX_AVRO (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '5', quote := false), STRUCT(columnId := 1, text := 'B', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:15', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:16', quote := false), STRUCT(columnId := 4, text := 'Spain', quote := false), STRUCT(columnId := 5, text := 'Region28', quote := false), STRUCT(columnId := 6, text := 'Madrid', quote := false), STRUCT(columnId := 7, text := '30', quote := false), STRUCT(columnId := 8, text := '10', quote := false)]); INSERT INTO IGRAFX_AVRO (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '5', quote := false), STRUCT(columnId := 1, text := 'C', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:16', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:17', quote := false), STRUCT(columnId := 4, text := 'Germany', quote := false), STRUCT(columnId := 5, text := 'Region29', quote := false), STRUCT(columnId := 6, text := 'Berlin', quote := false), STRUCT(columnId := 7, text := '20', quote := false), STRUCT(columnId := 8, text := '10', quote := false)]); INSERT INTO IGRAFX_AVRO (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '5', quote := false), STRUCT(columnId := 1, text := 'D', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:26', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:27', quote := false), STRUCT(columnId := 4, text := 'USA', quote := false), STRUCT(columnId := 5, text := 'Region30', quote := false), STRUCT(columnId := 6, text := 'Washington', quote := false), STRUCT(columnId := 7, text := '40', quote := false), STRUCT(columnId := 8, text := '10', quote := false)]); INSERT INTO IGRAFX_AVRO (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '5', quote := false), STRUCT(columnId := 1, text := 'E', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:29', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:31', quote := false), STRUCT(columnId := 4, text := 'Italy', quote := false), STRUCT(columnId := 5, text := 'Region31', quote := false), STRUCT(columnId := 6, text := 'Rome', quote := false), STRUCT(columnId := 7, text := '10', quote := false), STRUCT(columnId := 8, text := '10', quote := false)]); INSERT INTO IGRAFX_AVRO (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '5', quote := false), STRUCT(columnId := 1, text := 'F', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:29', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:31', quote := false), STRUCT(columnId := 4, text := 'France', quote := false), STRUCT(columnId := 5, text := 'Region32', quote := false), STRUCT(columnId := 6, text := 'Paris', quote := false), STRUCT(columnId := 7, text := '80', quote := false), STRUCT(columnId := 8, text := '10', quote := false)]); INSERT INTO IGRAFX_AVRO (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '5', quote := false), STRUCT(columnId := 1, text := 'G', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:31', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:32', quote := false), STRUCT(columnId := 4, text := 'Spain', quote := false), STRUCT(columnId := 5, text := 'Region33', quote := false), STRUCT(columnId := 6, text := 'Madrid', quote := false), STRUCT(columnId := 7, text := '40', quote := false), STRUCT(columnId := 8, text := '10', quote := false)]); INSERT INTO IGRAFX_AVRO (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '5', quote := false), STRUCT(columnId := 1, text := 'H', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:32', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:33', quote := false), STRUCT(columnId := 4, text := 'USA', quote := false), STRUCT(columnId := 5, text := 'Region34', quote := false), STRUCT(columnId := 6, text := 'Washington', quote := false), STRUCT(columnId := 7, text := '30', quote := false), STRUCT(columnId := 8, text := '10', quote := false)]); INSERT INTO IGRAFX_AVRO (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '5', quote := false), STRUCT(columnId := 1, text := 'I', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:33', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:34', quote := false), STRUCT(columnId := 4, text := 'Italy', quote := false), STRUCT(columnId := 5, text := 'Region35', quote := false), STRUCT(columnId := 6, text := 'Rome', quote := false), STRUCT(columnId := 7, text := '40', quote := false), STRUCT(columnId := 8, text := '10', quote := false)]); INSERT INTO IGRAFX_AVRO (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '5', quote := false), STRUCT(columnId := 1, text := 'J', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:34', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:35', quote := false), STRUCT(columnId := 4, text := 'Germany', quote := false), STRUCT(columnId := 5, text := 'Region36', quote := false), STRUCT(columnId := 6, text := 'Berlin', quote := false), STRUCT(columnId := 7, text := '20', quote := false), STRUCT(columnId := 8, text := '10', quote := false)]); INSERT INTO IGRAFX_AVRO (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '5', quote := false), STRUCT(columnId := 1, text := 'K', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:35', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:36', quote := false), STRUCT(columnId := 4, text := 'France', quote := false), STRUCT(columnId := 5, text := 'Region37', quote := false), STRUCT(columnId := 6, text := 'Paris', quote := false), STRUCT(columnId := 7, text := '20', quote := false), STRUCT(columnId := 8, text := '10', quote := false)]); INSERT INTO IGRAFX_AVRO (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '5', quote := false), STRUCT(columnId := 1, text := 'L', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:36', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:37', quote := false), STRUCT(columnId := 4, text := 'Canada', quote := false), STRUCT(columnId := 5, text := 'Region38', quote := false), STRUCT(columnId := 6, text := 'Ottawa', quote := false), STRUCT(columnId := 7, text := '40', quote := false), STRUCT(columnId := 8, text := '10', quote := false)]); INSERT INTO IGRAFX_AVRO (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '5', quote := false), STRUCT(columnId := 1, text := 'M', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:37', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:38', quote := false), STRUCT(columnId := 4, text := 'Spain', quote := false), STRUCT(columnId := 5, text := 'Region39', quote := false), STRUCT(columnId := 6, text := 'Madrid', quote := false), STRUCT(columnId := 7, text := '40', quote := false), STRUCT(columnId := 8, text := '10', quote := false)]); INSERT INTO IGRAFX_AVRO (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '5', quote := false), STRUCT(columnId := 1, text := 'N', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:38', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:39', quote := false), STRUCT(columnId := 4, text := 'Spain', quote := false), STRUCT(columnId := 5, text := 'Region39', quote := false), STRUCT(columnId := 6, text := 'Madrid', quote := false), STRUCT(columnId := 7, text := '60', quote := false), STRUCT(columnId := 8, text := '10', quote := false)]); INSERT INTO IGRAFX_AVRO (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '5', quote := false), STRUCT(columnId := 1, text := 'O', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:39', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:40', quote := false), STRUCT(columnId := 4, text := 'USA', quote := false), STRUCT(columnId := 5, text := 'Region39', quote := false), STRUCT(columnId := 6, text := 'Washington', quote := false), STRUCT(columnId := 7, text := '70', quote := false), STRUCT(columnId := 8, text := '10', quote := false)]); INSERT INTO IGRAFX_AVRO (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '5', quote := false), STRUCT(columnId := 1, text := 'P', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:40', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:41', quote := false), STRUCT(columnId := 4, text := 'Spain', quote := false), STRUCT(columnId := 5, text := 'Region39', quote := false), STRUCT(columnId := 6, text := 'Madrid', quote := false), STRUCT(columnId := 7, text := '10', quote := false), STRUCT(columnId := 8, text := '10', quote := false)]); INSERT INTO IGRAFX_AVRO (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '5', quote := false), STRUCT(columnId := 1, text := 'Q', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:41', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:42', quote := false), STRUCT(columnId := 4, text := 'Spain', quote := false), STRUCT(columnId := 5, text := 'Region39', quote := false), STRUCT(columnId := 6, text := 'Madrid', quote := false), STRUCT(columnId := 7, text := '30', quote := false), STRUCT(columnId := 8, text := '10', quote := false)]); INSERT INTO IGRAFX_AVRO (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '5', quote := false), STRUCT(columnId := 1, text := 'R', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:42', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:43', quote := false), STRUCT(columnId := 4, text := 'France', quote := false), STRUCT(columnId := 5, text := 'Region39', quote := false), STRUCT(columnId := 6, text := 'Paris', quote := false), STRUCT(columnId := 7, text := '20', quote := false), STRUCT(columnId := 8, text := '10', quote := false)]); INSERT INTO IGRAFX_AVRO (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '5', quote := false), STRUCT(columnId := 1, text := 'S', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:43', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:44', quote := false), STRUCT(columnId := 4, text := 'Italy', quote := false), STRUCT(columnId := 5, text := 'Region39', quote := false), STRUCT(columnId := 6, text := 'Rome', quote := false), STRUCT(columnId := 7, text := '30', quote := false), STRUCT(columnId := 8, text := '10', quote := false)]); INSERT INTO IGRAFX_AVRO (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '5', quote := false), STRUCT(columnId := 1, text := 'T', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:44', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:45', quote := false), STRUCT(columnId := 4, text := 'France', quote := false), STRUCT(columnId := 5, text := 'Region39', quote := false), STRUCT(columnId := 6, text := 'Paris', quote := false), STRUCT(columnId := 7, text := '40', quote := false), STRUCT(columnId := 8, text := '10', quote := false)]); INSERT INTO IGRAFX_AVRO (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '5', quote := false), STRUCT(columnId := 1, text := 'U', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:45', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:46', quote := false), STRUCT(columnId := 4, text := 'Spain', quote := false), STRUCT(columnId := 5, text := 'Region39', quote := false), STRUCT(columnId := 6, text := 'Madrid', quote := false), STRUCT(columnId := 7, text := '50', quote := false), STRUCT(columnId := 8, text := '10', quote := false)]); INSERT INTO IGRAFX_AVRO (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '5', quote := false), STRUCT(columnId := 1, text := 'V', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:46', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:47', quote := false), STRUCT(columnId := 4, text := 'Germany', quote := false), STRUCT(columnId := 5, text := 'Region39', quote := false), STRUCT(columnId := 6, text := 'Berlin', quote := false), STRUCT(columnId := 7, text := '10', quote := false), STRUCT(columnId := 8, text := '10', quote := false)]); INSERT INTO IGRAFX_AVRO (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '5', quote := false), STRUCT(columnId := 1, text := 'X', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:47', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:49', quote := false), STRUCT(columnId := 4, text := 'Germany', quote := false), STRUCT(columnId := 5, text := 'Region39', quote := false), STRUCT(columnId := 6, text := 'Berlin', quote := false), STRUCT(columnId := 7, text := '30', quote := false), STRUCT(columnId := 8, text := '10', quote := false)]); INSERT INTO IGRAFX_AVRO (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '5', quote := false), STRUCT(columnId := 1, text := 'Y', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:49', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:50', quote := false), STRUCT(columnId := 4, text := 'Spain', quote := false), STRUCT(columnId := 5, text := 'Region39', quote := false), STRUCT(columnId := 6, text := 'Madrid', quote := false), STRUCT(columnId := 7, text := '10', quote := false), STRUCT(columnId := 8, text := '10', quote := false)]); INSERT INTO IGRAFX_AVRO (dataArray) VALUES (ARRAY[STRUCT(columnId := 0, text := '5', quote := false), STRUCT(columnId := 1, text := 'Z', quote := true), STRUCT(columnId := 2, text := '10/10/10 08:50', quote := false), STRUCT(columnId := 3, text := '10/10/10 08:51', quote := false), STRUCT(columnId := 4, text := 'France', quote := false), STRUCT(columnId := 5, text := 'Region39', quote := false), STRUCT(columnId := 6, text := 'Paris', quote := false), STRUCT(columnId := 7, text := '50', quote := false), STRUCT(columnId := 8, text := '10', quote := false)]);

The connector will process this data and create three CSV files to send to the iGrafx Mining Platform. It will generate two files with 20 events each and, 30 seconds later, a final file containing 11 events. These files should then be added to the project on the platform.

You can check the Logging Events sent by the connector with the command :

SELECT * FROM LOGGING EMIT CHANGES;


Finally, you can drop the connector with the command :

DROP CONNECTOR IGrafxConnectorCMLogging; ````