Using Apache Camel and ActiveMQ to Implement Synchronous Request/Response.

Implementing a synchronous request/response pattern with Apache Camel and ActiveMQ is quite a bit easier than you may expected, and has allowed us to leverage our current messaging infrastructure to facilitate synchronous exchanges between applications where we otherwise may have needed to create a new web service.

Below is an example of setting up two Camel endpoints which will demonstrate the request/response pattern.

First, configure the connection to the JMS message broker.  In this case, an ActiveMQ broker is created in-process.

public void startBroker() throws Exception {
context = new DefaultCamelContext();
context.addComponent("jms-broker", activeMQComponent("vm://localhost?broker.persistent=false"));


Next, set up the producer route.  First a processor is created, which will print the body of the message.  The route will be executed when a file is dropped into the /Users/RobTerpilowski/tmp/in directory, and routed to the robt.test.queue destination.  Once the route has completed, the processor will be executed.  What we are hoping to see is that the message has been modified by the consuming endpoint when this route has completed.  The important piece to note here is the url:
exchangePattern=InOut tells Camel that the route is a syncrhonous request/response

public void buildProducerRoute(CamelContext context) throws Exception {
context.addRoutes(new RouteBuilder() {
public void configure() throws Exception {
Processor processor = (Exchange exchange) -> {
System.out.println("PRODUCER Received response: " + exchange.getIn().getBody(String.class));


Next, set up the consumer endpoint.  Again, a processor is created which will be run when the route has completed.  This processor will first print the message that the producer sent.  It will then replace the message with a new message saying that the original message was seen.  This endpoint will listen on the robt.test.queue and route the result to the directory /Users/RobTerpilowski/tmp/out.  When the route has completed, the processor will update the message.  If everything works correctly, the producer endpoint should be able to see the modified message.

public void buildConsumerRoute(CamelContext context) throws Exception {
context.addRoutes(new RouteBuilder() {
public void configure() throws Exception {
Processor processor = (Exchange exchange) -> {
System.out.println("CONSUMER received message: " + exchange.getIn().getBody(String.class));
exchange.getIn().setBody("I Saw it!!! It contained: " + exchange.getIn().getBody(String.class));

So now that the routes are set up, it’s time to send a message.  Saving a file in the specified input directory will kick things off.  The file will contain the text “HelloCamel”.


$ echo "HelloCamel" > /Users/RobTerpilowski/tmp/in/message.txt

The consumer listening on the robt.test.queue immediately sees the message arrives, and prints the message body.

CONSUMER received message: HelloCamel


The producer endpoint then receives the modified message back, with confirmation that the consumer endpoint did indeed see the message.

PRODUCER Received response: I Saw it!!! It contained: HelloCamel


Make Sure Your Server Clocks are in Sync!

As I finished my first test services that would utilize the Request/Response pattern I created an integration test where they connected to messaging broker that was running in-process.  Things looked great, and the services were communicating without any issues.  I deployed the services to a Wildfly instance running locally, which were pointing at a messaging broker on our staging server.  However, this time when I started my test, the requests were consistently timing out, and never making it back from the second service.  I literally spent the entire day deconstructing each service piece by piece to see what was going on.  I then remembered something about clock synchronization in the Camel JMS documentation.  I checked both the clock on the VM and the clock on the staging server and proceeded to do a face palm when I saw there was a four hour difference, lesson learned!


twitter: @RobTerpilowski

CamelOne 2012 Summary


I’ve just returned from the CamelOne Open Source Integration Conference in Boston where I was invited as a speaker to present how Lynden is using ActiveMQ as part of its freight tracking infrastructure.  As expected, there were a number of big announcements and very interesting projects that speakers from all over the world showcased during the 2 day conference.  Here are few new projects/applications that I found may be useful here at Lynden in particular.

I thought this was by far the most interesting project announced during the conference.  Fuse Fabric is a distributed configuration, management and provisioning system for Fuse ESB, Fuse Message Broker and the Apache open source integration solutions (ActiveMQ, Camel, CXF, Karaf & ServiceMix).  Basically providing a single point where configuration information for brokers (and other applications) can go to retrieve their config info.   Under the covers it is making use of Apache ZooKeeper to store the cluster configuration and node registration.

We are currently only running 1 production message broker, but we have 8 message brokers in our test environment.  Fuse Fabric could greatly reduce the overhead of managing the configuration of those brokers.

    Apache ZooKeeper
As it seems at these conferences, there are always ideas flying around among the attendees and I always seem to obtain just as many new ideas attending sessions as I do talking to other architects and developers outside the sessions.  I was explaining to someone the challenge we have with managing the configurations of our 70+ web applications and services in our production, staging and test environments.  His immediate response was to take a look at Apache ZooKeeper.  They had been using it the past few months for managing configuration of multiple environments with good success.  As mentioned above, Fuse Fabric makes use of Apache ZooKeeper, and I’m curious to learn if Fabric can be used beyond the integration apps it mentions (ESB, MQ, etc), and also used for things such as our web applications and services.

Fuse Management Console
Provides a toolf for managing large-scale deployments of Fuse ESB Enterprise and Fuse MQ Enterprise.  A FuseSource subscription is required to install the console, which provides the following benefits:

  • Simplified management– key status metrics and commands are accessible from a unified, centralized console
  • Automatic provisioning– dependencies between components and include files are tracked and managed
  • Profile management– simplify the creation and management of customized brokers by defining configuration profiles
  • Automated updates– updates to a single component are automatically deployed to the appropriate brokers based on the profile
  • Local or Cloud deployment – your integration infrastructure can be deployed and managed locally or in the Cloud

MQTT protocol support in ActiveMQ 5.6
MQTT protocol has come a little late for Lynden since we already have a solution for connecting Windows CE devices to ActiveMQ.  MQ Telemetry Transport (MQTT) is a machine to machine protocol design as an extremely lightweight publish/subscribe messaging transport.  ActiveMQ 5.6 and Apollo 1.0 will both support MQTT.

Fuse HQ
Fuse HQ is a SOA management and monitoring system and is available with a FuseSource support subscription.  Fuse HQ takes advantage of the JMX-based reporting capabilities of the Fuse products.  Some of the features Fuse HQ provides are:

Monitor systems– comprehensive cross-stack dashboard make it easy to check the health of systems and services

  • Role-based access controls for managing user visibility
  • Visually review relationships between FuseSource servers and other managed resources
  • Explore and generate reports of real-time and historic logs, configurations, and events
  • Impact analysis and change control based on configuration tracking
  • Collect, chart and view real-time and historic metrics from hardware, network and applications without invasive instrumentation
  • Auto-discover hardware and software attributes including configuration, version numbers, memory, CPU, disk and network devices
  • Distributed monitoring allows for linear scalability of Fuse HQ server

Manage systems– execute control operations to manage the FuseSource environment

  • Role-based access controls for managing user permissions
  • External authentication leverages existing LDAP or Kerberos directories
  • Automate maintenance activities with scheduled controls and conditional responses to alerts
  • High availability allows multiple Fuse HQ servers to assume workloads in the event of a server failure

Intelligent Alerts– definable alerts proactively identify problems before they occur

  • Follow-the-sun assignments routs alerts to the appropriate person based on the time of day and users’ scheduled availability
  • Integrate traps with existing IT operations suite
  • Define alerts once and apply to large, global resource groups
  • Alerts can be multiconditional, role-based and group based, and trigger recovery processes
  • Generate alerts on changes in configuration or key attributes of any managed resource

twitter: @RobTerp

Hacking ActiveMQ to Retrieve Connection Statistics

This hack is so awful that I am almost embarrassed to blog about it, but this was the only way to obtain the information we needed from ActiveMQ to display in an MBean in JConsole.

We have written a Java proxy server to allow Window CE devices running a C# application to connect to our JMS message broker.  The proxy server runs within a Glassfish instance to make managing its lifecycle a bit easier.  For system diagnostics and debugging purposes we created an MBean to be accessed via JMX so we could get real-time statistics from the proxy through JConsole or VisualVM, such as # of clients connected, message counts, as well as total connections open from the proxy to the broker.

The number of connections that the proxy had opened to the broker seemed like it *should* be a fairly straightforward task.  Enable statistics on the ActiveMQConnectionFactory instance by calling setStatsEnabled(true) and then retrieve the connection statistics from the factory via a call to the getStats() method, and then get the total number of open connections. However, no matter what we did, the getStats() call always returned null, and for good reason, upon inspecting the ActiveMQConnectionFactory source code we found:

218    public StatsImpl getStats() {
219        // TODO
220        return null;
221    }

In the ActiveMQConnectionFactory class there resides a field called factoryStats of type JMSStatsImpl.  The field is protected and there are no get() methods to retrieve this field, so we had to resort to drastic measures if we really wanted to know how many open connections the proxy had open to the broker.  The code to get the connection information from the factory appears below.

    public int getBrokerConnectionCount() {

        int count = -1;
        try {
            Field factoryStatsField = ActiveMQConnectionFactory.class.getDeclaredField("factoryStats");
            JMSStatsImpl factoryStats = (JMSStatsImpl) factoryStatsField.get(connectionFactory);
            count = factoryStats.getConnections().length;
        } catch (Exception ex) {
            logger.error("Attempted to get Broker connection count, but failed due to: " + ex.getMessage());
        return count;

Even though the field is protected, reflection can be used to set the field to “accessible” and then the value of the field can be read directly from the factory object.  This is not guaranteed to work in future releases of ActiveMQ, but since this is only for informational purposes in JConsole this was good enough for what we needed.

Twitter: @RobTerp