Using Apache Camel and ActiveMQ to Implement Synchronous Request/Response.

Implementing a synchronous request/response pattern with Apache Camel and ActiveMQ is quite a bit easier than you may expected, and has allowed us to leverage our current messaging infrastructure to facilitate synchronous exchanges between applications where we otherwise may have needed to create a new web service.

Below is an example of setting up two Camel endpoints which will demonstrate the request/response pattern.

First, configure the connection to the JMS message broker.  In this case, an ActiveMQ broker is created in-process.


public void startBroker() throws Exception {
context = new DefaultCamelContext();
context.addComponent("jms-broker", activeMQComponent("vm://localhost?broker.persistent=false"));
buildProducerRoute(context);
buildConsumerRoute(context);
context.start();
}

 

Next, set up the producer route.  First a processor is created, which will print the body of the message.  The route will be executed when a file is dropped into the /Users/RobTerpilowski/tmp/in directory, and routed to the robt.test.queue destination.  Once the route has completed, the processor will be executed.  What we are hoping to see is that the message has been modified by the consuming endpoint when this route has completed.  The important piece to note here is the url:
jms-broker:queue:robt.test.queue?exchangePattern=InOut
exchangePattern=InOut tells Camel that the route is a syncrhonous request/response


public void buildProducerRoute(CamelContext context) throws Exception {
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
Processor processor = (Exchange exchange) -> {
System.out.println("PRODUCER Received response: " + exchange.getIn().getBody(String.class));
};
from("file:///Users/RobTerpilowski/tmp/in")
.to("jms-broker:queue:robt.test.queue?exchangePattern=InOut")
.process(processor);
}
});
}

 

Next, set up the consumer endpoint.  Again, a processor is created which will be run when the route has completed.  This processor will first print the message that the producer sent.  It will then replace the message with a new message saying that the original message was seen.  This endpoint will listen on the robt.test.queue and route the result to the directory /Users/RobTerpilowski/tmp/out.  When the route has completed, the processor will update the message.  If everything works correctly, the producer endpoint should be able to see the modified message.


public void buildConsumerRoute(CamelContext context) throws Exception {
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
Processor processor = (Exchange exchange) -> {
System.out.println("CONSUMER received message: " + exchange.getIn().getBody(String.class));
exchange.getIn().setBody("I Saw it!!! It contained: " + exchange.getIn().getBody(String.class));
};
from("jms-broker:queue:robt.test.queue")
.to("file:///Users/RobTerpilowski/tmp/out")
.process(processor);
}
});
}

So now that the routes are set up, it’s time to send a message.  Saving a file in the specified input directory will kick things off.  The file will contain the text “HelloCamel”.

 

$ echo "HelloCamel" > /Users/RobTerpilowski/tmp/in/message.txt

The consumer listening on the robt.test.queue immediately sees the message arrives, and prints the message body.

CONSUMER received message: HelloCamel

 

The producer endpoint then receives the modified message back, with confirmation that the consumer endpoint did indeed see the message.

PRODUCER Received response: I Saw it!!! It contained: HelloCamel

 

Make Sure Your Server Clocks are in Sync!

As I finished my first test services that would utilize the Request/Response pattern I created an integration test where they connected to messaging broker that was running in-process.  Things looked great, and the services were communicating without any issues.  I deployed the services to a Wildfly instance running locally, which were pointing at a messaging broker on our staging server.  However, this time when I started my test, the requests were consistently timing out, and never making it back from the second service.  I literally spent the entire day deconstructing each service piece by piece to see what was going on.  I then remembered something about clock synchronization in the Camel JMS documentation.  I checked both the clock on the VM and the clock on the staging server and proceeded to do a face palm when I saw there was a four hour difference, lesson learned!

 

twitter: @RobTerpilowski

Leave a Reply