Core components

class apf.core.step.GenericStep(consumer: ~typing.Type[~apf.consumers.generic.GenericConsumer] = <class 'apf.core.step.DefaultConsumer'>, producer: ~typing.Type[~apf.producers.generic.GenericProducer] = <class 'apf.core.step.DefaultProducer'>, metrics_sender: ~typing.Type[~apf.metrics.generic.GenericMetricsProducer] = <class 'apf.core.step.DefaultMetricsProducer'>, config: dict = {}, prometheus_metrics: ~apf.metrics.prometheus.prometheus.PrometheusMetrics = <apf.metrics.prometheus.prometheus.DefaultPrometheusMetrics object>)[source]

Generic Step for apf.

Parameters:
consumerGenericConsumer

An object of type GenericConsumer.

levellogging.level

Logging level, has to be a logging.LEVEL constant.

Adding LOGGING_DEBUG to settings.py set the step’s global logging level to debug.

#settings.py
LOGGING_DEBUG = True
**step_argsdict

Additional parameters for the step.

Attributes:
consumer_config
metrics_config
metrics_producer_params
producer_config

Methods

execute(messages)

Execute the logic of the step.

get_extra_metrics(message)

Generate extra metrics from the EXTRA_METRICS metrics configuration.

get_value(message, params)

Get values from a massage and process it to create a new metric.

post_execute(result)

Override this method to perform additional operations on the processed data coming from apf.core.step.GenericStep.execute() method.

post_produce()

Override this method to perform operations after data has been produced by the producer.

pre_consume()

Override this method to perform operations before the first message arrives.

pre_execute(messages)

Override this method to perform operations on each batch of messages consumed.

pre_produce(result)

Override this method to perform additional operations on the processed data coming from apf.core.step.GenericStep.post_execute() method.

produce(result)

Produces messages using the configured producer class.

send_metrics(**metrics)

Send Metrics with a metrics producer.

set_producer_key_field(key_field)

Set the producer key, used in producer.produce(message, key=message[key_field])

tear_down()

Override this method to perform operations after the consumer has stopped consuming data.

start

abstract execute(messages: List[dict]) Iterable[Dict[str, Any]] | Dict[str, Any][source]

Execute the logic of the step. This method has to be implemented by the instanced class.

Parameters:
messagedict, list

Dict-like message to be processed or list of dict-like messages

get_extra_metrics(message)[source]

Generate extra metrics from the EXTRA_METRICS metrics configuration.

Parameters:
messagedict, list

Dict-like message to be processed or list of dict-like messages

Returns:
dict

Dictionary with extra metrics from the messages.

get_value(message, params)[source]

Get values from a massage and process it to create a new metric.

Parameters:
messagedict

Dict-like message to be processed

paramsstr, dict

String of the value key or dict with the following:

  • ‘key’: str

    Must have parameter, has to be in the message.

  • ‘alias’: str

    New key returned, this can be used to standarize some message keys.

  • ‘format’: callable

    Function to be call on the message value.

Returns:
new_key, value

Aliased key and processed value.

post_execute(result: Iterable[Dict[str, Any]] | Dict[str, Any])[source]

Override this method to perform additional operations on the processed data coming from apf.core.step.GenericStep.execute() method.

Typically used to do post processing, parsing, output formatting, etc.

post_produce()[source]

Override this method to perform operations after data has been produced by the producer.

You can use this lifecycle method to perform cleanup, send additional metrics, notifications, etc.

pre_consume()[source]

Override this method to perform operations before the first message arrives.

pre_execute(messages: List[dict])[source]

Override this method to perform operations on each batch of messages consumed.

Typically this method is used for pre processing operations such as parsing, formatting and overall preparation for the execute method that handles all the complex logic applied to the messages.

pre_produce(result: Iterable[Dict[str, Any]] | Dict[str, Any])[source]

Override this method to perform additional operations on the processed data coming from apf.core.step.GenericStep.post_execute() method.

Typically used to format data output as described in the step producer’s Schema

produce(result: Iterable[Dict[str, Any]] | Dict[str, Any])[source]

Produces messages using the configured producer class.

Parameters:
result: dict | list[dict]

The result of the step’s execution. This parameter can be an iterable or a single message, where the message should be a dictionary that matches the output schema of the step.

NOTE: If you want to produce with a key, use the set_producer_key_field(key_field)
method somewhere in the lifecycle of the step prior to the produce state.
send_metrics(**metrics)[source]

Send Metrics with a metrics producer.

For this method to work the METRICS_CONFIG variable has to be set in the STEP_CONFIG variable.

Example:

Send the compute time for an object.

#example_step/step.py
self.send_metrics(compute_time=compute_time, oid=oid)

For this to work we need to declare

#settings.py
STEP_CONFIG = {...
    "METRICS_CONFIG":{ #Can be a empty dictionary
        "CLASS": "apf.metrics.KafkaMetricsProducer",
        "PARAMS": { # params for the apf.metrics.KafkaMetricsProducer
            "PARAMS":{
                ## this producer uses confluent_kafka.Producer, so here we provide
                ## arguments for that class, like bootstrap.servers
                bootstrap.servers": "kafka1:9092",
            },
            "TOPIC": "metrics_topic" # the topic to store the metrics
        },
    }
}
Parameters:
**metricsdict-like

Parameters sent to the kafka topic as message.

set_producer_key_field(key_field: str)[source]

Set the producer key, used in producer.produce(message, key=message[key_field])

Parameters:
key_fieldstr

the key of the message which value will be used as key for the producer

tear_down()[source]

Override this method to perform operations after the consumer has stopped consuming data.

This method is called only once after processing messages and right before the start method ends.

You can use this lifecycle method to perform cleanup, send additional metrics, notifications, etc.

Topic Management

class apf.core.topic_management.DailyTopicStrategy(topic_format='ztf_%s_programid1', date_format='%Y%m%d', change_hour=22, retention_days=8)[source]

Gives a KafkaConsumer a new topic every day. (For more information check the apf.consumers.KafkaConsumer)

Parameters:
topic_formatstr/list

Topic format with %s for the date field.

date_formatstr
Date format code. Must be a valid 1989 C code.

i.e: %Y%m%d = YYYY-mm-dd

change_hour: int

UTC hour to change the topic.

Methods

get_topics()

Get list of topics updated to the current date.

get_topics()[source]

Get list of topics updated to the current date.

Returns:
list

List of updated topics.