camel-rabbitmq-kafka-connector sink configuration

When using camel-rabbitmq-kafka-connector as sink make sure to use the following Maven dependency to have support for the connector:

<dependency>
  <groupId>org.apache.camel.kafkaconnector</groupId>
  <artifactId>camel-rabbitmq-kafka-connector</artifactId>
  <version>x.x.x</version>
  <!-- use the same version as your Camel Kafka connector version -->
</dependency>

To use this Sink connector in Kafka connect you’ll need to set the following connector.class

connector.class=org.apache.camel.kafkaconnector.rabbitmq.CamelRabbitmqSinkConnector

The camel-rabbitmq sink connector supports 101 options, which are listed below.

Name Description Default Required Priority

camel.sink.path.exchangeName

The exchange name determines the exchange to which the produced messages will be sent to. In the case of consumers, the exchange name determines the exchange the queue will be bound to.

null

true

HIGH

camel.sink.endpoint.addresses

If this option is set, camel-rabbitmq will try to create connection based on the setting of option addresses. The addresses value is a string which looks like server1:12345, server2:12345

null

false

MEDIUM

camel.sink.endpoint.autoDelete

If it is true, the exchange will be deleted when it is no longer in use

true

false

MEDIUM

camel.sink.endpoint.automaticRecoveryEnabled

Enables connection automatic recovery (uses connection implementation that performs automatic recovery when existing connection has failures)

"true"

false

MEDIUM

camel.sink.endpoint.connectionFactory

To use a custom RabbitMQ connection factory. When this option is set, all connection options (connectionTimeout, requestedChannelMax…​) set on URI are not used

null

false

MEDIUM

camel.sink.endpoint.deadLetterExchange

The name of the dead letter exchange

null

false

MEDIUM

camel.sink.endpoint.deadLetterExchangeType

The type of the dead letter exchange One of: [direct] [fanout] [headers] [topic]

"direct"

false

MEDIUM

camel.sink.endpoint.deadLetterQueue

The name of the dead letter queue

null

false

MEDIUM

camel.sink.endpoint.deadLetterRoutingKey

The routing key for the dead letter exchange

null

false

MEDIUM

camel.sink.endpoint.declare

If the option is true, camel declare the exchange and queue name and bind them together. If the option is false, camel won’t declare the exchange and queue name on the server.

true

false

MEDIUM

camel.sink.endpoint.durable

If we are declaring a durable exchange (the exchange will survive a server restart)

true

false

MEDIUM

camel.sink.endpoint.exchangeType

The exchange type such as direct or topic. One of: [direct] [fanout] [headers] [topic]

"direct"

false

MEDIUM

camel.sink.endpoint.exclusive

Exclusive queues may only be accessed by the current connection, and are deleted when that connection closes.

false

false

MEDIUM

camel.sink.endpoint.hostname

The hostname of the running rabbitmq instance or cluster.

null

false

MEDIUM

camel.sink.endpoint.passive

Passive queues depend on the queue already to be available at RabbitMQ.

false

false

MEDIUM

camel.sink.endpoint.portNumber

Port number for the host with the running rabbitmq instance or cluster. Default value is 5672.

null

false

MEDIUM

camel.sink.endpoint.queue

The queue to receive messages from

null

false

MEDIUM

camel.sink.endpoint.routingKey

The routing key to use when binding a consumer queue to the exchange. For producer routing keys, you set the header rabbitmq.ROUTING_KEY.

null

false

MEDIUM

camel.sink.endpoint.skipDlqDeclare

If true the producer will not declare and bind a dead letter queue. This can be used if you have also DLQ rabbitmq consumer and you want to avoid argument clashing between Producer and Consumer. This option have no effect, if DLQ configured (deadLetterExchange option is not set).

false

false

MEDIUM

camel.sink.endpoint.skipExchangeDeclare

This can be used if we need to declare the queue but not the exchange

false

false

MEDIUM

camel.sink.endpoint.skipQueueBind

If true the queue will not be bound to the exchange after declaring it

false

false

MEDIUM

camel.sink.endpoint.skipQueueDeclare

If true the producer will not declare and bind a queue. This can be used for directing messages via an existing routing key.

false

false

MEDIUM

camel.sink.endpoint.vhost

The vhost for the channel

"/"

false

MEDIUM

camel.sink.endpoint.additionalHeaders

Map of additional headers. These headers will be set only when the 'allowCustomHeaders' is set to true

null

false

MEDIUM

camel.sink.endpoint.additionalProperties

