Built-In Pipeline Functions
All pipeline functions define a type and a factory function which is used to initialize an instance of the type with the required options. The instances returned by these factory functions give access to their appropriate pipeline function pointers when setting up the function pipeline.
Example
NewFilterFor([] {"Device1", "Device2"}).FilterByDeviceName
Batching
Included in the SDK is an in-memory batch function that will hold on to your data before continuing the pipeline. There are three functions provided for batching each with their own strategy.
| Factory Method | Description | 
|---|---|
| NewBatchByTime(timeInterval string) | This function returns a BatchConfiginstance with time being the strategy that is used for determining when to release the batched data and continue the pipeline.timeIntervalis the duration to wait (i.e.10s). The time begins after the first piece of data is received. If no data has been received no data will be sent forward. | 
| NewBatchByCount(batchThreshold int) | This function returns a BatchConfiginstance with count being the strategy that is used for determining when to release the batched data and continue the pipeline.batchThresholdis how many events to hold on to (i.e.25). The count begins after the first piece of data is received and once the threshold is met, the batched data will continue forward and the counter will be reset. | 
| NewBatchByTimeAndCount(timeInterval string, batchThreshold int) | This function returns a BatchConfiginstance with a combination of both time and count being the strategy that is used for determining when to release the batched data and continue the pipeline. Whichever occurs first will trigger the data to continue and be reset. | 
Examples
NewBatchByTime("10s").Batch
NewBatchByCount(10).Batch
NewBatchByTimeAndCount("30s", 10).Batch
| Property | Description | 
|---|---|
| IsEventData | The IsEventDataflag, when true, lets this function know that the data being batched isEventsand to un-marshal the data a[]Eventprior to returning the batched data. | 
| MergeOnSend | The MergeOnSendflag, when true, will merge the[][]bytedata to a single[]byteprior to sending the data to the next function in the pipeline. | 
Batch with IsEventData flag set to true.
batch := NewBatchByTimeAndCount("30s", 10)
batch.IsEventData = true
...
batch.Batch
Batch with MergeOnSend flag set to true.
batch := NewBatchByTimeAndCount("30s", 10)
batch.MergeOnSend = true
...
batch.Batch
Batch
Batch - This pipeline function will apply the selected strategy in your pipeline. By default the batched data returned by this function is [][]byte. This is because this function doesn't need to know the type of the individual items batched. It simply marshals the items to JSON if the data isn't already a []byte.
Warning
Keep memory usage in mind as you determine the thresholds for both time and count. The larger they are the more memory is required and could lead to performance issue.
Compression
There are two compression types included in the SDK that can be added to your pipeline. These transforms return a []byte.
| Factory Method | Description | 
|---|---|
| NewCompression() | This factory function returns a Compressioninstance that is used to access the compression functions. | 
GZIP
CompressWithGZIP  - This pipeline function receives either a string,[]byte, or json.Marshaler type, GZIP compresses the data, converts result to base64 encoded string, which is returned as a []byte to the pipeline.
Example
NewCompression().CompressWithGZIP
ZLIB
CompressWithZLIB - This pipeline function receives either a string,[]byte, or json.Marshaler type, ZLIB compresses the data, converts result to base64 encoded string, which is returned as a []byte to the pipeline.
Example
NewCompression().CompressWithZLIB
Conversion
There are two conversions included in the SDK that can be added to your pipeline. These transforms return a string.
| Factory Method | Description | 
|---|---|
| NewConversion() | This factory function returns a Conversioninstance that is used to access the conversion functions. | 
JSON
TransformToJSON - This pipeline function receives an dtos.Event type and converts it to JSON format and returns the JSON string to the pipeline.
Example
NewConversion().TransformToJSON
XML
TransformToXML  - This pipeline function receives an dtos.Event type, converts it to XML format and returns the XML string to the pipeline. 
Example
NewConversion().TransformToXML
Event
This enables the ability to wrap data into an Event/Reading
| Factory Method | Description | 
|---|---|
| NewEventWrapperSimpleReading(profileName string, deviceName string, resourceName string, valueType string) | This factory function returns an EventWrapperinstance configured to push aSimplereading. TheEventWrapperinstance returned  is used to access core data functions. | 
| NewEventWrapperBinaryReading(profileName string, deviceName string, resourceName string, mediaType string) | This factory function returns an EventWrapperinstance configured to push aBinaryreading. TheEventWrapperinstance returned  is used to access core data functions. | 
| NewEventWrapperObjectReading(profileName string, deviceName string, resourceName string) | This factory function returns an EventWrapperinstance configured to push anObjectreading. TheEventWrapperinstance returned is used to access core data functions. | 
Wrap Into Event
WrapIntoEvent - This pipeline function provides the ability to Wrap data in an Event/Reading. The data passed into this function from the pipeline is wrapped in an EdgeX Event with the Event and Reading metadata specified from the factory function options. The function returns the new EdgeX Event with ID populated.
Example
NewEventWrapperSimpleReading("my-profile", "my-device", "my-resource", "string").Wrap
Data Protection
There are two transforms included in the SDK that can be added to your pipeline for data protection.
AESProtection
| Factory Method | Description | 
|---|---|
| NewAESProtection(secretName string, secretValueKey string) | This function returns a Encryptioninstance initialized with the passed insecretNameandsecretValueKey | 
It requires a 64-byte key from secrets which is split in half, the first half used for encryption, the second for generating the signature.
Encrypt: This pipeline function receives either a string, []byte, or json.Marshaller type and encrypts it using AES256 encryption, signs it with a SHA512 hash and returns a []byte to the pipeline of the following form:
| initialization vector | ciphertext | signing hash | 
|---|---|---|
| 16 bytes | variable bytes | 32 bytes | 
Example
    transforms.NewAESProtection(secretName, secretValueKey).Encrypt(ctx, data)
