Azure storage datalake service
Since Camel 3.8
Both producer and consumer are supported
The Azure storage datalake component is used for storing and retrieving file from Azure Storage Datalake Sevice using the Azure APIs v12.
Prerequisites
You need to have a valid Azure account with Azure storage set up. More information can be found at Azure Documentation Portal.
Maven users will need to add the following dependency to their pom.xml
for this component.
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-azure-storage-datalake</artifactId>
<version>x.x.x</version>
<!-- use the same version as your camel core version -->
</dependency>
Uri Format
azure-storage-datalake:accountName[/fileSystemName][?options]
In case of a consumer, both accountName and fileSystemName are required. In case of the producer, it depends on the operation being requested.
You can append query options to the URI in the following format, ?option1=value&option2=value&…
For example, in order to download content from file test.txt
located on the filesystem
in camelTesting
storage account, use the following snippet:
from("azure-storage-datalake:camelTesting/filesystem?fileName=test.txt&accountKey=key").
to("file://fileDirectory");
Uri Options
The Azure storage datalake service component supports 33 options, which are listed below.
Name | Description | Default | Type |
---|---|---|---|
accountKey (common) |
account key for authentication |
String |
|
clientId (common) |
client id for azure account |
String |
|
clientSecret (common) |
client secret for azure account |
String |
|
clientSecretCredential (common) |
client secret credential for authentication |
ClientSecretCredential |
|
close (common) |
Whether or not a file changed event raised indicates completion (true) or modification (false) |
Boolean |
|
closeStreamAfterRead (common) |
check for closing stream after read |
Boolean |
|
configuration (common) |
configuration object for datalake |
DataLakeConfiguration |
|
dataCount (common) |
count number of bytes to download |
Long |
|
directoryName (common) |
directory of the file to be handled in component |
String |
|
downloadLinkExpiration (common) |
download link expiration time |
Long |
|
expression (common) |
expression for queryInputStream |
String |
|
fileDir (common) |
directory of file to do operations in the local system |
String |
|
fileName (common) |
name of file to be handled in component |
String |
|
fileOffset (common) |
offset position in file for different operations |
Long |
|
maxResults (common) |
maximum number of results to show at a time |
Integer |
|
maxRetryRequests (common) |
no of retries to a given request |
int |
|
openOptions (common) |
set open options for creating file |
Set |
|
path (common) |
path in azure datalake for operations |
String |
|
permission (common) |
permission string for the file |
String |
|
position (common) |
This parameter allows the caller to upload data in parallel and control the order in which it is appended to the file. |
Long |
|
recursive (common) |
recursively include all paths |
Boolean |
|
regex (common) |
regular expression for matching file names |
String |
|
retainUncommitedData (common) |
Whether or not uncommitted data is to be retained after the operation |
Boolean |
|
serviceClient (common) |
Autowired datalake service client for azure storage datalake |
DataLakeServiceClient |
|
sharedKeyCredential (common) |
shared key credential for azure datalake gen2 |
StorageSharedKeyCredential |
|
tenantId (common) |
tenant id for azure account |
String |
|
timeout (common) |
Timeout for operation |
Duration |
|
umask (common) |
umask permission for file |
String |
|
userPrincipalNameReturned (common) |
whether or not to use upn |
Boolean |
|
bridgeErrorHandler (consumer) |
Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. |
false |
boolean |
lazyStartProducer (producer) |
Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel’s routing error handlers. Beware that when the first message is processed then creating and starting the producer may take a little time and prolong the total processing time of the processing. |
false |
boolean |
operation (producer) |
operation to be performed. There are 2 enums and the value can be one of: listFileSystem, listFiles |
listFileSystem |
DataLakeOperationsDefinition |
autowiredEnabled (advanced) |
Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc. |
true |
boolean |
The Azure storage datalake service endpoint is configured using URI syntax:
azure-storage-datalake:accountName/fileSystemName
with the following path and query parameters:
Path Parameters (2 parameters):
Name | Description | Default | Type |
---|---|---|---|
accountName |
name of the azure account |
String |
|
fileSystemName |
name of filesystem to be used |
String |
Query Parameters (50 parameters):
Name | Description | Default | Type |
---|---|---|---|
accountKey (common) |
account key for authentication |
String |
|
clientId (common) |
client id for azure account |
String |
|
clientSecret (common) |
client secret for azure account |
String |
|
clientSecretCredential (common) |
client secret credential for authentication |
ClientSecretCredential |
|
close (common) |
Whether or not a file changed event raised indicates completion (true) or modification (false) |
Boolean |
|
closeStreamAfterRead (common) |
check for closing stream after read |
Boolean |
|
dataCount (common) |
count number of bytes to download |
Long |
|
dataLakeServiceClient (common) |
service client of datalake |
DataLakeServiceClient |
|
directoryName (common) |
directory of the file to be handled in component |
String |
|
downloadLinkExpiration (common) |
download link expiration time |
Long |
|
expression (common) |
expression for queryInputStream |
String |
|
fileDir (common) |
directory of file to do operations in the local system |
String |
|
fileName (common) |
name of file to be handled in component |
String |
|
fileOffset (common) |
offset position in file for different operations |
Long |
|
maxResults (common) |
maximum number of results to show at a time |
Integer |
|
maxRetryRequests (common) |
no of retries to a given request |
int |
|
openOptions (common) |
set open options for creating file |
Set |
|
path (common) |
path in azure datalake for operations |
String |
|
permission (common) |
permission string for the file |
String |
|
position (common) |
This parameter allows the caller to upload data in parallel and control the order in which it is appended to the file. |
Long |
|
recursive (common) |
recursively include all paths |
Boolean |
|
regex (common) |
regular expression for matching file names |
String |
|
retainUncommitedData (common) |
Whether or not uncommitted data is to be retained after the operation |
Boolean |
|
serviceClient (common) |
Autowired datalake service client for azure storage datalake |
DataLakeServiceClient |
|
sharedKeyCredential (common) |
shared key credential for azure datalake gen2 |
StorageSharedKeyCredential |
|
tenantId (common) |
tenant id for azure account |
String |
|
timeout (common) |
Timeout for operation |
Duration |
|
umask (common) |
umask permission for file |
String |
|
userPrincipalNameReturned (common) |
whether or not to use upn |
Boolean |
|
bridgeErrorHandler (consumer) |
Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. |
false |
boolean |
sendEmptyMessageWhenIdle (consumer) |
If the polling consumer did not poll any files, you can enable this option to send an empty message (no body) instead. |
false |
boolean |
exceptionHandler (consumer) |
To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored. |
ExceptionHandler |
|
exchangePattern (consumer) |
Sets the exchange pattern when the consumer creates an exchange. There are 3 enums and the value can be one of: InOnly, InOut, InOptionalOut |
ExchangePattern |
|
pollStrategy (consumer) |
A pluggable org.apache.camel.PollingConsumerPollingStrategy allowing you to provide your custom implementation to control error handling usually occurred during the poll operation before an Exchange have been created and being routed in Camel. |
PollingConsumerPollStrategy |
|
lazyStartProducer (producer) |
Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel’s routing error handlers. Beware that when the first message is processed then creating and starting the producer may take a little time and prolong the total processing time of the processing. |
false |
boolean |
operation (producer) |
operation to be performed. There are 2 enums and the value can be one of: listFileSystem, listFiles |
listFileSystem |
DataLakeOperationsDefinition |
backoffErrorThreshold (scheduler) |
The number of subsequent error polls (failed due some error) that should happen before the backoffMultipler should kick-in. |
int |
|
backoffIdleThreshold (scheduler) |
The number of subsequent idle polls that should happen before the backoffMultipler should kick-in. |
int |
|
backoffMultiplier (scheduler) |
To let the scheduled polling consumer backoff if there has been a number of subsequent idles/errors in a row. The multiplier is then the number of polls that will be skipped before the next actual attempt is happening again. When this option is in use then backoffIdleThreshold and/or backoffErrorThreshold must also be configured. |
int |
|
delay (scheduler) |
Milliseconds before the next poll. |
500 |
long |
greedy (scheduler) |
If greedy is enabled, then the ScheduledPollConsumer will run immediately again, if the previous run polled 1 or more messages. |
false |
boolean |
initialDelay (scheduler) |
Milliseconds before the first poll starts. |
1000 |
long |
repeatCount (scheduler) |
Specifies a maximum limit of number of fires. So if you set it to 1, the scheduler will only fire once. If you set it to 5, it will only fire five times. A value of zero or negative means fire forever. |
0 |
long |
runLoggingLevel (scheduler) |
The consumer logs a start/complete log line when it polls. This option allows you to configure the logging level for that. There are 6 enums and the value can be one of: TRACE, DEBUG, INFO, WARN, ERROR, OFF |
TRACE |
LoggingLevel |
scheduledExecutorService (scheduler) |
Allows for configuring a custom/shared thread pool to use for the consumer. By default each consumer has its own single threaded thread pool. |
ScheduledExecutorService |
|
scheduler (scheduler) |
To use a cron scheduler from either camel-spring or camel-quartz component. Use value spring or quartz for built in scheduler |
none |
Object |
schedulerProperties (scheduler) |
To configure additional properties when using a custom scheduler or any of the Quartz, Spring based scheduler. |
Map |
|
startScheduler (scheduler) |
Whether the scheduler should be auto started. |
true |
boolean |
timeUnit (scheduler) |
Time unit for initialDelay and delay options. There are 7 enums and the value can be one of: NANOSECONDS, MICROSECONDS, MILLISECONDS, SECONDS, MINUTES, HOURS, DAYS |
MILLISECONDS |
TimeUnit |
useFixedDelay (scheduler) |
Controls if fixed delay or fixed rate is used. See ScheduledExecutorService in JDK for details. |
true |
boolean |
Methods of authentication
In order to use this component, you will have to provide at least one of the below given points for authentication purposes.
-
Provide
accountName
andaccessKey
for your azure account. -
Provide StorageSharedKeyCredential instance which can be provided into
sharedKeyCredential
option. -
Provide ClientSecretCredential instance which can be provided into
clientSecretCredential
option. -
Provide
accountName
,clientId
,clientSecret
andtenantId
for authentication with Azure Active Directory. -
Provide a DataLakeServiceClient instance which can be provided into
serviceClient
option.
Usage
Message Headers specified by user
Header | Variable Name | Type | Description |
---|---|---|---|
|
|
|
Defines options available to configure the behavior of a call to listFileSystemsSegment on a DataLakeServiceAsyncClient object. Null may be passed. |
|
|
|
An optional timeout value beyond which a RuntimeException will be raised. |
|
|
|
Specify the producer operation to execute. Different operations allowed are shown below. |
|
|
|
Name of the file system in azure datalake on which operation is to be performed. Please make sure that filesystem name is all lowercase. |
|
|
|
Name of the directory in azure datalake on which operation is to be performed. |
|
|
|
Name of the file in azure datalake on which operation is to be performed. |
|
|
|
The metadata to associate with the file. |
|
|
|
|
|
|
|
This contains values which will restrict the successful operation of a variety of requests to the conditions present. These conditions are entirely optional. |
|
|
|
Defines options available to configure the behavior of a call to listContainersSegment on a DataLakeFileSystemClient object. Null may be passed. |
|
|
|
Path of the file to be used for upload operations. |
|
|
|
Specifies if the call to listContainersSegment should recursively include all paths. |
|
|
|
Specifies the maximum number of blobs to return, including all BlobPrefix elements. |
|
|
|
|
|
|
|
Filter the results to return only those files with match the specified regular expression. |
|
|
|
Directory in which the file is to be downloaded. |
|
|
|
An MD5 hash of the content. The hash is used to verify the intergity of the file during transport. |
|
|
|
This is a representation of a range of bytes on a file, typically used during a download operation. Passing null as a FileRange value will default to the entire range of the file. |
|
|
|
|
|
|
|
Set of OpenOption used to configure how to open or create a file. |
|
|
|
Flag indicating if the file was incrementally copied. |
|
|
|
Set the Expiration time of the download link. |
|
|
|
The position where the data is to be appended. |
|
|
|
By setting lease id, requests will fail if the provided lease does not match the active lease on the file. |
|
|
|
Additional parameters for a set of operations. |
|
|
|
Determines Whether or not uncommitted data is to be retained after the operation. |
|
|
|
Whether or not a file changed event raised indicates completion (true) or modification (false). |
|
|
|
The length of the file after all data has been written. |
|
|
|
The query expression on the file. |
|
|
|
Defines the input serialization for a file query request. either FileQueryJsonSerialization or FileQueryDelimitedSerialization |
|
|
|
Defines the output serialization for a file query request. either FileQueryJsonSerialization or FileQueryDelimitedSerialization |
|
|
|
Sets error consumer for file query |
|
|
|
Sets progress consumer for file query |
|
|
|
Optional parameters for File Query. |
|
|
|
Sets the permission for file. |
|
|
|
Sets the umask for file. |
Message headers set by the component
Header | Variable Name | Type | Description |
---|---|---|---|
|
|
|
The metadata to associate with the file. |
|
|
|
Non parsed http headers that can be used by the user. |
|
|
|
Access tier of file. |
|
|
|
Datetime when the access tier of the blob last changed. |
|
|
|
Archive status of file. |
|
|
|
Cache control specified for the file. |
|
|
|
Content disposition specified for the file. |
|
|
|
Content encoding specified for the file. |
|
|
|
Content language specified for the file. |
|
|
|
Content type specified for the file. |
|
|
|
Conclusion time of the last attempted Copy Blob operation where this file was the destination file. |
|
|
|
String identifier for this copy operation. |
|
|
|
Contains the number of bytes copied and the total bytes in the source in the last attempted Copy Blob operation where this file was the destination file. |
|
|
|
URL up to 2 KB in length that specifies the source file or file used in the last attempted Copy Blob operation where this file was the destination file. |
|
|
|
Status of the last copy operation performed on the file. |
|
|
|
|
|
|
|
Creation time of the file. |
|
|
|
The SHA-256 hash of the encryption key used to encrypt the file. |
|
|
|
Flag indicating if the file’s content is encrypted on the server. |
|
|
|
The E Tag of the file. |
|
|
|
Size of the file. |
|
|
|
Datetime when the file was last modified. |
|
|
|
Type of lease on the file. |
|
|
|
State of the lease on the file. |
|
|
|
Status of the lease on the file. |
|
|
|
The link that can be used to download the file from datalake. |
Automatic detection of service client
The component is capable of automatically detecting the presence of a DataLakeServiceClient bean in the registry. Hence, if your registry has only one instance of type DataLakeServiceClient, it will be automatically used as the default client. You won’t have to explicitly define it as an uri paramater.
Azure Storage DataLake Producer Operations
The various operations supported by Azure Storage DataLake are as given below:
Operations on Service level
For these operations, accountName
option is required
Operation | Description |
---|---|
|
List all the file systems that are present in the given azure account. |
Operations on File system level
For these operations, accountName
and fileSystemName
options are required
Operation | Description |
---|---|
|
Creates a new file System with the storage account |
|
Deletes the specified file system within the storage account |
|
Returns list of all the files within the given path in the given file system , with folder structure flattened |
Operations on Directory level
For these operations, accountName
, fileSystemName
and directoryName
options are required
Operation | Description |
---|---|
|
Creates a new file in the specified directory within the fileSystem |
|
Deletes the specified directory within the file system |
Operations on file level
For these operations, accountName
, fileSystemName
and fileName
options are required
Operation | Description |
---|---|
|
Get the contents of a file |
|
Downloadd the entire file from the file system into a path specified by fileDir. |
|
Generate download link for the specified file using Shared Access Signature (SAS). The expiration time to be set for the link can be specified otherwise 1 hour is taken as default. |
|
Deletes the specified file. |
|
Appends the data passed to the specified file in the file System. Flush command is required after append. |
|
Flushes the data already appended to the specified file. |
|
Opens an inputstream based on the query passed to the endpoint. For this operation, you must first register the query acceleration feature with your subscription. |
Refer the examples section below for more details on how to use these operations
Consumer Examples
To consume a file from the storage datalake into a file using the file component, this can be done like this:
from("azure-storage-datalake":cameltesting/filesystem?fileName=test.txt&accountKey=yourAccountKey").
to("file:/filelocation");
You can also directly write to a file without using the file component. For this, you will need to specify the path in fileDir
option, to save it to your machine.
from("azure-storage-datalake":cameltesting/filesystem?fileName=test.txt&accountKey=yourAccountKey&fileDir=/test/directory").
to("mock:results");
This component also supports batch consumer. So, you can consume multiple files from a file system by specifying the path from where you want to consume the files.
from("azure-storage-datalake":cameltesting/filesystem?accountKey=yourAccountKey&fileDir=/test/directory&path=abc/test").
to("mock:results");
Producer Examples
-
listFileSystem
from("direct:start")
.process(exchange -> {
//required headers can be added here
exchange.getIn().setHeader(DataLakeConstants.LIST_FILESYSTEMS_OPTIONS, new ListFileSystemsOptions().setMaxResultsPerPage(10));
})
.to("azure-storage-datalake:cameltesting?operation=listFileSystem&dataLakeServiceClient=#serviceClient")
.to("mock:results");
-
createFileSystem
from("direct:start")
.process(exchange -> {
exchange.getIn().setHeader(DataLakeConstants.FILESYSTEM_NAME, "test1");
})
.to("azure-storage-datalake:cameltesting?operation=createFileSystem&dataLakeServiceClient=#serviceClient")
.to("mock:results");
-
deleteFileSystem
from("direct:start")
.process(exchange -> {
exchange.getIn().setHeader(DataLakeConstants.FILESYSTEM_NAME, "test1");
})
.to("azure-storage-datalake:cameltesting?operation=deleteFileSystem&dataLakeServiceClient=#serviceClient")
.to("mock:results");
-
listPaths
from("direct:start")
.process(exchange -> {
exchange.getIn().setHeader(DataLakeConstants.LIST_PATH_OPTIONS, new ListPathsOptions().setPath("/main"));
})
.to("azure-storage-datalake:cameltesting/filesystem?operation=listPaths&dataLakeServiceClient=#serviceClient")
.to("mock:results");
-
getFile
This can be done in two ways, We can either set an outputstream in the exchange body
from("direct:start")
.process(exchange -> {
// set an outputstream where the file data can should be written
exchange.getIn().setBody(outputStream);
})
.to("azure-storage-datalake:cameltesting/filesystem?operation=getFile&fileName=test.txt&dataLakeServiceClient=#serviceClient")
.to("mock:results");
Or if body is not set, the operation will give an inputstream, given that you have already registered for query acceleration in azure portal.
from("direct:start")
.to("azure-storage-datalake:cameltesting/filesystem?operation=getFile&fileName=test.txt&dataLakeServiceClient=#serviceClient")
.process(exchange -> {
InputStream inputStream = exchange.getMessage().getBody(InputStream.class);
System.out.Println(IOUtils.toString(inputStream, StandardCharcets.UTF_8.name()));
})
.to("mock:results");
-
deleteFile
from("direct:start")
.to("azure-storage-datalake:cameltesting/filesystem?operation=deleteFile&fileName=test.txt&dataLakeServiceClient=#serviceClient")
.to("mock:results");
-
downloadToFile
from("direct:start")
.to("azure-storage-datalake:cameltesting/filesystem?operation=downloadToFile&fileName=test.txt&fileDir=/test/mydir&dataLakeServiceClient=#serviceClient")
.to("mock:results");
-
downloadLink
from("direct:start")
.to("azure-storage-datalake:cameltesting/filesystem?operation=downloadLink&fileName=test.txt&dataLakeServiceClient=#serviceClient")
.process(exchange -> {
String link = exchange.getMessage().getBody(String.class);
System.out.println(link);
})
.to("mock:results");
-
appendToFile
from("direct:start")
.process(exchange -> {
final String data = "test data";
final InputStream inputStream = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
exchange.getIn().setBody(inputStream);
})
.to("azure-storage-datalake:cameltesting/filesystem?operation=appendToFile&fileName=test.txt&dataLakeServiceClient=#serviceClient")
.to("mock:results");
-
flushToFile
from("direct:start")
.process(exchange -> {
exchange.getIn().setHeader(DataLakeConstants.POSITION, 0);
})
.to("azure-storage-datalake:cameltesting/filesystem?operation=flushToFile&fileName=test.txt&dataLakeServiceClient=#serviceClient")
.to("mock:results");
-
openQueryInputStream
For this operation, you should have already registered for query acceleration on the azure portal
from("direct:start")
.process(exchange -> {
exchange.getIn().setHeader(DataLakeConstants.QUERY_OPTIONS, new FileQueryOptions("SELECT * from BlobStorage"));
})
.to("azure-storage-datalake:cameltesting/filesystem?operation=openQueryInputStream&fileName=test.txt&dataLakeServiceClient=#serviceClient")
.to("mock:results");
-
upload
from("direct:start")
.process(exchange -> {
final String data = "test data";
final InputStream inputStream = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
exchange.getIn().setBody(inputStream);
})
.to("azure-storage-datalake:cameltesting/filesystem?operation=upload&fileName=test.txt&dataLakeServiceClient=#serviceClient")
.to("mock:results");
-
uploadFromFile
from("direct:start")
.process(exchange -> {
exchange.getIn().setHeader(DataLakeConstants.PATH, "test/file.txt");
})
.to("azure-storage-datalake:cameltesting/filesystem?operation=uploadFromFile&fileName=test.txt&dataLakeServiceClient=#serviceClient")
.to("mock:results");
-
createFile
from("direct:start")
.process(exchange -> {
exchange.getIn().setHeader(DataLakeConstants.DIRECTORY_NAME, "test/file/");
})
.to("azure-storage-datalake:cameltesting/filesystem?operation=createFile&fileName=test.txt&dataLakeServiceClient=#serviceClient")
.to("mock:results");
-
deleteDirectory
from("direct:start")
.process(exchange -> {
exchange.getIn().setHeader(DataLakeConstants.DIRECTORY_NAME, "test/file/");
})
.to("azure-storage-datalake:cameltesting/filesystem?operation=deleteDirectory&dataLakeServiceClient=#serviceClient")
.to("mock:results");
Testing
Please run all the unit tests and integration test while making changes to the component as changes or version upgrades can break things. For running all the test in the component, you will need to obtain azure accountName and accessKey. After obtaining the same, you can run the full test, on this component directory, by running the following maven command
mvn verify -PfullTests -Dazure.storage.account.name=<accountName> -Dazure.storage.account.key=<accessKey>
You can also skip the integration test, and run only basic unit test by using the command
mvn test