Source code for apf.consumers.csv
from apf.consumers.generic import GenericConsumer
import pandas as pd
[docs]class CSVConsumer(GenericConsumer):
"""CSV Consumer.
**Example:**
CSV Consumer configuration example
.. code-block:: python
#settings.py
CONSUMER_CONFIG = { ...
"FILE_PATH": "csv_file_path",
"OTHER_ARGS": {
"index_col": "id",
"sep": ";",
"header": 0
}
}
Parameters
----------
FILE_PATH: path
CSV path location
OTHER_ARGS: dict
Parameters passed to :func:`pandas.read_csv`
(reference `pandas documentation <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html>`_)
"""
def __init__(self, config):
super().__init__(config)
path = self.config.get("FILE_PATH", None)
if path is None:
raise Exception("FILE_PATH variable not set")
def consume(self):
"""Get a message from a csv file
Yields
------
dict
Dictionary like message of an alert.
"""
df = pd.read_csv(self.config["FILE_PATH"], **self.config.get("OTHER_ARGS", {}))
self.len = len(df)
for index, row in df.iterrows():
yield row.to_dict()