Note
The Algorithm used with app-service-configurable configuration to access this transform is AES256
Reading data protected with this function is a multi step process:
- base64 decode (for languages other than go - example code assumes hex encoding)
- extract hash from payload (last 32 bytes)
- validate hash - if this step fails decryption should not be attempted
- decrypt ciphertext + remove padding
Signing Hash Validation
def hash(cipher_hex, key):
    # Extract the 32 bytes of the Hash signature from the end of the cipher_hex
    extract_hash = cipher_hex[-64:]
    # last 32 bytes of the 64 byte key used by the encrypt function (2 hex digits per byte)
    private_key = key[-64:]
    # IV & ciphertext
    content = cipher_hex[:-64]
    hash_text = hmac.new(key=bytes.fromhex(private_key), msg=(bytes.fromhex(content) + bytearray(8)), digestmod='SHA512')
    # Calculated tag is only the the first 32 bytes of the resulting SHA512
    calculated_hash = hash_text.hexdigest()[:64]
    if extract_hash == calculated_hash:
        return "true"
    else:
        return "false", extract_hash, calculated_hash
If the signing hash can be validated, the message is OK to decrypt
Payload Decryption
def decrypt(cipher_hex, key):
    # first 32 bytes of the 64 byte key used by the encrypt function (2 hex digits per byte)
    private_key = bytes.fromhex(key[:64])
    # Extract the cipher text (remaining bytes in the middle)
    cipher_text = cipher_hex[32:]
    cipher_text = bytes.fromhex(cipher_text[:-64])
    # Extract the 16 bytes of initial vector from the beginning of the data
    iv = bytes.fromhex(cipher_hex[:32])
    # Decrypt
    cipher = AES.new(private_key, AES.MODE_CBC, iv)
    plain_pad = cipher.decrypt(cipher_text)
    unpadded = Padding.unpad(plain_pad, AES.block_size)
    return unpadded.decode('utf-8')
