Producers plugins

class apf.producers.GenericProducer(config=None)[source]

Generic Producer for Alert Processing Framework.

Attributes:
key_field

Methods

produce([message])

Send a message after processing.

set_key_field(key)

Set key used when indexing produced messages.

abstract produce(message=None, **kwargs)[source]

Send a message after processing.

Parameters:
messagedict-like

Message to be sended.

set_key_field(key)[source]

Set key used when indexing produced messages.

class apf.producers.KafkaProducer(config)[source]

Kafka Single Topic Producer.

Parameters:
PARAMS: dict

Parameters passed to confluent_kafka.Producer

The required parameters are:

  • bootstrap.servers: comma separated <host:port> string to brokers.

TOPIC: string

Kafka fixed output topic.

Example:

Depending on the step configuration the producer config can be passsed in different ways, the recommended one is passing it on the STEP_CONFIG variable.

#settings.py
PRODUCER_CONFIG = {
    "PARAMS": {
        "bootstrap.servers": "kafka1:9092, kafka2:9092",
    },
    "TOPIC": "test_topic"
}

STEP_CONFIG = { ...
    "PRODUCER_CONFIG": PRODUCER_CONFIG
}

If multiple producers are required, the varible inside STEP_CONFIG can be changed to “PRODUCER1_CONFIG”, “PRODUCER2_CONFIG”, etc.

TOPIC_STRATEGY: dict

Using a topic strategy instead of a fixed topic. Similar to the consumers topic strategy, the required parameters are:

  • CLASS: apf.core.topic_management.GenericTopicStrategy class to be used.

  • PARAMS: Parameters passed to CLASS object.

Example:

Produce to a topic that updates on 23 hours UTC every day.

#settings.py
PRODUCER_CONFIG = { ...
    "TOPIC_STRATEGY": {
        "CLASS": "apf.core.topic_management.DailyTopicStrategy",
        "PARAMS": {
            "topic_format": "test_%s",
            "date_format": "%Y%m%d",
            "change_hour": 23
        }
    }
}

STEP_CONFIG = { ...
    "PRODUCER_CONFIG": PRODUCER_CONFIG
}
SCHEMA: dict

AVRO Output Schema (AVRO Schema Definition)

Example:

#settings.py
PRODUCER_CONFIG = { ...
    "SCHEMA": {
        "namespace": "example.avro",
        "type": "record",
        "name": "User",
        "fields": [
            {"name": "name", "type": "string"},
            {"name": "favorite_number",  "type": ["int", "null"]},
            {"name": "favorite_color", "type": ["string", "null"]}
        ]
    }
}
Attributes:
key_field

Methods

produce([message])

Produce Message to a topic.

set_key_field(key)

Set key used when indexing produced messages.

class apf.producers.CSVProducer(config)[source]

CSV File Producer.

Warning

CSVProducer only works for a single process step, running it distributed or with multiprocessing can result on issues.

Parameters:
FILE_PATH: :class:`str`

Output CSV File Path.

Attributes:
key_field

Methods

produce([message])

Produce Message to a CSV File.

set_key_field(key)

Set key used when indexing produced messages.

class apf.producers.JSONProducer(config)[source]

JSON Producer

This producer creates multiple output files (json) according to the buffer size, where each file contains buffer_size elements.

Every file is created in the FILE_PATH directory and each output file is named producer_output{i} where i is a counter for the times the buffer has completed.

Parameters:
FILE_PATH: :py:class:`str`

Output JSON File Directory.

Attributes:
key_field

Methods

produce([message])

Produce Message to a JSON File.

set_key_field(key)

Set key used when indexing produced messages.