Using Apache Camel and ActiveMQ to Implement Synchronous Request/Response.

Implementing a synchronous request/response pattern with Apache Camel and ActiveMQ is quite a bit easier than you may expected, and has allowed us to leverage our current messaging infrastructure to facilitate synchronous exchanges between applications where we otherwise may have needed to create a new web service.

Below is an example of setting up two Camel endpoints which will demonstrate the request/response pattern.

First, configure the connection to the JMS message broker.  In this case, an ActiveMQ broker is created in-process.


public void startBroker() throws Exception {
context = new DefaultCamelContext();
context.addComponent("jms-broker", activeMQComponent("vm://localhost?broker.persistent=false"));
buildProducerRoute(context);
buildConsumerRoute(context);
context.start();
}

 

Next, set up the producer route.  First a processor is created, which will print the body of the message.  The route will be executed when a file is dropped into the /Users/RobTerpilowski/tmp/in directory, and routed to the robt.test.queue destination.  Once the route has completed, the processor will be executed.  What we are hoping to see is that the message has been modified by the consuming endpoint when this route has completed.  The important piece to note here is the url:
jms-broker:queue:robt.test.queue?exchangePattern=InOut
exchangePattern=InOut tells Camel that the route is a syncrhonous request/response


public void buildProducerRoute(CamelContext context) throws Exception {
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
Processor processor = (Exchange exchange) -> {
System.out.println("PRODUCER Received response: " + exchange.getIn().getBody(String.class));
};
from("file:///Users/RobTerpilowski/tmp/in")
.to("jms-broker:queue:robt.test.queue?exchangePattern=InOut")
.process(processor);
}
});
}

 

Next, set up the consumer endpoint.  Again, a processor is created which will be run when the route has completed.  This processor will first print the message that the producer sent.  It will then replace the message with a new message saying that the original message was seen.  This endpoint will listen on the robt.test.queue and route the result to the directory /Users/RobTerpilowski/tmp/out.  When the route has completed, the processor will update the message.  If everything works correctly, the producer endpoint should be able to see the modified message.


public void buildConsumerRoute(CamelContext context) throws Exception {
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
Processor processor = (Exchange exchange) -> {
System.out.println("CONSUMER received message: " + exchange.getIn().getBody(String.class));
exchange.getIn().setBody("I Saw it!!! It contained: " + exchange.getIn().getBody(String.class));
};
from("jms-broker:queue:robt.test.queue")
.to("file:///Users/RobTerpilowski/tmp/out")
.process(processor);
}
});
}

So now that the routes are set up, it’s time to send a message.  Saving a file in the specified input directory will kick things off.  The file will contain the text “HelloCamel”.

 

$ echo "HelloCamel" > /Users/RobTerpilowski/tmp/in/message.txt

The consumer listening on the robt.test.queue immediately sees the message arrives, and prints the message body.

CONSUMER received message: HelloCamel

 

The producer endpoint then receives the modified message back, with confirmation that the consumer endpoint did indeed see the message.

PRODUCER Received response: I Saw it!!! It contained: HelloCamel

 

Make Sure Your Server Clocks are in Sync!

As I finished my first test services that would utilize the Request/Response pattern I created an integration test where they connected to messaging broker that was running in-process.  Things looked great, and the services were communicating without any issues.  I deployed the services to a Wildfly instance running locally, which were pointing at a messaging broker on our staging server.  However, this time when I started my test, the requests were consistently timing out, and never making it back from the second service.  I literally spent the entire day deconstructing each service piece by piece to see what was going on.  I then remembered something about clock synchronization in the Camel JMS documentation.  I checked both the clock on the VM and the clock on the staging server and proceeded to do a face palm when I saw there was a four hour difference, lesson learned!

 

twitter: @RobTerpilowski

CamelOne 2012 Summary

camelone2012

I’ve just returned from the CamelOne Open Source Integration Conference in Boston where I was invited as a speaker to present how Lynden is using ActiveMQ as part of its freight tracking infrastructure.  As expected, there were a number of big announcements and very interesting projects that speakers from all over the world showcased during the 2 day conference.  Here are few new projects/applications that I found may be useful here at Lynden in particular.