Export
There are two export functions included in the SDK that can be added to your pipeline.
HTTP Export
| Factory Method | Description | 
|---|---|
| NewHTTPSender(url string, mimeType string, persistOnError bool) | This factory function returns a HTTPSenderinstance initialized with the passed in url, mime type and persistOnError values. | 
| NewHTTPSenderWithSecretHeader(url string, mimeType string, persistOnError bool, headerName string, secretName string, secretValueKey string) | This factory function returns a HTTPSenderinstance similar to the above function however will set up theHTTPSenderto add a header  to the HTTP request using theheaderNamefor the field name and thesecretNameandsecretValueKeyto pull the header field value from the Secret Store. | 
| NewHTTPSenderWithOptions(options HTTPSenderOptions) | This factory function returns a HTTPSenderusing the passed inoptionsto configure it. | 
// HTTPSenderOptions contains all options available to the sender
type HTTPSenderOptions struct {
    // URL of destination
    URL string
    // MimeType to send to destination
    MimeType string
    // PersistOnError enables use of store & forward loop if true
    PersistOnError bool
    // HTTPHeaderName to use for passing configured secret
    HTTPHeaderName string
    // SecretName to search for configured secret
    SecretName string
    // SecretValueKey is the key for configured secret data
    SecretValueKey  string
    // URLFormatter specifies custom formatting behavior to be applied to configured URL.
    // If nothing specified, default behavior is to attempt to replace placeholders in the
    // form '{some-context-key}' with the values found in the context storage.
    URLFormatter StringValuesFormatter
    // ContinueOnSendError allows execution of subsequent chained senders after errors if true
    ContinueOnSendError bool
    // ReturnInputData enables chaining multiple HTTP senders if true
    ReturnInputData bool
}
HTTP POST
HTTPPost - This pipeline function receives either a string, []byte, or json.Marshaler type from the previous function in the pipeline and posts it to the configured endpoint and returns the HTTP response. If no previous function exists, then the event that triggered the pipeline, marshaled to json, will be used. If the post fails and persistOnError=true and Store and Forward is enabled, the data will be stored for later retry. See Store and Forward for more details. If ReturnInputData=true  the function will return the data that it received instead of the HTTP response. This allows the following function in the pipeline to be another HTTP Export which receives the same data but is configured to send to a different endpoint. When chaining for multiple HTTP Exports you need to decide how to handle errors. Do you want to stop execution of the pipeline or continue so that the next HTTP Export function can attempt to export to its endpoint. This is where ContinueOnSendError comes in. If set to true the error is logged and the function returns the received data for the next function to use. ContinueOnSendError=true can only be used when ReturnInputData=true and cannot be use when PersistOnError=true.
Example
POST 
NewHTTPSender("https://myendpoint.com","application/json",false).HTTPPost 
PUT 
NewHTTPSender("https://myendpoint.com","application/json",false).HTTPPut 
POST with secure header NewHTTPSenderWithSecretHeader("https://myendpoint.com","application/json",false,"Authentication","/jwt","AuthToken").HTTPPost
** PUT with secure header** NewHTTPSenderWithSecretHeader("https://myendpoint.com","application/json",false,"Authentication","/jwt","AuthToken").HTTPPPut
HTTP PUT
HTTPPut - This pipeline function operates the same as HTTPPost but uses the PUT method rather than POST. 
URL Formatting
The configured URL is dynamically formatted prior to the POST/PUT request. The default formatter (used if URLFormatter is nil) simply replaces any placeholder text, {key-name}, in the configured URL with matching values from the new Context Storage. An error will occur if a specified placeholder does not exist in the Context Storage. See the Context Storage documentation for more details on seeded values and storing your own values.
The URLFormatter option allows you to override the default formatter with your own custom URL formatting scheme.
Example
Export the Events to  different endpoints base on their device name            
Url="http://myhost.com/edgex-events/{devicename}" 
MQTT Export
| Factory Method | Description | 
|---|---|
| NewMQTTSecretSender(mqttConfig MQTTSecretConfig, persistOnError bool) | This factory function returns a MQTTSecretSenderinstance initialized with the options specified in theMQTTSecretConfigandpersistOnError. | 
| NewMQTTSecretSenderWithTopicFormatter(mqttConfig MQTTSecretConfig, persistOnError bool, topicFormatter StringValuesFormatter) | This factory function returns a MQTTSecretSenderinstance initialized with the options specified in theMQTTSecretConfig,persistOnErrorandtopicFormatter. See Topic Formatting below for more details. | 
  type MQTTSecretConfig struct {
    // BrokerAddress should be set to the complete broker address i.e. mqtts://mosquitto:8883/mybroker
    BrokerAddress string
    // ClientId to connect with the broker with.
    ClientId string
    // The name of the secret in secret provider to retrieve your secrets
    SecretName string
    // AutoReconnect indicated whether or not to retry connection if disconnected
    AutoReconnect bool
    // KeepAlive is the interval duration between client sending keepalive ping to broker
    KeepAlive string
    // ConnectTimeout is the duration for timing out on connecting to the broker
    ConnectTimeout string
    // Topic that you wish to publish to
    Topic string
    // QoS for MQTT Connection
    QoS byte
    // Retain setting for MQTT Connection
    Retain bool
    // SkipCertVerify
    SkipCertVerify bool
    // AuthMode indicates what to use when connecting to the broker. 
    // Options are "none", "cacert" , "usernamepassword", "clientcert".
    // If a CA Cert exists in the SecretName data then it will be used for 
    // all modes except "none". 
    AuthMode string
  }
