Skip to main content

Common (node.js + browser)

These classes and functions are available via both @transitive-sdk/utils and @transitive-sdk/utils-web.

DataCache

A class implementing a local data cache, used as a local data store with deduplication detection and update events. While this class is very handy you probably won't need to create instances of it directly. Instead use the mqttSync.data instance which holds the locally stored data subscribed/published from/to MQTTSync. For example on the robot:

// update/publish our status:
mqttSync.data.update('status', {changed: Date.now(), msg: 'OK'});
// subscribe to new user requests (e.g., from UI):
mqttSync.data.subscribePath('+user/request', (request, key, {user}) => {
log.debug(`user ${user} made request`, request);
});

In the cloud or in a web component you would need to use the full topic including org, device, scope, cap-name, and version.

Parameters

  • data (optional, default {})

filter

Filter the object using path with wildcards

Parameters
  • path

filterByTopic

Filter the object using topic with wildcards

Parameters
  • topic

forMatch

For each topic match, invoke the callback with the value, path, and match just like subscribePath, but on the current data rather than future changes.

Parameters
  • topic
  • callback

forPathMatch

For each path match, invoke the callback with the value, path, and match just like subscribePath

Parameters
  • path
  • callback

get

Get sub-value at path, or entire object if none given

Parameters
  • path (optional, default [])

getByTopic

Get sub-value specified by topic

Parameters
  • topic

subscribe

Add a callback for all change events.

Parameters
  • callback

subscribePath

Subscribe to a specific topic only. Callback receives value, key, matched, tags.

Parameters
  • topic
  • callback

subscribePathFlat

Same as subscribePath but always get all changes in flat form

Parameters
  • topic
  • callback

unsubscribe

Remove a callback previously registered using subscribe.

Parameters
  • callback

update

Update the value at the given path (array or dot separated string)

Parameters
  • path
  • value
  • tags

updateFromArray

Update the object with the given value at the given path, remove empty; return the flat changes (see toFlatObject). Add tags to updates to mark them somehow based on the context, e.g., so that some subscriptions can choose to ignore updates with a certain tag.

Parameters
  • path
  • value
  • tags (optional, default {})

updateFromModifier

Update data from a modifier object where keys are topic names to be interpreted as paths, and values are the values to set

Parameters
  • modifier
  • tags

updateFromTopic

Set value from the given topic (with or without leading or trailing slash)

Parameters
  • topic
  • value
  • tags

MqttSync

A class that combines DataCache and MQTT to implement a data synchronization feature over the latter. Relies on retained messages in mqtt for persistence.

Parameters

  • options object

    • options.mqttClient object An already connected mqtt.js client.
    • options.onChange function? A function that is called any time there is a change to the shared data. This is not usually used. It's usually better to use the finer grained MqttSync.data.subscribePath instead, that allows you to subscribe to changes just on a specific sun-object instead, see DataCache.
    • options.ignoreRetain boolean? retain all messages, ignorant of the retain flag.
    • options.migrate array? an array of objects of the form {topic, newVersion, level}. Only meaningful in the cloud. Instructs MQTTSync to first migrate existing topics to a new version namespace, publishing at the designated level down from the version level. For example:js [{ topic: `/myorg/mydevice/@local/my-cap/+/config`, newVersion: this.version, level: 1 }] Would migrate any existing data in the capability's config namespace to the current version of the package, publishing at the config/+ level (rather than atomically at the config level itself).
    • options.onReady function? A function that is called when the MQTTSync client is ready and has completed any requested migrations.
    • options.sliceTopic number? a number indicating at what level to slice the topic, i.e., only use a suffix. Used in robot-capabilities to slice off the topic prefix (namespaces).
    • options.onHeartbeatGranted

beforeDisconnect

Run all registered hooks before disconnecting

call

Make an RPC request. Example:

mqttSync.call('/mySquare', 11, result => {
log.debug(`Called /mySquare with arg 11 and got ${result}`);
});

Alternative you can omit the callback and use async/await:

const result = await mqttSync.call('/mySquare', 11);
log.debug(`Called /mySquare with arg 11 and got ${result}`);

See the note about namespaces in register.

Note: It is your responsibility to only call methods that exist (have been registered). Calling a non-existent command just hangs.

Parameters
  • command
  • args
  • callback (optional, default undefined)

clear

Delete all retained messages in a certain topic prefix, waiting for a mqtt broker heartbeat to collect existing retained. Use with care, never delete topics not owned by us. Harmless within capabilities, which are namespaced already.

options.filter(topic): a function that can be provided to further, programmatically filter the set of topics to clear, e.g., to onlt clear topics of old versions.

Note: This may not yet work in robot-capabilities, since the subscription prefix and received topic prefix don't match (the device prefix is added to subscription by localMQTT.

Parameters
  • prefixes
  • callback (optional, default undefined)
  • options (optional, default {})

clearThrottle

Clear the set throttling delay.

isPublished

Check whether we are publishing the given topic in a non-atomic way. This is used to determine whether to store the published value or not.

Parameters
  • topic

isSubscribed

check whether we are subscribed to the given topic

Parameters
  • topic

migrate

Migrate a list of {topic, newVersion, transform}. The version number in topic will be ignored, and all versions' values will be merged, applied in order, such that the latest version is applied last. topic may include wildcards in the part before the version number but not after.

Example:

