MQTT

Since Camel 2.10

The mqtt: component is used for communicating with MQTT compliant message brokers, like Apache ActiveMQ or Mosquitto

Maven users will need to add the following dependency to their pom.xml for this component:

<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-mqtt</artifactId>
    <version>x.x.x</version>
    <!-- use the same version as your Camel core version -->
</dependency>

URI format

mqtt://name[?options]

Where name is the name you want to assign the component.

Options

The MQTT component supports 4 options, which are listed below.

Name Description Default Type

host (common)

The URI of the MQTT broker to connect too - this component also supports SSL - e.g. ssl://127.0.0.1:8883

String

userName (security)

Username to be used for authentication against the MQTT broker

String

password (security)

Password to be used for authentication against the MQTT broker

String

resolveProperty Placeholders (advanced)

Whether the component should resolve property placeholders on itself when starting. Only properties which are of String type can use property placeholders.

true

boolean

The MQTT endpoint is configured using URI syntax:

mqtt:name

with the following path and query parameters:

Path Parameters (1 parameters):

Name Description Default Type

name

Required A logical name to use which is not the topic name.

String

Query Parameters (39 parameters):

Name Description Default Type

blockingExecutor (common)

SSL connections perform blocking operations against internal thread pool unless you call the setBlockingExecutor method to configure that executor they will use instead.

Executor

byDefaultRetain (common)

The default retain policy to be used on messages sent to the MQTT broker

false

boolean

cleanSession (common)

Set to false if you want the MQTT server to persist topic subscriptions and ack positions across client sessions. Defaults to true.

false

boolean

clientId (common)

Use to set the client Id of the session. This is what an MQTT server uses to identify a session where setCleanSession(false); is being used. The id must be 23 characters or less. Defaults to auto generated id (based on your socket address, port and timestamp).

String

connectAttemptsMax (common)

The maximum number of reconnect attempts before an error is reported back to the client on the first attempt by the client to connect to a server. Set to -1 to use unlimited attempts. Defaults to -1.

-1

long

connectWaitInSeconds (common)

Delay in seconds the Component will wait for a connection to be established to the MQTT broker

10

int

disconnectWaitInSeconds (common)

The number of seconds the Component will wait for a valid disconnect on stop() from the MQTT broker

5

int

dispatchQueue (common)

A HawtDispatch dispatch queue is used to synchronize access to the connection. If an explicit queue is not configured via the setDispatchQueue method, then a new queue will be created for the connection. Setting an explicit queue might be handy if you want multiple connection to share the same queue for synchronization.

DispatchQueue

host (common)

The URI of the MQTT broker to connect too - this component also supports SSL - e.g. ssl://127.0.0.1:8883

tcp://127.0.0.1:1883

URI

keepAlive (common)

Configures the Keep Alive timer in seconds. Defines the maximum time interval between messages received from a client. It enables the server to detect that the network connection to a client has dropped, without having to wait for the long TCP/IP timeout.

short

localAddress (common)

The local InetAddress and port to use

URI

maxReadRate (common)

Sets the maximum bytes per second that this transport will receive data at. This setting throttles reads so that the rate is not exceeded. Defaults to 0 which disables throttling.

int

maxWriteRate (common)

Sets the maximum bytes per second that this transport will send data at. This setting throttles writes so that the rate is not exceeded. Defaults to 0 which disables throttling.

int

mqttQosPropertyName (common)

The property name to look for on an Exchange for an individual published message. If this is set (one of AtMostOnce, AtLeastOnce or ExactlyOnce ) - then that QoS will be set on the message sent to the MQTT message broker.

MQTTQos

String

mqttRetainPropertyName (common)

The property name to look for on an Exchange for an individual published message. If this is set (expects a Boolean value) - then the retain property will be set on the message sent to the MQTT message broker.

MQTTRetain

String

mqttTopicPropertyName (common)

These a properties that are looked for in an Exchange - to publish to

MQTTTopicPropertyName

String

publishTopicName (common)

The default Topic to publish messages on

camel/mqtt/test

String

qualityOfService (common)

Quality of service level to use for topics.

AtLeastOnce

String

receiveBufferSize (common)

