Source code for apf.producers.generic
from abc import ABC, abstractmethod
import logging
from typing import Union
[docs]class GenericProducer(ABC):
"""Generic Producer for Alert Processing Framework."""
def __init__(self, config=None):
self.logger = logging.getLogger(f"alerce.{self.__class__.__name__}")
self.logger.info(f"Creating {self.__class__.__name__}")
self.config = config
self._key_field = None
@property
def key_field(self):
return self._key_field
[docs] def set_key_field(self, key):
"""Set key used when indexing produced messages."""
self._key_field = key
[docs] @abstractmethod
def produce(self, message=None, **kwargs):
"""Send a message after processing.
Parameters
----------
message : dict-like
Message to be sended.
"""
pass