Content Enricher

Camel supports the Content Enricher from the EIP patterns using a Message Translator, an arbitrary Processor in the routing logic, or using the enrich DSL element to enrich the message.

image

Content enrichment using a Message Translator or a Processor

You can consume a message from one destination, transform it with something like Velocity or XQuery, and then send it on to another destination. For example using InOnly (one way messaging)

from("activemq:My.Queue")
    .to("velocity:com/acme/MyResponse.vm")
    .to("activemq:Another.Queue");

If you want to use InOut (request-reply) semantics to process requests on the My.Queue queue on ActiveMQ with a template generated response, then sending responses back to the JMSReplyTo Destination you could use this:

from("activemq:My.Queue")
    .to("velocity:com/acme/MyResponse.vm");

Here is a simple example using the DSL directly to transform the message

from("direct:start")
    .setBody(body().append(" World!"))
    .to("mock:result");

In this example we add our own Processor using explicit Java

from("direct:start")
    .process(new Processor() {
        public void process(Exchange exchange) {
            Message in = exchange.getIn();
            in.setBody(in.getBody(String.class) + " World!");
        }
    })
    .to("mock:result");

we can use Bean Integration to use any Java method on any bean to act as the transformer

from("activemq:My.Queue")
    .beanRef("myBeanName", "myMethodName")
    .to("activemq:Another.Queue");

For further examples of this pattern in use you could look at one of the JUnit tests

Using Spring XML

<route>
    <from uri="activemq:Input"/>
    <bean ref="myBeanName" method="doTransform"/>
    <to uri="activemq:Output"/>
</route>

Content enrichment using the enrich DSL element

Camel comes with two flavors of content enricher in the DSL

  • enrich

  • pollEnrich

enrich uses a Producer to obtain the additional data. It is usually used for Request Reply messaging, for instance to invoke an external web service.
pollEnrich on the other hand uses a Polling Consumer to obtain the additional data. It is usually used for Event Message messaging, for instance to read a file or download a FTP file.

Camel 2.15 or older - Data from current Exchange not used

pollEnrich or enrich does not access any data from the current Exchange which means when polling it cannot use any of the existing headers you may have set on the Exchange. For example you cannot set a filename in the Exchange.FILE_NAME header and use pollEnrich to consume only that file. For that you must set the filename in the endpoint URI.

Instead of using enrich you can use Recipient List and have dynamic endpoints and define an AggregationStrategy on the Recipient List which then would work as a enrich would do.

pollEnrich only accept one message as response. That means that if you target to enrich your original message with the enricher collecting messages from a seda, …​ components using an aggregation strategy. Only one response message will be aggregated with the original message.

From Camel 2.16 onwards both enrich and pollEnrich supports dynamic endpoints that uses an Expression to compute the uri, which allows to use data from the current Exchange. In other words all what is told above no longer apply and it just works.

Enrich Options

Name Default Value Description

uri

The endpoint uri for the external service to enrich from. You must use either uri or ref. Important: From Camel 2.16 onwards, this option is removed, and you use an Expression to configure the uri, such as Simple or Constant or any other dynamic language that can compute the uri dynamically using values from the current Exchange.

ref

Refers to the endpoint for the external service to enrich from. You must use either uri or ref. Important: From Camel 2.16 onwards, this option is removed, and you use an Expression to configure the uri, such as Simple or Constant or any other dynamic language that can compute the uri dynamically using values from the current Exchange.

expression

Camel 2.16: Mandatory. The Expression to configure the uri, such as Simple or Constant or any other dynamic language that can compute the uri dynamically using values from the current Exchange.

strategyRef

Refers to an AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message. By default Camel will use the reply from the external service as outgoing message. From Camel 2.12 onwards you can also use a POJO as the AggregationStrategy, see the Aggregate page for more details.

strategyMethodName

Camel 2.12: This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy. See the Aggregate page for more details.

strategyMethodAllowNull

false

Camel 2.12: If this option is false then the aggregate method is not used if there was no data to enrich. If this option is true then null values is used as the oldExchange (when no data to enrich), when using POJOs as the AggregationStrategy. See the Aggregate page for more details.

aggregateOnException