I thought this was by far the most interesting project announced during the conference.  Fuse Fabric is a distributed configuration, management and provisioning system for Fuse ESB, Fuse Message Broker and the Apache open source integration solutions (ActiveMQ, Camel, CXF, Karaf & ServiceMix).  Basically providing a single point where configuration information for brokers (and other applications) can go to retrieve their config info.   Under the covers it is making use of Apache ZooKeeper to store the cluster configuration and node registration.

We are currently only running 1 production message broker, but we have 8 message brokers in our test environment.  Fuse Fabric could greatly reduce the overhead of managing the configuration of those brokers.

    Apache ZooKeeper
As it seems at these conferences, there are always ideas flying around among the attendees and I always seem to obtain just as many new ideas attending sessions as I do talking to other architects and developers outside the sessions.  I was explaining to someone the challenge we have with managing the configurations of our 70+ web applications and services in our production, staging and test environments.  His immediate response was to take a look at Apache ZooKeeper.  They had been using it the past few months for managing configuration of multiple environments with good success.  As mentioned above, Fuse Fabric makes use of Apache ZooKeeper, and I’m curious to learn if Fabric can be used beyond the integration apps it mentions (ESB, MQ, etc), and also used for things such as our web applications and services.

Fuse Management Console
Provides a toolf for managing large-scale deployments of Fuse ESB Enterprise and Fuse MQ Enterprise.  A FuseSource subscription is required to install the console, which provides the following benefits:

  • Simplified management– key status metrics and commands are accessible from a unified, centralized console
  • Automatic provisioning– dependencies between components and include files are tracked and managed
  • Profile management– simplify the creation and management of customized brokers by defining configuration profiles
  • Automated updates– updates to a single component are automatically deployed to the appropriate brokers based on the profile
  • Local or Cloud deployment – your integration infrastructure can be deployed and managed locally or in the Cloud

MQTT protocol support in ActiveMQ 5.6
MQTT protocol has come a little late for Lynden since we already have a solution for connecting Windows CE devices to ActiveMQ.  MQ Telemetry Transport (MQTT) is a machine to machine protocol design as an extremely lightweight publish/subscribe messaging transport.  ActiveMQ 5.6 and Apollo 1.0 will both support MQTT.

Fuse HQ
Fuse HQ is a SOA management and monitoring system and is available with a FuseSource support subscription.  Fuse HQ takes advantage of the JMX-based reporting capabilities of the Fuse products.  Some of the features Fuse HQ provides are:

Monitor systems– comprehensive cross-stack dashboard make it easy to check the health of systems and services

  • Role-based access controls for managing user visibility
  • Visually review relationships between FuseSource servers and other managed resources
  • Explore and generate reports of real-time and historic logs, configurations, and events
  • Impact analysis and change control based on configuration tracking
  • Collect, chart and view real-time and historic metrics from hardware, network and applications without invasive instrumentation
  • Auto-discover hardware and software attributes including configuration, version numbers, memory, CPU, disk and network devices
  • Distributed monitoring allows for linear scalability of Fuse HQ server

Manage systems– execute control operations to manage the FuseSource environment

  • Role-based access controls for managing user permissions
  • External authentication leverages existing LDAP or Kerberos directories
  • Automate maintenance activities with scheduled controls and conditional responses to alerts
  • High availability allows multiple Fuse HQ servers to assume workloads in the event of a server failure

Intelligent Alerts– definable alerts proactively identify problems before they occur

  • Follow-the-sun assignments routs alerts to the appropriate person based on the time of day and users’ scheduled availability
  • Integrate traps with existing IT operations suite
  • Define alerts once and apply to large, global resource groups
  • Alerts can be multiconditional, role-based and group based, and trigger recovery processes
  • Generate alerts on changes in configuration or key attributes of any managed resource

twitter: @RobTerp

Monitoring and Managing your Java EE application with JMX

I have recently become a big fan of the Java Management eXtensions (JMX) framework for monitoring and managing our server-side applications that are installed on Glassfish 3.  Setting an application up to be monitored via JMX with either VisualVM or JConsole (both available as part of the JDK) is a relatively easy and straightforward process, so much so, that I don’t know why we have not started using JMX earlier.

