Source code for apf.consumers.generic
from abc import ABC, abstractmethod
import logging
from typing import Generator, Union
[docs]class GenericConsumer(ABC):
"""Generic Consumer for Alert Processing Framework.
Parameters are passed through *config* as a :py:class:`dict` of params.
"""
def __init__(self, config=None):
self.logger = logging.getLogger(f"alerce.{self.__class__.__name__}")
self.logger.info(f"Creating {self.__class__.__name__}")
self.config = config
[docs] @abstractmethod
def consume(self) -> Generator[Union[list, dict], None, None]:
"""Get a message from a data source
Yields
------
dict
Dictionary like message of an alert.
"""
yield {}
[docs] def commit(self):
"""Post consume processing.
Can be a postgresql, kafka, commit or a custom function to run after an alert is processed.
The commited value has to be stored as a class attribute in consume to be accessed. i.e.
.. code-block:: python
def consume(self):
self.message = get_message()
def commit(self):
commit_logic(self.message)
"""
pass