false

Camel 2.14: If this option is false then the aggregate method is not used if there was an exception thrown while trying to retrieve the data to enrich from the resource. Setting this option to true allows end users to control what to do if there was an exception in the aggregate method. For example to suppress the exception or set a custom message body etc.

shareUnitOfWork

false

Camel 2.16: Shares the unit of work with the parent and the resource exchange. Enrich will by default not share unit of work between the parent exchange and the resource exchange. This means the resource exchange has its own individual unit of work. See Splitter for more information and example.

cacheSize

Camel 2.16: Allows to configure the cache size for the ProducerCache which caches producers for reuse in the enrich. Will by default use the default cache size which is 1000. Setting the value to -1 allows to turn off the cache all together.

ignoreInvalidEndpoint

false

Camel 2.16: Whether to ignore an endpoint URI that could not be resolved. If disabled, Camel will throw an exception identifying the invalid endpoint URI.

AggregationStrategy aggregationStrategy = ...

from("direct:start")
    .enrich("direct:resource", aggregationStrategy)
    .to("direct:result");

from("direct:resource") ...

The content enricher (enrich) retrieves additional data from a resource endpoint in order to enrich an incoming message (contained in the original exchange). An aggregation strategy is used to combine the original exchange and the resource exchange. The first parameter of the AggregationStrategy.aggregate(Exchange, Exchange) method corresponds to the original exchange, the second parameter the resource exchange. The results from the resource endpoint are stored in the resource exchange’s out-message. Here’s an example template for implementing an aggregation strategy:

public class ExampleAggregationStrategy implements AggregationStrategy {

    public Exchange aggregate(Exchange original, Exchange resource) {
        Object originalBody = original.getIn().getBody();
        Object resourceResponse = resource.getIn().getBody();
        Object mergeResult = ... // combine original body and resource response

        if (original.getPattern().isOutCapable()) {
            original.getOut().setBody(mergeResult);
        } else {
            original.getIn().setBody(mergeResult);
        }

        return original;
    }
}

Using this template the original exchange can be of any pattern. The resource exchange created by the enricher is always an in-out exchange.

Using Spring XML

The same example in the Spring DSL

<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
    <route>
        <from uri="direct:start"/>
        <enrich strategyRef="aggregationStrategy">
            <constant>direct:resource</constant>
        </enrich>
        <to uri="direct:result"/>
    </route>
    <route>
        <from uri="direct:resource"/>
        ...
    </route>
</camelContext>

<bean id="aggregationStrategy" class="..." />

Aggregation strategy is optional

The aggregation strategy is optional. If you do not provide it Camel will by default just use the body obtained from the resource.

from("direct:start")
    .enrich("direct:resource")
    .to("direct:result");

In the route above the message sent to the direct:result endpoint will contain the output from the direct:resource as we do not use any custom aggregation.

And for Spring DSL:

<route>
    <from uri="direct:start"/>
    <enrich>
        <constant>direct:resource</constant>
    </enrich>
    <to uri="direct:result"/>
</route>

Using dynamic uris

Since Camel 2.16

From Camel 2.16 onwards enrich and pollEnrich supports using dynamic uris computed based on information from the current Exchange. For example to enrich from a HTTP endpoint where the header with key orderId is used as part of the content-path of the HTTP url:

from("direct:start")
    .enrich().simple("http:myserver/$\{header.orderId}/order")
    .to("direct:result");

And in XML DSL

<route>
    <from uri="direct:start"/>
    <enrich>
        <simple>http:myserver/$\{header.orderId}/order</simple>
    </enrich>
    <to uri="direct:result"/>
</route>

Content enrichment using pollEnrich

The pollEnrich works just as the enrich however as it uses a Polling Consumer we have 3 methods when polling

  • receive

  • receiveNoWait

  • receive(timeout)

PollEnrich Options

Name Default Value Description

uri

The endpoint uri for the external service to enrich from. You must use either uri or ref. Important: From Camel 2.16 onwards, this option is removed, and you use an Expression to configure the uri, such as Simple or Constant or any other dynamic language that can compute the uri dynamically using values from the current Exchange.

ref

