Source code for apf.core.topic_management
import datetime
class GenericTopicStrategy:
def get_topics(self):
pass
class Topic:
def __init__(self, name, date, name_format, date_format):
self.name = name
self.date = date
self.name_format = name_format
self.date_format = date_format
def __str__(self):
return self.name
def __repr__(self):
return self.name
[docs]class DailyTopicStrategy(GenericTopicStrategy):
"""Gives a KafkaConsumer a new topic every day. (For more information check the :class:`apf.consumers.KafkaConsumer`)
Parameters
----------
topic_format : str/list
Topic format with %s for the date field.
date_format : str
Date format code. Must be a valid 1989 C code.
i.e: %Y%m%d = YYYY-mm-dd
change_hour: int
UTC hour to change the topic.
"""
def __init__(
self,
topic_format="ztf_%s_programid1",
date_format="%Y%m%d",
change_hour=22,
retention_days=8,
):
super().__init__()
self.topic_formats = (
topic_format if type(topic_format) is list else [topic_format]
)
self.retention_days = retention_days
self.date_format = date_format
self.change_hour = change_hour
now = datetime.datetime.utcnow()
date = now.strftime(self.date_format)
self.topics = [
Topic(topic % date, now, topic, self.date_format)
for topic in self.topic_formats
]
def _remove_old_topics(self, now):
for topic in self.topics:
delta = now - topic.date
if abs(delta.days) >= self.retention_days:
self.topics = self.topics[-self.retention_days :]
[docs] def get_topics(self):
"""Get list of topics updated to the current date.
Returns
-------
:class:`list`
List of updated topics.
"""
now = datetime.datetime.utcnow()
self._remove_old_topics(now)
if now.hour >= self.change_hour:
now += datetime.timedelta(days=1)
date = now.strftime(self.date_format)
for topic_format in self.topic_formats:
topic = Topic(topic_format % date, now, topic_format, self.date_format)
if not [x for x in self.topics if x.name == topic.name]:
self.topics.append(topic)
return [topic.name for topic in self.topics]