Testing the Camel Kafka Connector

Running The Tests

The most basic way of running the tests is to do so as part of the build. The command line for that is:

mvn -DskipIntegrationTests=false clean verify

To also run the slow integration tests use:

mvn -DskipIntegrationTests=false -Denable.slow.tests=true clean verify

The test code in the project is rather flexible and allow a different range of options regarding the environment, tools, etc that can be used. For example, to use an external (remote) Kafka broker:

mvn -Dkafka.bootstrap.servers=host1:port -Dkafka.instance.type=remote -DskipIntegrationTests=false clean verify

Running The Tests Remotely

Not only Kafka, but also other external infrastructure can be used for the tests. To do so, you must set properties for the services that you want to run. This causes the tests to not launch the local container and use existing remote instances. At the moment, the following properties can be set for remote testing:

  • kafka.instance.type

    • kafka.bootstrap.servers

  • aws-service.instance.type

    • access.key: AWS access key (mandatory for remote testing)

    • secret.key: AWS secret key (mandatory for remote testing)

    • aws.region: AWS region (optional)

    • aws.host: AWS host (optional)

  • aws-service.kinesis.instance.type

    • access.key: AWS access key (mandatory for remote testing)

    • secret.key: AWS secret key (mandatory for remote testing)

    • aws.region: AWS region (optional)

    • aws.host: AWS host (optional)

  • azure.instance.type

    • azure.storage.queue.host

    • azure.storage.queue.port

    • azure.storage.queue.account.name

    • azure.storage.queue.account.key

  • elasticsearch.instance.type

  • elasticsearch.instance.type

    • elasticsearch.host

    • elasticsearch.port

  • cassandra.instance.type

    • cassandra.host

    • cassandra.cql3.port

  • jdbc.instance.type

    • jdbc.connection.url

  • jms-service.instance.type

    • jms.broker.address

  • hdfs.instance.type

    • hdfs.host

    • hdfs.port

  • mongodb.instance.type

    • mongodb.url

It is possible to use a properties file to set these. To do so use -Dtest.properties=/path/to/file.properties.

Manual Tests

A few manual tests can be enabled and executed with adequate configuration on the accounts and environments used by those services. This is very specific to the nature of each of those services, therefore please consult the comments on each of those test cases for the details related to their setup.

The following manual or semi-automated tests are available under these circumstances. They can be enabled with these flags along with others that are specific to that service:

  • it.test.salesforce.enable

  • it.test.slack.enable

Salesforce Test

Before running the tests be sure to:

  • Run the Salesforce CLI container:

$ docker run --rm --name salesforce-cli -it -v /path/to/sfdx:/root/.sfdx salesforce/salesforcedx
  • Within the container, use the following command to login:

$ sfdx force:auth:device:login -s -d -i <client ID>
  • Now you should be able to run all integration tests including Salesforce tests with:

$ mvn -DskipIntegrationTests=false -Dit.test.salesforce.enable=true clean verify

Performance Tests

There is also a reference test implementation for checking the performance and resource usage in some situations. The current implementation, in the perf-tests-rabbitmq, can be run using:

mvn -DskipIntegrationTests=false -Dit.test.perf.enabled=true clean verify

Additional JVM settings, such as those to allow usage of the Async Profiler can be passed using the jvm.user.settings property. For example:

mvn -DskipIntegrationTests=false -Dit.test.perf.enabled=true -Djvm.user.settings="-agentpath:/path/to/asyncProfiler.so=start,file=/path/to/profile.svg" clean verify

The duration of the test can be adjusted with the rabbitmq.test.duration. This option takes the number of minutes for the test duration.

Writing New Tests

The Camel Kafka Connector aim to replicate as closely as possible the scenarios on which the connectors will be run. In order to achieve that, the code tries to replicate the infrastructure on which the connectors run. This means that every integration test launches a Kafka and Zookeeper instance, the connector under test as well as the infrastructure required for the connector to run.

The lifecycle for Camel and Kafka environments is handled in the test lifecycle and already implemented. Therefore, new tests usually have to implement support for two things: the infrastructure required for the connector to run and the test logic.

Simulating the Test Infrastructure

When implementing a new integration test, the first step is to identify how to simulate the sink or source infrastructure that is needed to run the test. In general, the integration test leverages the features provided by the project TestContainers and uses container images to simulate the environments.

The test code abstracts the provisioning of test environments behind service classes (i.e.: JMSService, JDBCService, etc). The purpose of the service class is to abstract the both the type service (i.e.: Kafka, Strimzi, etc) and the location of the service (i.e.: remote, local, etc). This provides flexibility to test the connector under different circumstances (ie.: using a remote JMS broker or using a local JMS broker running in a container managed by TestContainers). It makes it easier to hit edge cases as well as try different operating scenarios (ie.: higher latency, slow backends, etc).

JUnit 5 manages the lifecycle of the services, therefore each service must be a JUnit 5 compliant extension. The exact extension point that a service must extend is specific to each service. The JUnit 5 documentation is the reference for the extension points.

In general, the services should aim to minimize the test execution time and resource usage when running. As such, the BeforeAllCallback and AfterAllCallback should be the preferred extensions whenever possible because they allow the instance of the infrastructure to be static throughout the test execution.

Instantiation of clients for the service classes can be handled in the Service classes whenever is sensible to do so. For example, when handling credentials or different communication protocols that are determined by the service, it might make sense to abstract that logic from the test code.

A specialized service factory class is responsible for creating the service according to runtime parameters and/or other test scenarios constraints. When a service allows different service types or locations to be selected, this should be done via command line properties (-D<property.name>=<value>). For example, when allowing a service to choose between running as a local container or as remote instance, a property in the format <name>.instance.type should be handled. Additional runtime parameters used in different scenarios, should be handled as <name>.<parameter>.

When a container image is not available via TestContainers, tests can provide their own implementation using officially available images. The license must be compatible with Apache 2.0. If an official image is not available, a Dockerfile to build the service can be provided. The Dockerfile should try to minimize the container size and resource usage whenever possible.

Writing the Test Logic

There are 2 important bits required to write the test logic. The first one is a property factory that creates the connector specific properties. It stores information such as the sink or source queue, connector classes, converters and connector specific parameters.

The properties factories should specialize either the SinkConnectorPropertyFactory class or the SourceConnectorPropertyFactory class. The base classes provide the common methods applicable to all situations. The property factory should provide a static method called basic. This method should create the most basic connector property factory for the connector to run. If needed, the property factory for the CamelMongoDBPropertyFactory one can be used as a reference.

The connector property factory, should, ideally, also provide a method that allows setup sink or source URLs. Even though this is not encouraged for regular usage of the connector, it is a good way to spot differences when the connector is configured via properties versus when it is configured via URL. The connector config classes for each connector (ie.: those classes whose name end with ConnectorConfig) provide the list of supported configurations.

The test class should specialize the AbstractKafkaTest class. Every specialization of that class must provide a getConnectorsInTest method that provides an array with the name of the connectors being test. During the test execution, the contents of this array is used to traverse the project directory and find the connector classes. This is required because Kafka Connect handles the classpath separately and loads the connector classes through the plugin.path setting. Check the comments on the pluginPaths methods of the PluginPathHelper for additional details.

As a general rule, test should ensure as much isolation of the test classpath and the runtime classpath as possible. Not only this allows us to catch possible classpath issues, but also conflicts between connector dependencies and Kafka’s Connect own runtime libraries.

The test code should avoid printing data to the stdout. Instead, the default project logger should be used. Every connector module has its own log file. Tests for new connectors should ensure that the relevant information is logged.