Consumer plugins¶
- class apf.consumers.GenericConsumer(config=None)[source]¶
Generic Consumer for Alert Processing Framework.
Parameters are passed through config as a
dict
of params.Methods
commit
()Post consume processing.
consume
()Get a message from a data source
- commit()[source]¶
Post consume processing. Can be a postgresql, kafka, commit or a custom function to run after an alert is processed.
The commited value has to be stored as a class attribute in consume to be accessed. i.e.
def consume(self): self.message = get_message() def commit(self): commit_logic(self.message)
- class apf.consumers.KafkaConsumer(config)[source]¶
Consume from a Kafka Topic.
By default
KafkaConsumer
uses a manual commit strategy to avoid data loss on errors.This strategy can be disabled completly adding “COMMIT”:False to the STEP_CONFIG variable in the step’s settings.py file, this can be useful for step testing because Kafka doesn’t save the messages that already were processed.
Example:
#settings.py STEP_CONFIG = { ... "COMMIT": False #Disable commit #useful for testing/debugging. }
- Parameters:
- TOPICS: list
List of topics to consume.
Example:
Subscribe to a fixed list of topics:
#settings.py CONSUMER_CONFIG = { ... "TOPICS": ["topic1", "topic2"] }
Using confluent_kafka syntax we can subscribe to a pattern
#settings.py CONSUMER_CONFIG = { ... "TOPICS": ["^topic*"] }
More on pattern subscribe here
- TOPIC_STRATEGY: dict
Parameters to configure a topic strategy instead of a fixed topic list.
The required parameters are:
CLASS: apf.core.topic_management.GenericTopicStrategy class to be used.
PARAMS: Parameters passed to CLASS object.
Example:
A topic strategy that updates on 23 hours UTC every day.
#settings.py CONSUMER_CONFIG = { ... "TOPIC_STRATEGY": { "CLASS": "apf.core.topic_management"+\ "DailyTopicStrategy", "PARAMS": { "topic_format": [ "ztf_%s_programid1", "ztf_%s_programid3" ], "date_format": "%Y%m%d", "change_hour": 23, "retention_days": 8, } } }
- PARAMS: dict
Parameters passed to
confluent_kafka.Consumer
The required parameters are:
bootstrap.servers: comma separated <host:port>
str
to brokers.group.id:
str
with consumer group name.
Example:
Configure a Kafka Consumer to a secure Kafka Cluster
#settings.py CONSUMER_CONFIG = { ... "PARAMS": { "bootstrap.servers": "kafka1:9093,kafka2:9093", "group.id": "step_group", 'security.protocol': 'SSL', 'ssl.ca.location': '<ca-cert path>', 'ssl.keystore.location': '<keystore path>', 'ssl.keystore.password': '<keystore password>' } }
all supported confluent_kafka parameters can be found on the official library documentation
Methods
consume
([num_messages, timeout])Consumes num_messages messages from the specified topic.
- consume(num_messages=1, timeout=60)[source]¶
Consumes num_messages messages from the specified topic.
Will return a dictionary or a list, depending on the number of messages consumed.
If num_messages > 1 then it returns list.
If num_messages = 1 then it returns dict.
- Parameters:
- num_messages: int
Number of messages to be consumed
- timeout: int
Seconds to wait when consuming messages. Raises exception if doesn’t get the messages after specified time
- class apf.consumers.CSVConsumer(config)[source]¶
CSV Consumer.
Example:
CSV Consumer configuration example
#settings.py CONSUMER_CONFIG = { ... "FILE_PATH": "csv_file_path", "OTHER_ARGS": { "index_col": "id", "sep": ";", "header": 0 } }
- Parameters:
- FILE_PATH: path
CSV path location
- OTHER_ARGS: dict
Parameters passed to
pandas.read_csv()
(reference pandas documentation)
Methods
commit
()Post consume processing.
consume
()Get a message from a csv file
- class apf.consumers.JSONConsumer(config)[source]¶
JSON Consumer.
Example:
JSON Consumer configuration example
#settings.py CONSUMER_CONFIG = { ... "FILEPATH": "json_file_path" }
- Parameters:
- FILE_PATH: path
JSON path location
Methods
commit
()Post consume processing.
consume
()
- class apf.consumers.AVROFileConsumer(config)[source]¶
Consume from a AVRO Files Directory.
Example:
#settings.py CONSUMER_CONFIG = { ... "DIRECTORY_PATH": "path/to/avro/directory" }
- Parameters:
- DIRECTORY_PATH: path
AVRO files Directory path location
Methods
commit
()Post consume processing.
consume
()