Pipeline Example

  1. Create input and processor chain

[16]:
from logprep.factory import Factory
from logprep.ng.abc.processor import Processor
from logprep.ng.event.log_event import LogEvent
from logprep.ng.event.event_state import EventStateType
from logprep.ng.pipeline import Pipeline


input_connector = iter(
    [
        LogEvent({"message": "Log message 1"}, original=b"", state=EventStateType.RECEIVED),
        LogEvent({"message": "Log message 2"}, original=b"", state=EventStateType.RECEIVED),
        LogEvent({"user": {"name": "John Doe"}}, original=b"", state=EventStateType.RECEIVED),
    ]
)

def get_processors() -> list[Processor]:
    processors = [
        Factory.create(
            {
                "processor": {
                    "type": "ng_generic_adder",
                    "rules": [
                        {
                            "filter": "*",
                            "generic_adder": {"add": {"event.tags": "generic added tag"}},
                        }
                    ],
                }
            }
        ),
        Factory.create(
            {
                "pseudo_this": {
                    "type": "ng_pseudonymizer",
                    "pubkey_analyst": "../../../../../examples/exampledata/rules/pseudonymizer/example_analyst_pub.pem",
                    "pubkey_depseudo": "../../../../../examples/exampledata/rules/pseudonymizer/example_depseudo_pub.pem",
                    "regex_mapping": "../../../../../examples/exampledata/rules/pseudonymizer/regex_mapping.yml",
                    "hash_salt": "a_secret_tasty_ingredient",
                    "outputs": [{"opensearch": "pseudonyms"}],
                    "rules": [
                        {
                            "filter": "user.name",
                            "pseudonymizer": {
                                "id": "pseudonymizer-1a3c69b2-5d54-4b6b-ab07-c7ddbea7917c",
                                "mapping": {"user.name": "RE_WHOLE_FIELD"},
                            },
                        }
                    ],
                    "max_cached_pseudonyms": 1000000,
                }
            }
        ),
    ]
    for processor in processors:
        processor.setup()
    return processors

processors = get_processors()

  1. create the pipeline

[17]:

pipeline = Pipeline(input_connector, processors)
  1. run the pipeline

[18]:
for event in pipeline:
    print(f"Processed event: {event.data}")
    print(f"Event state: {event.state}")
    print(f"generated extra_data: {event.extra_data}")
    print("-" * 40)
Processed event: {'message': 'Log message 1', 'event': {'tags': 'generic added tag'}}
Event state: processed
generated extra_data: []
----------------------------------------
Processed event: {'message': 'Log message 2', 'event': {'tags': 'generic added tag'}}
Event state: processed
generated extra_data: []
----------------------------------------
Processed event: {'user': {'name': '<pseudonym:12f7a0505314df2259513546a7e3da518098b6999443ff681a56fb752afa998b>'}, 'event': {'tags': 'generic added tag'}}
Event state: processed
generated extra_data: [PseudonymEvent(data={'pseudonym': '12f7a0505314df2259513546a7e3da518098b6999443ff681a56fb752afa998b', 'origin': 'KeHunzkJWuA27pZ8jicAHPXGoRYc27Ko+uQlQQTS9KT1V84dnApS0tCI3vTRhMawSD+ZTS+HRqt1nIYiKX4B3pVAlghK3PlOzVwxO6Gkktg12GNp++aTW5b7+aIClmHa8IoiBr/Nhg5ld9ctkmndbkm149zohKQlox67rellfEY=:XzZybCLehuWhWJD+JXDOQw==:DU6nasNCIDsMsc86gQWjES7k2Zmv++2gnuaXmX9DzwOjE2B5PY4pTvPJe54hvKn2RqL2IPX1q0cAjr5zWzexNEAKjcrNCyCjQRTUWgTLUhwC/Jx7COQrxjpfMEPOfDwVgdQHHMV7VJ+ErGf80ETFU0GD3jupBA0GyH5OJNr45qB3lVgUfwpHzazhMBQ2IRx2FpVYyymANecfeFjz/inWmxcrr6AueoM7lj4wJhzMVizcHmHEDzqw7Smo4Gv6DV2YGG/7HqpZiCF+ky5A7ukAf3reC3YzzsdCb/y5DH5/NJzPJKcR3Dio3W8TYQw/VP0jd9AwJlKxidrSCh342nLh0Q==:piBPxAWte9b2zMnjppX3uA==:Ht1eEIEr+xg='}, state=processed)]
----------------------------------------