App Functions SDK - Store and Forward
The Store and Forward capability allows for export functions to persist data on failure and for the export of the data to be retried at a later time.
Note
The order the data exported via this retry mechanism is not guaranteed to be the same order in which the data was initial received from Core Data
Configuration
Writable.StoreAndForward
allows enabling, setting the interval between retries and the max number of retries. If running with Configuration Provider, these settings can be changed on the fly via Consul without having to restart the service.
Example - Store and Forward configuration
Writable:
StoreAndForward:
Enabled: false
RetryInterval: "5m"
MaxRetryCount: 10
Note
RetryInterval should be at least 1 second (e.g. '1s') or greater. If a value less than 1 second is specified, 1 second will be used. Endless retries will occur when MaxRetryCount is set to 0. If MaxRetryCount is set to less than 0, a default of 1 retry will be used.
Database configuration section describes which database type to use and the information required to connect to the database. This section is required if Store and Forward is enabled and is provided in Common Configuration.
Example - Database configuration
Database:
Type: "postgres"
Host: "localhost"
Port: 5432
Timeout: "5s"
How it works
When an export function encounters an error sending data it can call SetRetryData(payload []byte)
on the AppFunctionContext
. This will store the data for later retry. If the Application Service is stopped and then restarted while stored data hasn't been successfully exported, the export retry will resume once the service is up and running again.
Note
It is important that export functions return an error and stop pipeline execution after the call to SetRetryData
. See HTTPPost function in SDK as an example
When the RetryInterval
expires, the function pipeline will be re-executed starting with the export function that saved the data. The saved data will be passed to the export function which can then attempt to resend the data.
Note
The export function will receive the data as it was stored, so it is important that any transformation of the data occur in functions prior to the export function. The export function should only export the data that it receives.
One of three outcomes can occur after the export retried has completed.
-
Export retry was successful
In this case, the stored data is removed from the database and the execution of the pipeline functions after the export function, if any, continues.
-
Export retry fails and retry count
has not been
exceededIn this case, the stored data is updated in the database with the incremented retry count
-
Export retry fails and retry count
has been
exceededIn this case, the stored data is removed from the database and never retried again.
Note
Changing Writable.Pipeline.ExecutionOrder will invalidate all currently stored data and result in it all being removed from the database on the next retry. This is because the position of the export function can no longer be guaranteed and no way to ensure it is properly executed on the retry.
Custom Storage
The default backing store is PostgreSQL. Custom implementations of the StoreClient
interface can be provided if PostgreSQL does not meet your requirements.
type StoreClient interface {
// Store persists a stored object to the data store and returns the assigned UUID.
Store(o StoredObject) (id string, err error)
// RetrieveFromStore gets an object from the data store.
RetrieveFromStore(appServiceKey string) (objects []StoredObject, err error)
// Update replaces the data currently in the store with the provided data.
Update(o StoredObject) error
// RemoveFromStore removes an object from the data store.
RemoveFromStore(o StoredObject) error
// Disconnect ends the connection.
Disconnect() error
}
service.RegisterCustomStoreFactory("jetstream", func(cfg interfaces.DatabaseInfo, cred config.Credentials) (interfaces.StoreClient, error) {
conn, err := nats.Connect(fmt.Sprintf("nats://%s:%d", cfg.Host, cfg.Port))
if err != nil {
return nil, err
}
js, err := conn.JetStream()
if err != nil {
return nil, err
}
kv, err := js.KeyValue(serviceKey)
if err != nil {
kv, err = js.CreateKeyValue(&nats.KeyValueConfig{Bucket: serviceKey})
}
return &JetstreamStore{
conn: conn,
serviceKey: serviceKey,
kv: kv,
}, err
})
and configured using the registered name in the Database
section:
Example - Database configuration
Database:
Type: "jetstream"
Host: "broker"
Port: 4222
Timeout: "5s"