Skip to content

App Functions SDK for Python - App Function Context API

The context parameter passed to each function/transform provides operations and data associated with each execution of the pipeline.

Let's take a look at its API:

class AppFunctionContext(ABC):
    """
    An abstract base class that defines the interface for an application function context.

    This class provides an interface for cloning the context, getting and setting response data and
    content type, triggering retry for failed data, getting the secret provider, and getting the
    logger.

    Methods:
        clone() -> 'AppFunctionContext': Clones the context.
        correlation_id() -> str: Gets the correlation ID.
        input_content_type() -> str: Gets the input content type.
        set_response_data(data: bytes): Sets the response data.
        response_data() -> bytes: Gets the response data.
        set_response_content_type(content_type: str): Sets the response content type.
        response_content_type() -> str: Gets the response content type.
        set_retry_data(data: bytes): Sets the retry data.
        trigger_retry_failed_data(): Triggers retry for failed data.
        secret_provider() -> SecretProvider: Gets the secret provider.
        logger() -> 'Logger': Gets the logger.
    """

    @abstractmethod
    def clone(self) -> 'AppFunctionContext':
        """
        Clones the context.

        Returns:
            A clone of the context.
        """

    @abstractmethod
    def correlation_id(self) -> str:
        """
        Gets the correlation ID.

        Returns:
            The correlation ID.
        """

    @abstractmethod
    def set_correlation_id(self, correlation_id: str):
        """
        Sets the correlation ID.
        """

    @abstractmethod
    def input_content_type(self) -> str:
        """
        Gets the input content type.

        Returns:
            The input content type.
        """

    @abstractmethod
    def set_input_content_type(self, input_content_type: str):
        """
        Sets the input content type.
        """

    @abstractmethod
    def set_response_data(self, data: bytes):
        """
        Sets the response data.

        Args:
            data: The response data.
        """

    @abstractmethod
    def response_data(self) -> bytes:
        """
        Gets the response data.

        Returns:
            The response data.
        """

    @abstractmethod
    def set_response_content_type(self, content_type: str):
        """
        Sets the response content type.

        Args:
            content_type: The response content type.
        """

    @abstractmethod
    def response_content_type(self) -> str:
        """
        Gets the response content type.

        Returns:
            The response content type.
        """

    @abstractmethod
    def set_retry_data(self, data: bytes):
        """
        Sets the retry data.

        Args:
            data: The retry data.
        """

    @abstractmethod
    def retry_data(self) -> bytes:
        """
        Gets the retry data.
        """

    @abstractmethod
    def trigger_retry_failed_data(self):
        """
        Triggers retry for failed data.
        """

    @abstractmethod
    def secret_provider(self) -> SecretProvider:
        """
        Gets the secret provider.

        Returns:
            The secret provider.
        """

    @abstractmethod
    def logger(self) -> 'Logger':
        """
        Gets the logger.

        Returns:
            The logger.
        """

    @abstractmethod
    def pipeline_id(self) -> str:
        """
        Gets the pipeline ID.

        Returns:
            The pipeline ID.
        """

    @abstractmethod
    def add_value(self, key: str, value: str):
        """
        Adds the key and value to context_data.

        Returns:
            The pipeline ID.
        """

    @abstractmethod
    def remove_value(self, key: str):
        """
        Deletes a value stored in the context at the given key
        """

    @abstractmethod
    def get_value(self, key: str) -> Tuple[str, bool]:
        """
        Attempts to retrieve a value stored in the context at the given key
        """

    @abstractmethod
    def get_values(self) -> dict:
        """
        GetAllValues returns a read-only copy of all data stored in the context
        """

    @abstractmethod
    def apply_values(self, str_format: str) -> str:
        """
        apply_values looks in the provided string for placeholders of the form
        '{any-value-key}' and attempts to replace with the value stored under
        the key in context storage.  An error will be returned if any placeholders
        are not matched to a value in the context.
        """

    @abstractmethod
    def event_client(self) -> EventClientABC:
        """
        event_client returns the event client instance
        """

    @abstractmethod
    def reading_client(self) -> ReadingClientABC:
        """
        reading_client returns the reading client instance
        """

    @abstractmethod
    def command_client(self) -> CommandClientABC:
        """
        command_client returns the command client instance
        """

    @abstractmethod
    def device_service_client(self) -> DeviceServiceClientABC:
        """
        device_service_client returns the device service client instance
        """

Response Data

set_response_data

set_response_data(data: bytes)

This API sets the response data that will be returned to the trigger when pipeline execution is complete.

response_data

response_data() -> bytes

This API returns the data that will be returned to the trigger when pipeline execution is complete.

set_response_content_type

set_response_content_type(content_type: str)

This API sets the content type that will be returned to the trigger when pipeline execution is complete.

