Publish Subscribe MQ messages

Frame format

  • msg id frame
  • data = the data (json format) for the action

The message id

The message id is build from a couple of parts

  • category = string to subscribe to
  • timestamp = str(time()).replace(‘.’,’_’)
  • version = 0_1

example:

  • category = plugin.status
  • timestamp = 1371476851_54
  • version = 0_1

plugin.status.1371476851_54.0_1

The content

The content is a json encoded python dict

The Network

As domogik has a lot of different components that need to publish or subscribe messages a system is required to forward the messages.

The dmg_forwarder was build for this, the forwarder listens on 2 different ports, on for the publisher clients and one for the subscribed clients. So all clients (domogik components) that publish a messages will need to connect to the publisher listing port, the forwarder will then broadcast this message to all clients (domogik components) that are connected on the subscriber port.

The subscribing to certain messages is done on the client side, by using the subscribe socket of zmq.

The publisher

The MQPub class is used to publish messages onto the pub/sub network.

The class needs 2 parameters on init:
  • context = is an instance of zmq.Context()
  • caller_id = is name used to identify the client
For sending (publishing) messages there is a send_event method with 2 parameters:
  • category = used to generate the message id
  • content = a python object (mostly a dict) that well be json encoded to be used as the content of the message

The subscribers

Sync client

The MQSyncSub class is used to subscribe to certain messages on the pub/sub network.

The class needs 3 parameters on init:
  • context = is an instance of zmq.Context()
  • caller_id = is name used to identify the client
  • category_filters = a python list with the messages to subscribe to (string match on the beginning of the message id)

Then there is a blocking method called wait_for_event, this method will block until a message is received that matches one of the category_filters.

aSync client

The MQAsyncSub class is used to subscribe to certain messages on the pub/sub network.

The class needs 3 parameters on init:

  • context = is an instance of zmq.Context()
  • caller_id = is name used to identify the client
  • category_filters = a python list with the messages to subscribe to (string match on the beginning of the message id)

The on_message callback will be called when a message is received that matches one of the category_filters. The on_message callback has 2 parameters:

  • msg_id = is the message id of the message
  • content = a python object (probably a dict) that represent the json decoded message content