Producers plugins¶
- class apf.producers.GenericProducer(config=None)[source]¶
Generic Producer for Alert Processing Framework.
- Attributes:
- key_field
Methods
produce
([message])Send a message after processing.
set_key_field
(key)Set key used when indexing produced messages.
- class apf.producers.KafkaProducer(config)[source]¶
Kafka Single Topic Producer.
- Parameters:
- PARAMS: dict
Parameters passed to
confluent_kafka.Producer
The required parameters are:
bootstrap.servers: comma separated <host:port>
string
to brokers.
- TOPIC: string
Kafka fixed output topic.
Example:
Depending on the step configuration the producer config can be passsed in different ways, the recommended one is passing it on the STEP_CONFIG variable.
#settings.py PRODUCER_CONFIG = { "PARAMS": { "bootstrap.servers": "kafka1:9092, kafka2:9092", }, "TOPIC": "test_topic" } STEP_CONFIG = { ... "PRODUCER_CONFIG": PRODUCER_CONFIG }
If multiple producers are required, the varible inside STEP_CONFIG can be changed to “PRODUCER1_CONFIG”, “PRODUCER2_CONFIG”, etc.
- TOPIC_STRATEGY: dict
Using a topic strategy instead of a fixed topic. Similar to the consumers topic strategy, the required parameters are:
CLASS: apf.core.topic_management.GenericTopicStrategy class to be used.
PARAMS: Parameters passed to CLASS object.
Example:
Produce to a topic that updates on 23 hours UTC every day.
#settings.py PRODUCER_CONFIG = { ... "TOPIC_STRATEGY": { "CLASS": "apf.core.topic_management.DailyTopicStrategy", "PARAMS": { "topic_format": "test_%s", "date_format": "%Y%m%d", "change_hour": 23 } } } STEP_CONFIG = { ... "PRODUCER_CONFIG": PRODUCER_CONFIG }
- SCHEMA: dict
AVRO Output Schema (AVRO Schema Definition)
Example:
#settings.py PRODUCER_CONFIG = { ... "SCHEMA": { "namespace": "example.avro", "type": "record", "name": "User", "fields": [ {"name": "name", "type": "string"}, {"name": "favorite_number", "type": ["int", "null"]}, {"name": "favorite_color", "type": ["string", "null"]} ] } }
- Attributes:
- key_field
Methods
produce
([message])Produce Message to a topic.
set_key_field
(key)Set key used when indexing produced messages.
- class apf.producers.CSVProducer(config)[source]¶
CSV File Producer.
Warning
CSVProducer only works for a single process step, running it distributed or with multiprocessing can result on issues.
- Parameters:
- FILE_PATH: :class:`str`
Output CSV File Path.
- Attributes:
- key_field
Methods
produce
([message])Produce Message to a CSV File.
set_key_field
(key)Set key used when indexing produced messages.
- class apf.producers.JSONProducer(config)[source]¶
JSON Producer
This producer creates multiple output files (json) according to the buffer size, where each file contains buffer_size elements.
Every file is created in the FILE_PATH directory and each output file is named producer_output{i} where i is a counter for the times the buffer has completed.
- Parameters:
- FILE_PATH: :py:class:`str`
Output JSON File Directory.
- Attributes:
- key_field
Methods
produce
([message])Produce Message to a JSON File.
set_key_field
(key)Set key used when indexing produced messages.