Core components¶
- class apf.core.step.GenericStep(consumer: ~typing.Type[~apf.consumers.generic.GenericConsumer] = <class 'apf.core.step.DefaultConsumer'>, producer: ~typing.Type[~apf.producers.generic.GenericProducer] = <class 'apf.core.step.DefaultProducer'>, metrics_sender: ~typing.Type[~apf.metrics.generic.GenericMetricsProducer] = <class 'apf.core.step.DefaultMetricsProducer'>, config: dict = {}, prometheus_metrics: ~apf.metrics.prometheus.prometheus.PrometheusMetrics = <apf.metrics.prometheus.prometheus.DefaultPrometheusMetrics object>)[source]¶
Generic Step for apf.
- Parameters:
- consumer
GenericConsumer
An object of type GenericConsumer.
- levellogging.level
Logging level, has to be a logging.LEVEL constant.
Adding LOGGING_DEBUG to settings.py set the step’s global logging level to debug.
#settings.py LOGGING_DEBUG = True
- **step_argsdict
Additional parameters for the step.
- consumer
- Attributes:
- consumer_config
- metrics_config
- metrics_producer_params
- producer_config
Methods
execute
(messages)Execute the logic of the step.
get_extra_metrics
(message)Generate extra metrics from the EXTRA_METRICS metrics configuration.
get_value
(message, params)Get values from a massage and process it to create a new metric.
post_execute
(result)Override this method to perform additional operations on the processed data coming from
apf.core.step.GenericStep.execute()
method.Override this method to perform operations after data has been produced by the producer.
Override this method to perform operations before the first message arrives.
pre_execute
(messages)Override this method to perform operations on each batch of messages consumed.
pre_produce
(result)Override this method to perform additional operations on the processed data coming from
apf.core.step.GenericStep.post_execute()
method.produce
(result)Produces messages using the configured producer class.
send_metrics
(**metrics)Send Metrics with a metrics producer.
set_producer_key_field
(key_field)Set the producer key, used in producer.produce(message, key=message[key_field])
Override this method to perform operations after the consumer has stopped consuming data.
start
- abstract execute(messages: List[dict]) Iterable[Dict[str, Any]] | Dict[str, Any] [source]¶
Execute the logic of the step. This method has to be implemented by the instanced class.
- Parameters:
- messagedict, list
Dict-like message to be processed or list of dict-like messages
- get_extra_metrics(message)[source]¶
Generate extra metrics from the EXTRA_METRICS metrics configuration.
- Parameters:
- messagedict, list
Dict-like message to be processed or list of dict-like messages
- Returns:
- dict
Dictionary with extra metrics from the messages.
- get_value(message, params)[source]¶
Get values from a massage and process it to create a new metric.
- Parameters:
- messagedict
Dict-like message to be processed
- paramsstr, dict
String of the value key or dict with the following:
- ‘key’: str
Must have parameter, has to be in the message.
- ‘alias’: str
New key returned, this can be used to standarize some message keys.
- ‘format’: callable
Function to be call on the message value.
- Returns:
- new_key, value
Aliased key and processed value.
- post_execute(result: Iterable[Dict[str, Any]] | Dict[str, Any])[source]¶
Override this method to perform additional operations on the processed data coming from
apf.core.step.GenericStep.execute()
method.Typically used to do post processing, parsing, output formatting, etc.
- post_produce()[source]¶
Override this method to perform operations after data has been produced by the producer.
You can use this lifecycle method to perform cleanup, send additional metrics, notifications, etc.
- pre_execute(messages: List[dict])[source]¶
Override this method to perform operations on each batch of messages consumed.
Typically this method is used for pre processing operations such as parsing, formatting and overall preparation for the execute method that handles all the complex logic applied to the messages.
- pre_produce(result: Iterable[Dict[str, Any]] | Dict[str, Any])[source]¶
Override this method to perform additional operations on the processed data coming from
apf.core.step.GenericStep.post_execute()
method.Typically used to format data output as described in the step producer’s Schema
- produce(result: Iterable[Dict[str, Any]] | Dict[str, Any])[source]¶
Produces messages using the configured producer class.
- Parameters:
- result: dict | list[dict]
The result of the step’s execution. This parameter can be an iterable or a single message, where the message should be a dictionary that matches the output schema of the step.
- NOTE: If you want to produce with a key, use the set_producer_key_field(key_field)
- method somewhere in the lifecycle of the step prior to the produce state.
- send_metrics(**metrics)[source]¶
Send Metrics with a metrics producer.
For this method to work the METRICS_CONFIG variable has to be set in the STEP_CONFIG variable.
Example:
Send the compute time for an object.
#example_step/step.py self.send_metrics(compute_time=compute_time, oid=oid)
For this to work we need to declare
#settings.py STEP_CONFIG = {... "METRICS_CONFIG":{ #Can be a empty dictionary "CLASS": "apf.metrics.KafkaMetricsProducer", "PARAMS": { # params for the apf.metrics.KafkaMetricsProducer "PARAMS":{ ## this producer uses confluent_kafka.Producer, so here we provide ## arguments for that class, like bootstrap.servers bootstrap.servers": "kafka1:9092", }, "TOPIC": "metrics_topic" # the topic to store the metrics }, } }
- Parameters:
- **metricsdict-like
Parameters sent to the kafka topic as message.
- set_producer_key_field(key_field: str)[source]¶
Set the producer key, used in producer.produce(message, key=message[key_field])
- Parameters:
- key_fieldstr
the key of the message which value will be used as key for the producer
- tear_down()[source]¶
Override this method to perform operations after the consumer has stopped consuming data.
This method is called only once after processing messages and right before the start method ends.
You can use this lifecycle method to perform cleanup, send additional metrics, notifications, etc.
Topic Management¶
- class apf.core.topic_management.DailyTopicStrategy(topic_format='ztf_%s_programid1', date_format='%Y%m%d', change_hour=22, retention_days=8)[source]¶
Gives a KafkaConsumer a new topic every day. (For more information check the
apf.consumers.KafkaConsumer
)- Parameters:
- topic_formatstr/list
Topic format with %s for the date field.
- date_formatstr
- Date format code. Must be a valid 1989 C code.
i.e: %Y%m%d = YYYY-mm-dd
- change_hour: int
UTC hour to change the topic.
Methods
Get list of topics updated to the current date.