Source code for apf.producers.json_prod
from apf.producers.generic import GenericProducer
import json
from pandas import DataFrame, read_json, concat
import pathlib
[docs]class JSONProducer(GenericProducer):
"""JSON Producer
This producer creates multiple output files (json)
according to the buffer size, where each file contains
`buffer_size` elements.
Every file is created in the `FILE_PATH` directory
and each output file is named producer_output{i} where `i` is
a counter for the times the buffer has completed.
Parameters
----------
FILE_PATH: :py:class:`str`
Output JSON File Directory.
"""
def __init__(self, config):
super().__init__(config=config)
self.buffer = DataFrame()
self.buffer_size = config.get("buffer_size", 1)
self.file_counter = 0
def produce(self, message=None, **kwargs):
"""Produce Message to a JSON File."""
if "FILE_PATH" in self.config and self.config["FILE_PATH"]:
serialized_message = read_json(
json.dumps([message]), orient="records", typ="frame"
)
self.buffer = concat([self.buffer, serialized_message])
output_file = (
pathlib.Path(self.config["FILE_PATH"])
/ f"producer_output{self.file_counter}.json"
)
if len(self.buffer) == self.buffer_size:
self.logger.info(f"Buffer: {self.buffer}")
self.buffer.to_json(
output_file,
orient="records",
)
self.file_counter += 1
self.buffer = []