Source code for apf.producers.kafka

from apf.producers.generic import GenericProducer
from confluent_kafka import Producer
import fastavro
import io
import importlib


[docs]class KafkaProducer(GenericProducer): """Kafka Single Topic Producer. Parameters ---------- PARAMS: dict Parameters passed to :class:`confluent_kafka.Producer` The required parameters are: - *bootstrap.servers*: comma separated <host:port> :class:`string` to brokers. TOPIC: string Kafka fixed output topic. *Example:* Depending on the step configuration the producer config can be passsed in different ways, the recommended one is passing it on the `STEP_CONFIG` variable. .. code-block:: python #settings.py PRODUCER_CONFIG = { "PARAMS": { "bootstrap.servers": "kafka1:9092, kafka2:9092", }, "TOPIC": "test_topic" } STEP_CONFIG = { ... "PRODUCER_CONFIG": PRODUCER_CONFIG } If multiple producers are required, the varible inside `STEP_CONFIG` can be changed to "PRODUCER1_CONFIG", "PRODUCER2_CONFIG", etc. TOPIC_STRATEGY: dict Using a topic strategy instead of a fixed topic. Similar to the consumers topic strategy, the required parameters are: - *CLASS*: `apf.core.topic_management.GenericTopicStrategy` class to be used. - *PARAMS*: Parameters passed to *CLASS* object. **Example:** Produce to a topic that updates on 23 hours UTC every day. .. code-block:: python #settings.py PRODUCER_CONFIG = { ... "TOPIC_STRATEGY": { "CLASS": "apf.core.topic_management.DailyTopicStrategy", "PARAMS": { "topic_format": "test_%s", "date_format": "%Y%m%d", "change_hour": 23 } } } STEP_CONFIG = { ... "PRODUCER_CONFIG": PRODUCER_CONFIG } SCHEMA: dict AVRO Output Schema `(AVRO Schema Definition) <https://avro.apache.org/docs/current/gettingstartedpython.html#Defining+a+schema>`_ **Example:** .. code-block:: python #settings.py PRODUCER_CONFIG = { ... "SCHEMA": { "namespace": "example.avro", "type": "record", "name": "User", "fields": [ {"name": "name", "type": "string"}, {"name": "favorite_number", "type": ["int", "null"]}, {"name": "favorite_color", "type": ["string", "null"]} ] } } """ def __init__(self, config): super().__init__(config=config) self.producer = Producer(self.config["PARAMS"]) self.schema = self.config["SCHEMA"] self.schema = fastavro.parse_schema(self.schema) self.dynamic_topic = False if self.config.get("TOPIC"): self.logger.info(f'Producing to {self.config["TOPIC"]}') self.topic = ( self.config["TOPIC"] if type(self.config["TOPIC"]) is list else [self.config["TOPIC"]] ) elif self.config.get("TOPIC_STRATEGY"): self.dynamic_topic = True module_name, class_name = self.config["TOPIC_STRATEGY"]["CLASS"].rsplit( ".", 1 ) TopicStrategy = getattr(importlib.import_module(module_name), class_name) self.topic_strategy = TopicStrategy( **self.config["TOPIC_STRATEGY"]["PARAMS"] ) self.topic = self.topic_strategy.get_topics() self.logger.info(f'Using {self.config["TOPIC_STRATEGY"]}') self.logger.info(f"Producing to {self.topic}") def _serialize_message(self, message): out = io.BytesIO() fastavro.writer(out, self.schema, [message]) return out.getvalue() def produce(self, message=None, **kwargs): """Produce Message to a topic. Parameters ---------- message: dict | None The value of the message to be produced. Should match the schema defined in the config["SCHEMA"] **kwargs: dict Any other keyword argument will be passed as **kwargs to the produce method of the producer. Do not specify the key argument, as it will be duplicated. NOTE: To set a key for the message, use the set_key_field(key_field) method prior to consuming The producer will have a key_field attribute that will be either None, or some specified value, and will use that for each produce() call. """ key = None if message: key = message[self.key_field] if self.key_field else None message = self._serialize_message(message) if self.dynamic_topic: self.topic = self.topic_strategy.get_topics() for topic in self.topic: try: self.producer.produce(topic, value=message, key=key, **kwargs) self.producer.poll(0) except BufferError as e: self.logger.info(f"Error producing message: {e}") self.logger.info("Calling poll to empty queue and producing again") self.producer.flush() self.producer.produce(topic, value=message, key=key, **kwargs) def __del__(self): self.logger.info("Waiting to produce last messages") self.producer.flush()
class KafkaSchemalessProducer(KafkaProducer): def _serialize_message(self, message): out = io.BytesIO() fastavro.schemaless_writer(out, self.schema, message, strict=True) return out.getvalue()