Map of additional properties. These are standard RabbitMQ properties as defined in com.rabbitmq.client.AMQP.BasicProperties. The map keys should be from org.apache.camel.component.rabbitmq.RabbitMQConstants. Any other keys will be ignored.

null

false

MEDIUM

camel.sink.endpoint.allowCustomHeaders

Allow pass custom values to header

false

false

MEDIUM

camel.sink.endpoint.allowNullHeaders

Allow pass null values to header

false

false

MEDIUM

camel.sink.endpoint.bridgeEndpoint

If the bridgeEndpoint is true, the producer will ignore the message header of rabbitmq.EXCHANGE_NAME and rabbitmq.ROUTING_KEY

false

false

MEDIUM

camel.sink.endpoint.channelPoolMaxSize

Get maximum number of opened channel in pool

10

false

MEDIUM

camel.sink.endpoint.channelPoolMaxWait

Set the maximum number of milliseconds to wait for a channel from the pool

1000L

false

MEDIUM

camel.sink.endpoint.guaranteedDeliveries

When true, an exception will be thrown when the message cannot be delivered (basic.return) and the message is marked as mandatory. PublisherAcknowledgement will also be activated in this case. See also publisher acknowledgements - When will messages be confirmed.

false

false

MEDIUM

camel.sink.endpoint.immediate

This flag tells the server how to react if the message cannot be routed to a queue consumer immediately. If this flag is set, the server will return an undeliverable message with a Return method. If this flag is zero, the server will queue the message, but with no guarantee that it will ever be consumed. If the header is present rabbitmq.IMMEDIATE it will override this option.

false

false

MEDIUM

camel.sink.endpoint.lazyStartProducer

Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel’s routing error handlers. Beware that when the first message is processed then creating and starting the producer may take a little time and prolong the total processing time of the processing.

false

false

MEDIUM

camel.sink.endpoint.mandatory

This flag tells the server how to react if the message cannot be routed to a queue. If this flag is set, the server will return an unroutable message with a Return method. If this flag is zero, the server silently drops the message. If the header is present rabbitmq.MANDATORY it will override this option.

false

false

MEDIUM

camel.sink.endpoint.publisherAcknowledgements

When true, the message will be published with publisher acknowledgements turned on

false

false

MEDIUM

camel.sink.endpoint.publisherAcknowledgements Timeout

The amount of time in milliseconds to wait for a basic.ack response from RabbitMQ server

null

false

MEDIUM

camel.sink.endpoint.args

Specify arguments for configuring the different RabbitMQ concepts, a different prefix is required for each: Exchange: arg.exchange. Queue: arg.queue. Binding: arg.binding. DLQ: arg.dlq.queue. DLQ binding: arg.dlq.binding. For example to declare a queue with message ttl argument: http://localhost:5672/exchange/queueargs=arg.queue.x-message-ttl=60000

null

false

MEDIUM

camel.sink.endpoint.clientProperties

Connection client properties (client info used in negotiating with the server)

null

false

MEDIUM

camel.sink.endpoint.connectionFactoryException Handler

Custom rabbitmq ExceptionHandler for ConnectionFactory

null

false

MEDIUM

camel.sink.endpoint.connectionTimeout

Connection timeout

60000

false

MEDIUM

camel.sink.endpoint.networkRecoveryInterval

Network recovery interval in milliseconds (interval used when recovering from network failure)

"5000"

false

MEDIUM

camel.sink.endpoint.requestedChannelMax

Connection requested channel max (max number of channels offered)

2047

false

MEDIUM

camel.sink.endpoint.requestedFrameMax

Connection requested frame max (max size of frame offered)

0

false

MEDIUM

camel.sink.endpoint.requestedHeartbeat

Connection requested heartbeat (heart-beat in seconds offered)

60

false

MEDIUM

camel.sink.endpoint.requestTimeout

Set timeout for waiting for a reply when using the InOut Exchange Pattern (in milliseconds)

20000L

false

MEDIUM

camel.sink.endpoint.requestTimeoutCheckerInterval

Set requestTimeoutCheckerInterval for inOut exchange

1000L

false

MEDIUM

camel.sink.endpoint.synchronous

Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported).

false

false

MEDIUM

camel.sink.endpoint.topologyRecoveryEnabled

Enables connection topology recovery (should topology recovery be performed)

null

false

MEDIUM

camel.sink.endpoint.transferException

When true and an inOut Exchange failed on the consumer side send the caused Exception back in the response

false

false

MEDIUM