Sets the size of the internal socket receive buffer. Defaults to 65536 (64k)

65536

int

reconnectAttemptsMax (common)

The maximum number of reconnect attempts before an error is reported back to the client after a server connection had previously been established. Set to -1 to use unlimited attempts. Defaults to -1.

-1

long

reconnectBackOffMultiplier (common)

The Exponential backoff be used between reconnect attempts. Set to 1 to disable exponential backoff. Defaults to 2.

2.0

double

reconnectDelay (common)

How long to wait in ms before the first reconnect attempt. Defaults to 10.

10

long

reconnectDelayMax (common)

The maximum amount of time in ms to wait between reconnect attempts. Defaults to 30,000.

30000

long

sendBufferSize (common)

Sets the size of the internal socket send buffer. Defaults to 65536 (64k)

65536

int

sendWaitInSeconds (common)

The maximum time the Component will wait for a receipt from the MQTT broker to acknowledge a published message before throwing an exception

5

int

sslContext (common)

To configure security using SSLContext configuration

SSLContext

subscribeTopicName (common)

Deprecated These are set on the Endpoint - together with properties inherited from MQTT

String

subscribeTopicNames (common)

A comma-delimited list of Topics to subscribe to for messages. Note that each item of this list can contain MQTT wildcards ( and/or #), in order to subscribe to topics matching a certain pattern within a hierarchy. For example, is a wildcard for all topics at a level within the hierarchy, so if a broker has topics topics/one and topics/two, then topics/ can be used to subscribe to both. A caveat to consider here is that if the broker adds topics/three, the route would also begin to receive messages from that topic.

String

trafficClass (common)

Sets traffic class or type-of-service octet in the IP header for packets sent from the transport. Defaults to 8 which means the traffic should be optimized for throughput.

8

int

version (common)

Set to 3.1.1 to use MQTT version 3.1.1. Otherwise defaults to the 3.1 protocol version.

3.1

String

willMessage (common)

The Will message to send. Defaults to a zero length message.

String

willQos (common)

Sets the quality of service to use for the Will message. Defaults to AT_MOST_ONCE.

AtMostOnce

QoS

willRetain (common)

Set to true if you want the Will to be published with the retain option.

QoS

willTopic (common)

If set the server will publish the client’s Will message to the specified topics if the client has an unexpected disconnection.

String

bridgeErrorHandler (consumer)

Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored.

false

boolean

exceptionHandler (consumer)

To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored.

ExceptionHandler

exchangePattern (consumer)

Sets the exchange pattern when the consumer creates an exchange.

ExchangePattern

lazySessionCreation (producer)

Sessions can be lazily created to avoid exceptions, if the remote server is not up and running when the Camel producer is started.

true

boolean

synchronous (advanced)

Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported).

false

boolean

Spring Boot Auto-Configuration

When using Spring Boot make sure to use the following Maven dependency to have support for auto configuration:

<dependency>
  <groupId>org.apache.camel</groupId>
  <artifactId>camel-mqtt-starter</artifactId>
  <version>x.x.x</version>
  <!-- use the same version as your Camel core version -->
</dependency>

The component supports 5 options, which are listed below.

Name Description Default Type

camel.component.mqtt.enabled

Enable mqtt component

true

Boolean

camel.component.mqtt.host

The URI of the MQTT broker to connect too - this component also supports SSL - e.g. ssl://127.0.0.1:8883

String

camel.component.mqtt.password

Password to be used for authentication against the MQTT broker

String

camel.component.mqtt.resolve-property-placeholders

Whether the component should resolve property placeholders on itself when starting. Only properties which are of String type can use property placeholders.

true

Boolean

camel.component.mqtt.user-name

Username to be used for authentication against the MQTT broker

String

Samples

Sending messages:

from("direct:foo").to("mqtt:cheese?publishTopicName=test.mqtt.topic");

Consuming messages:

from("mqtt:bar?subscribeTopicName=test.mqtt.topic").transform(body().convertToString()).to("mock:result")

Endpoints

Camel supports the Message Endpoint pattern using the Endpoint interface. Endpoints are usually created by a Component and Endpoints are usually referred to in the DSL via their URIs.

From an Endpoint you can use the following methods