MongoDB
Since Camel 2.10
According to Wikipedia: "NoSQL is a movement promoting a loosely defined class of non-relational data stores that break with a long history of relational databases and ACID guarantees." NoSQL solutions have grown in popularity in the last few years, and major extremely-used sites and services such as Facebook, LinkedIn, Twitter, etc. are known to use them extensively to achieve scalability and agility.
Basically, NoSQL solutions differ from traditional RDBMS (Relational Database Management Systems) in that they don’t use SQL as their query language and generally don’t offer ACID-like transactional behaviour nor relational data. Instead, they are designed around the concept of flexible data structures and schemas (meaning that the traditional concept of a database table with a fixed schema is dropped), extreme scalability on commodity hardware and blazing-fast processing.
MongoDB is a very popular NoSQL solution and the camel-mongodb component integrates Camel with MongoDB allowing you to interact with MongoDB collections both as a producer (performing operations on the collection) and as a consumer (consuming documents from a MongoDB collection).
MongoDB revolves around the concepts of documents (not as is office documents, but rather hierarchical data defined in JSON/BSON) and collections. This component page will assume you are familiar with them. Otherwise, visit http://www.mongodb.org/.
Maven users will need to add the following dependency to their pom.xml
for this component:
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-mongodb</artifactId>
<version>x.y.z</version>
<!-- use the same version as your Camel core version -->
</dependency>
URI format
mongodb:connectionBean?database=databaseName&collection=collectionName&operation=operationName[&moreOptions...]
MongoDB options
The MongoDB component has no options.
The MongoDB endpoint is configured using URI syntax:
mongodb:connectionBean
with the following path and query parameters:
Path Parameters (1 parameters):
Name | Description | Default | Type |
---|---|---|---|
connectionBean |
Required Name of com.mongodb.Mongo to use. |
String |
Query Parameters (23 parameters):
Name | Description | Default | Type |
---|---|---|---|
collection (common) |
Sets the name of the MongoDB collection to bind to this endpoint |
String |
|
collectionIndex (common) |
Sets the collection index (JSON FORMAT : field1 : order1, field2 : order2) |
String |
|
createCollection (common) |
Create collection during initialisation if it doesn’t exist. Default is true. |
true |
boolean |
database (common) |
Sets the name of the MongoDB database to target |
String |
|
operation (common) |
Sets the operation this endpoint will execute against MongoDB. For possible values, see MongoDbOperation. |
MongoDbOperation |
|
outputType (common) |
Convert the output of the producer to the selected type : DBObjectList DBObject or DBCursor. DBObjectList or DBCursor applies to findAll and aggregate. DBObject applies to all other operations. |
MongoDbOutputType |
|
writeConcern (common) |
Set the WriteConcern for write operations on MongoDB using the standard ones. Resolved from the fields of the WriteConcern class by calling the WriteConcern#valueOf(String) method. |
ACKNOWLEDGED |
WriteConcern |
bridgeErrorHandler (consumer) |
Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. |
false |
boolean |
exceptionHandler (consumer) |
To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored. |
ExceptionHandler |
|
exchangePattern (consumer) |
Sets the exchange pattern when the consumer creates an exchange. |
ExchangePattern |
|
cursorRegenerationDelay (advanced) |
MongoDB tailable cursors will block until new data arrives. If no new data is inserted, after some time the cursor will be automatically freed and closed by the MongoDB server. The client is expected to regenerate the cursor if needed. This value specifies the time to wait before attempting to fetch a new cursor, and if the attempt fails, how long before the next attempt is made. Default value is 1000ms. |
1000 |
long |
dynamicity (advanced) |
Sets whether this endpoint will attempt to dynamically resolve the target database and collection from the incoming Exchange properties. Can be used to override at runtime the database and collection specified on the otherwise static endpoint URI. It is disabled by default to boost performance. Enabling it will take a minimal performance hit. |
false |
boolean |
readPreference (advanced) |
Sets a MongoDB ReadPreference on the Mongo connection. Read preferences set directly on the connection will be overridden by this setting. The ReadPreference#valueOf(String) utility method is used to resolve the passed readPreference value. Some examples for the possible values are nearest, primary or secondary etc. |
ReadPreference |
|
synchronous (advanced) |
Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported). |
false |
boolean |
writeResultAsHeader (advanced) |
In write operations, it determines whether instead of returning WriteResult as the body of the OUT message, we transfer the IN message to the OUT and attach the WriteResult as a header. |
false |
boolean |
persistentId (tail) |
One tail tracking collection can host many trackers for several tailable consumers. To keep them separate, each tracker should have its own unique persistentId. |
String |
|
persistentTailTracking (tail) |
Enable persistent tail tracking, which is a mechanism to keep track of the last consumed message across system restarts. The next time the system is up, the endpoint will recover the cursor from the point where it last stopped slurping records. |
false |
boolean |
persistRecords (tail) |
Sets the number of tailed records after which the tail tracking data is persisted to MongoDB. |
-1 |
int |
tailTrackCollection (tail) |
Collection where tail tracking information will be persisted. If not specified, MongoDbTailTrackingConfig#DEFAULT_COLLECTION will be used by default. |
String |
|
tailTrackDb (tail) |
Indicates what database the tail tracking mechanism will persist to. If not specified, the current database will be picked by default. Dynamicity will not be taken into account even if enabled, i.e. the tail tracking database will not vary past endpoint initialisation. |
String |
|
tailTrackField (tail) |
Field where the last tracked value will be placed. If not specified, MongoDbTailTrackingConfig#DEFAULT_FIELD will be used by default. |
String |
|
tailTrackIncreasingField (tail) |
Correlation field in the incoming record which is of increasing nature and will be used to position the tailing cursor every time it is generated. The cursor will be (re)created with a query of type: tailTrackIncreasingField lastValue (possibly recovered from persistent tail tracking). Can be of type Integer, Date, String, etc. NOTE: No support for dot notation at the current time, so the field should be at the top level of the document. |
String |
|
tailTrackingStrategy (tail) |
Sets the strategy used to extract the increasing field value and to create the query to position the tail cursor. |
LITERAL |
MongoDBTailTracking Enum |
Spring Boot Auto-Configuration
When using Spring Boot make sure to use the following Maven dependency to have support for auto configuration:
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-mongodb-starter</artifactId>
<version>x.x.x</version>
<!-- use the same version as your Camel core version -->
</dependency>
The component supports 2 options, which are listed below.
Name | Description | Default | Type |
---|---|---|---|
camel.component.mongodb.enabled |
Enable mongodb component |
true |
Boolean |
camel.component.mongodb.resolve-property-placeholders |
Whether the component should resolve property placeholders on itself when starting. Only properties which are of String type can use property placeholders. |
true |
Boolean |
Configuration of database in Spring XML
The following Spring XML creates a bean defining the connection to a MongoDB instance.
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="mongoBean" class="com.mongodb.Mongo">
<constructor-arg name="host" value="${mongodb.host}" />
<constructor-arg name="port" value="${mongodb.port}" />
</bean>
</beans>
Sample route
The following route defined in Spring XML executes the operation <<*dbStats*>> on a collection.
Get DB stats for specified collection
<route>
<from uri="direct:start" />
<!-- using bean 'mongoBean' defined above -->
<to uri="mongodb:mongoBean?database=${mongodb.database}&collection=${mongodb.collection}&operation=getDbStats" />
<to uri="direct:result" />
</route>
MongoDB operations - producer endpoints
Query operations
findById
This operation retrieves only one element from the collection whose _id field matches the content of the IN message body. The incoming object can be anything that has an equivalent to a BSON type. See http://bsonspec.org//specification[http://bsonspec.org//specification] and http://www.mongodb.org/display/DOCS/Java+Types.
from("direct:findById")
.to("mongodb:myDb?database=flights&collection=tickets&operation=findById")
.to("mock:resultFindById");
findOneByQuery
Use this operation to retrieve just one element from the collection that
matches a MongoDB query. The query object is extracted from the IN
message body, i.e. it should be of type DBObject
or convertible to
DBObject
. It can be a JSON String or a Hashmap. See
Type conversions for more info.
Example with no query (returns any object of the collection):
from("direct:findOneByQuery")
.to("mongodb:myDb?database=flights&collection=tickets&operation=findOneByQuery")
.to("mock:resultFindOneByQuery");
Example with a query (returns one matching result):
from("direct:findOneByQuery")
.setBody().constant("{ \"name\": \"Raul Kripalani\" }")
.to("mongodb:myDb?database=flights&collection=tickets&operation=findOneByQuery")
.to("mock:resultFindOneByQuery");
findAll
The findAll
operation returns all documents matching a query, or none
at all, in which case all documents contained in the collection are
returned. The query object is extracted from the IN message body, i.e.
it should be of type DBObject
or convertible to DBObject
. It can be
a JSON String or a Hashmap. See Type conversions for
more info.
Example with no query (returns all object in the collection):
from("direct:findAll")
.to("mongodb:myDb?database=flights&collection=tickets&operation=findAll")
.to("mock:resultFindAll");
Example with a query (returns all matching results):
from("direct:findAll")
.setBody().constant("{ \"name\": \"Raul Kripalani\" }")
.to("mongodb:myDb?database=flights&collection=tickets&operation=findAll")
.to("mock:resultFindAll");
Paging and efficient retrieval is supported via the following headers:
Header key | Quick constant | Description (extracted from MongoDB API doc) | Expected type |
---|---|---|---|
|
|
Discards a given number of elements at the beginning of the cursor. |
int/Integer |
|
|
Limits the number of elements returned. |
int/Integer |
|
|
Limits the number of elements returned in one batch. A cursor typically fetches a batch of result objects and store them locally. If batchSize is positive, it represents the size of each batch of objects retrieved. It can be adjusted to optimize performance and limit data transfer. If batchSize is negative, it will limit of number objects returned, that fit within the max batch size limit (usually 4MB), and cursor will be closed. For example if batchSize is -10, then the server will return a maximum of 10 documents and as many as can fit in 4MB, then close the cursor. Note that this feature is different from limit() in that documents must fit within a maximum size, and it removes the need to send a request to close the cursor server-side. The batch size can be changed even after a cursor is iterated, in which case the setting will apply on the next batch retrieval. |
int/Integer |
You can also "stream" the documents returned from the server into your route by including outputType=DBCursor (Camel 2.16+) as an endpoint option which may prove simpler than setting the above headers. This hands your Exchange the DBCursor from the Mongo driver, just as if you were executing the findAll() within the Mongo shell, allowing your route to iterate over the results. By default and without this option, this component will load the documents from the driver’s cursor into a List and return this to your route - which may result in a large number of in-memory objects. Remember, with a DBCursor do not ask for the number of documents matched - see the MongoDB documentation site for details.
Example with option outputType=DBCursor and batch size :
from("direct:findAll")
.setHeader(MongoDbConstants.BATCH_SIZE).constant(10)
.setBody().constant("{ \"name\": \"Raul Kripalani\" }")
.to("mongodb:myDb?database=flights&collection=tickets&operation=findAll&outputType=DBCursor")
.to("mock:resultFindAll");
The findAll
operation will also return the following OUT headers to
enable you to iterate through result pages if you are using paging:
Header key | Quick constant | Description (extracted from MongoDB API doc) | Data type |
---|---|---|---|
|
|
Number of objects matching the query. This does not take limit/skip into consideration. |
int/Integer |
|
|
Number of objects matching the query. This does not take limit/skip into consideration. |
int/Integer |
count
Returns the total number of objects in a collection, returning a Long as the OUT message body.
The following example will count the number of records in the "dynamicCollectionName" collection. Notice how dynamicity is enabled, and as a result, the operation will not run against the "notableScientists" collection, but against the "dynamicCollectionName" collection.
// from("direct:count").to("mongodb:myDb?database=tickets&collection=flights&operation=count&dynamicity=true");
Long result = template.requestBodyAndHeader("direct:count", "irrelevantBody", MongoDbConstants.COLLECTION, "dynamicCollectionName");
assertTrue("Result is not of type Long", result instanceof Long);
From Camel 2.14 onwards you can provide
a com.mongodb.DBObject
object in the message body as a query, and
operation will return the amount of documents matching this criteria.
DBObject query = ...
Long count = template.requestBodyAndHeader("direct:count", query, MongoDbConstants.COLLECTION, "dynamicCollectionName");
Specifying a fields filter (projection)
Query operations will, by default, return the matching objects in their
entirety (with all their fields). If your documents are large and you
only require retrieving a subset of their fields, you can specify a
field filter in all query operations, simply by setting the relevant
DBObject
(or type convertible to DBObject
, such as a JSON String,
Map, etc.) on the CamelMongoDbFieldsFilter
header, constant shortcut:
MongoDbConstants.FIELDS_FILTER
.
Here is an example that uses MongoDB’s BasicDBObjectBuilder to simplify
the creation of DBObjects. It retrieves all fields except _id
and
boringField
:
// route: from("direct:findAll").to("mongodb:myDb?database=flights&collection=tickets&operation=findAll")
DBObject fieldFilter = BasicDBObjectBuilder.start().add("_id", 0).add("boringField", 0).get();
Object result = template.requestBodyAndHeader("direct:findAll", (Object) null, MongoDbConstants.FIELDS_FILTER, fieldFilter);
Specifying a sort clause
There is a often a requirement to fetch the min/max record from a collection based on sorting by a particular field. In Mongo the operation is performed using syntax similar to:
db.collection.find().sort({_id: -1}).limit(1)
// or
db.collection.findOne({$query:{},$orderby:{_id:-1}})
In a Camel route the SORT_BY header can be used with the findOneByQuery
operation to achieve the same result. If the FIELDS_FILTER header is also
specified the operation will return a single field/value pair
that can be passed directly to another component (for example, a
parameterized MyBatis SELECT query). This example demonstrates fetching
the temporally newest document from a collection and reducing the result
to a single field, based on the documentTimestamp
field:
.from("direct:someTriggeringEvent")
.setHeader(MongoDbConstants.SORT_BY).constant("{\"documentTimestamp\": -1}")
.setHeader(MongoDbConstants.FIELDS_FILTER).constant("{\"documentTimestamp\": 1}")
.setBody().constant("{}")
.to("mongodb:myDb?database=local&collection=myDemoCollection&operation=findOneByQuery")
.to("direct:aMyBatisParameterizedSelect")
;
Create/update operations
insert
Inserts an new object into the MongoDB collection, taken from the IN
message body. Type conversion is attempted to turn it into DBObject
or
a List
.
Two modes are supported: single insert and multiple insert. For multiple insert, the endpoint will expect a List, Array or Collections of objects of any type, as long as they are - or can be converted to - `DBObject`. All objects are inserted at once. The endpoint will intelligently decide which backend operation to invoke (single or multiple insert) depending on the input.
Example:
from("direct:insert")
.to("mongodb:myDb?database=flights&collection=tickets&operation=insert");
The operation will return a WriteResult, and depending on the
WriteConcern
or the value of the invokeGetLastError
option,
getLastError()
would have been called already or not. If you want to
access the ultimate result of the write operation, you need to retrieve
the CommandResult
by calling getLastError()
or
getCachedLastError()
on the WriteResult
. Then you can verify the
result by calling CommandResult.ok()
,
CommandResult.getErrorMessage()
and/or CommandResult.getException()
.
Note that the new object’s _id
must be unique in the collection. If
you don’t specify the value, MongoDB will automatically generate one for
you. But if you do specify it and it is not unique, the insert operation
will fail (and for Camel to notice, you will need to enable
invokeGetLastError or set a WriteConcern that waits for the write
result).
This is not a limitation of the component, but it is how things work in
MongoDB for higher throughput. If you are using a custom _id
, you are
expected to ensure at the application level that is unique (and this is
a good practice too).
Since Camel 2.15: OID(s) of the inserted record(s) is stored in the
message header under CamelMongoOid
key (MongoDbConstants.OID
constant). The value stored is org.bson.types.ObjectId
for single
insert or java.util.List<org.bson.types.ObjectId>
if multiple records
have been inserted.
save
The save operation is equivalent to an upsert (UPdate, inSERT) operation, where the record will be updated, and if it doesn’t exist, it will be inserted, all in one atomic operation. MongoDB will perform the matching based on the _id field.
Beware that in case of an update, the object is replaced entirely and the usage of MongoDB’s $modifiers is not permitted. Therefore, if you want to manipulate the object if it already exists, you have two options:
-
perform a query to retrieve the entire object first along with all its fields (may not be efficient), alter it inside Camel and then save it.
-
use the update operation with $modifiers, which will execute the update at the server-side instead. You can enable the upsert flag, in which case if an insert is required, MongoDB will apply the $modifiers to the filter query object and insert the result.
For example:
from("direct:insert")
.to("mongodb:myDb?database=flights&collection=tickets&operation=save");
update
Update one or multiple records on the collection. Requires a List<DBObject> as the IN message body containing exactly 2 elements:
-
Element 1 (index 0) ⇒ filter query ⇒ determines what objects will be affected, same as a typical query object
-
Element 2 (index 1) ⇒ update rules ⇒ how matched objects will be updated. All modifier operations from MongoDB are supported.
Multiupdates . By default, MongoDB will only update 1 object even if multiple objects
match the filter query. To instruct MongoDB to update all matching
records, set the CamelMongoDbMultiUpdate IN message header to true .
|
A header with key CamelMongoDbRecordsAffected
will be returned
(MongoDbConstants.RECORDS_AFFECTED
constant) with the number of
records updated (copied from WriteResult.getN()
).
Supports the following IN message headers:
Header key | Quick constant | Description (extracted from MongoDB API doc) | Expected type |
---|---|---|---|
|
|
If the update should be applied to all objects matching. See http://www.mongodb.org/display/DOCS/Atomic+Operations |
boolean/Boolean |
|
|
If the database should create the element if it does not exist |
boolean/Boolean |
For example, the following will update all records whose filterField field equals true by setting the value of the "scientist" field to "Darwin":
// route: from("direct:update").to("mongodb:myDb?database=science&collection=notableScientists&operation=update");
DBObject filterField = new BasicDBObject("filterField", true);
DBObject updateObj = new BasicDBObject("$set", new BasicDBObject("scientist", "Darwin"));
Object result = template.requestBodyAndHeader("direct:update", new Object[] {filterField, updateObj}, MongoDbConstants.MULTIUPDATE, true);
Delete operations
remove
Remove matching records from the collection. The IN message body will
act as the removal filter query, and is expected to be of type
DBObject
or a type convertible to it.
The following example will remove all objects whose field 'conditionField' equals true, in the science database, notableScientists collection:
// route: from("direct:remove").to("mongodb:myDb?database=science&collection=notableScientists&operation=remove");
DBObject conditionField = new BasicDBObject("conditionField", true);
Object result = template.requestBody("direct:remove", conditionField);
A header with key CamelMongoDbRecordsAffected
is returned
(MongoDbConstants.RECORDS_AFFECTED
constant) with type int
,
containing the number of records deleted (copied from
WriteResult.getN()
).
Bulk Write Operations
bulkWrite
Since Camel 2.21
Performs write operations in bulk with controls for order of execution.
Requires a List<WriteModel<DBObject>>
as the IN message body containing commands for insert, update, and delete operations.
The following example will insert a new scientist "Pierre Curie", update record with id "5" by setting the value of the "scientist" field to "Marie Curie" and delete record with id "3" :
// route: from("direct:bulkWrite").to("mongodb:myDb?database=science&collection=notableScientists&operation=bulkWrite");
List<WriteModel<DBObject>> bulkOperations = Arrays.asList(
new InsertOneModel<>(new BasicDBObject("scientist", "Pierre Curie")),
new UpdateOneModel<>(new BasicDBObject("_id", "5"),
new BasicDBObject("$set", new BasicDBObject("scientist", "Marie Curie"))),
new DeleteOneModel<>(new BasicDBObject("_id", "3")));
BulkWriteResult result = template.requestBody("direct:bulkWrite", bulkOperations, BulkWriteResult.class);
By default, operations are executed in order and interrupted on the first write error without processing any remaining write operations in the list.
To instruct MongoDB to continue to process remaining write operations in the list, set the CamelMongoDbBulkOrdered
IN message header to false
.
Unordered operations are executed in parallel and this behavior is not guaranteed.
Header key | Quick constant | Description (extracted from MongoDB API doc) | Expected type |
---|---|---|---|
|
|
Perform an ordered or unordered operation execution. Defaults to true. |
boolean/Boolean |
Other operations
aggregate
Since Camel 2.14
Perform a aggregation with the given pipeline contained in the body. Aggregations could be long and heavy operations. Use with care.
// route: from("direct:aggregate").to("mongodb:myDb?database=science&collection=notableScientists&operation=aggregate");
from("direct:aggregate")
.setBody().constant("[{ $match : {$or : [{\"scientist\" : \"Darwin\"},{\"scientist\" : \"Einstein\"}]}},{ $group: { _id: \"$scientist\", count: { $sum: 1 }} } ]")
.to("mongodb:myDb?database=science&collection=notableScientists&operation=aggregate")
.to("mock:resultAggregate");
Supports the following IN message headers:
Header key | Quick constant | Description (extracted from MongoDB API doc) | Expected type |
---|---|---|---|
|
|
Sets the number of documents to return per batch. |
int/Integer |
|
|
Enable aggregation pipeline stages to write data to temporary files. |
boolean/Boolean |
Efficient retrieval is supported via outputType=DBCursor.
You can also "stream" the documents returned from the server into your route by including outputType=DBCursor (Camel 2.21+) as an endpoint option which may prove simpler than setting the above headers. This hands your Exchange the DBCursor from the Mongo driver, just as if you were executing the aggregate() within the Mongo shell, allowing your route to iterate over the results. By default and without this option, this component will load the documents from the driver’s cursor into a List and return this to your route - which may result in a large number of in-memory objects. Remember, with a DBCursor do not ask for the number of documents matched - see the MongoDB documentation site for details.
Example with option outputType=DBCursor and batch size:
// route: from("direct:aggregate").to("mongodb:myDb?database=science&collection=notableScientists&operation=aggregate");
from("direct:aggregate")
.setHeader(MongoDbConstants.BATCH_SIZE).constant(10)
.setBody().constant("[{ $match : {$or : [{\"scientist\" : \"Darwin\"},{\"scientist\" : \"Einstein\"}]}},{ $group: { _id: \"$scientist\", count: { $sum: 1 }} } ]")
.to("mongodb:myDb?database=science&collection=notableScientists&operation=aggregate&outputType=DBCursor")
.to("mock:resultAggregate");
getDbStats
Equivalent of running the db.stats()
command in the MongoDB shell,
which displays useful statistic figures about the database.
For example:
> db.stats();
{
"db" : "test",
"collections" : 7,
"objects" : 719,
"avgObjSize" : 59.73296244784423,
"dataSize" : 42948,
"storageSize" : 1000058880,
"numExtents" : 9,
"indexes" : 4,
"indexSize" : 32704,
"fileSize" : 1275068416,
"nsSizeMB" : 16,
"ok" : 1
}
Usage example:
// from("direct:getDbStats").to("mongodb:myDb?database=flights&collection=tickets&operation=getDbStats");
Object result = template.requestBody("direct:getDbStats", "irrelevantBody");
assertTrue("Result is not of type DBObject", result instanceof DBObject);
The operation will return a data structure similar to the one displayed
in the shell, in the form of a DBObject
in the OUT message body.
getColStats
Equivalent of running the db.collection.stats()
command in the MongoDB
shell, which displays useful statistic figures about the collection.
For example:
> db.camelTest.stats();
{
"ns" : "test.camelTest",
"count" : 100,
"size" : 5792,
"avgObjSize" : 57.92,
"storageSize" : 20480,
"numExtents" : 2,
"nindexes" : 1,
"lastExtentSize" : 16384,
"paddingFactor" : 1,
"flags" : 1,
"totalIndexSize" : 8176,
"indexSizes" : {
"_id_" : 8176
},
"ok" : 1
}
Usage example:
// from("direct:getColStats").to("mongodb:myDb?database=flights&collection=tickets&operation=getColStats");
Object result = template.requestBody("direct:getColStats", "irrelevantBody");
assertTrue("Result is not of type DBObject", result instanceof DBObject);
The operation will return a data structure similar to the one displayed
in the shell, in the form of a DBObject
in the OUT message body.
command
Since Camel 2.15
Run the body as a command on database. Usefull for admin operation as getting host informations, replication or sharding status.
Collection parameter is not use for this operation.
// route: from("command").to("mongodb:myDb?database=science&operation=command");
DBObject commandBody = new BasicDBObject("hostInfo", "1");
Object result = template.requestBody("direct:command", commandBody);
Dynamic operations
An Exchange can override the endpoint’s fixed operation by setting the
CamelMongoDbOperation
header, defined by the
MongoDbConstants.OPERATION_HEADER
constant.
The values supported are determined by the MongoDbOperation enumeration and match the accepted values for the `operation` parameter on the endpoint URI.
For example:
// from("direct:insert").to("mongodb:myDb?database=flights&collection=tickets&operation=insert");
Object result = template.requestBodyAndHeader("direct:insert", "irrelevantBody", MongoDbConstants.OPERATION_HEADER, "count");
assertTrue("Result is not of type Long", result instanceof Long);
Tailable Cursor Consumer
MongoDB offers a mechanism to instantaneously consume ongoing data from
a collection, by keeping the cursor open just like the tail -f
command
of *nix systems. This mechanism is significantly more efficient than a
scheduled poll, due to the fact that the server pushes new data to the
client as it becomes available, rather than making the client ping back
at scheduled intervals to fetch new data. It also reduces otherwise
redundant network traffic.
There is only one requisite to use tailable cursors: the collection must be a "capped collection", meaning that it will only hold N objects, and when the limit is reached, MongoDB flushes old objects in the same order they were originally inserted. For more information, please refer to: http://www.mongodb.org/display/DOCS/Tailable+Cursors.
The Camel MongoDB component implements a tailable cursor consumer, making this feature available for you to use in your Camel routes. As new objects are inserted, MongoDB will push them as DBObjects in natural order to your tailable cursor consumer, who will transform them to an Exchange and will trigger your route logic.
How the tailable cursor consumer works
To turn a cursor into a tailable cursor, a few special flags are to be
signalled to MongoDB when first generating the cursor. Once created, the
cursor will then stay open and will block upon calling the
DBCursor.next()
method until new data arrives. However, the MongoDB
server reserves itself the right to kill your cursor if new data doesn’t
appear after an indeterminate period. If you are interested to continue
consuming new data, you have to regenerate the cursor. And to do so, you
will have to remember the position where you left off or else you will
start consuming from the top again.
The Camel MongoDB tailable cursor consumer takes care of all these tasks for you. You will just need to provide the key to some field in your data of increasing nature, which will act as a marker to position your cursor every time it is regenerated, e.g. a timestamp, a sequential ID, etc. It can be of any datatype supported by MongoDB. Date, Strings and Integers are found to work well. We call this mechanism "tail tracking" in the context of this component.
The consumer will remember the last value of this field and whenever the
cursor is to be regenerated, it will run the query with a filter like:
increasingField > lastValue
, so that only unread data is consumed.
Setting the increasing field: Set the key of the increasing field on
the endpoint URI tailTrackingIncreasingField
option. In Camel 2.10, it
must be a top-level field in your data, as nested navigation for this
field is not yet supported. That is, the "timestamp" field is okay, but
"nested.timestamp" will not work. Please open a ticket in the Camel JIRA
if you do require support for nested increasing fields.
Cursor regeneration delay: One thing to note is that if new data is
not already available upon initialisation, MongoDB will kill the cursor
instantly. Since we don’t want to overwhelm the server in this case, a
cursorRegenerationDelay
option has been introduced (with a default
value of 1000ms.), which you can modify to suit your needs.
An example:
from("mongodb:myDb?database=flights&collection=cancellations&tailTrackIncreasingField=departureTime")
.id("tailableCursorConsumer1")
.autoStartup(false)
.to("mock:test");
The above route will consume from the "flights.cancellations" capped collection, using "departureTime" as the increasing field, with a default regeneration cursor delay of 1000ms.
Persistent tail tracking
Standard tail tracking is volatile and the last value is only kept in memory. However, in practice you will need to restart your Camel container every now and then, but your last value would then be lost and your tailable cursor consumer would start consuming from the top again, very likely sending duplicate records into your route.
To overcome this situation, you can enable the persistent tail tracking feature to keep track of the last consumed increasing value in a special collection inside your MongoDB database too. When the consumer initialises again, it will restore the last tracked value and continue as if nothing happened.
The last read value is persisted on two occasions: every time the cursor is regenerated and when the consumer shuts down. We may consider persisting at regular intervals too in the future (flush every 5 seconds) for added robustness if the demand is there. To request this feature, please open a ticket in the Camel JIRA.
Enabling persistent tail tracking
To enable this function, set at least the following options on the endpoint URI:
-
persistentTailTracking
option totrue
-
persistentId
option to a unique identifier for this consumer, so that the same collection can be reused across many consumers
Additionally, you can set the tailTrackDb
, tailTrackCollection
and
tailTrackField
options to customise where the runtime information will
be stored. Refer to the endpoint options table at the top of this page
for descriptions of each option.
For example, the following route will consume from the
"flights.cancellations" capped collection, using "departureTime" as the
increasing field, with a default regeneration cursor delay of 1000ms,
with persistent tail tracking turned on, and persisting under the
"cancellationsTracker" id on the "flights.camelTailTracking", storing
the last processed value under the "lastTrackingValue" field
(camelTailTracking
and lastTrackingValue
are defaults).
from("mongodb:myDb?database=flights&collection=cancellations&tailTrackIncreasingField=departureTime&persistentTailTracking=true" +
"&persistentId=cancellationsTracker")
.id("tailableCursorConsumer2")
.autoStartup(false)
.to("mock:test");
Below is another example identical to the one above, but where the persistent tail tracking runtime information will be stored under the "trackers.camelTrackers" collection, in the "lastProcessedDepartureTime" field:
from("mongodb:myDb?database=flights&collection=cancellations&tailTrackIncreasingField=departureTime&persistentTailTracking=true" +
"&persistentId=cancellationsTracker&tailTrackDb=trackers&tailTrackCollection=camelTrackers" +
"&tailTrackField=lastProcessedDepartureTime")
.id("tailableCursorConsumer3")
.autoStartup(false)
.to("mock:test");
Oplog Tail Tracking
The oplog collection tracking feature allows to implement trigger like functionality in MongoDB. In order to activate this collection you will have first to activate a replica set. For more information on this topic please check https://docs.mongodb.com/manual/tutorial/deploy-replica-set/ .
Below you can find an example of a Java DSL based route demonstrating how you can use the component to track the oplog
collection. In this specific case we are filtering the events which affect a collection customers in
database optlog_test. Note that the tailTrackIncreasingField
is a timestamp field ('ts') which implies
that you have to use the tailTrackingStrategy
parameter with the TIMESTAMP value.
import com.mongodb.BasicDBObject;
import com.mongodb.MongoClient;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mongodb.MongoDBTailTrackingEnum;
import org.apache.camel.main.Main;
import java.io.InputStream;
/**
* For this to work you need to turn on the replica set
* <p>
* Commands to create a replica set:
* <p>
* rs.initiate( {
* _id : "rs0",
* members: [ { _id : 0, host : "localhost:27017" } ]
* })
*/
public class MongoDbTracker {
private final String database;
private final String collection;
private final String increasingField;
private MongoDBTailTrackingEnum trackingStrategy;
private int persistRecords = -1;
private boolean persistenTailTracking;
public MongoDbTracker(String database, String collection, String increasingField) {
this.database = database;
this.collection = collection;
this.increasingField = increasingField;
}
public static void main(String[] args) throws Exception {
final MongoDbTracker mongoDbTracker = new MongoDbTracker("local", "oplog.rs", "ts");
mongoDbTracker.setTrackingStrategy(MongoDBTailTrackingEnum.TIMESTAMP);
mongoDbTracker.setPersistRecords(5);
mongoDbTracker.setPersistenTailTracking(true);
mongoDbTracker.startRouter();
// run until you terminate the JVM
System.out.println("Starting Camel. Use ctrl + c to terminate the JVM.\n");
}
public void setTrackingStrategy(MongoDBTailTrackingEnum trackingStrategy) {
this.trackingStrategy = trackingStrategy;
}
public void setPersistRecords(int persistRecords) {
this.persistRecords = persistRecords;
}
public void setPersistenTailTracking(boolean persistenTailTracking) {
this.persistenTailTracking = persistenTailTracking;
}
void startRouter() throws Exception {
// create a Main instance
Main main = new Main();
main.bind(MongoConstants.CONN_NAME, new MongoClient("localhost", 27017));
main.addRouteBuilder(new RouteBuilder() {
@Override
public void configure() throws Exception {
getContext().getTypeConverterRegistry().addTypeConverter(InputStream.class, BasicDBObject.class,
new MongoToInputStreamConverter());
from("mongodb://" + MongoConstants.CONN_NAME + "?database=" + database
+ "&collection=" + collection
+ "&persistentTailTracking=" + persistenTailTracking
+ "&persistentId=trackerName" + "&tailTrackDb=local"
+ "&tailTrackCollection=talendTailTracking"
+ "&tailTrackField=lastTrackingValue"
+ "&tailTrackIncreasingField=" + increasingField
+ "&tailTrackingStrategy=" + trackingStrategy.toString()
+ "&persistRecords=" + persistRecords
+ "&cursorRegenerationDelay=1000")
.filter().jsonpath("$[?(@.ns=='optlog_test.customers')]")
.id("logger")
.to("log:logger?level=WARN")
.process(new Processor() {
public void process(Exchange exchange) throws Exception {
Message message = exchange.getIn();
System.out.println(message.getBody().toString());
exchange.getOut().setBody(message.getBody().toString());
}
});
}
});
main.run();
}
}
Type conversions
The MongoDbBasicConverters
type converter included with the
camel-mongodb component provides the following conversions:
Name | From type | To type | How? |
---|---|---|---|
fromMapToDBObject |
|
|
constructs a new |
fromBasicDBObjectToMap |
|
|
|
fromStringToDBObject |
|
|
uses |
fromAnyObjectToDBObject |
|
|
uses the Jackson library to convert the
object to a |
This type converter is auto-discovered, so you don’t need to configure anything manually.