{ "cells": [ { "cell_type": "markdown", "id": "3ea6bd8d", "metadata": {}, "source": [ "## Pipeline Example\n", "\n", "1. Create input and processor chain" ] }, { "cell_type": "code", "execution_count": 16, "id": "c2f1dd6d", "metadata": {}, "outputs": [], "source": [ "from logprep.factory import Factory\n", "from logprep.ng.abc.processor import Processor\n", "from logprep.ng.event.log_event import LogEvent\n", "from logprep.ng.event.event_state import EventStateType\n", "from logprep.ng.pipeline import Pipeline\n", "\n", "\n", "input_connector = iter(\n", " [\n", " LogEvent({\"message\": \"Log message 1\"}, original=b\"\", state=EventStateType.RECEIVED),\n", " LogEvent({\"message\": \"Log message 2\"}, original=b\"\", state=EventStateType.RECEIVED),\n", " LogEvent({\"user\": {\"name\": \"John Doe\"}}, original=b\"\", state=EventStateType.RECEIVED),\n", " ]\n", ")\n", "\n", "def get_processors() -> list[Processor]:\n", " processors = [\n", " Factory.create(\n", " {\n", " \"processor\": {\n", " \"type\": \"ng_generic_adder\",\n", " \"rules\": [\n", " {\n", " \"filter\": \"*\",\n", " \"generic_adder\": {\"add\": {\"event.tags\": \"generic added tag\"}},\n", " }\n", " ],\n", " }\n", " }\n", " ),\n", " Factory.create(\n", " {\n", " \"pseudo_this\": {\n", " \"type\": \"ng_pseudonymizer\",\n", " \"pubkey_analyst\": \"../../../../../examples/exampledata/rules/pseudonymizer/example_analyst_pub.pem\",\n", " \"pubkey_depseudo\": \"../../../../../examples/exampledata/rules/pseudonymizer/example_depseudo_pub.pem\",\n", " \"regex_mapping\": \"../../../../../examples/exampledata/rules/pseudonymizer/regex_mapping.yml\",\n", " \"hash_salt\": \"a_secret_tasty_ingredient\",\n", " \"outputs\": [{\"opensearch\": \"pseudonyms\"}],\n", " \"rules\": [\n", " {\n", " \"filter\": \"user.name\",\n", " \"pseudonymizer\": {\n", " \"id\": \"pseudonymizer-1a3c69b2-5d54-4b6b-ab07-c7ddbea7917c\",\n", " \"mapping\": {\"user.name\": \"RE_WHOLE_FIELD\"},\n", " },\n", " }\n", " ],\n", " \"max_cached_pseudonyms\": 1000000,\n", " }\n", " }\n", " ),\n", " ]\n", " for processor in processors:\n", " processor.setup()\n", " return processors\n", "\n", "processors = get_processors()\n" ] }, { "cell_type": "markdown", "id": "ec064e0b", "metadata": {}, "source": [ "2. create the pipeline" ] }, { "cell_type": "code", "execution_count": 17, "id": "454c2e8d", "metadata": {}, "outputs": [], "source": [ "\n", "pipeline = Pipeline(input_connector, processors)" ] }, { "cell_type": "markdown", "id": "47780fd6", "metadata": {}, "source": [ "3. run the pipeline" ] }, { "cell_type": "code", "execution_count": 18, "id": "90dd7cfe", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Processed event: {'message': 'Log message 1', 'event': {'tags': 'generic added tag'}}\n", "Event state: processed\n", "generated extra_data: []\n", "----------------------------------------\n", "Processed event: {'message': 'Log message 2', 'event': {'tags': 'generic added tag'}}\n", "Event state: processed\n", "generated extra_data: []\n", "----------------------------------------\n", "Processed event: {'user': {'name': ''}, 'event': {'tags': 'generic added tag'}}\n", "Event state: processed\n", "generated extra_data: [PseudonymEvent(data={'pseudonym': '12f7a0505314df2259513546a7e3da518098b6999443ff681a56fb752afa998b', 'origin': 'KeHunzkJWuA27pZ8jicAHPXGoRYc27Ko+uQlQQTS9KT1V84dnApS0tCI3vTRhMawSD+ZTS+HRqt1nIYiKX4B3pVAlghK3PlOzVwxO6Gkktg12GNp++aTW5b7+aIClmHa8IoiBr/Nhg5ld9ctkmndbkm149zohKQlox67rellfEY=:XzZybCLehuWhWJD+JXDOQw==:DU6nasNCIDsMsc86gQWjES7k2Zmv++2gnuaXmX9DzwOjE2B5PY4pTvPJe54hvKn2RqL2IPX1q0cAjr5zWzexNEAKjcrNCyCjQRTUWgTLUhwC/Jx7COQrxjpfMEPOfDwVgdQHHMV7VJ+ErGf80ETFU0GD3jupBA0GyH5OJNr45qB3lVgUfwpHzazhMBQ2IRx2FpVYyymANecfeFjz/inWmxcrr6AueoM7lj4wJhzMVizcHmHEDzqw7Smo4Gv6DV2YGG/7HqpZiCF+ky5A7ukAf3reC3YzzsdCb/y5DH5/NJzPJKcR3Dio3W8TYQw/VP0jd9AwJlKxidrSCh342nLh0Q==:piBPxAWte9b2zMnjppX3uA==:Ht1eEIEr+xg='}, state=processed)]\n", "----------------------------------------\n" ] } ], "source": [ "for event in pipeline:\n", " print(f\"Processed event: {event.data}\")\n", " print(f\"Event state: {event.state}\")\n", " print(f\"generated extra_data: {event.extra_data}\")\n", " print(\"-\" * 40)" ] } ], "metadata": { "kernelspec": { "display_name": ".venv", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.12.3" } }, "nbformat": 4, "nbformat_minor": 5 }