Rollback

Camel recommends supporting the Transactional Client from the EIP patterns using spring transactions.

image

Transaction Oriented Endpoints like JMS support using a transaction for both inbound and outbound message exchanges. Endpoints that support transactions will participate in the current transaction context that they are called from.

Configuration of Redelivery

The redelivery in transacted mode is not handled by Camel but by the backing system (the transaction manager). In such cases you should resort to the backing system how to configure the redelivery.

You should use the SpringRouteBuilder to setup the routes since you will need to setup the spring context with the TransactionTemplates that will define the transaction manager configuration and policies.

For inbound endpoint to be transacted, they normally need to be configured to use a Spring PlatformTransactionManager. In the case of the JMS component, this can be done by looking it up in the spring context.

You first define needed object in the spring configuration.

<bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
  <property name="connectionFactory" ref="jmsConnectionFactory" />
</bean>
<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
  <property name="brokerURL" value="tcp://localhost:61616"/>
</bean>

Then you look them up and use them to create the JmsComponent.

PlatformTransactionManager transactionManager = (PlatformTransactionManager) spring.getBean("jmsTransactionManager");
ConnectionFactory connectionFactory = (ConnectionFactory) spring.getBean("jmsConnectionFactory");
JmsComponent component = JmsComponent.jmsComponentTransacted(connectionFactory, transactionManager);
component.getConfiguration().setConcurrentConsumers(1);
ctx.addComponent("activemq", component);

Options

The Rollback EIP supports 3 options which are listed below:

Name Description Default Type

markRollbackOnly

Mark the transaction for rollback only (cannot be overruled to commit)

false

Boolean

markRollbackOnlyLast

Mark only last sub transaction for rollback only. When using sub transactions (if the transaction manager support this)

false

Boolean

message

Message to use in rollback exception

String

Transaction Policies

Outbound endpoints will automatically enlist in the current transaction context. But what if you do not want your outbound endpoint to enlist in the same transaction as your inbound endpoint? The solution is to add a Transaction Policy to the processing route. You first have to define transaction policies that you will be using. The policies use a spring TransactionTemplate under the covers for declaring the transaction demarcation to use. So you will need to add something like the following to your spring xml:

<bean id="PROPAGATION_REQUIRED" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
  <property name="transactionManager" ref="jmsTransactionManager"/>
</bean>
<bean id="PROPAGATION_REQUIRES_NEW" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
  <property name="transactionManager" ref="jmsTransactionManager"/>
  <property name="propagationBehaviorName" value="PROPAGATION_REQUIRES_NEW"/>
</bean>

Then in your SpringRouteBuilder, you just need to create new SpringTransactionPolicy objects for each of the templates.

public void configure() {
    ...
    Policy required = bean(SpringTransactionPolicy.class, "PROPAGATION_REQUIRED"));
    Policy requirenew = bean(SpringTransactionPolicy.class, "PROPAGATION_REQUIRES_NEW"));
    ...
}

Once created, you can use the Policy objects in your processing routes:

 // Send to bar in a new transaction
from("activemq:queue:foo").policy(requirenew).to("activemq:queue:bar");
// Send to bar without a transaction.
from("activemq:queue:foo").policy(notsupported).to("activemq:queue:bar");

OSGi Blueprint

If you are using OSGi Blueprint then you most likely have to explicit declare a policy and refer to the policy from the transacted in the route.

<bean id="required" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
  <property name="transactionManager" ref="jmsTransactionManager"/>
  <property name="propagationBehaviorName" value="PROPAGATION_REQUIRED"/>
</bean>

And then refer to "required" from the route:

<route>
  <from uri="activemq:queue:foo"/>
  <transacted ref="required"/>
  <to uri="activemq:queue:bar"/>
</route>

Database Sample

In this sample we want to ensure that two endpoints is under transaction control. These two endpoints inserts data into a database.
The sample is in its full as a unit test.

