Kafka Logging Events

The connector has the possibility via its kafkaLoggingEvents.sendInformation, kafkaLoggingEvents.topic properties to log file related events in a Kafka topic.

The AVRO schema expected for the Logging is the following :

{
  "fields": [
    {
      "default": null,
      "name": "EVENT_TYPE",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "IGRAFX_PROJECT",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "EVENT_DATE",
      "type": [
        "null",
        "long"
      ]
    },
    {
      "default": null,
      "name": "EVENT_SEQUENCE_ID",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "PAYLOAD",
      "type": [
        "null",
        "string"
      ]
    }
  ],
  "name": "IGrafxKafkaLoggingEventsSchema",
  "namespace": "io.confluent.ksql.avro_schemas",
  "type": "record"
}

An event is composed of :

  • an eventType (String) : currently there are pushFile and issuePushFile
  • a igrafxProject (String : UUID) : corresponds to the iGrafx Project ID to which we want to send (the projectId connector's property)
  • an eventDate (Long) : corresponds to the date of the event
  • an eventSequenceId (String) : corresponds to the ID of the sequence of events related to a file
  • a payload (String : JSON) : can contain any information related to a certain event type

To create a STREAM to manipulate those events in ksqlDB there are two possibilities :

  • Create the following STREAM before sending any event to the Kafka Logging Events topic (creation of the topic with a correct schema) :
CREATE STREAM LOGGING_1 (
    EVENT_TYPE VARCHAR,
    IGRAFX_PROJECT VARCHAR,
    EVENT_DATE BIGINT,
    EVENT_SEQUENCE_ID VARCHAR,
    PAYLOAD VARCHAR
) WITH (
    KAFKA_TOPIC='event_logging_topic_example', 
    PARTITIONS=1, 
    REPLICAS=1, 
    VALUE_FORMAT='AVRO'
);
  • Create the following STREAM after sending the first events to the Kafka Logging Events topic (the topic needs to exist with a correct schema) :
CREATE STREAM LOGGING_2 WITH (
    KAFKA_TOPIC='journalisation_connecteur_test',
    VALUE_FORMAT='AVRO'
);

For now, those 2 events are generated by the connector :

  • pushFile : event generated when the sending of a file by the connector ended successfully

For this event, the information embedded in the payload is : the file name (filename: String), the event date (date: Long), and the number of lines in the file (lineNumber: Int). Here is an example of payload for this event :

{
    "filename": "filename_example",
    "date": 3446454564,
    "lineNumber": 100
}
  • issuePushFile : event generated when there was an issue during the creation/sending of a file

The information embedded in its payload is : the file name (filename: String), the event date (date: Long), and the exception type (exceptionType: String) corresponding to the name of the thrown exception. Here is an example of payload for this event :

{
    "filename": "filename_example",
    "date": 3446454564,
    "exceptionType": "com.igrafx.kafka.sink.main.domain.exceptions.SendFileException"
}

Table summarizing the meaning of the different events fields :

eventType igrafxProject eventDate eventSequenceId payload
pushFile pushFile The ID of the iGrafx project The date for which the file has been successfully sent MD5 hash of a String containing the source topic/partition/offset of the data in the file filename/date/lineNumber
issuePushFile issuePushFile The ID of the iGrafx project The date of the issue MD5 hash of a String containing the source topic/partition/offset of the data that should have been sent in the file filename/date/exceptionType

When the sending of the event to Kafka fails, there are two possibilities :

  • if the event was an issuePushFile event, the exception stopping the Task is the one that occurred during the creation/sending of the file, prior to the event sending issue (but the event's exception is still logged)
  • if the event was a pushFile event, the exception stopping the Task is the event's exception