Source code for apf.metrics.generic

import abc
import logging
import datetime
import json


class DateTimeEncoder(json.JSONEncoder):
    # Override the default method
    def default(self, obj):
        if isinstance(obj, (datetime.date, datetime.datetime)):
            return obj.isoformat()


[docs]class GenericMetricsProducer(abc.ABC): def __init__(self, config): self.config = config self.logger = logging.getLogger(f"alerce.{self.__class__.__name__}") self.logger.info(f"Creating {self.__class__.__name__}")
[docs] @abc.abstractmethod def send_metrics(self, metrics): """Write metrics into a data store or other metrics system. Parameters ---------- metrics : dict Metrics to be written. """ pass