We have a application that runs within Glassfish and acts as a proxy from our handheld wireless devices in our warehouses to our ActiveMQ messaging server.  We wanted a way to monitor the health of this application in real-time and looked at using JMX for the task.

The process is fairly painless.  Start with creating an interface with “get” methods for the data that should be available through JMX.  The interface name should be appended with “MXBean” if any of the interface’s methods will need to return values other than primitives.  In the example below getClients() returns a Set of MessagingProxyClientMXBean objects. These MXBeans dive further into device specific info and will also show as a tree structure in VisualVM. (More about these beans later).  Below is the interface for exposing data related to our messaging proxy server.

public interface ProxyServerInfoMXBean {

    public int getConnectedClientCount();

    public String getProxyVersion();

    public int getLastMinuteMessageCount();

    public int getLastHourMessageCount();

    public int getLast24HourMessageCount();

    public int getSubscriberConnectionCount();

    public int getPublisherConnectionCount();

    public int getBrokerConnectionCount();

    public Set<MessagingProxyClientMXBean> getClients();

    public String getBuildDate();

}

As you may expect, the implementation of the interface for the proxy’s MXBean is pretty straightforward as well.

public class ProxyServerInfo implements ProxyServerInfoMXBean {

    protected Map<String, IClientContext> clientMap = new HashMap<String, IClientContext>();
    protected Logger logger;
    protected MessageCounter messageCounter;

    public ProxyServerInfo(Map<String, IClientContext> clientMap) {
        this.clientMap = clientMap;
        logger = Logger.getLogger( getClass() );
        messageCounter = MessageCounter.getInstance();
    }

    @Override
    public int getConnectedClientCount() {
        return clientMap.size();
    }

    @Override
    public Set<MessagingProxyClientMXBean> getClients() {
        Set<MessagingProxyClientMXBean> beanSet = new HashSet<MessagingProxyClientMXBean>();
        for (IClientContext context : clientMap.values()) {
            beanSet.add(new MessagingProxyClient(context));
        }

        return beanSet;
    }

    @Override
    public int getLast24HourMessageCount() {
        return messageCounter.getLast24HourCount();
    }

    @Override
    public int getLastHourMessageCount() {
        return messageCounter.getLastHourMessageCount();
    }

    @Override
    public int getLastMinuteMessageCount() {
        return messageCounter.getLastMinuteMessageCount();
    }

    @Override
    public String getProxyVersion() {
        return Server.getInstance().getVersion();
    }

    @Override
    public String getBuildDate() {
        return Server.getInstance().getBuildDate();
    }

    @Override
    public int getBrokerConnectionCount() {
        return Server.getInstance().getBrokerConnectionCount();
    }

    @Override
    public int getPublisherConnectionCount() {
        return Server.getInstance().getPublisherConnectionCount();
    }

    @Override
    public int getSubscriberConnectionCount() {
        return Server.getInstance().getSubscriberConnectionCount();
    }

}

The final piece to get the MXBean to appear in JConsole is to connect to the JMX instance and register the MXBean.  In the code snippet below, and unique nameString is defined which will determine the bean’s location in JConsole’s tree hierarchy.

protected MBeanServer mbs = null;
protected ProxyServerInfo proxyServerMBean;
protected String nameString = "com.lynden.messaging:type=MessagingProxyClients";

protected ObjectName mBeanName;

public void connect() {
   mbs = ManagementFactory.getPlatformMBeanServer();
   proxyServerMBean = new ProxyServerInfo(clientMap); 
  try { 
    mBeanName = new ObjectName(nameString); 
    try { 
      mbs.unregisterMBean(mBeanName); 
    } catch (Exception ex) { 
       //there may not already be a bean there to unregister. 
    }
      mbs.registerMBean(proxyServerMBean, mBeanName); 
    } catch (Exception ex) { 
      throw new IllegalStateException(ex);
   }
 }

Launching VisualVM and connecting to Glassfish’s JMX port, the “com.lynden.messaging” node appears and shows the results of the MessagingProxyClients info that was defined in the class and interface above.

image


Nesting MXBeans

As previously mentioned, MXBeans can be nested, which in this case can be used to display information about the individual clients that are connected to the proxy server.

Just like in the previous example an MXBean interface can be created to represent the various pieces of data about the client that we want available in JMX, along with an implementation of the interface (Not shown).