response_content_type

response_content_type() -> str

This API returns the content type that will be returned to the trigger when pipeline execution is complete.

Clients

logger

logger() -> 'Logger'

Returns a Logger to leverage logging libraries/service utilized throughout the EdgeX framework. The SDK has initialized everything, so it can be used to log TRACE, DEBUG, WARN, INFO, and ERROR messages as appropriate.

Example - Logger

ctx.logger().info("Hello World")
ctx.logger().error(f"Some error occurred: {err}")

event_client

event_client() -> EventClientABC

Returns an EventClient to leverage Core Data's Event API. See interface definition for more details. This client is useful for querying events. Note if Core Data is not specified in the Clients configuration, this will return None.

reading_client

reading_client() -> ReadingClientABC

Returns an ReadingClient to leverage Core Data's Reading API. See interface definition for more details. This client is useful for querying readings. Note if Core Data is not specified in the Clients configuration, this will return None.

command_client

command_client() -> CommandClientABC

Returns a CommandClient to leverage Core Command's Command API. See interface definition for more details. Useful for sending commands to devices. Note if Core Command is not specified in the Clients configuration, this will return None.

device_service_client

device_service_client() -> DeviceServiceClientABC

Returns a DeviceServiceClient to leverage Core Metadata's DeviceService API. See interface definition for more details. Useful for querying information about Device Services. Note if Core Metadata is not specified in the Clients configuration, this will return None.

device_profile_client

device_profile_client() -> DeviceProfileClientABC

Returns a DeviceProfileClient to leverage Core Metadata's DeviceProfile API. See interface definition for more details. Useful for querying information about Device Profiles and is used by the GetDeviceResource helper function below. Note if Core Metadata is not specified in the Clients configuration, this will return None.

device_client

device_client() -> DeviceClientABC

Returns a DeviceClient to leverage Core Metadata's Device API. See interface definition for more details. Useful for querying information about Devices. Note if Core Metadata is not specified in the Clients configuration, this will return None.

Note about Clients

Each of the clients above is only initialized if the Clients section of the configuration contains an entry for the service associated with the Client API. If it isn't in the configuration the client will be None. Your code must check for None to avoid runtime error in case it is missing from the configuration. Only add the clients to your configuration that your Application Service will actually be using. The following is an example Clients section of a configuration.yaml with all supported clients specified:

Example - Client Configuration Section

Clients:
  core-data:
    Protocol: http
    Host: localhost
    Port: 59880

  core-command:
    Protocol: http
    Host: localhost
    Port: 59882

Note

Core Metadata client is required and provided by the App Services Common Configuration, so it is not included in the above example.

Context Storage

The context API exposes a dict-like interface that can be used to store custom data specific to a given pipeline execution. This data is persisted for retry if needed. Currently only strings are supported, and keys are treated as case-insensitive.

There following values are seeded into the Context Storage when an Event is received:

  • Profile Name (key to retrieve value is profilename)
  • Device Name (key to retrieve value is devicename)
  • Source Name (key to retrieve value is sourcename)
  • Received Topic (key to retrieve value is receivedtopic)

Note

Received Topic only available when the message was received from the Edgex MessageBus or External MQTT triggers.

Storage can be accessed using the following methods:

add_value

add_value(key: str, value: str)

This API stores a value for access within a pipeline execution

remove_value

remove_value(key: str)

This API deletes a value stored in the context at the given key

get_value

get_value(key: str) -> Tuple[str, bool]

This API attempts to retrieve a value stored in the context at the given key. The return value is a tuple containing the value and a boolean indicating if the key was found in the context. If the given key is not found, an empty string and False will be returned.

get_values

get_values() -> dict

This API returns a read-only copy of all data stored in the context

apply_values

apply_values(str_format: str) -> str

This API will replace placeholders of the form {context-key-name} with the value found in the context with key context-key-name. Note that key matching is case-insensitive. An error will be raised if any placeholders in the provided string do NOT have a corresponding entry in the context storage dict.

Secrets

SecretProvider

SecretProvider() interfaces.SecretProvider

This API returns reference to the SecretProvider instance with following APIs:

