Common (node.js + browser)
These classes and functions are available via both @transitive-sdk/utils
and @transitive-sdk/utils-web
.
DataCache
A class implementing a local data cache, used as a local data store with deduplication detection and update events. While this class is very handy you probably won't need to create instances of it directly. Instead use the mqttSync.data instance which holds the locally stored data subscribed/published from/to MQTTSync. For example on the robot:
// update/publish our status:
mqttSync.data.update('status', {changed: Date.now(), msg: 'OK'});
// subscribe to new user requests (e.g., from UI):
mqttSync.data.subscribePath('+user/request', (request, key, {user}) => {
log.debug(`user ${user} made request`, request);
});
In the cloud or in a web component you would need to use the full topic including org, device, scope, cap-name, and version.
Parameters
data
(optional, default{}
)
filter
Filter the object using path with wildcards
Parameters
path
filterByTopic
Filter the object using topic with wildcards
Parameters
topic
forMatch
For each topic match, invoke the callback with the value, topic, and match just like subscribePath, but on the current data rather than future changes.
Parameters
topic
callback
forPathMatch
For each path match, invoke the callback with the value, topic, and match just like subscribePath
Parameters
path
callback
get
Get sub-value at path, or entire object if none given
Parameters
path
(optional, default[]
)
getByTopic
Get sub-value specified by topic
Parameters
topic
subscribe
Add a callback for all change events.
Parameters
callback
subscribePath
Subscribe to a specific topic only. Callback receives
value, key, matched, tags
.
Parameters
topic
callback
subscribePathFlat
Same as subscribePath but always get all changes in flat form
Parameters
topic
callback
unsubscribe
Remove a callback previously registered using subscribe
.
Parameters
callback
update
Update the value at the given path (array or dot separated string)
Parameters
path
value
tags
updateFromArray
Update the object with the given value at the given path, remove empty;
return the flat changes (see toFlatObject). Add tags
to updates to mark
them somehow based on the context, e.g., so that some subscriptions can choose
to ignore updates with a certain tag.
Parameters
path
value
tags
(optional, default{}
)
updateFromModifier
Update data from a modifier object where keys are topic names to be interpreted as paths, and values are the values to set
Parameters
modifier
tags
updateFromTopic
Set value from the given topic (with or without leading or trailing slash)
Parameters
topic
value
tags
MqttSync
A class that combines DataCache and MQTT to implement a data synchronization feature over the latter. Relies on retained messages in mqtt for persistence.
Parameters
-
options
objectoptions.mqttClient
object An already connected mqtt.js client.options.onChange
function? A function that is called any time there is a change to the shared data. This is not usually used. It's usually better to use the finer grainedMqttSync.data.subscribePath
instead, that allows you to subscribe to changes just on a specific sun-object instead, see DataCache.options.ignoreRetain
boolean? retain all messages, ignorant of the retain flag.options.migrate
array? an array of objects of the form{topic, newVersion, level}
. Only meaningful in the cloud. Instructs MQTTSync to first migrate existing topics to a new version namespace, publishing at the designated level down from the version level. For example:js [{ topic: `/myorg/mydevice/@local/my-cap/+/config`, newVersion: this.version, level: 1 }]
Would migrate any existing data in the capability'sconfig
namespace to the current version of the package, publishing at theconfig/+
level (rather than atomically at the config level itself).options.onReady
function? A function that is called when the MQTTSync client is ready and has completed any requested migrations.options.sliceTopic
number? a number indicating at what level to slice the topic, i.e., only use a suffix. Used in robot-capabilities to slice off the topic prefix (namespaces).options.onHeartbeatGranted
beforeDisconnect
Run all registered hooks before disconnecting
call
Make an RPC request. Example:
mqttSync.call('/mySquare', 11, result => {
log.debug(`Called /mySquare with arg 11 and got ${result}`);
});
Alternative you can omit the callback and use async/await:
const result = await mqttSync.call('/mySquare', 11);
log.debug(`Called /mySquare with arg 11 and got ${result}`);
See the note about namespaces in register
.
Note: It is your responsibility to only call methods that exist (have been registered). Calling a non-existent command just hangs.
Parameters
command
args
callback
(optional, defaultundefined
)
clear
Delete all retained messages in a certain topic prefix, waiting for a mqtt broker heartbeat to collect existing retained. Use with care, never delete topics not owned by us. Harmless within capabilities, which are namespaced already.
options.filter(topic)
: a function that can be provided to further,
programmatically filter the set of topics to clear, e.g., to onlt clear
topics of old versions.
Note: This may not yet work in robot-capabilities, since the subscription prefix and received topic prefix don't match (the device prefix is added to subscription by localMQTT.
Parameters
prefixes
callback
(optional, defaultundefined
)options
(optional, default{}
)
clearThrottle
Clear the set throttling delay.
isPublished
Check whether we are publishing the given topic in a non-atomic way. This is used to determine whether to store the published value or not.
Parameters
topic
isSubscribed
check whether we are subscribed to the given topic
Parameters
topic
migrate
Migrate a list of {topic, newVersion, transform}
. The version number in
topic will be ignored, and all versions' values will be merged, applied in
order, such that the latest version is applied last.
Parameters
list
onReady
(optional, defaultundefined
)
onBeforeDisconnect
Register a new hook to be called before disconnecting
Parameters
fn
publish
Register a listener for path in data. Make sure to populate the data before calling this or set the data all at once afterwards.
With option "atomic" this will always send the whole sub-document, not flat changes. Useful, e.g., for desiredPackages, see https://github.com/chfritz/transitive/issues/85.
Parameters
topic
options
(optional, default{atomic:false}
)
Returns any true if publication added (false, e.g., when already present)
publishAtLevel
Publish all values at the given level of the given object under the given topic (plus sub-key, of course). TODO: Is this OK, or do we need to go through this.publish?
Parameters
topic
value
level
register
Register an RPC request handler. Example:
mqttSync.register('/mySquare', arg => {
log.debug('running /mySquare with args', arg);
return arg * arg;
});
Note that the command topic needs to be in the capabilities namespace like
any other topic. In robot capabilities, as usual, these can start in /
because the local mqtt bridge operated by the robot agent will place all
topics in their respective namespace. In the cloud and on the web you will
need to use the respective namespace, i.e.,
/orgId/deviceId/@scope/capName/capVersion/
.
Async/Await
Yes, you can make the handler async
and use await
inside of it. This
will be handled correctly, i.e., MqttSync will await the result of the
handler before responding to the RPC request client.
Parameters
command
handler
setThrottle
Set delay between processing of publishing queue in milliseconds. This allows you to effectively throttle the rate at which this instance will publish changes. Note that updates to a topic already in the queue will not cause multiple publications. Only the latest value will be published.
Parameters
delay
number? Number of milliseconds to wait between processing of publish queue.
subscribe
Subscribe to the given topic (and all sub-topics). The callback will indicate success/failure, not a message on the topic.
Parameters
topic
callback
(optional, defaultnoop
)
waitForHeartbeatOnce
register a callback for the next heartbeat from the broker
Parameters
callback
clone
Deep-clone the given object. All functionality is lost, just data is kept.
Parameters
obj
decodeJWT
Parse JWT and return the decoded payload (JSON).
Parameters
jwt
dropWildcardIds
reduce wildcards with Ids, such as +sessionId
, to just +
Parameters
x
forMatchIterator
Iterate through the object and invoke callback for each match of path (with named wildcards)
Parameters
obj
path
callback
pathSoFar
(optional, default[]
)matchSoFar
(optional, default{}
)
getLogger
Get a new loglevel logger; call with a name, e.g., module.id
. The returned
logger has methods trace, debug, info, warn, error. See
https://www.npmjs.com/package/loglevel for details.
getRandomId
Generate a random id (base36)
Parameters
bytes
(optional, default6
)
isPrefixOf
prefixArray is a prefix of the array
Parameters
prefixArray
array
isSubTopicOf
sub is a strict sub-topic of parent, and in particular not equal
Parameters
sub
parent
mergeVersions
given an object where the keys are versions, merge this into one object where the latest version of each subfield overwrites any previous
Parameters
versionsObject
subTopic
(optional, defaultundefined
)options
(optional, default{}
)
mqttClearRetained
delete all retained messages in a certain topic prefix, waiting for a given delay to collect existing retained. Use with care, never delete topics not owned by us. Harmless within capabilities, which are namespaced already.
Parameters
mqttClient
prefixes
callback
delay
(optional, default1000
)
parseMQTTTopic
parse an MQTT topic according to our topic schema
Parameters
topic
parseMQTTUsername
parse usernames used in MQTT
Parameters
username