Kafka Logging Events
The connector has the possibility via its kafkaLoggingEvents.sendInformation, kafkaLoggingEvents.topic
properties to log file related events in a Kafka topic.
The AVRO schema expected for the Logging is the following :
{
"fields": [
{
"default": null,
"name": "EVENT_TYPE",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "IGRAFX_PROJECT",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "EVENT_DATE",
"type": [
"null",
"long"
]
},
{
"default": null,
"name": "EVENT_SEQUENCE_ID",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "PAYLOAD",
"type": [
"null",
"string"
]
}
],
"name": "IGrafxKafkaLoggingEventsSchema",
"namespace": "io.confluent.ksql.avro_schemas",
"type": "record"
}
An event is composed of :
- an
eventType(String) : currently there are pushFile and issuePushFile - a
igrafxProject(String : UUID) : corresponds to the iGrafx Project ID to which we want to send (the projectId connector's property) - an
eventDate(Long) : corresponds to the date of the event - an
eventSequenceId(String) : corresponds to the ID of the sequence of events related to a file - a
payload(String : JSON) : can contain any information related to a certain event type
To create a STREAM to manipulate those events in ksqlDB there are two possibilities :
- Create the following STREAM before sending any event to the Kafka Logging Events topic (creation of the topic with a correct schema) :
CREATE STREAM LOGGING_1 (
EVENT_TYPE VARCHAR,
IGRAFX_PROJECT VARCHAR,
EVENT_DATE BIGINT,
EVENT_SEQUENCE_ID VARCHAR,
PAYLOAD VARCHAR
) WITH (
KAFKA_TOPIC='event_logging_topic_example',
PARTITIONS=1,
REPLICAS=1,
VALUE_FORMAT='AVRO'
);
- Create the following STREAM after sending the first events to the Kafka Logging Events topic (the topic needs to exist with a correct schema) :
CREATE STREAM LOGGING_2 WITH (
KAFKA_TOPIC='journalisation_connecteur_test',
VALUE_FORMAT='AVRO'
);
For now, those 2 events are generated by the connector :
- pushFile : event generated when the sending of a file by the connector ended successfully
For this event, the information embedded in the payload is : the file name (filename: String), the event date (date: Long), and the number of lines in the file (lineNumber: Int). Here is an example of payload for this event :
{
"filename": "filename_example",
"date": 3446454564,
"lineNumber": 100
}
- issuePushFile : event generated when there was an issue during the creation/sending of a file
The information embedded in its payload is : the file name (filename: String), the event date (date: Long), and the exception type (exceptionType: String) corresponding to the name of the thrown exception. Here is an example of payload for this event :
{
"filename": "filename_example",
"date": 3446454564,
"exceptionType": "com.igrafx.kafka.sink.main.domain.exceptions.SendFileException"
}
Table summarizing the meaning of the different events fields :
| eventType | igrafxProject | eventDate | eventSequenceId | payload | |
|---|---|---|---|---|---|
| pushFile | pushFile | The ID of the iGrafx project | The date for which the file has been successfully sent | MD5 hash of a String containing the source topic/partition/offset of the data in the file | filename/date/lineNumber |
| issuePushFile | issuePushFile | The ID of the iGrafx project | The date of the issue | MD5 hash of a String containing the source topic/partition/offset of the data that should have been sent in the file | filename/date/exceptionType |
When the sending of the event to Kafka fails, there are two possibilities :
- if the event was an
issuePushFileevent, the exception stopping the Task is the one that occurred during the creation/sending of the file, prior to the event sending issue (but the event's exception is still logged) - if the event was a
pushFileevent, the exception stopping the Task is the event's exception