OnCompletion
Camel has this concept of a Unit of Work that encompass the
Exchange. The unit of work among others supports
synchronization callbacks that are invoked when the
Exchange is complete. The callback API is defined in
org.apache.camel.spi.Synchronization
and the extended synchronization
org.apache.camel.spi.SynchronizationRouteAware
that have callbacks for
route events.
UnitOfWork API
You can get hold of the org.apache.camel.spi.UnitOfWork
from
org.apache.camel.Exchange
with the method getUnitOfWork()
.
OnCompletion DSL
The OnCompletion EIP supports the following features:
-
level: context or route (route level override global level)
-
triggered either always, only if completed with success, or only if failed
-
onWhen
predicate to only trigger if matched -
mode
to define whether to run either before or after route consumer writes response back to callee (if its InOut) (default AfterConsumer) -
parallelProcessing
whether to run async or sync (use a thread pool or not) (default false)
The onCompletion supports running the completion task in either synchronous or asynchronous mode (using a thread pool) and also whether to run before or after the route consumer is done. The reason is to give more flexibility. For example to specify to run synchronous and before the route consumer is done, which allows to modify the exchange before the consumer writes back any response to the callee. You can use this to for example add customer headers, or send to a log to log the response message, etc.
onCompletion with route scope
The OnCompletion EIP allows you to add custom routes/processors when the original Exchange is complete. Camel spins off a copy of the Exchange and routes it in a separate thread, similar to a Wire Tap. This allows the original thread to continue while the onCompletion route is running concurrently. We chose this model as we did not want the onCompletion route to interfere with the original route.
Multiple onCompletions
You may define multiple onCompletions at both context and route level.
When you define route level onCompletions then any context levels are disabled for that given route.
from("direct:start")
.onCompletion()
// this route is only invoked when the original route is complete as a kind
// of completion callback
.to("log:sync")
.to("mock:sync")
// must use end to denote the end of the onCompletion route
.end()
// here the original route contiues
.process(new MyProcessor())
.to("mock:result");
By default the OnCompletion EIP will be triggered when the
Exchange is complete and regardless if the
Exchange completed with success or with a failure
(such as an Exception was thrown). You can limit the trigger to only
occur onCompleteOnly
or by onFailureOnly
as shown below:
from("direct:start")
// here we qualify onCompletion to only invoke when the exchange failed (exception or FAULT body)
.onCompletion().onFailureOnly()
.to("log:sync")
.to("mock:syncFail")
// must use end to denote the end of the onCompletion route
.end()
.onCompletion().onCompleteOnly()
.to("log:sync")
.to("mock:syncOK")
.end()
// here the original route continues
.process(new MyProcessor())
.to("mock:result");
You can identify if the Exchange is an
OnCompletion Exchange as Camel will add the
property Exchange.ON_COMPLETION
with a boolean value of true
.
Using onCompletion from XML DSL
The onCompletion is defined like this with XML DSL:
<route>
<from uri="direct:start"/>
<!-- this onCompletion block will only be executed when the exchange is done being routed -->
<!-- this callback is always triggered even if the exchange failed -->
<onCompletion>
<!-- so this is a kinda like an after completion callback -->
<to uri="log:sync"/>
<to uri="mock:sync"/>
</onCompletion>
<process ref="myProcessor"/>
<to uri="mock:result"/>
</route>
And the onCompleteOnly
and onFailureOnly
is defined as a boolean
attribute on the <onCompletion>
tag, so the failure example would be:
<route>
<from uri="direct:start"/>
<!-- this onCompletion block will only be executed when the exchange is done being routed -->
<!-- this callback is only triggered when the exchange failed, as we have onFailure=true -->
<onCompletion onFailureOnly="true">
<to uri="log:sync"/>
<to uri="mock:sync"/>
</onCompletion>
<process ref="myProcessor"/>
<to uri="mock:result"/>
</route>
onCompletion with global level
This works just like the route level except from the fact that they are defined globally. An example below:
// define a global on completion that is invoked when the exchange is complete
onCompletion().to("log:global").to("mock:sync");
from("direct:start")
.process(new MyProcessor())
.to("mock:result");
And in XML:
<!-- this is a global onCompletion route that is invoke when any exchange is complete
as a kind of after callback -->
<onCompletion>
<to uri="log:global"/>
<to uri="mock:sync"/>
</onCompletion>
<route>
<from uri="direct:start"/>
<process ref="myProcessor"/>
<to uri="mock:result"/>
</route>
If an OnCompletion is defined in a route, it overrides all global scoped and thus it is only the route scoped that are used. The globally scoped are not in use. |
Using onCompletion with onWhen predicate
As other DSL in Camel you can attach a predicate to the OnCompletion so it only triggers in certain conditions, when the predicate matches. For example to only trigger if the message body contains the word Hello we can do like:
from("direct:start")
.onCompletion().onWhen(body().contains("Hello"))
// this route is only invoked when the original route is complete as a kind
// of completion callback. And also only if the onWhen predicate is true
.to("log:sync")
.to("mock:sync")
// must use end to denote the end of the onCompletion route
.end()
// here the original route contiues
.to("log:original")
.to("mock:result");
Using onCompletion with or without thread pool
To use thread pool then either set a executorService
or set
parallelProcessing
to true.
For example in Java DSL do
onCompletion().parallelProcessing()
.to("mock:before")
.delay(1000)
.setBody(simple("OnComplete:${body}"));
And in XML DSL:
<onCompletion parallelProcessing="true">
<to uri="before"/>
<delay><constant>1000</constant></delay>
<setBody><simple>OnComplete:${body}</simple></setBody>
</onCompletion>
You can also refer to a specific thread pool
to be used, using the executorServiceRef
option
<onCompletion executorServiceRef="myThreadPool">
<to uri="before"/>
<delay><constant>1000</constant></delay>
<setBody><simple>OnComplete:${body}</simple></setBody>
</onCompletion>
OnCompletion consumer modes
OnCompletion supports two modes that affects the route consumer:
-
AfterConsumer - Default mode which runs after the consumer is done
-
BeforeConsumer - Runs before the consumer is done, and before the consumer writes back response to the callee
The AfterConsumer mode is the default mode which is the same behavior as in older Camel releases.
The new BeforeConsumer mode is used to run onCompletion before the consumer writes its response back to the callee (if in InOut mode). This allows the onCompletion to modify the Exchange, such as adding special headers, or to log the Exchange as a response logger etc.
For example to always add a "created by" header you
use modeBeforeConsumer()
as shown below:
.onCompletion().modeBeforeConsumer()
.setHeader("createdBy", constant("Someone"))
.end()
And in XML DSL you set the mode attribute to BeforeConsumer:
<onCompletion mode="BeforeConsumer">
<setHeader name="createdBy">
<constant>Someone</constant>
</setHeader>
</onCompletion>