Creating a new apf step

apf was created to simplify the development of an stream processing pipeline.

To illustrate how the creation of a pipeline step was intended we have the following diagram.

_images/apf-flow.png

This tutorial will guide developers to create an example step from the installation of the framework until building and running the docker image locally.

1. Installing apf

To install apf run

pip install apf_base

This will install the package and a command line script.

apf [--help] command

2. Creating base step

apf comes with a code generation tool to create a base for a new step.

To create this base run

apf new-step example_step

This command will create the following file tree

example_test/
├── example_test/
│   ├── __init__.py
│   └── step.py
├── scripts/
│   ├── run_multiprocess.py
│   └── run_step.py
├── tests/
├── Dockerfile
├── requirements.txt
└── settings.py

The step will be a python package called example_test, inside the package there is a step.py with the step logic.

3. Coding the step

In example_test/step.py we will code the step logic, it can be as simple as printing the message or a more complex logic. For each new message the execute() method is called with a python dict with the message itself.

#example_test/step.py
def execute(self,message):
  ################################
  #   Here comes the Step Logic  #
  ################################

  pass

For this example we will just log the message changing the execution code to

#example_test/step.py
def execute(self,message):
  # Logging the message
  self.logger.info(message)

Here self.logger is the default logger (logging.Logger) from apf.core.GenericStep.

Then we can go to scripts/run_step.py. The basic run_step.py comes with the following

#scripts/run_step.py
step = ExampleTest(config=STEP_CONFIG,level=level)
step.start()

But you can pass any argument you need to define in the Step’s constructor.

An alternate step initialization could look like this:

#scripts/run_step.py
step = ExampleTest(
            consumer=KafkaConsumer,
            producer=KafkaProducer,
            metrics_sender=KafkaMetricsProducer,
            config=STEP_CONFIG,
            level=level
)
step.start()

This can be useful for tests as well, since you can pass a mock class and do not need to rely on settings, that have more boilerplate.

Starting at APF 2.0.0 the consumers, producers and metrics manager are configured only through the config dictionary passed to the Step class.

4. Configuring the step

After coding the step and modifying the script, the step must be configured.

There are 2 files needed to configure a step.

1- settings.py:

This file contains all the configuration passed to the consumers, producers and plugins. Having it separately from the main script make it easier to change configurations from run to run.

For good practice having environmental variables as parameters is better than hard-coding them to the settings file, and comes very handy when deploying the same dockerized step with different configurations.

The basic settings.py comes with the following

# settings.py
# LOGGING_DEBUG = True

CONSUMER_CONFIG = {}
PRODUCER_CONFIG = {}
METRICS_CONFIG = {}
PROMETHEUS = False

## Step Configuration
STEP_CONFIG = {
    "CONSUMER_CONFIG": CONSUMER_CONFIG,
    "PRODUCER_CONFIG": PRODUCER_CONFIG,
    "METRICS_CONFIG": METRICS_CONFIG,
    "PROMETHEUS": PROMETHEUS,
}

We will test our step with a CSVConsumer

#settings.py
CONSUMER_CONFIG = {
  "CLASS": "apf.consumers.CSVConsumer",
  "FILE_PATH": "https://raw.githubusercontent.com/alercebroker/APF/develop/docs/source/_static/example/detections.csv",
  "OTHER_ARGS": {
      "index_col": "oid"
  }
}

2- requirements.txt

The default requirements file for any python package, for good practice having the package with a specific version is better than using the latest one.

The basic requirements.txt comes with the current apf version as a required package

#requirements.txt
apf==<version>

By default the apf package is already on the requirements file, so for this tutorial we will skip this step.

5. Running the step locally

The step can be executed with

python scripts/run_step.py

To run the step dockerized, first we need to build the step

docker build -t example_step .
docker run --rm --name example_step example_step

Note

Try using another Consumer configure it and run it locally to check it works. For example a CSVConsumer or a JSONConsumer