mqttSync.migrate([{topic: '/+/dId/@scope/capname/+/b', newVersion: '1.2.0'}]
Parameters
  • list
  • onReady (optional, default undefined)

onBeforeDisconnect

Register a new hook to be called before disconnecting

Parameters
  • fn

publish

Register a listener for path in data. Make sure to populate the data before calling this or set the data all at once afterwards.

With option "atomic" this will always send the whole sub-document, not flat changes. Useful, e.g., for desiredPackages, see https://github.com/chfritz/transitive/issues/85.

Parameters
  • topic
  • options (optional, default {atomic:false})

Returns any true if publication added (false, e.g., when already present)

publishAtLevel

Publish all values at the given level of the given object under the given topic (plus sub-key, of course). TODO: Is this OK, or do we need to go through this.publish?

Parameters
  • topic
  • value
  • level

register

Register an RPC request handler. Example:

mqttSync.register('/mySquare', arg => {
log.debug('running /mySquare with args', arg);
return arg * arg;
});

Note that the command topic needs to be in the capabilities namespace like any other topic. In robot capabilities, as usual, these can start in / because the local mqtt bridge operated by the robot agent will place all topics in their respective namespace. In the cloud and on the web you will need to use the respective namespace, i.e., /orgId/deviceId/@scope/capName/capVersion/.

Async/Await

Yes, you can make the handler async and use await inside of it. This will be handled correctly, i.e., MqttSync will await the result of the handler before responding to the RPC request client.

Parameters
  • command
  • handler

setThrottle

Set delay between processing of publishing queue in milliseconds. This allows you to effectively throttle the rate at which this instance will publish changes. Note that updates to a topic already in the queue will not cause multiple publications. Only the latest value will be published.

Parameters
  • delay number? Number of milliseconds to wait between processing of publish queue.

subscribe

Subscribe to the given topic (and all sub-topics). The callback will indicate success/failure, not a message on the topic.

Parameters
  • topic
  • callback (optional, default noop)

waitForHeartbeatOnce

register a callback for the next heartbeat from the broker

Parameters
  • callback

clone

Deep-clone the given object. All functionality is lost, just data is kept.

Parameters

  • obj

decodeJWT

Parse JWT and return the decoded payload (JSON).

Parameters

  • jwt

dropWildcardIds

reduce wildcards with Ids, such as +sessionId, to just +

Parameters

  • x

forMatchIterator

Iterate through the object and invoke callback for each match of path (with named wildcards)

Parameters

  • obj
  • path
  • callback
  • pathSoFar (optional, default [])
  • matchSoFar (optional, default {})

getLogger

Get a new loglevel logger; call with a name, e.g., module.id. The returned logger has methods trace, debug, info, warn, error. See https://www.npmjs.com/package/loglevel for details.

getRandomId

Generate a random id (base36)

Parameters

  • bytes (optional, default 6)

isPrefixOf

prefixArray is a prefix of the array

Parameters

  • prefixArray
  • array

isSubTopicOf

sub is a strict sub-topic of parent, and in particular not equal

Parameters

  • sub
  • parent

mergeVersions

given an object where the keys are versions, merge this into one object where the latest version of each subfield overwrites any previous

Parameters

  • versionsObject
  • subTopic (optional, default undefined)
  • options (optional, default {})

mqttClearRetained

delete all retained messages in a certain topic prefix, waiting for a given delay to collect existing retained. Use with care, never delete topics not owned by us. Harmless within capabilities, which are namespaced already.

Parameters

  • mqttClient
  • prefixes
  • callback
  • delay (optional, default 1000)

parseMQTTTopic

parse an MQTT topic according to our topic schema

Parameters

  • topic

parseMQTTUsername

parse usernames used in MQTT

Parameters

  • username

pathToTopic

convert a path array to mqtt topic; reduces +id identifiers to +

Parameters

  • pathArray

selectFromObject

Given an object and a path with wildcards (* and +), modify the object to only contain elements matched by the path, e.g., {a: {b: 1, c: 2}, d: 2} and ['a','+'] would give {a: {b: 1, c: 2}}

Parameters

  • obj object The object to select from
  • path array An array specifying the path to select, potentially containing mqtt wildcards ('+').

setFromPath

Like _.set but without arrays. This allows using numbers as keys.

Parameters

  • obj
  • path
  • value

toFlatObject

Given an object, return a new flat object of topic+value pairs, e.g.:

{a: {b: 1, c: 2}, d: 3}{'/a/b': 1, '/a/c': 2, '/d': 3}

Note: not idempotent!

{'/a/b': 1, '/a/c': 2, d: 3}{'%2Fa%2Fb': 1, '%2Fa%2Fc': 2, '/d': 3}

Parameters

  • obj
  • prefix (optional, default [])
  • rtv (optional, default {})

topicMatch

match a slash-separated topic with a selector using +XYZ for (named) wildcards. Return the matching result.

Parameters

  • selector
  • topic

topicToPath

convert topic to path array

Parameters

  • topic

tryJSONParse

Try parsing JSON, return null if unsuccessful

Parameters

  • string

unset

Unset the topic in that obj, and clean up parent if empty, recursively. Return the path to the removed node.

Parameters

  • obj
  • path

updateObject

Given a modifier {"a/b/c": "xyz"} update the object obj such that obj.a.b.c = "xyz".

Parameters

  • obj
  • modifier

versionCompare

Compare to version strings. Return -1 if a is lower than b, 0 if they are equal, and 1 otherwise. If either is not a complete version, e.g., 2.0, interpret it as a range and use its minimum version for the comparison. Hence, 2.0 < 2.0.1.

Parameters

  • a
  • b

visit

Reusable visitor pattern: iteratively visits all nodes in the tree described by object, where childField indicates the child-of predicate.

Parameters

  • object
  • childField
  • visitor

visitAncestor

Given an object and a path, visit each ancestor of the path

Parameters

  • object
  • path
  • visitor
  • prefix (optional, default [])

wait

Wait for delay ms, for use in async functions.

Parameters

  • delay