Registering the MXBean for the client is similar to the above process for registering the MXBean for the server itself, except for the “clientNameString” variable shown below, which shows a type=MessagingProxyClients,name=<some-client-id>.  Once that is in place, any new clients that connect to the proxy will automatically appear in JConsole without needing any sort of refresh.

String clientNameString = "com.lynden.messaging:type=MessagingProxyClients,name=";

MessagingProxyClient mpc = new MessagingProxyClient(client);
ObjectName clientName = new ObjectName(clientNameString + client.getClientId()); 
mbs.registerMBean(mpc, clientName); 
clientObjectList.add(clientNameString + client.getClientId());

When clients disconnect from the proxy, the client’s MXBeans are deregistered from JMX in a similar manner.

String clientNameString="com.lynden.messaging:type=MessagingProxyClients,name=";
mbs.unregisterMBean(new ObjectName(clientNameString + client.getClientId()));
clientObjectList.remove(clientNameString + client.getClientId());

Below is a screenshot from VisualVM of an expanded proxy server node, displaying all the connected client instances.  The right side of the window shows the various properties that are available from the client’s MXBean.

image


JMX Operations

One final piece of JMX functionality that can be useful is the ability to execute methods on the MXBeans (called Operations) in VisualVM/JConsole.  In order to create a method available via JMX it only needs to be in the MXBean interface and implementation, and not conform to the standard JavaBean “getter” format.  In our client’s MXBean, in addition to the getter methods, the bean also contained a removeClient() method which would force the client’s connection to the proxy to close.  Below is a snippet from the client’s MXBean’s implementation.

@Override
public void removeClient() {
     client.setFlaggedForDisposal(true);
}

Below is a screenshot from VisualVM showing that the method can be invoked from the bean’s “Operations” tab by clicking the “removeClient” button.

image

With this information available from our production servers via JMX we now have a very useful tool for debugging issues when they arise in production that will give us greater insight than the application’s log files alone.

twitter: @RobTerp

Hacking ActiveMQ to Retrieve Connection Statistics

This hack is so awful that I am almost embarrassed to blog about it, but this was the only way to obtain the information we needed from ActiveMQ to display in an MBean in JConsole.

We have written a Java proxy server to allow Window CE devices running a C# application to connect to our JMS message broker.  The proxy server runs within a Glassfish instance to make managing its lifecycle a bit easier.  For system diagnostics and debugging purposes we created an MBean to be accessed via JMX so we could get real-time statistics from the proxy through JConsole or VisualVM, such as # of clients connected, message counts, as well as total connections open from the proxy to the broker.

The number of connections that the proxy had opened to the broker seemed like it *should* be a fairly straightforward task.  Enable statistics on the ActiveMQConnectionFactory instance by calling setStatsEnabled(true) and then retrieve the connection statistics from the factory via a call to the getStats() method, and then get the total number of open connections. However, no matter what we did, the getStats() call always returned null, and for good reason, upon inspecting the ActiveMQConnectionFactory source code we found:

218    public StatsImpl getStats() {
219        // TODO
220        return null;
221    }

In the ActiveMQConnectionFactory class there resides a field called factoryStats of type JMSStatsImpl.  The field is protected and there are no get() methods to retrieve this field, so we had to resort to drastic measures if we really wanted to know how many open connections the proxy had open to the broker.  The code to get the connection information from the factory appears below.

    @Override
    public int getBrokerConnectionCount() {

        int count = -1;
        try {
            Field factoryStatsField = ActiveMQConnectionFactory.class.getDeclaredField("factoryStats");
            factoryStatsField.setAccessible(true);
            JMSStatsImpl factoryStats = (JMSStatsImpl) factoryStatsField.get(connectionFactory);
            count = factoryStats.getConnections().length;
        } catch (Exception ex) {
            logger.error("Attempted to get Broker connection count, but failed due to: " + ex.getMessage());
        }
        return count;
    }

Even though the field is protected, reflection can be used to set the field to “accessible” and then the value of the field can be read directly from the factory object.  This is not guaranteed to work in future releases of ActiveMQ, but since this is only for informational purposes in JConsole this was good enough for what we needed.

Twitter: @RobTerp