First of all we setup the usual spring stuff in its configuration file. Here we have defined a DataSource to the HSQLDB and a most importantly the Spring DataSource TransactionManager that is doing the heavy lifting of ensuring our transactional policies. You are of course free to use any of the Spring based TransactionManager, eg. if you are in a full blown J2EE container you could use JTA or the WebLogic or WebSphere specific managers.

As we use the new convention over configuration we do not need to configure a transaction policy bean, so we do not have any PROPAGATION_REQUIRED beans. All the beans needed to be configured is standard Spring beans only, eg. there are no Camel specific configuration at all.https://github.com/apache/camel/blob/master/components/camel-spring/src/test/resources/org/apache/camel/spring/interceptor/springTransactionalClientDataSourceMinimalConfiguration.xml[springTransactionalClientDataSourceMinimalConfiguration]Then we are ready to define our Camel routes. We have two routes: 1 for success conditions, and 1 for a forced rollback condition.
This is after all based on a unit test. Notice that we mark each route as transacted using the transacted tag.https://github.com/apache/camel/blob/master/components/camel-spring/src/test/resources/org/apache/camel/spring/interceptor/springTransactionalClientDataSourceMinimalConfiguration.xml[springTransactionalClientDataSourceMinimalConfiguration]That is all that is needed to configure a Camel route as being transacted. Just remember to use the transacted DSL. The rest is standard Spring XML to setup the transaction manager.

JMS Sample

In this sample we want to listen for messages on a queue and process the messages with our business logic java code and send them along. Since its based on a TransactionMinimalConfigurationTest.java the destination is a mock endpoint.

First we configure the standard Spring XML to declare a JMS connection factory, a JMS transaction manager and our ActiveMQ component that we use in our routing.https://github.com/apache/camel/blob/master/components/camel-jms/src/test/resources/org/apache/camel/component/jms/tx/TransactionMinimalConfigurationTest.xml[TransactionMinimalConfigurationTest.xml]And then we configure our routes. Notice that all we have to do is mark the route as transacted using the transacted tag.https://github.com/apache/camel/blob/master/components/camel-jms/src/test/resources/org/apache/camel/component/jms/tx/TransactionMinimalConfigurationTest.xml[TransactionMinimalConfigurationTest.xml]

Integration Testing with Spring

An Integration Test here means a test runner class annotated @RunWith(SpringJUnit4ClassRunner.class).

When following the Spring Transactions documentation it is tempting to annotate your integration test with @Transactional then seed your database before firing up the route to be tested and sending a message in. This is incorrect as Spring will have an in-progress transaction, and Camel will wait on this before proceeding, leading to the route timing out.

Instead, remove the @Transactional annotation from the test method and seed the test data within a TransactionTemplate execution which will ensure the data is committed to the database before Camel attempts to pick up and use the transaction manager. A simple example can be found on GitHub.

Spring’s transactional model ensures each transaction is bound to one thread. A Camel route may invoke additional threads which is where the blockage may occur. This is not a fault of Camel but as the programmer you must be aware of the consequences of beginning a transaction in a test thread and expecting a separate thread created by your Camel route to be participate, which it cannot. You can, in your test, mock the parts that cause separate threads to avoid this issue.

Using multiple routes with different propagation behaviors

Suppose you want to route a message through two routes and by which the 2nd route should run in its own transaction. How do you do that? You use propagation behaviors for that where you configure it as follows:

  • The first route use PROPAGATION_REQUIRED

  • The second route use PROPAGATION_REQUIRES_NEW

This is configured in the Spring XML file.https://github.com/apache/camel/blob/master/components/camel-spring/src/test/resources/org/apache/camel/spring/interceptor/MixedTransactionPropagationTest.xml[MixedTransactionPropagationTest.xml]Then in the routes you use transacted DSL to indicate which of these two propagations it uses.https://github.com/apache/camel/blob/master/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/MixedTransactionPropagationTest.java[MixedTransactionPropagationTest.java]Notice how we have configured the onException in the 2nd route to indicate in case of any exceptions we should handle it and just rollback this transaction. This is done using the markRollbackOnlyLast which tells Camel to only do it for the current transaction and not globally.