Wire Tap
Wire Tap (from the EIP patterns) allows you to route messages to a separate location while they are being forwarded to the ultimate destination.
Streams
If you Wire Tap a stream message body then you should consider enabling Stream caching to ensure the message body can be read at each endpoint. See more details at Stream caching.
See the cacheSize option for more details on how much cache to use depending on how many or few unique endpoints are used.
|
Options
The Wire Tap EIP supports 12 options which are listed below:
Name | Description | Default | Type |
---|---|---|---|
processorRef |
Reference to a Processor to use for creating a new body as the message to use for wire tapping |
String |
|
body |
Uses the expression for creating a new body as the message to use for wire tapping |
ExpressionSubElementDefinition |
|
executorServiceRef |
Uses a custom thread pool |
String |
|
copy |
Uses a copy of the original exchange |
true |
Boolean |
dynamicUri |
Whether the uri is dynamic or static. If the uri is dynamic then the simple language is used to evaluate a dynamic uri to use as the wire-tap destination, for each incoming message. This works similar to how the toD EIP pattern works. If static then the uri is used as-is as the wire-tap destination. |
true |
Boolean |
onPrepareRef |
Uses the Processor when preparing the org.apache.camel.Exchange to be send. This can be used to deep-clone messages that should be send, or any custom logic needed before the exchange is send. |
String |
|
uri |
Required The uri of the endpoint to send to. The uri can be dynamic computed using the org.apache.camel.language.simple.SimpleLanguage expression. |
String |
|
pattern |
Sets the optional ExchangePattern used to invoke this endpoint |
ExchangePattern |
|
cacheSize |
Sets the maximum size used by the org.apache.camel.spi.ProducerCache which is used to cache and reuse producers when using this recipient list, when uris are reused. Beware that when using dynamic endpoints then it affects how well the cache can be utilized. If each dynamic endpoint is unique then its best to turn of caching by setting this to -1, which allows Camel to not cache both the producers and endpoints; they are regarded as prototype scoped and will be stopped and discarded after use. This reduces memory usage as otherwise producers/endpoints are stored in memory in the caches. However if there are a high degree of dynamic endpoints that have been used before, then it can benefit to use the cache to reuse both producers and endpoints and therefore the cache size can be set accordingly or rely on the default size (1000). If there is a mix of unique and used before dynamic endpoints, then setting a reasonable cache size can help reduce memory usage to avoid storing too many non frequent used producers. |
Integer |
|
ignoreInvalidEndpoint |
Ignore the invalidate endpoint exception when try to create a producer with that endpoint |
false |
Boolean |
allowOptimisedComponents |
Whether to allow components to optimise toD if they are org.apache.camel.spi.SendDynamicAware . |
true |
Boolean |
autoStartComponents |
Whether to auto startup components when toD is starting up. |
true |
Boolean |
WireTap Thread pool
The WireTap uses a thread pool to process the tapped messages. This thread pool will by default use the settings detailed at Threading Model. In particular, when the pool is exhausted (with all threads utilized), further wiretaps will be executed synchronously by the calling thread. To remedy this, you can configure an explicit thread pool on the Wire Tap having either a different rejection policy, a larger worker queue, or more worker threads.
WireTap Node
Camel’s Wire Tap node supports two flavors when tapping an Exchange:
-
With the traditional Wire Tap, Camel will copy the original Exchange and set its Exchange Pattern to
InOnly
, as we want the tapped Exchange to be sent in a fire and forget style. The tapped Exchange is then sent in a separate thread so it can run in parallel with the original. Beware that only the Exchange is copied - Wire Tap won’t do a deep clone (unless you specify a custom processor viaonPrepareRef
which does that). So all copies could share objects from the original Exchange. -
Camel also provides an option of sending a new Exchange allowing you to populate it with new values.
Sending a Copy (traditional wiretap)
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() {
// START SNIPPET: e1
from("direct:start")
.to("log:foo")
.wireTap("direct:tap")
.to("mock:result");
// END SNIPPET: e1
from("direct:tap")
.delay(1000).setBody().constant("Tapped")
.to("mock:result", "mock:tap");
from("direct:test").wireTap("direct:a").id("wiretap_1").to("mock:a");
from("direct:a").to("mock:b");
}
};
}
Sending a New Exchange
Camel supports either a processor or an
Expression to populate the new
Exchange. Using a processor gives you full power
over how the Exchange is populated as you can set
properties, headers, etc. An Expression can only
be used to set the IN
body.
The Expression or
Processor is pre-populated with a copy of the
original Exchange, which allows you to access the
original message when you prepare a new Exchange to
be sent. You can use the copy
option (enabled by default) to
indicate whether you want this.
Below is the processor variation,
where we disable copy
by passing in false
to create a new, empty
Exchange
public void testFireAndForgetUsingProcessor() throws Exception {
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
// START SNIPPET: e1
from("direct:start")
.wireTap("direct:foo", false, new Processor() {
public void process(Exchange exchange) throws Exception {
exchange.getIn().setBody("Bye World");
exchange.getIn().setHeader("foo", "bar");
}
}).to("mock:result");
from("direct:foo").to("mock:foo");
// END SNIPPET: e1
}
});
}
Using Dynamic URIs
For example to wire tap to a dynamic URI, then it supports the same dynamic URIs as documented in Message Endpoint. For example to wire tap to a JMS queue where the header ID is part of the queue name:
from("direct:start") .wireTap("jms:queue:backup-$\{header.id}")
.to("bean:doSomething");
Sending a New exchange and Set Headers in DSL
If you send a new message using Wire Tap, then you could only set the message body using an Expression from the DSL. If you also need to set headers, you would have to use a Processor. From It’s possible to set headers as well using the DSL.
The following example sends a new message which has
-
Bye World
as message body. -
A header with key
id
with the value123
. -
A header with key
date
which has current date as value.
Java DSL
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
// START SNIPPET: e1
from("direct:start")
// tap a new message and send it to direct:tap
// the new message should be Bye World with 2 headers
.wireTap("direct:tap")
// create the new tap message body and headers
.newExchangeBody(constant("Bye World"))
.newExchangeHeader("id", constant(123))
.newExchangeHeader("date", simple("${date:now:yyyyMMdd}"))
.end()
// here we continue routing the original messages
.to("mock:result");
// this is the tapped route
from("direct:tap")
.to("mock:tap");
// END SNIPPET: e1
}
};
}
Using onPrepare
to Execute Custom Logic when Preparing Messages
See details at Multicast