Secrets in the Secret Store may be located at any SecretName however they must have some or all the follow keys at the specified in the secret data:
- username- username to connect to the broker
- password- password used to connect to the broker
- clientkey- client private key in PEM format
- clientcert- client cert in PEM format
- cacert- ca cert in PEM format
The AuthMode setting you choose depends on what secret values above are used. For example, if "none" is specified as auth mode all keys will be ignored. Similarly, if AuthMode is set to "clientcert" username and password will be ignored.
Topic Formatting
The configured Topic is dynamically formatted prior to publishing . The default formatter (used if topicFormatter is nil) simply replaces any placeholder text, {key-name}, in the configured Topic with matching values from the new Context Storage. An error will occur if a specified placeholder does not exist in the Context Storage. See the Context Storage documentation for more details on seeded values and storing your own values.
The topicFormatter option allows you to override the default formatter with your own custom topic formatting scheme.
Filtering
There are four basic types of filtering included in the SDK to add to your pipeline. There is also an option to Filter Out specific items. These provided filter functions return a type of dtos.Event. If filtering results in no remaining data, the pipeline execution for that pass is terminated. If no values are provided for filtering, then data flows through unfiltered.
| Factory Method | Description | 
|---|---|
| NewFilterFor([]string filterValues) | This factory function returns a Filterinstance initialized with the passed in filter values withFilterOutset tofalse. ThisFilterinstance is used to access the following filter functions that will operate using the specified filter values. | 
| NewFilterOut([]string filterValues) | This factory function returns a Filterinstance initialized with the passed in filter values withFilterOutset totrue. ThisFilterinstance is used to access the following filter functions that will operate using the specified filter values. | 
type Filter struct {
    // Holds the values to be filtered
    FilterValues []string
    // Determines if items in FilterValues should be filtered out. If set to true all items found in the filter will be removed. If set to false all items found in the filter will be returned. If FilterValues is empty then all items will be returned.
    FilterOut    bool
}
By Profile Name
FilterByProfileName - This pipeline function will filter the event data down to Events that either have (For) or don't have (Out) the specified profiles names.  
Example
NewFilterFor([] {"Profile1", "Profile2"}).FilterByProfileName
By Device Name
FilterByDeviceName - This pipeline function will filter the event data down to Events that either have (For) or don't have (Out) the specified device names.  
Example
NewFilterFor([] {"Device1", "Device2"}).FilterByDeviceName
By Source Name
FilterBySourceName - This pipeline function will filter the event data down to Events that either have (For) or don't have (Out) the specified source names.  Source name is either the resource name or command name responsible for the Event creation.
Example
NewFilterFor([] {"Source1", "Source2"}).FilterBySourceName
By Resource Name
FilterByResourceName - This pipeline function will filter the Event's reading data down to Readings that either have (For) or don't have (Out) the specified resource names.  If the result of filtering is zero Readings remaining, the function terminates pipeline execution.
Example
NewFilterFor([] {"Resource1", "Resource2"}).FilterByResourceName
JSON Logic
| Factory Method | Description | 
|---|---|
| NewJSONLogic(rule string) | This factory function returns a JSONLogicinstance initialized with the passed in JSON rule. The rule passed in should be a JSON string conforming to the specification here: http://jsonlogic.com/operations.html. | 
Evaluate
Evaluate - This is the pipeline function that will be used in the pipeline to apply the JSON rule to data coming in on the pipeline. If the condition of your rule is met, then the pipeline will continue and the data will continue to flow to the next function in the pipeline. If the condition of your rule is NOT met, then pipeline execution stops. 
Example
NewJSONLogic("{ \"in\" : [{ \"var\" : \"device\" }, 
              [\"Random-Integer-Device\",\"Random-Float-Device\"] ] }").Evaluate
Note
Only operations that return true or false are supported. See http://jsonlogic.com/operations.html# for the complete list of operations paying attention to return values. Any operator that returns manipulated data is currently not supported. For more advanced scenarios checkout LF Edge eKuiper.
Tip
Leverage http://jsonlogic.com/play.html to get your rule right before implementing in code. JSON can be a bit tricky to get right in code with all the escaped double quotes.
Response Data
There is one response data function included in the SDK that can be added to your pipeline.
| Factory Method | Description | 
|---|---|
| NewResponseData() | This factory function returns a ResponseDatainstance that is used to access the following pipeline function below. | 
Content Type
ResponseContentType - This property is used to set the content-type of the response.
Example
responseData := NewResponseData()
responseData.ResponseContentType = "application/json"
Set Response Data
SetResponseData - This pipeline function receives either a string,[]byte, or json.Marshaler type from the previous function in the pipeline and sets it as the response data that the pipeline returns to the configured trigger. If configured to use theEdgeXMessageBustrigger, the data will be published back to the EdgeX MessageBus as determined by the configuration. Similar, if  configured to use theExternalMQTT trigger, the data will be published back to the external MQTT Broker as determined by the configuration. If configured to use HTTP trigger the data is returned as the HTTP response. 
Note
Calling SetResponseData() and SetResponseContentType() from the Context API in a custom function can be used in place of adding this function to your pipeline.
Tags
There is one Tags transform included in the SDK that can be added to your pipeline.
| Factory Method | Description | 
|---|---|
| NewTags(tags map[string]interface{}) Tags | This factory function returns a Tagsinstance initialized with the passed in collection of generic tag key/value pairs. ThisTagsinstance is used to access the following Tags function that will use the specified collection of tag key/value pairs. This allows for generic complex types for the Tag values. | 
Add Tags
AddTags - This pipeline function receives an Edgex Event type and adds the collection of specified tags to the Event's Tags collection.
Example
var myTags = map[string]interface{}{
    "MyValue" : 123,
    "GatewayId": "HoustonStore000123",
    "Coordinates": map[string]float32 {
       "Latitude": 29.630771,
       "Longitude": "-95.377603",
    },
}
NewGenericTags(myTags).AddTags
MetricsProcessor
MetricsProcessor contains configuration and functions for processing the new dtos.Metrics type. 
| Factory Method | Description | 
|---|---|
| NewMetricsProcessor(additionalTags map[string]interface{}) (*MetricsProcessor, error) | This factory function returns a `MetricsProcessorinstance initialized with the passed in collection ofadditionalTags(name/value pairs). ThisMetricsProcessorinstance is used to access the following functions that will process a dtos.Metric instance. TheadditionalTagsare added as metric tags to the processed data. An error will be returned if any of theadditionalTagshave an invalid name. Currently must be non-blank. | 
ToLineProtocol
ToLineProtocol - This pipeline function will transform the received dtos.Metric to a Line Protocol formatted string. See https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/ for details on the Line Protocol syntax.
Note
When ToLineProtocol is the first function in the functions pipeline, the TargetType for the service must be set to &dtos.Metric{}. See Target Type section for details on setting the service's TargetType. The Trigger configuration must also be set so  SubscribeTopics="edgex/telemetry/#" in order to receive the dtos.Metric data from other services. See the new App Service Configurable metrics-influxdb profile for an example.
Example
mp, err := NewMetricsProcessor(map[string]string{"MyTag":"MyTagValue"})
if err != nil {
    ... handle error
}
...
mp.ToLineProtocol
Warning
Any service using the MetricsProcessor needs to disable its own Telemetry reporting to avoid circular data generation from processing. To do this set the servicesWriteable.Telemetry configuration to:
[Writable.Telemetry]
Interval = "0s" # Don't report any metrics as that would be cyclic processing.