Wire Tap

Wire Tap (from the EIP patterns) allows you to route messages to a separate location while they are being forwarded to the ultimate destination.

image

Streams

If you Wire Tap a stream message body then you should consider enabling Stream caching to ensure the message body can be read at each endpoint. See more details at Stream caching.

See the cacheSize option for more details on how much cache to use depending on how many or few unique endpoints are used.

Options

The Wire Tap EIP supports 12 options which are listed below:

Name Description Default Type

processorRef

Reference to a Processor to use for creating a new body as the message to use for wire tapping

String

body

Uses the expression for creating a new body as the message to use for wire tapping

ExpressionSubElementDefinition

executorServiceRef

Uses a custom thread pool

String

copy

Uses a copy of the original exchange

true

Boolean

dynamicUri

Whether the uri is dynamic or static. If the uri is dynamic then the simple language is used to evaluate a dynamic uri to use as the wire-tap destination, for each incoming message. This works similar to how the toD EIP pattern works. If static then the uri is used as-is as the wire-tap destination.

true

Boolean

onPrepareRef

Uses the Processor when preparing the org.apache.camel.Exchange to be send. This can be used to deep-clone messages that should be send, or any custom logic needed before the exchange is send.

String

uri

Required The uri of the endpoint to send to. The uri can be dynamic computed using the org.apache.camel.language.simple.SimpleLanguage expression.

String

pattern

Sets the optional ExchangePattern used to invoke this endpoint

ExchangePattern

cacheSize

Sets the maximum size used by the org.apache.camel.spi.ProducerCache which is used to cache and reuse producers when using this recipient list, when uris are reused. Beware that when using dynamic endpoints then it affects how well the cache can be utilized. If each dynamic endpoint is unique then its best to turn of caching by setting this to -1, which allows Camel to not cache both the producers and endpoints; they are regarded as prototype scoped and will be stopped and discarded after use. This reduces memory usage as otherwise producers/endpoints are stored in memory in the caches. However if there are a high degree of dynamic endpoints that have been used before, then it can benefit to use the cache to reuse both producers and endpoints and therefore the cache size can be set accordingly or rely on the default size (1000). If there is a mix of unique and used before dynamic endpoints, then setting a reasonable cache size can help reduce memory usage to avoid storing too many non frequent used producers.

Integer

ignoreInvalidEndpoint

Ignore the invalidate endpoint exception when try to create a producer with that endpoint

false

Boolean

allowOptimisedComponents

Whether to allow components to optimise toD if they are org.apache.camel.spi.SendDynamicAware .

true

Boolean

autoStartComponents

Whether to auto startup components when toD is starting up.

true

Boolean

WireTap Thread pool

The WireTap uses a thread pool to process the tapped messages. This thread pool will by default use the settings detailed at Threading Model. In particular, when the pool is exhausted (with all threads utilized), further wiretaps will be executed synchronously by the calling thread. To remedy this, you can configure an explicit thread pool on the Wire Tap having either a different rejection policy, a larger worker queue, or more worker threads.

WireTap Node

Camel’s Wire Tap node supports two flavors when tapping an Exchange:

  • With the traditional Wire Tap, Camel will copy the original Exchange and set its Exchange Pattern to InOnly, as we want the tapped Exchange to be sent in a fire and forget style. The tapped Exchange is then sent in a separate thread so it can run in parallel with the original. Beware that only the Exchange is copied - Wire Tap won’t do a deep clone (unless you specify a custom processor via onPrepareRef which does that). So all copies could share objects from the original Exchange.

  • Camel also provides an option of sending a new Exchange allowing you to populate it with new values.

Sending a Copy (traditional wiretap)

    protected RouteBuilder createRouteBuilder() {
        return new RouteBuilder() {
            public void configure() {
                // START SNIPPET: e1
                from("direct:start")
                    .to("log:foo")
                    .wireTap("direct:tap")
                    .to("mock:result");
                // END SNIPPET: e1

                from("direct:tap")
                    .delay(1000).setBody().constant("Tapped")
                    .to("mock:result", "mock:tap");

                from("direct:test").wireTap("direct:a").id("wiretap_1").to("mock:a");
                from("direct:a").to("mock:b");
            }
        };
    }

Sending a New Exchange

Camel supports either a processor or an Expression to populate the new Exchange. Using a processor gives you full power over how the Exchange is populated as you can set properties, headers, etc. An Expression can only be used to set the IN body.

The Expression or Processor is pre-populated with a copy of the original Exchange, which allows you to access the original message when you prepare a new Exchange to be sent. You can use the copy option (enabled by default) to indicate whether you want this.

Below is the processor variation, where we disable copy by passing in false to create a new, empty Exchange

    public void testFireAndForgetUsingProcessor() throws Exception {
        context.addRoutes(new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                // START SNIPPET: e1
                from("direct:start")
                    .wireTap("direct:foo", false, new Processor() {
                        public void process(Exchange exchange) throws Exception {
                            exchange.getIn().setBody("Bye World");
                            exchange.getIn().setHeader("foo", "bar");
                        }
                    }).to("mock:result");


                from("direct:foo").to("mock:foo");
                // END SNIPPET: e1
            }
        });
    }

Using Dynamic URIs

For example to wire tap to a dynamic URI, then it supports the same dynamic URIs as documented in Message Endpoint. For example to wire tap to a JMS queue where the header ID is part of the queue name:

    from("direct:start") .wireTap("jms:queue:backup-$\{header.id}")
        .to("bean:doSomething");

Sending a New exchange and Set Headers in DSL

If you send a new message using Wire Tap, then you could only set the message body using an Expression from the DSL. If you also need to set headers, you would have to use a Processor. From It’s possible to set headers as well using the DSL.

The following example sends a new message which has

  • Bye World as message body.

  • A header with key id with the value 123.

  • A header with key date which has current date as value.

Java DSL

    @Override
    protected RouteBuilder createRouteBuilder() throws Exception {
        return new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                // START SNIPPET: e1
                from("direct:start")
                    // tap a new message and send it to direct:tap
                    // the new message should be Bye World with 2 headers
                    .wireTap("direct:tap")
                        // create the new tap message body and headers
                        .newExchangeBody(constant("Bye World"))
                        .newExchangeHeader("id", constant(123))
                        .newExchangeHeader("date", simple("${date:now:yyyyMMdd}"))
                    .end()
                    // here we continue routing the original messages
                    .to("mock:result");

                // this is the tapped route
                from("direct:tap")
                    .to("mock:tap");
                // END SNIPPET: e1
            }
        };
    }

Using onPrepare to Execute Custom Logic when Preparing Messages

See details at Multicast