ZooKeeper
Since Camel 2.9
The ZooKeeper component allows interaction with a ZooKeeper cluster and exposes the following features to Camel:
-
Creation of nodes in any of the ZooKeeper create modes.
-
Get and Set the data contents of arbitrary cluster nodes (data being set must be convertible to
byte[]
). -
Create and retrieve the list the child nodes attached to a particular node.
-
A Distributed
RoutePolicy
that leverages a Leader election coordinated by ZooKeeper to determine if exchanges should get processed.
Maven users will need to add the following dependency to their pom.xml
for this component:
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-zookeeper</artifactId>
<version>x.x.x</version>
<!-- use the same version as your Camel core version -->
</dependency>
URI format
zookeeper://zookeeper-server[:port][/path][?options]
The path from the URI specifies the node in the ZooKeeper server (a.k.a. znode) that will be the target of the endpoint:
Options
The ZooKeeper component supports 2 options, which are listed below.
Name | Description | Default | Type |
---|---|---|---|
configuration (advanced) |
To use a shared ZooKeeperConfiguration |
ZooKeeperConfiguration |
|
resolveProperty Placeholders (advanced) |
Whether the component should resolve property placeholders on itself when starting. Only properties which are of String type can use property placeholders. |
true |
boolean |
The ZooKeeper endpoint is configured using URI syntax:
zookeeper:serverUrls/path
with the following path and query parameters:
Path Parameters (2 parameters):
Name | Description | Default | Type |
---|---|---|---|
serverUrls |
Required The zookeeper server hosts (multiple servers can be separated by comma) |
String |
|
path |
Required The node in the ZooKeeper server (aka znode) |
String |
Query Parameters (12 parameters):
Name | Description | Default | Type |
---|---|---|---|
awaitExistence (common) |
Deprecated Not in use |
true |
boolean |
listChildren (common) |
Whether the children of the node should be listed |
false |
boolean |
timeout (common) |
The time interval to wait on connection before timing out. |
5000 |
int |
backoff (consumer) |
The time interval to backoff for after an error before retrying. |
5000 |
long |
bridgeErrorHandler (consumer) |
Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. |
false |
boolean |
repeat (consumer) |
Should changes to the znode be 'watched' and repeatedly processed. |
false |
boolean |
sendEmptyMessageOnDelete (consumer) |
Upon the delete of a znode, should an empty message be send to the consumer |
true |
boolean |
exceptionHandler (consumer) |
To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored. |
ExceptionHandler |
|
exchangePattern (consumer) |
Sets the exchange pattern when the consumer creates an exchange. |
ExchangePattern |
|
create (producer) |
Should the endpoint create the node if it does not currently exist. |
false |
boolean |
createMode (producer) |
The create mode that should be used for the newly created node |
EPHEMERAL |
String |
synchronous (advanced) |
Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported). |
false |
boolean |
Spring Boot Auto-Configuration
When using Spring Boot make sure to use the following Maven dependency to have support for auto configuration:
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-zookeeper-starter</artifactId>
<version>x.x.x</version>
<!-- use the same version as your Camel core version -->
</dependency>
The component supports 57 options, which are listed below.
Name | Description | Default | Type |
---|---|---|---|
camel.component.zookeeper.cluster.service.attributes |
Custom service attributes. |
Map |
|
camel.component.zookeeper.cluster.service.auth-info-list |
List |
||
camel.component.zookeeper.cluster.service.base-path |
String |
||
camel.component.zookeeper.cluster.service.connection-timeout |
Long |
||
camel.component.zookeeper.cluster.service.connection-timeout-unit |
TimeUnit |
||
camel.component.zookeeper.cluster.service.curator-framework |
CuratorFramework |
||
camel.component.zookeeper.cluster.service.enabled |
Sets if the zookeeper cluster service should be enabled or not, default is false. |
false |
Boolean |
camel.component.zookeeper.cluster.service.id |
Cluster Service ID |
String |
|
camel.component.zookeeper.cluster.service.max-close-wait |
Long |
||
camel.component.zookeeper.cluster.service.max-close-wait-unit |
TimeUnit |
||
camel.component.zookeeper.cluster.service.namespace |
String |
||
camel.component.zookeeper.cluster.service.nodes |
List |
||
camel.component.zookeeper.cluster.service.order |
Service lookup order/priority. |
Integer |
|
camel.component.zookeeper.cluster.service.reconnect-base-sleep-time |
Long |
||
camel.component.zookeeper.cluster.service.reconnect-base-sleep-time-unit |
TimeUnit |
||
camel.component.zookeeper.cluster.service.reconnect-max-retries |
Integer |
||
camel.component.zookeeper.cluster.service.reconnect-max-sleep-time |
Long |
||
camel.component.zookeeper.cluster.service.reconnect-max-sleep-time-unit |
TimeUnit |
||
camel.component.zookeeper.cluster.service.retry-policy |
RetryPolicy |
||
camel.component.zookeeper.cluster.service.session-timeout |
Long |
||
camel.component.zookeeper.cluster.service.session-timeout-unit |
TimeUnit |
||
camel.component.zookeeper.configuration.backoff |
The time interval to backoff for after an error before retrying. |
5000 |
Long |
camel.component.zookeeper.configuration.create |
Should the endpoint create the node if it does not currently exist. |
false |
Boolean |
camel.component.zookeeper.configuration.create-mode |
The create mode that should be used for the newly created node |
EPHEMERAL |
String |
camel.component.zookeeper.configuration.list-children |
Whether the children of the node should be listed |
false |
Boolean |
camel.component.zookeeper.configuration.path |
The node in the ZooKeeper server (aka znode) |
String |
|
camel.component.zookeeper.configuration.repeat |
Should changes to the znode be 'watched' and repeatedly processed. |
false |
Boolean |
camel.component.zookeeper.configuration.send-empty-message-on-delete |
Upon the delete of a znode, should an empty message be send to the consumer |
true |
Boolean |
camel.component.zookeeper.configuration.servers |
The zookeeper server hosts |
List |
|
camel.component.zookeeper.configuration.timeout |
The time interval to wait on connection before timing out. |
5000 |
Integer |
camel.component.zookeeper.enabled |
Enable zookeeper component |
true |
Boolean |
camel.component.zookeeper.resolve-property-placeholders |
Whether the component should resolve property placeholders on itself when starting. Only properties which are of String type can use property placeholders. |
true |
Boolean |
camel.component.zookeeper.service-registry.attributes |
Custom service attributes. |
Map |
|
camel.component.zookeeper.service-registry.auth-info-list |
List |
||
camel.component.zookeeper.service-registry.base-path |
String |
||
camel.component.zookeeper.service-registry.connection-timeout |
Long |
||
camel.component.zookeeper.service-registry.connection-timeout-unit |
TimeUnit |
||
camel.component.zookeeper.service-registry.curator-framework |
CuratorFramework |
||
camel.component.zookeeper.service-registry.deregister-services-on-stop |
Boolean |
||
camel.component.zookeeper.service-registry.enabled |
Sets if the zookeeper service registry should be enabled or not, default is false. |
false |
Boolean |
camel.component.zookeeper.service-registry.id |
Service Registry ID |
String |
|
camel.component.zookeeper.service-registry.max-close-wait |
Long |
||
camel.component.zookeeper.service-registry.max-close-wait-unit |
TimeUnit |
||
camel.component.zookeeper.service-registry.namespace |
String |
||
camel.component.zookeeper.service-registry.nodes |
List |
||
camel.component.zookeeper.service-registry.order |
Service lookup order/priority. |
Integer |
|
camel.component.zookeeper.service-registry.override-service-host |
Boolean |
||
camel.component.zookeeper.service-registry.reconnect-base-sleep-time |
Long |
||
camel.component.zookeeper.service-registry.reconnect-base-sleep-time-unit |
TimeUnit |
||
camel.component.zookeeper.service-registry.reconnect-max-retries |
Integer |
||
camel.component.zookeeper.service-registry.reconnect-max-sleep-time |
Long |
||
camel.component.zookeeper.service-registry.reconnect-max-sleep-time-unit |
TimeUnit |
||
camel.component.zookeeper.service-registry.retry-policy |
RetryPolicy |
||
camel.component.zookeeper.service-registry.service-host |
String |
||
camel.component.zookeeper.service-registry.session-timeout |
Long |
||
camel.component.zookeeper.service-registry.session-timeout-unit |
TimeUnit |
||
camel.component.zookeeper.configuration.await-existence |
Deprecated Not in use |
true |
Boolean |
Use cases
Reading from a znode
The following snippet will read the data from the znode
/somepath/somenode/
provided that it already exists. The data
retrieved will be placed into an exchange and passed onto
the rest of the route:
from("zookeeper://localhost:39913/somepath/somenode").to("mock:result");
If the node does not yet exist then a flag can be supplied to have the endpoint await its creation:
from("zookeeper://localhost:39913/somepath/somenode?awaitCreation=true").to("mock:result");
Reading from a znode
When data is read due to a WatchedEvent
received from the ZooKeeper
ensemble, the CamelZookeeperEventType
header holds ZooKeeper’s
EventType
value from that WatchedEvent
. If the data is read initially (not
triggered by a WatchedEvent
) the CamelZookeeperEventType
header will not
be set.
Writing to a znode
The following snippet will write the payload of the exchange into the
znode at /somepath/somenode/
provided that it already exists:
from("direct:write-to-znode")
.to("zookeeper://localhost:39913/somepath/somenode");
For flexibility, the endpoint allows the target znode to be specified
dynamically as a message header. If a header keyed by the string
CamelZooKeeperNode
is present then the value of the header will be
used as the path to the znode on the server. For instance using the same
route definition above, the following code snippet will write the data
not to /somepath/somenode
but to the path from the header
/somepath/someothernode
.
the testPayload must be convertible
to byte[] as the data stored in ZooKeeper is byte based.
|
Object testPayload = ...
template.sendBodyAndHeader("direct:write-to-znode", testPayload, "CamelZooKeeperNode", "/somepath/someothernode");
To also create the node if it does not exist the create
option should
be used.
from("direct:create-and-write-to-znode")
.to("zookeeper://localhost:39913/somepath/somenode?create=true");
Starting version 2.11 it is also possible to delete a node using the
header CamelZookeeperOperation
by setting it to DELETE
:
from("direct:delete-znode")
.setHeader(ZooKeeperMessage.ZOOKEEPER_OPERATION, constant("DELETE"))
.to("zookeeper://localhost:39913/somepath/somenode");
or equivalently:
<route>
<from uri="direct:delete-znode" />
<setHeader headerName="CamelZookeeperOperation">
<constant>DELETE</constant>
</setHeader>
<to uri="zookeeper://localhost:39913/somepath/somenode" />
</route>
ZooKeeper nodes can have different types; they can be 'Ephemeral' or
'Persistent' and 'Sequenced' or 'Unsequenced'. For further information
of each type you can check
here.
By default endpoints will create unsequenced, ephemeral nodes, but the
type can be easily manipulated via a uri config parameter or via a
special message header. The values expected for the create mode are
simply the names from the CreateMode
enumeration:
-
PERSISTENT
-
PERSISTENT_SEQUENTIAL
-
EPHEMERAL
-
EPHEMERAL_SEQUENTIAL
For example to create a persistent znode via the URI config:
from("direct:create-and-write-to-persistent-znode")
.to("zookeeper://localhost:39913/somepath/somenode?create=true&createMode=PERSISTENT");
or using the header CamelZookeeperCreateMode
.
the testPayload must be convertible to byte[] as the data stored in
ZooKeeper is byte based.
|
Object testPayload = ...
template.sendBodyAndHeader("direct:create-and-write-to-persistent-znode", testPayload, "CamelZooKeeperCreateMode", "PERSISTENT");
ZooKeeper enabled Route policies
This functionality with route policy is deprecated, instead use ZooKeeperClusterService
or the camel-zookeeper-master component.
|
ZooKeeper allows for very simple and effective leader election out of
the box. This component exploits this election capability in a
RoutePolicy
to control when and how routes are
enabled. This policy would typically be used in fail-over scenarios, to
control identical instances of a route across a cluster of Camel based
servers. A very common scenario is a simple 'Master-Slave' setup where
there are multiple instances of a route distributed across a cluster but
only one of them, that of the master, should be running at a time. If
the master fails, a new master should be elected from the available
slaves and the route in this new master should be started.
The policy uses a common znode path across all instances of the
RoutePolicy
that will be involved in the election. Each policy writes
its id into this node and Zookeeper will order the writes in the order
it received them. The policy then reads the listing of the node to see
what position of its id; this position is used to determine if the route
should be started or not. The policy is configured at startup with the
number of route instances that should be started across the cluster and
if its position in the list is less than this value then its route will
be started. For a Master-slave scenario, the route is configured with 1
route instance and only the first entry in the listing will start its
route. All policies watch for updates to the listing and if the listing
changes they recalculate if their route should be started. For more info
on Zookeeper’s leader election capability see
this
page.
The following example uses the node /someapplication/somepolicy
for
the election and is set up to start only the top '1' entries in the node
listing i.e. elect a master:
ZooKeeperRoutePolicy policy = new ZooKeeperRoutePolicy("zookeeper:localhost:39913/someapp/somepolicy", 1);
from("direct:policy-controlled")
.routePolicy(policy)
.to("mock:controlled");
There are currently 3 policies defined in the component, with different SLAs:
-
ZooKeeperRoutePolicy
-
CuratorLeaderRoutePolicy
(since 2.19) -
MultiMasterCuratorLeaderRoutePolicy
(since 2.19)
ZooKeeperRoutePolicy supports multiple active nodes, but it’s activation kicks in only after a Camel component and its correspondent Consumer have already been started, this introduces, depending on your routes definition, the risk that you component can already start consuming events and producing `Exchange`s, before the policy could estabilish that the node should not be activated.
CuratorLeaderRoutePolicy supports only a single active node, but it’s bound to a different CamelContext
lifecycle method; this Policy kicks in before any route or consumer is started
thus you can be sure that no even is processed before the Policy takes its decision.
MultiMasterCuratorLeaderRoutePolicy support multiple active nodes, and it’s bound to the same lifecycle method as CuratorLeaderRoutePolicy
; this Policy kicks in before any route or consumer is started
thus you can be sure that no even is processed before the Policy takes its decision.