camel.sink.endpoint.allowMessageBodySerialization

Whether to allow Java serialization of the message body or not. If this value is true, the message body will be serialized on the producer side using Java serialization, if no type converter can handle the message body. On the consumer side, it will deserialize the message body if this value is true and the message contains a CamelSerialize header. Setting this value to true may introduce a security vulnerability as it allows an attacker to attempt to deserialize to a gadget object which could result in a RCE or other security vulnerability.

false

false

MEDIUM

camel.sink.endpoint.password

Password for authenticated access

"guest"

false

MEDIUM

camel.sink.endpoint.sslProtocol

Enables SSL on connection, accepted value are true, TLS and 'SSLv3

null

false

MEDIUM

camel.sink.endpoint.trustManager

Configure SSL trust manager, SSL should be enabled for this option to be effective

null

false

MEDIUM

camel.sink.endpoint.username

Username in case of authenticated access

"guest"

false

MEDIUM

camel.component.rabbitmq.addresses

If this option is set, camel-rabbitmq will try to create connection based on the setting of option addresses. The addresses value is a string which looks like server1:12345, server2:12345

null

false

MEDIUM

camel.component.rabbitmq.autoDelete

If it is true, the exchange will be deleted when it is no longer in use

true

false

MEDIUM

camel.component.rabbitmq.connectionFactory

To use a custom RabbitMQ connection factory. When this option is set, all connection options (connectionTimeout, requestedChannelMax…​) set on URI are not used

null

false

MEDIUM

camel.component.rabbitmq.deadLetterExchange

The name of the dead letter exchange

null

false

MEDIUM

camel.component.rabbitmq.deadLetterExchangeType

The type of the dead letter exchange One of: [direct] [fanout] [headers] [topic]

"direct"

false

MEDIUM

camel.component.rabbitmq.deadLetterQueue

The name of the dead letter queue

null

false

MEDIUM

camel.component.rabbitmq.deadLetterRoutingKey

The routing key for the dead letter exchange

null

false

MEDIUM

camel.component.rabbitmq.declare

If the option is true, camel declare the exchange and queue name and bind them together. If the option is false, camel won’t declare the exchange and queue name on the server.

true

false

MEDIUM

camel.component.rabbitmq.durable

If we are declaring a durable exchange (the exchange will survive a server restart)

true

false

MEDIUM

camel.component.rabbitmq.exclusive

Exclusive queues may only be accessed by the current connection, and are deleted when that connection closes.

false

false

MEDIUM

camel.component.rabbitmq.hostname

The hostname of the running RabbitMQ instance or cluster.

null

false

MEDIUM

camel.component.rabbitmq.passive

Passive queues depend on the queue already to be available at RabbitMQ.

false

false

MEDIUM

camel.component.rabbitmq.portNumber

Port number for the host with the running rabbitmq instance or cluster.

5672

false

MEDIUM

camel.component.rabbitmq.skipExchangeDeclare

This can be used if we need to declare the queue but not the exchange

false

false

MEDIUM

camel.component.rabbitmq.skipQueueBind

If true the queue will not be bound to the exchange after declaring it

false

false

MEDIUM

camel.component.rabbitmq.skipQueueDeclare

If true the producer will not declare and bind a queue. This can be used for directing messages via an existing routing key.

false

false

MEDIUM

camel.component.rabbitmq.vhost

The vhost for the channel

"/"

false

MEDIUM

camel.component.rabbitmq.additionalHeaders

Map of additional headers. These headers will be set only when the 'allowCustomHeaders' is set to true

null

false

MEDIUM

camel.component.rabbitmq.additionalProperties

Map of additional properties. These are standard RabbitMQ properties as defined in com.rabbitmq.client.AMQP.BasicProperties The map keys should be from org.apache.camel.component.rabbitmq.RabbitMQConstants. Any other keys will be ignored. When the message already contains these headers they will be given precedence over these properties.

null

false

MEDIUM

camel.component.rabbitmq.allowNullHeaders

Allow pass null values to header

false

false

MEDIUM

camel.component.rabbitmq.channelPoolMaxSize

Get maximum number of opened channel in pool

10

false

MEDIUM

camel.component.rabbitmq.channelPoolMaxWait

Set the maximum number of milliseconds to wait for a channel from the pool

1000L

false

MEDIUM

camel.component.rabbitmq.guaranteedDeliveries

When true, an exception will be thrown when the message cannot be delivered (basic.return) and the message is marked as mandatory. PublisherAcknowledgement will also be activated in this case. See also publisher acknowledgements - When will messages be confirmed.

