Source code for apf.consumers.avro_file
from apf.consumers.generic import GenericConsumer
import fastavro
import os
import glob
[docs]class AVROFileConsumer(GenericConsumer):
"""Consume from a AVRO Files Directory.
**Example:**
.. code-block:: python
#settings.py
CONSUMER_CONFIG = { ...
"DIRECTORY_PATH": "path/to/avro/directory"
}
Parameters
----------
DIRECTORY_PATH: path
AVRO files Directory path location
"""
def __init__(self, config):
super().__init__(config)
def consume(self):
files = glob.glob(os.path.join(self.config["DIRECTORY_PATH"], "*.avro"))
files.sort()
if "consume.messages" in self.config:
num_messages = self.config["consume.messages"]
elif "NUM_MESSAGES" in self.config:
num_messages = self.config["NUM_MESSAGES"]
else:
num_messages = 1
msgs = []
left = len(files)
for file in files:
self.logger.debug(f"Reading File: {file}")
left -= 1
with open(file, "rb") as f:
avro_reader = fastavro.reader(f)
data = None
for read in avro_reader:
data = read
break
if num_messages == 1:
yield data
else:
msgs.append(data)
if len(msgs) == num_messages or left + len(msgs) < num_messages:
return_msgs = msgs.copy()
msgs = []
yield return_msgs
class AVROInfiniteConsumer(GenericConsumer):
"""Consume from a Infinite AVRO Files Directory.
**Example:**
.. code-block:: python
#settings.py
CONSUMER_CONFIG = { ...
"DIRECTORY_PATH": "path/to/avro/directory"
}
Parameters
----------
DIRECTORY_PATH: path
AVRO files Directory path location
"""
def __init__(self, config):
super().__init__(config)
def consume(self):
files = glob.glob(os.path.join(self.config["DIRECTORY_PATH"], "*.avro"))
files.sort()
if "consume.messages" in self.config:
num_messages = self.config["consume.messages"]
elif "NUM_MESSAGES" in self.config:
num_messages = self.config["NUM_MESSAGES"]
else:
num_messages = 1
msgs = []
index = 0
while True:
file = files[index % len(files)]
self.logger.debug(f"Reading File: {file}")
with open(file, "rb") as f:
avro_reader = fastavro.reader(f)
for data in avro_reader:
if num_messages == 1:
yield data
else:
msgs.append(data)
if len(msgs) == num_messages:
return_msgs = msgs.copy()
msgs = []
yield return_msgs