Using Apache Camel and ActiveMQ to Implement Synchronous Request/Response.

Implementing a synchronous request/response pattern with Apache Camel and ActiveMQ is quite a bit easier than you may expected, and has allowed us to leverage our current messaging infrastructure to facilitate synchronous exchanges between applications where we otherwise may have needed to create a new web service.

Below is an example of setting up two Camel endpoints which will demonstrate the request/response pattern.

First, configure the connection to the JMS message broker.  In this case, an ActiveMQ broker is created in-process.

public void startBroker() throws Exception {
context = new DefaultCamelContext();
context.addComponent("jms-broker", activeMQComponent("vm://localhost?broker.persistent=false"));


Next, set up the producer route.  First a processor is created, which will print the body of the message.  The route will be executed when a file is dropped into the /Users/RobTerpilowski/tmp/in directory, and routed to the robt.test.queue destination.  Once the route has completed, the processor will be executed.  What we are hoping to see is that the message has been modified by the consuming endpoint when this route has completed.  The important piece to note here is the url:
exchangePattern=InOut tells Camel that the route is a syncrhonous request/response

public void buildProducerRoute(CamelContext context) throws Exception {
context.addRoutes(new RouteBuilder() {
public void configure() throws Exception {
Processor processor = (Exchange exchange) -> {
System.out.println("PRODUCER Received response: " + exchange.getIn().getBody(String.class));


Next, set up the consumer endpoint.  Again, a processor is created which will be run when the route has completed.  This processor will first print the message that the producer sent.  It will then replace the message with a new message saying that the original message was seen.  This endpoint will listen on the robt.test.queue and route the result to the directory /Users/RobTerpilowski/tmp/out.  When the route has completed, the processor will update the message.  If everything works correctly, the producer endpoint should be able to see the modified message.

public void buildConsumerRoute(CamelContext context) throws Exception {
context.addRoutes(new RouteBuilder() {
public void configure() throws Exception {
Processor processor = (Exchange exchange) -> {
System.out.println("CONSUMER received message: " + exchange.getIn().getBody(String.class));
exchange.getIn().setBody("I Saw it!!! It contained: " + exchange.getIn().getBody(String.class));

So now that the routes are set up, it’s time to send a message.  Saving a file in the specified input directory will kick things off.  The file will contain the text “HelloCamel”.


$ echo "HelloCamel" > /Users/RobTerpilowski/tmp/in/message.txt

The consumer listening on the robt.test.queue immediately sees the message arrives, and prints the message body.

CONSUMER received message: HelloCamel


The producer endpoint then receives the modified message back, with confirmation that the consumer endpoint did indeed see the message.

PRODUCER Received response: I Saw it!!! It contained: HelloCamel


Make Sure Your Server Clocks are in Sync!

As I finished my first test services that would utilize the Request/Response pattern I created an integration test where they connected to messaging broker that was running in-process.  Things looked great, and the services were communicating without any issues.  I deployed the services to a Wildfly instance running locally, which were pointing at a messaging broker on our staging server.  However, this time when I started my test, the requests were consistently timing out, and never making it back from the second service.  I literally spent the entire day deconstructing each service piece by piece to see what was going on.  I then remembered something about clock synchronization in the Camel JMS documentation.  I checked both the clock on the VM and the clock on the staging server and proceeded to do a face palm when I saw there was a four hour difference, lesson learned!


twitter: @RobTerpilowski

CamelOne 2012 Summary


I’ve just returned from the CamelOne Open Source Integration Conference in Boston where I was invited as a speaker to present how Lynden is using ActiveMQ as part of its freight tracking infrastructure.  As expected, there were a number of big announcements and very interesting projects that speakers from all over the world showcased during the 2 day conference.  Here are few new projects/applications that I found may be useful here at Lynden in particular.

I thought this was by far the most interesting project announced during the conference.  Fuse Fabric is a distributed configuration, management and provisioning system for Fuse ESB, Fuse Message Broker and the Apache open source integration solutions (ActiveMQ, Camel, CXF, Karaf & ServiceMix).  Basically providing a single point where configuration information for brokers (and other applications) can go to retrieve their config info.   Under the covers it is making use of Apache ZooKeeper to store the cluster configuration and node registration.

We are currently only running 1 production message broker, but we have 8 message brokers in our test environment.  Fuse Fabric could greatly reduce the overhead of managing the configuration of those brokers.

    Apache ZooKeeper
As it seems at these conferences, there are always ideas flying around among the attendees and I always seem to obtain just as many new ideas attending sessions as I do talking to other architects and developers outside the sessions.  I was explaining to someone the challenge we have with managing the configurations of our 70+ web applications and services in our production, staging and test environments.  His immediate response was to take a look at Apache ZooKeeper.  They had been using it the past few months for managing configuration of multiple environments with good success.  As mentioned above, Fuse Fabric makes use of Apache ZooKeeper, and I’m curious to learn if Fabric can be used beyond the integration apps it mentions (ESB, MQ, etc), and also used for things such as our web applications and services.

Fuse Management Console
Provides a toolf for managing large-scale deployments of Fuse ESB Enterprise and Fuse MQ Enterprise.  A FuseSource subscription is required to install the console, which provides the following benefits:

  • Simplified management– key status metrics and commands are accessible from a unified, centralized console
  • Automatic provisioning– dependencies between components and include files are tracked and managed
  • Profile management– simplify the creation and management of customized brokers by defining configuration profiles
  • Automated updates– updates to a single component are automatically deployed to the appropriate brokers based on the profile
  • Local or Cloud deployment – your integration infrastructure can be deployed and managed locally or in the Cloud

MQTT protocol support in ActiveMQ 5.6
MQTT protocol has come a little late for Lynden since we already have a solution for connecting Windows CE devices to ActiveMQ.  MQ Telemetry Transport (MQTT) is a machine to machine protocol design as an extremely lightweight publish/subscribe messaging transport.  ActiveMQ 5.6 and Apollo 1.0 will both support MQTT.

Fuse HQ
Fuse HQ is a SOA management and monitoring system and is available with a FuseSource support subscription.  Fuse HQ takes advantage of the JMX-based reporting capabilities of the Fuse products.  Some of the features Fuse HQ provides are:

Monitor systems– comprehensive cross-stack dashboard make it easy to check the health of systems and services

  • Role-based access controls for managing user visibility
  • Visually review relationships between FuseSource servers and other managed resources
  • Explore and generate reports of real-time and historic logs, configurations, and events
  • Impact analysis and change control based on configuration tracking
  • Collect, chart and view real-time and historic metrics from hardware, network and applications without invasive instrumentation
  • Auto-discover hardware and software attributes including configuration, version numbers, memory, CPU, disk and network devices
  • Distributed monitoring allows for linear scalability of Fuse HQ server

Manage systems– execute control operations to manage the FuseSource environment

  • Role-based access controls for managing user permissions
  • External authentication leverages existing LDAP or Kerberos directories
  • Automate maintenance activities with scheduled controls and conditional responses to alerts
  • High availability allows multiple Fuse HQ servers to assume workloads in the event of a server failure

Intelligent Alerts– definable alerts proactively identify problems before they occur

  • Follow-the-sun assignments routs alerts to the appropriate person based on the time of day and users’ scheduled availability
  • Integrate traps with existing IT operations suite
  • Define alerts once and apply to large, global resource groups
  • Alerts can be multiconditional, role-based and group based, and trigger recovery processes
  • Generate alerts on changes in configuration or key attributes of any managed resource

twitter: @RobTerp

Hacking ActiveMQ to Retrieve Connection Statistics

This hack is so awful that I am almost embarrassed to blog about it, but this was the only way to obtain the information we needed from ActiveMQ to display in an MBean in JConsole.

We have written a Java proxy server to allow Window CE devices running a C# application to connect to our JMS message broker.  The proxy server runs within a Glassfish instance to make managing its lifecycle a bit easier.  For system diagnostics and debugging purposes we created an MBean to be accessed via JMX so we could get real-time statistics from the proxy through JConsole or VisualVM, such as # of clients connected, message counts, as well as total connections open from the proxy to the broker.

The number of connections that the proxy had opened to the broker seemed like it *should* be a fairly straightforward task.  Enable statistics on the ActiveMQConnectionFactory instance by calling setStatsEnabled(true) and then retrieve the connection statistics from the factory via a call to the getStats() method, and then get the total number of open connections. However, no matter what we did, the getStats() call always returned null, and for good reason, upon inspecting the ActiveMQConnectionFactory source code we found:

218    public StatsImpl getStats() {
219        // TODO
220        return null;
221    }

In the ActiveMQConnectionFactory class there resides a field called factoryStats of type JMSStatsImpl.  The field is protected and there are no get() methods to retrieve this field, so we had to resort to drastic measures if we really wanted to know how many open connections the proxy had open to the broker.  The code to get the connection information from the factory appears below.

    public int getBrokerConnectionCount() {

        int count = -1;
        try {
            Field factoryStatsField = ActiveMQConnectionFactory.class.getDeclaredField("factoryStats");
            JMSStatsImpl factoryStats = (JMSStatsImpl) factoryStatsField.get(connectionFactory);
            count = factoryStats.getConnections().length;
        } catch (Exception ex) {
            logger.error("Attempted to get Broker connection count, but failed due to: " + ex.getMessage());
        return count;

Even though the field is protected, reflection can be used to set the field to “accessible” and then the value of the field can be read directly from the factory object.  This is not guaranteed to work in future releases of ActiveMQ, but since this is only for informational purposes in JConsole this was good enough for what we needed.

Twitter: @RobTerp

Coping with unreliable wireless network connections between Windows CE and JMS using Apache MINA

Our freight tracking and cross docking system which consists of handheld barcode scanners which are running a C# application on Windows CE (written in-house), needed a reliable way to communicate with each other using our backend ActiveMQ Java Message Service infrastructure.  These devices include Symbol MC9090 handheld scanners, WT4090 wrist mount scanners as well as VC5090 forklift mounted scanners (all pictured below).

MC 9090 Handheld Device

VC 5090 Forklift Mounted Device

WT 4090 Wristmount Device

There were a number of challenges integrating these devices which were running .Net software communicating via a Java messaging server.  The original route we headed down was to use the .Net Messaging Service API (NMS) for ActiveMQ, with the devices communicating directly to the message broker. The NMS libraries are .NET assemblies that provide access to ActiveMQ via the native OpenWire protocol. This approach however proved to be an ineffective solution for the following reasons.

  • We learned that the next release of the .NET Message Service API (NMS) library would be dropping support for Windows CE. So even if we wanted to continue this route we either couldn’t or we’d need to modify the NMS library ourselves to support our CE clients.
  • NMS/OpenWire failover protocol was problematic on Windows CE.  Even though we were only running 1 production messaging broker, we were using the failover protocol on the client since it provided a way for the client to automatically attempt to reconnect to the messaging broker should the connection go down. (This happens frequently on these devices).
  • Unreliable network connections.  To expand on the point above further, devices are constantly taken in and out of truck trailers where the Wi-Fi connections are spotty.  Connections to the messaging system were not always getting properly cleaned up, and as a result, excessive resources would gradually be consumed over time on the broker which caused instability and occasional crashes on the server side.

In light of the above challenges it was decided to develop a proxy server to handle the communication between the CE devices and the messaging broker and act as a buffer for all the frequent disconnects/reconnects that the devices experienced.  We had initially looked at the REST API provided by ActiveMQ, but due to lack of support for durable subscriptions at the time (it may support them now), we ruled that option out.

Apache MINA and the MINA State Machine Libraries

The Apache MINA libraries provide a way to create network applications in an efficient and scalable way. The framework makes use of Java NIO and features an event driven API which allowed for a very elegant design of the proxy server. Even better, the MINA State Machine (which has no official release at this time, but the code can be downloaded and built from the MINA Subversion repository) provided an easy way to manage various states of the messaging clients (ie. Connected, Subscribed, Unsubscribed, etc). An ASCII based protocol was used as the communication medium between the clients and the proxy server.  In this manner, we could also make use of the proxy to connect Java desktop clients since their connection to the messaging system would be somewhat transient (as users stopped and started the application), although not as transient as the device connections in the field.  The ASCII protocol would also allow us to tie in our legacy BASIC applications to the messaging system which are running on the UniVerse database and application platform.  Existing server side Java applications would still be allowed to talk directly with the broker because in theory, their connection to the messaging system should be constant and stable. We are currently hosting the proxy server within our Glassfish3 application server with our other web applications since it provides an easy way to manage the proxy’s lifecycle. The chart below illustrates the current paths the various applications in our system take to access the message server.

Future directions

One obvious weakness with the design that is illustrated above is that of failover support (or lack thereof). Even if we were to run multiple message brokers for fault tolerance, the proxy server itself is still a single point of failure within the system. Support for Java and BASIC clients as previously mentioned will also be at the top of our list of additions to make to our messaging solution.

Twitter: @RobTerp