Refers to the endpoint for the external service to enrich from. You must use either uri or ref. Important: From Camel 2.16 onwards, this option is removed, and you use an Expression to configure the uri, such as Simple or Constant or any other dynamic language that can compute the uri dynamically using values from the current Exchange.

expression

Camel 2.16: Mandatory. The Expression to configure the uri, such as Simple or Constant or any other dynamic language that can compute the uri dynamically using values from the current Exchange.

strategyRef

Refers to an AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message. By default Camel will use the reply from the external service as outgoing message. From Camel 2.12 onwards you can also use a POJO as the AggregationStrategy, see the Aggregate page for more details.

strategyMethodName

Camel 2.12: This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy. See the Aggregate page for more details.

strategyMethodAllowNull

false

Camel 2.12: If this option is false then the aggregate method is not used if there was no data to enrich. If this option is true then null values is used as the oldExchange (when no data to enrich), when using POJOs as the AggregationStrategy. See the Aggregate page for more details.

timeout

-1

Timeout in millis when polling from the external service. See below for important details about the timeout.

aggregateOnException

false

Camel 2.14: If this option is false then the aggregate method is not used if there was an exception thrown while trying to retrieve the data to enrich from the resource. Setting this option to true allows end users to control what to do if there was an exception in the aggregate method. For example to suppress the exception or set a custom message body etc.

cacheSize

Camel 2.16: Allows to configure the cache size for the ConsumerCache which caches consumers for reuse in the pollEnrich. Will by default use the default cache size which is 1000. Setting the value to -1 allows to turn off the cache all together.

ignoreInvalidEndpoint

false

Camel 2.16: Whether to ignore an endpoint URI that could not be resolved. If disabled, Camel will throw an exception identifying the invalid endpoint URI.

Good practice to use timeout value

By default Camel will use the receive. Which may block until there is a message available. It is therefore recommended to always provide a timeout value, to make this clear that we may wait for a message, until the timeout is hit.

If there is no data then the newExchange in the aggregation strategy is null.

You can pass in a timeout value that determines which method to use

  • if timeout is -1 or other negative number then receive is selected (Important: the receive method may block if there is no message)

  • if timeout is 0 then receiveNoWait is selected

  • otherwise receive(timeout) is selected

The timeout values is in millis.

Camel 2.15 or older - Data from current Exchange not used

pollEnrich does not access any data from the current Exchange which means when polling it cannot use any of the existing headers you may have set on the Exchange. For example you cannot set a filename in the Exchange.FILE_NAME header and use pollEnrich to consume only that file. For that you must set the filename in the endpoint URI.

From Camel 2.16 onwards both enrich and pollEnrich supports dynamic endpoints that uses an Expression to compute the uri, which allows to use data from the current Exchange. In other words all what is told above no longer apply and it just works.

Example

In this example we enrich the message by loading the content from the file named inbox/data.txt.

from("direct:start")
    .pollEnrich("file:inbox?fileName=data.txt")
    .to("direct:result");

And in XML DSL you do:

<route>
    <from uri="direct:start"/>
    <pollEnrich>
        <constant>file:inbox?fileName=data.txt</constant>
    </pollEnrich>
    <to uri="direct:result"/>
</route>

If there is no file then the message is empty. We can use a timeout to either wait (potentially forever) until a file exists, or use a timeout to wait a certain period.

For example to wait up to 5 seconds you can do:

<route>
    <from uri="direct:start"/>
    <pollEnrich timeout="5000">
        <constant>file:inbox?fileName=data.txt</constant>
    </pollEnrich>
    <to uri="direct:result"/>
</route>

Using dynamic uris

Since Camel 2.16

From Camel 2.16 onwards enrich and pollEnrich supports using dynamic uris computed based on information from the current Exchange. For example to pollEnrich from an endpoint that uses a header to indicate a SEDA queue name:

from("direct:start")
    .pollEnrich()
    .simple("seda:$\{header.name}")
    .to("direct:result");

And in XML DSL

<route>
    <from uri="direct:start"/>
    <pollEnrich>
        <simple>seda:$\{header.name}</simple>
    </pollEnrich>
    <to uri="direct:result"/>
</route>