class SecretProvider(ABC):
    """
    An abstract base class that defines the interface for a secret provider.

    This class provides an interface for storing and retrieving secrets, checking the last update
    time of secrets, listing secret names, checking the existence of secrets, and registering or
    deregistering secret update callbacks.

    Methods:
        store_secrets(secret_name: str, secrets: Secrets): Stores secrets.
        get_secrets(secret_name: str, *secret_keys: str) -> Secrets: Retrieves secrets.
        secrets_last_updated() -> datetime: Checks the last update time of secrets.
        list_secret_names() -> List[str]: Lists secret names.
        has_secrets(secret_name: str) -> bool: Checks the existence of secrets.
        register_secret_update_callback(secret_name: str, callback: Callable[[str], None]):
        Registers a secret update callback.
        deregister_secret_update_callback(secret_name: str): Deregisters a secret update callback.
    """

    @abstractmethod
    def store_secrets(self, secret_name: str, secrets: Secrets):
        """
        Stores secrets.

        Args:
            secret_name: The name of the secret.
            secrets: The secrets to be stored.
        """

    @abstractmethod
    def get_secrets(self, secret_name: str, *secret_keys: str) -> Secrets:
        """
        Retrieves secrets.

        Args:
            secret_name: The name of the secret.
            secret_keys: The keys of the secrets to be retrieved.

        Returns:
            The retrieved secrets.
        """

    @abstractmethod
    def secrets_last_updated(self) -> datetime:
        """
        Checks the last update time of secrets.

        Returns:
            The last update time of secrets.
        """

    @abstractmethod
    def list_secret_names(self) -> List[str]:
        """
        Lists secret names.

        Returns:
            A list of secret names.
        """

    @abstractmethod
    def has_secret(self, secret_name: str) -> bool:
        """
        Checks the existence of secrets.

        Args:
            secret_name: The name of the secret.

        Returns:
            True if the secret exists, False otherwise.
        """

    @abstractmethod
    def register_secret_update_callback(self, secret_name: str, callback: Callable[[str], None]):
        """
        Registers a secret update callback.

        Args:
            secret_name: The name of the secret.
            callback: The callback to be registered.
        """

    @abstractmethod
    def deregister_secret_update_callback(self, secret_name: str):
        """
        Deregisters a secret update callback.

        Args:
            secret_name: The name of the secret.
        """

Store and Forward

The APIs in this section are related to the Store and Forward capability. See the Store and Forward section for more details.

set_retry_data

set_retry_data(data: bytes)

This method can be used to store data for later retry. This is useful when creating a custom export function that needs to retry on failure. The payload data will be stored for later retry based on Store and Forward configuration. When the retry is triggered, the function pipeline will be re-executed starting with the function that called this API. That function will be passed the stored data, so it is important that all transformations occur in functions prior to the export function. The Context will also be restored to the state when the function called this API. See Store and Forward for more details.

Note

Store and Forward must be enabled when calling this API, otherwise the data is ignored.

trigger_retry_failed_data

trigger_retry_failed_data()

This method sets the flag to trigger retry of failed data once the current pipeline execution has completed. This method should only be called when the export of data was successful, which indicates that the recipient is accepting data. This allows the failed data to be retried as soon as the recipient is back on-line rather than waiting for the configured retry interval to expire.

Note

Store and Forward must be enabled and failed data must be present, otherwise the call to this API is ignored.

Miscellaneous

clone

clone() -> AppFunctionContext

This method returns a copy of the context that can be mutated independently where appropriate. This can be useful when running operations that take AppFunctionContext in parallel.

correlation_id

correlation_id() -> str

This API returns the ID used to track the EdgeX event through entire EdgeX framework.

pipeline_id

pipeline_id() -> str

This API returns the ID of the pipeline currently executing. Useful when logging messages from pipeline functions so the message contain the ID of the pipeline that executed the pipeline function.

input_content_type

input_content_type() -> str

This API returns the content type of the data that initiated the pipeline execution. Only useful when the TargetType for the pipeline is []byte, otherwise the data will be the type specified by TargetType.

get_device_resource

get_device_resource(device_name: str, resource_name: str) -> DeviceResource

This API retrieves the DeviceResource for the given profile / resource name. Results are cached to minimize HTTP traffic to core-metadata.

metrics_manager

metrics_manager() -> MetricsManager

This API returns the Metrics Manager used to register various metric types, such as counter, gauge, gaugeFloat64, or timer.

Note

Note that counter, gauge, and timer are metric types as implemented in PyFormance, and gaugeFloat64 is an extended metric type as implemented by app_functions_sdk_py.

from pyformance import meters
from app_functions_sdk_py.interfaces import AppFunctionContext

def register_my_counter_metric(ctx: AppFunctionContext):
    counter_metric_name = "MyCounter"
    my_counter = meters.Counter("")
    my_tags = {"Tag1": "Value1"}
    ctx.metrics_manager().register(counter_metric_name, my_counter, my_tags)

publish

publish(data: Any, content_type: str)

This API pushes data to the EdgeX MessageBus using configured topic and raises an error if the EdgeX MessageBus is disabled in configuration

publish_with_topic

publish_with_topic(topic: str, data: Any, content_type: str)

This API pushes data to the EdgeX MessageBus using a given topic and raises an error if the EdgeX MessageBus is disabled in configuration