Pipeline Example
Create input and processor chain
[16]:
from logprep.factory import Factory
from logprep.ng.abc.processor import Processor
from logprep.ng.event.log_event import LogEvent
from logprep.ng.event.event_state import EventStateType
from logprep.ng.pipeline import Pipeline
input_connector = iter(
[
LogEvent({"message": "Log message 1"}, original=b"", state=EventStateType.RECEIVED),
LogEvent({"message": "Log message 2"}, original=b"", state=EventStateType.RECEIVED),
LogEvent({"user": {"name": "John Doe"}}, original=b"", state=EventStateType.RECEIVED),
]
)
def get_processors() -> list[Processor]:
processors = [
Factory.create(
{
"processor": {
"type": "ng_generic_adder",
"rules": [
{
"filter": "*",
"generic_adder": {"add": {"event.tags": "generic added tag"}},
}
],
}
}
),
Factory.create(
{
"pseudo_this": {
"type": "ng_pseudonymizer",
"pubkey_analyst": "../../../../../examples/exampledata/rules/pseudonymizer/example_analyst_pub.pem",
"pubkey_depseudo": "../../../../../examples/exampledata/rules/pseudonymizer/example_depseudo_pub.pem",
"regex_mapping": "../../../../../examples/exampledata/rules/pseudonymizer/regex_mapping.yml",
"hash_salt": "a_secret_tasty_ingredient",
"outputs": [{"opensearch": "pseudonyms"}],
"rules": [
{
"filter": "user.name",
"pseudonymizer": {
"id": "pseudonymizer-1a3c69b2-5d54-4b6b-ab07-c7ddbea7917c",
"mapping": {"user.name": "RE_WHOLE_FIELD"},
},
}
],
"max_cached_pseudonyms": 1000000,
}
}
),
]
for processor in processors:
processor.setup()
return processors
processors = get_processors()
create the pipeline
[17]:
pipeline = Pipeline(input_connector, processors)
run the pipeline
[18]:
for event in pipeline:
print(f"Processed event: {event.data}")
print(f"Event state: {event.state}")
print(f"generated extra_data: {event.extra_data}")
print("-" * 40)
Processed event: {'message': 'Log message 1', 'event': {'tags': 'generic added tag'}}
Event state: processed
generated extra_data: []
----------------------------------------
Processed event: {'message': 'Log message 2', 'event': {'tags': 'generic added tag'}}
Event state: processed
generated extra_data: []
----------------------------------------
Processed event: {'user': {'name': '<pseudonym:12f7a0505314df2259513546a7e3da518098b6999443ff681a56fb752afa998b>'}, 'event': {'tags': 'generic added tag'}}
Event state: processed
generated extra_data: [PseudonymEvent(data={'pseudonym': '12f7a0505314df2259513546a7e3da518098b6999443ff681a56fb752afa998b', 'origin': 'KeHunzkJWuA27pZ8jicAHPXGoRYc27Ko+uQlQQTS9KT1V84dnApS0tCI3vTRhMawSD+ZTS+HRqt1nIYiKX4B3pVAlghK3PlOzVwxO6Gkktg12GNp++aTW5b7+aIClmHa8IoiBr/Nhg5ld9ctkmndbkm149zohKQlox67rellfEY=:XzZybCLehuWhWJD+JXDOQw==:DU6nasNCIDsMsc86gQWjES7k2Zmv++2gnuaXmX9DzwOjE2B5PY4pTvPJe54hvKn2RqL2IPX1q0cAjr5zWzexNEAKjcrNCyCjQRTUWgTLUhwC/Jx7COQrxjpfMEPOfDwVgdQHHMV7VJ+ErGf80ETFU0GD3jupBA0GyH5OJNr45qB3lVgUfwpHzazhMBQ2IRx2FpVYyymANecfeFjz/inWmxcrr6AueoM7lj4wJhzMVizcHmHEDzqw7Smo4Gv6DV2YGG/7HqpZiCF+ky5A7ukAf3reC3YzzsdCb/y5DH5/NJzPJKcR3Dio3W8TYQw/VP0jd9AwJlKxidrSCh342nLh0Q==:piBPxAWte9b2zMnjppX3uA==:Ht1eEIEr+xg='}, state=processed)]
----------------------------------------