false

false

MEDIUM

camel.component.rabbitmq.immediate

This flag tells the server how to react if the message cannot be routed to a queue consumer immediately. If this flag is set, the server will return an undeliverable message with a Return method. If this flag is zero, the server will queue the message, but with no guarantee that it will ever be consumed. If the header is present rabbitmq.IMMEDIATE it will override this option.

false

false

MEDIUM

camel.component.rabbitmq.lazyStartProducer

Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel’s routing error handlers. Beware that when the first message is processed then creating and starting the producer may take a little time and prolong the total processing time of the processing.

false

false

MEDIUM

camel.component.rabbitmq.mandatory

This flag tells the server how to react if the message cannot be routed to a queue. If this flag is set, the server will return an unroutable message with a Return method. If this flag is zero, the server silently drops the message. If the header is present rabbitmq.MANDATORY it will override this option.

false

false

MEDIUM

camel.component.rabbitmq.publisherAcknowledgements

When true, the message will be published with publisher acknowledgements turned on

false

false

MEDIUM

camel.component.rabbitmq.publisherAcknowledgements Timeout

The amount of time in milliseconds to wait for a basic.ack response from RabbitMQ server

null

false

MEDIUM

camel.component.rabbitmq.args

Specify arguments for configuring the different RabbitMQ concepts, a different prefix is required for each: Exchange: arg.exchange. Queue: arg.queue. Binding: arg.binding. DLQ: arg.dlq.queue. DLQ Binding: arg.dlq.binding. For example to declare a queue with message ttl argument: http://localhost:5672/exchange/queueargs=arg.queue.x-message-ttl=60000

null

false

MEDIUM

camel.component.rabbitmq.autoDetectConnection Factory

Whether to auto-detect looking up RabbitMQ connection factory from the registry. When enabled and a single instance of the connection factory is found then it will be used. An explicit connection factory can be configured on the component or endpoint level which takes precedence.

true

false

MEDIUM

camel.component.rabbitmq.automaticRecoveryEnabled

Enables connection automatic recovery (uses connection implementation that performs automatic recovery when connection shutdown is not initiated by the application)

null

false

MEDIUM

camel.component.rabbitmq.autowiredEnabled

Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc.

true

false

MEDIUM

camel.component.rabbitmq.clientProperties

Connection client properties (client info used in negotiating with the server)

null

false

MEDIUM

camel.component.rabbitmq.connectionFactory ExceptionHandler

Custom rabbitmq ExceptionHandler for ConnectionFactory

null

false

MEDIUM

camel.component.rabbitmq.connectionTimeout

Connection timeout

60000

false

MEDIUM

camel.component.rabbitmq.networkRecoveryInterval

Network recovery interval in milliseconds (interval used when recovering from network failure)

"5000"

false

MEDIUM

camel.component.rabbitmq.requestedChannelMax

Connection requested channel max (max number of channels offered)

2047

false

MEDIUM

camel.component.rabbitmq.requestedFrameMax

Connection requested frame max (max size of frame offered)

0

false

MEDIUM

camel.component.rabbitmq.requestedHeartbeat

Connection requested heartbeat (heart-beat in seconds offered)

60

false

MEDIUM

camel.component.rabbitmq.requestTimeout

Set timeout for waiting for a reply when using the InOut Exchange Pattern (in milliseconds)

20000L

false

MEDIUM

camel.component.rabbitmq.requestTimeoutChecker Interval

Set requestTimeoutCheckerInterval for inOut exchange

1000L

false

MEDIUM

camel.component.rabbitmq.topologyRecoveryEnabled

Enables connection topology recovery (should topology recovery be performed)

null

false

MEDIUM

camel.component.rabbitmq.transferException

When true and an inOut Exchange failed on the consumer side send the caused Exception back in the response

false

false

MEDIUM

camel.component.rabbitmq.password

Password for authenticated access

"guest"

false

MEDIUM

camel.component.rabbitmq.sslProtocol

Enables SSL on connection, accepted value are true, TLS and 'SSLv3

null

false

MEDIUM

camel.component.rabbitmq.trustManager

Configure SSL trust manager, SSL should be enabled for this option to be effective

null

false

MEDIUM

camel.component.rabbitmq.username

Username in case of authenticated access

"guest"

false

MEDIUM

The camel-rabbitmq sink connector has no converters out of the box.

The camel-rabbitmq sink connector has no transforms out of the box.

The camel-rabbitmq sink connector has no aggregation strategies out of the box.