{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Usage of ConfluentKafka Output Connector with Event Objects\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The following example demonstrates the delivery of events to the opensearch output connector" ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "vscode": { "languageId": "shellscript" } }, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ " Container kafka Stopping\n", " Container kafka Stopped\n", " Container kafka Removing\n", " Container kafka Removed\n", " Network compose_kafka Removing\n", " Network compose_kafka Removed\n", " Network compose_kafka Creating\n", " Network compose_kafka Created\n", " Container kafka Creating\n", " Container kafka Created\n", " Container kafka Starting\n", " Container kafka Started\n" ] } ], "source": [ "%%bash\n", "docker compose -f ../../../../../examples/compose/docker-compose.yml down -v \n", "docker compose -f ../../../../../examples/compose/docker-compose.yml up -d kafka\n" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "DEBUG:KafkaOutput:Produced message {'message': 'Event 0', '@timestamp': '2025-07-28 12:24:33.325385+00:00'} to topic consumer\n", "DEBUG:KafkaOutput:Produced message {'message': 'Event 1', '@timestamp': '2025-07-28 12:24:33.325411+00:00'} to topic consumer\n", "DEBUG:KafkaOutput:Produced message {'message': 'Event 2', '@timestamp': '2025-07-28 12:24:33.325420+00:00'} to topic consumer\n", "DEBUG:KafkaOutput:Produced message {'message': 'Event 3', '@timestamp': '2025-07-28 12:24:33.325426+00:00'} to topic consumer\n", "DEBUG:KafkaOutput:Produced message {'message': 'Event 4', '@timestamp': '2025-07-28 12:24:33.325432+00:00'} to topic consumer\n", "DEBUG:KafkaOutput:Produced message {'message': 'Event 5', '@timestamp': '2025-07-28 12:24:33.325438+00:00'} to topic consumer\n", "DEBUG:KafkaOutput:Produced message {'message': 'Event 6', '@timestamp': '2025-07-28 12:24:33.325444+00:00'} to topic consumer\n", "DEBUG:KafkaOutput:Produced message {'message': 'Event 7', '@timestamp': '2025-07-28 12:24:33.325450+00:00'} to topic consumer\n", "DEBUG:KafkaOutput:Produced message {'message': 'Event 8', '@timestamp': '2025-07-28 12:24:33.325456+00:00'} to topic consumer\n", "DEBUG:KafkaOutput:Produced message {'message': 'Event 9', '@timestamp': '2025-07-28 12:24:33.325462+00:00'} to topic consumer\n", "DEBUG:KafkaOutput:Message delivered to 'consumer' partition 0, offset 10\n", "DEBUG:KafkaOutput:Message delivered to 'consumer' partition 1, offset 0\n", "DEBUG:KafkaOutput:Message delivered to 'consumer' partition 1, offset 1\n", "DEBUG:KafkaOutput:Message delivered to 'consumer' partition 1, offset 2\n", "DEBUG:KafkaOutput:Message delivered to 'consumer' partition 2, offset 0\n", "DEBUG:KafkaOutput:Message delivered to 'consumer' partition 2, offset 1\n", "DEBUG:KafkaOutput:Message delivered to 'consumer' partition 2, offset 2\n", "DEBUG:KafkaOutput:Message delivered to 'consumer' partition 3, offset 0\n", "DEBUG:KafkaOutput:Message delivered to 'consumer' partition 3, offset 1\n", "DEBUG:KafkaOutput:Message delivered to 'consumer' partition 3, offset 2\n", "INFO:KafkaOutput:Producer flushed successfully. 0 messages remaining.\n" ] } ], "source": [ "from typing import Iterator\n", "from logprep.factory import Factory\n", "from logprep.util.time import TimeParser\n", "from logprep.ng.connector.confluent_kafka.output import ConfluentKafkaOutput\n", "from logprep.ng.event.log_event import LogEvent\n", "from logprep.ng.event.event_state import EventStateType\n", "import logging\n", "import sys\n", "\n", "# Configure logging\n", "logging.basicConfig(\n", " level=logging.DEBUG, \n", " stream=sys.stdout\n", ")\n", "\n", "\n", "config = {\n", " \"type\": \"ng_confluentkafka_output\",\n", " \"topic\": \"consumer\",\n", " \"flush_timeout\": 300,\n", " \"send_timeout\": 0,\n", " \"kafka_config\": {\n", " \"bootstrap.servers\": \"127.0.0.1:9092\"\n", " }\n", "}\n", "\n", "confluent_kafka_output: ConfluentKafkaOutput = Factory.create({\"my_kafka\": config})\n", "\n", "\n", "events: Iterator = [\n", " LogEvent({\"message\": f\"Event {i}\", \"@timestamp\": str(TimeParser.now())}, original=b\"\", state=EventStateType.PROCESSED)\n", " for i in range(10)\n", "]\n", "\n", "# store events in the Opensearch output\n", "for event in events:\n", " confluent_kafka_output.store(event)\n", "\n", "# event goes to state STORED_IN_OUTPUT first and then after callback from librdkafka it will be changed to DELIVERED\n", "# assert events[-1].state == EventStateType.STORED_IN_OUTPUT\n", "\n", "# Flush the output to ensure all events are sent\n", "confluent_kafka_output.shut_down()\n" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Events total: 10\n", "Events in delivered state: 10\n", "Events not delivered: []\n" ] } ], "source": [ "print(f\"Events total: {len(events)}\")\n", "print(f\"Events in delivered state: {len([e for e in events if e.state == EventStateType.DELIVERED])}\")\n", "\n", "print(f\"Events not delivered: {[event for event in events if event.state != EventStateType.DELIVERED]}\")\n", "\n", "\n", "# Verify that all events are delivered\n", "for event in events:\n", " assert event.state == EventStateType.DELIVERED, f\"Event {event.data['message']} not delivered | State: {event.state}\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The following case demonstrates error handling in the confluent_kafka output.\n", "We try to send to a non existing topic.\n", "This should provoke an error for that unknown topic or partition." ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "DEBUG:KafkaOutput:Produced message {'message': 'Event 0', '@timestamp': '2025-07-28 12:24:44.018526+00:00'} to topic non_existent_topic\n", "DEBUG:KafkaOutput:Produced message {'message': 'Event 1', '@timestamp': '2025-07-28 12:24:44.018555+00:00'} to topic non_existent_topic\n", "DEBUG:KafkaOutput:Produced message {'message': 'Event 2', '@timestamp': '2025-07-28 12:24:44.018564+00:00'} to topic non_existent_topic\n", "DEBUG:KafkaOutput:Produced message {'message': 'Event 3', '@timestamp': '2025-07-28 12:24:44.018571+00:00'} to topic non_existent_topic\n", "DEBUG:KafkaOutput:Produced message {'message': 'Event 4', '@timestamp': '2025-07-28 12:24:44.018577+00:00'} to topic non_existent_topic\n", "DEBUG:KafkaOutput:Produced message {'message': 'Event 5', '@timestamp': '2025-07-28 12:24:44.018584+00:00'} to topic non_existent_topic\n", "DEBUG:KafkaOutput:Produced message {'message': 'Event 6', '@timestamp': '2025-07-28 12:24:44.018590+00:00'} to topic non_existent_topic\n", "DEBUG:KafkaOutput:Produced message {'message': 'Event 7', '@timestamp': '2025-07-28 12:24:44.018596+00:00'} to topic non_existent_topic\n", "DEBUG:KafkaOutput:Produced message {'message': 'Event 8', '@timestamp': '2025-07-28 12:24:44.018602+00:00'} to topic non_existent_topic\n", "DEBUG:KafkaOutput:Produced message {'message': 'Event 9', '@timestamp': '2025-07-28 12:24:44.018608+00:00'} to topic non_existent_topic\n", "ERROR:KafkaOutput:Message delivery failed: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str=\"Broker: Unknown topic or partition\"}\n", "ERROR:KafkaOutput:Message delivery failed: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str=\"Broker: Unknown topic or partition\"}\n", "ERROR:KafkaOutput:Message delivery failed: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str=\"Broker: Unknown topic or partition\"}\n", "ERROR:KafkaOutput:Message delivery failed: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str=\"Broker: Unknown topic or partition\"}\n", "ERROR:KafkaOutput:Message delivery failed: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str=\"Broker: Unknown topic or partition\"}\n", "ERROR:KafkaOutput:Message delivery failed: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str=\"Broker: Unknown topic or partition\"}\n", "ERROR:KafkaOutput:Message delivery failed: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str=\"Broker: Unknown topic or partition\"}\n", "ERROR:KafkaOutput:Message delivery failed: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str=\"Broker: Unknown topic or partition\"}\n", "ERROR:KafkaOutput:Message delivery failed: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str=\"Broker: Unknown topic or partition\"}\n", "ERROR:KafkaOutput:Message delivery failed: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str=\"Broker: Unknown topic or partition\"}\n", "INFO:KafkaOutput:Producer flushed successfully. 0 messages remaining.\n", "Event Event 0 failed with error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str=\"Broker: Unknown topic or partition\"}\n", "Event Event 1 failed with error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str=\"Broker: Unknown topic or partition\"}\n", "Event Event 2 failed with error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str=\"Broker: Unknown topic or partition\"}\n", "Event Event 3 failed with error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str=\"Broker: Unknown topic or partition\"}\n", "Event Event 4 failed with error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str=\"Broker: Unknown topic or partition\"}\n", "Event Event 5 failed with error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str=\"Broker: Unknown topic or partition\"}\n", "Event Event 6 failed with error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str=\"Broker: Unknown topic or partition\"}\n", "Event Event 7 failed with error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str=\"Broker: Unknown topic or partition\"}\n", "Event Event 8 failed with error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str=\"Broker: Unknown topic or partition\"}\n", "Event Event 9 failed with error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str=\"Broker: Unknown topic or partition\"}\n" ] } ], "source": [ "events: Iterator = [\n", " LogEvent({\"message\": f\"Event {i}\", \"@timestamp\": str(TimeParser.now())}, original=b\"\", state=EventStateType.PROCESSED)\n", " for i in range(10)\n", "]\n", "\n", "# store events in the Confluent Kafka output\n", "for event in events:\n", " confluent_kafka_output.store_custom(event, \"non_existent_topic\")\n", "\n", "# Flush the output to ensure all events are sent\n", "confluent_kafka_output.flush()\n", "# Verify that all events are delivered\n", "for event in events:\n", " assert event.state == EventStateType.FAILED\n", " assert len(event.errors) == 1\n", " print (f\"Event {event.data['message']} failed with error: {event.errors[0]}\")" ] } ], "metadata": { "kernelspec": { "display_name": ".venv", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.12.3" }, "orig_nbformat": 4 }, "nbformat": 4, "nbformat_minor": 2 }