CQ: Designing a JMS Implementation (Part 1)

I am ashamed to admit it, but I started this project a couple years ago. You can say it is being delivered late and over-budget (ok, not over-budget since it really cost nothing to produce so far other than time). There have been a few tantalizing tidbits posted here to introduce the software project (CQ is back on track and Reworking and Refactoring CQ) and this short article is more of the same, but with a bit of substance.

Tonight, basic CQ went &live&! It can now process multiple sessions in parallel without the issues I had encountered before. I can write new entries, read next messages, and client-confirm these messages now.

Basic design of CQ

CQ is a Java client/server application that implements the JMS standard. My basic requirements are that it provide queuing, that it supports several message types (e.g. ASCII, Unicode, Byte-array), that it supports auto-confirmation or client confirmation of reads, that a server might support multiple sessions, and that the server can support any of several different databases in the background to provide its persistence. JMS drivers also need to be provided for (of course) Java and .NET.

Future goals include server-to-server queues and eventually to support topics. Both of these present very interesting wrinkles from a development standpoint. Also, multiple sessions spanning a pool of connections would also be desirable. The underpinnings to support this are already in the messaging protocol used by CQ.

CQ is meant to be totally open-source. While a license has not been chosen yet, I will probably strongly consider the Mozilla Public License. Now that CQ has actually emerged from its shell (so to speak), I should really start making this decision and move to create a SourceForge project to release it to the world. This will happen as soon as I test databases other than MS SQL Server. I have already written code to support Oracle and MySQL and maybe, once released to the wild, other enterprising souls will support other popular databases such as Postgres, etc. I think it would also be a good idea (and a fun side-project) to add a filesystem storage plugin to obviate the need for a database engine should this be a preferred installation choice.

Why Java? This is a pretty self-explanatory decision. Java is ubiquitous on most platforms so choosing it over, say .NET, is a no-brainer. Personally, I would like to run CQ on my Linux server in addition to Windows.

Database Design

The database provides the persistence layer for the queue messages. CQ’s database schema is relatively simple and straightforward. As you can easily see below, the central tables are the Message table (used to contain the various pieces of the header metadata) and the MessageBody table (which contains, strangely enough, the body of the message). The Queue table tracks all the queues which in turn are attached to the queue manager in the Manager table. For later use, there are remote manager and topic tables.

Code Design Highlights

The messaging protocol is broken into three distinct layers. The lowest is known as the Socket Messaging layer and is used to establish an initial connection from a client to a server. There are then two parallel protocol layers, one being the Client Messaging layer which supports a client-server connection, and the other is the Server Messaging layer which is intended for the future server-server connectivity design. The server-server design will permit a queue manager to be extended over a geographically disperse area while providing clients with local access to the queues.

One of the neat parts of this codebase is how messages are dispatched from client to server. This takes full advantage of the Thread wait() and Thread.interrupt() support built into Java. Essentially, a client can send a protocol message to the server that might receive a response immediately or following some delay. Additionally, some of the message types can have a timeout that expires. The central class supporting this mechanism is the PendingRequest class:

/* PendingRequest.java
 *
 * Copyright (c) 2010, Chris Laforet Software/Christopher Laforet
 * All Rights Reserved
 *
 * Started: Jul 8, 2010
 * Revision Information: $Date: 2010-08-20 02:28:36 $
 *                       $Revision: 1.6 $
 * 
 */

package com.chrislaforetsoftware.cq.common.request;

import java.util.Date;

import com.chrislaforetsoftware.cq.common.message.ClientServerExchangeMessage;
import com.chrislaforetsoftware.cq.common.util.Timeout;

/** Container for a pending request in or out.  Each request
 * has a specific request ID and a state machine driving it.
 * 
 * @author Christopher Laforet
 */

public class PendingRequest
	{
	private int _sessionID;
	private int _requestID;
	private Thread _requestingThread;
	private ClientServerExchangeMessage _message = null;
	private Timeout _timeout;
	
	/** Creates a holding area for a request going to the other side.
	 * When there is a response, or on timeout, the requesting thread
	 * is wakened from its sleep with an interrupt. 
	 * 
	 * @param SessionID the ID of the session for this request.
	 * @param RequestID the ID of the request itself.
	 * @param RequestingThread the thread making the request.
	 * @param TimeoutMsec the timeout to wait before expiring or 0 for infinite wait.
	 */
	public PendingRequest(int SessionID,int RequestID,Thread RequestingThread,long TimeoutMsec)
		{
		_sessionID = SessionID;
		_requestID = RequestID;
		_requestingThread = RequestingThread;
		if (TimeoutMsec > 0)
			_timeout = Timeout.createTimeout(TimeoutMsec);
		}
	
	
	/** Retrieves the session ID for this parked request.
	 * 
	 * @return the session ID for the request.
	 */
	public int getSessionID()
		{
		return _sessionID;
		}
	
	/** Retrieves the request ID for this parked request.
	 * 
	 * @return the request ID for the request.
	 */
	public int getRequestID()
		{
		return _requestID;
		}
	
	
	/** Checks if the request has expired.
	 * 
	 * @param UtcNow the time to check against.
	 * 
	 * @return true if expired or false.
	 */
	public boolean isExpired(Date UtcNow)
		{
		if (_timeout == null)
			return false;
		
		return _timeout.isExpired(UtcNow);
		}

	
	/** Tickles the requesting thread to wake up after a 
	 * timeout without a message.
	 */
	public void awakenThread()
		{
		awakenThread(null);
		}

	
	/** Tickles the requesting thread to start acting
	 * on a ClientServerMessage (which is a ProtocolMessage).
	 * 
	 * @param Message the ClientServerExchangeMessage received.  This will be null on 
	 * timeout or be a valid message if one was received.
	 */
	public void awakenThread(ClientServerExchangeMessage Message)
		{
		synchronized (this)
			{
			_message = Message;
			_requestingThread.interrupt();
			}
		}
	
	
	/** Retrieves the message that triggered the awakening
	 * which will be null if timed out.
	 * 
	 * @return the message or null.
	 */
	public ClientServerExchangeMessage getReplyMessage()
		{
		return _message;
		}
	}

Handling these items in the ConnectionManager class is a HashMap:

private HashMap<String,PendingRequest> _requestList = new HashMap<String,PendingRequest>();

This hashmap serves a few purposes. First of all, it is a container for the pending requests within the specific connection. Also, it permits an efficient lookup for a match (as we will see in a moment) since the items are added with a key of Session+Message IDs. As responses return, the key can easily locate the waiting thread.

When a protocol message is dispatched from the client to the server, it is handled in the ConnectionManager as follows:

/** Transacts a message between the client and the server.
 * A transacted message is one that requires a response.
 * 
 * @param Message the message to transmit.
 * @param TimeoutMsec the number of msec to wait or 0 for indefinite wait.
 * @return the response from the server or null if timed out.
 */
public ClientServerExchangeMessage transactMessage(ClientServerExchangeMessage Message,long TimeoutMsec)
        throws IOException, ConnectionException, TransmitMessageException 
        {
        if (!_isOpen)
                throw new ConnectionException("Connection is closed");

        synchronized (this)
                {
                boolean foundSession = false;
                for (SessionManager sm : _sessionList)
                        {
                        if (sm.getSessionID() == Message.getSessionKey())
                                {
                                foundSession = true;
                                break;
                                }
                        }
                
                if (!foundSession)
                        throw new ConnectionException("Connection does not support session " + Message.getSessionKey());
                }

        // place request into list before sending message...
        PendingRequest request = null;
        synchronized (_requestList)
                {
                request = new PendingRequest(Message.getSessionKey(),Message.getRequestID(),Thread.currentThread(),TimeoutMsec);
                _requestList.put(request.getSessionID() + ":" + request.getRequestID(),request);
                }
        
        try
                {
                _connection.send(Message);
                }
        catch (Exception ee)
                {
                synchronized (_requestList)
                        {
                        _requestList.remove(request.getRequestID());
                        }
                throw new TransmitMessageException("Unable to transmit message to other side: " + ee);
                }
                
        synchronized (request)
                {
                try
                        {
                        request.wait(TimeoutMsec);
                        }
                catch (InterruptedException ie)
                        {
                        }
                }
        
        synchronized (_requestList)
                {
                _requestList.remove(request.getRequestID());
                }

        return request.getReplyMessage();
        }

The key is that the PendingRequest is created and inserted into the request map. The protocol message is then sent to the server and then the thread is made to sleep waiting for the response. The ConnectionManager has a thread that looks for incoming messages and then dispatches them to the waiting threads. This is done by posting the message to the PendingRequest object then calling it to awake the waiting thread which is holding the lock for its PendingRequest object.

public void run()
        {
        int SleepPeriodMsec = 1;
        Date nextCheck = new Date(new Date().getTime() + 1000);
        try
                {
                while (true)
                        {
                        try
                                {
                                // determine if there is any pending message...
                                ClientServerExchangeMessage message = null;
                                synchronized (this)
                                        {
                                        message = (ClientServerExchangeMessage)_connection.readMessage();
                                        }
                                
                                if (message == null)
                                        {
                                        Date now = new Date();
                                        if (nextCheck.compareTo(now) <= 0)
                                                {
                                                synchronized (_requestList)
                                                        {
                                                        // walks down the request chain to see if any timeouts need processing
                                                        for (Iterator<String> it = _requestList.keySet().iterator(); it.hasNext(); )
                                                                {
                                                                String toCheck = it.next();
                                                                if (_requestList.get(toCheck).isExpired(now))
                                                                        {
                                                                        _requestList.get(toCheck).awakenThread();
                                                                        it.remove();
                                                                        }
                                                                }
                                                        }
                                                nextCheck = new Date(new Date().getTime() + 1000);
                                                }
                                        else
                                                {
                                                SleepPeriodMsec += 10;
                                                if (SleepPeriodMsec > 750)
                                                        SleepPeriodMsec = 750;
                                                }
                                
                                        Thread.sleep(SleepPeriodMsec);		// sleep on a sliding scale
                                        continue;
                                        }
                                
                                
                                // reset sleep period to default...
                                SleepPeriodMsec = 1;

                                // determine if we have a pending request to dispatch to
                                String key = message.getSessionKey() + ":" + message.getRequestID();
                                PendingRequest request = null;
                                synchronized (_requestList)
                                        {
                                        if (_requestList.containsKey(key))
                                                {
                                                request = _requestList.get(key);
                                                _requestList.remove(key);
                                                }
                                        }
                                
                                if (request == null)
                                        Logger.log("ConnectionManager.run(): Unable to find a pending request for session:request of " + key);
                                else
                                        request.awakenThread(message);
                                }
                        catch (IOException ioe)
                                {
                                Logger.log("ConnectionManager.run(): Caught an IOException reading from connection: " + ioe);
                                }
                        catch (InterruptedException ie)
                                {
                                throw ie;
                                }
                        catch (Exception ee)
                                {
                                Logger.log("ConnectionManager.run(): Caught an Exception reading from connection: " + ee);
                                }
                        }
                }
        catch (InterruptedException ie)
                {
                // exit has been requested....
                }
        }

Of course, this code does have a problem which I have not been able to overcome yet. I will discuss this below in the Pending Problems section.

Another cool part of this application is the way that it does use a plugin-type architecture for handling the persistence layer. This basic overloading of the Data Access Layer (DAL) drivers can be seen in the following screenshot from Eclipse:

The plugin architecture also extends (currently) into the .NET configuration tool and will do so in the Java version when it is developed.

Pending Problems and Immediate Coding Needs

The biggest problem I have is with the busy-waiting loop in the ConnectionManager’s run() method. Unfortunately, I cannot think of an easy way to deal with it. I try to use an incremental sleep algorithm but, ultimately, it is less than ideal. I am open to any good ideas to resolve this non-scalable issue.

I also need to determine what to do about the configuration tool. I initially created a Windows Forms app in .NET to make a quick solution to the problem but I really need to work on a native Java app. My question is should I make it Swing or use Eclipse’s SWT library? I need to make this decision pretty quickly!

I also need to finish the JMS driver code for CQ in Java and develop a reasonable .NET facsimile of it. One thing that I do like with IBM’s MQ is that the .NET drivers follow the Java JMS spec as closely as possible, and I should strive to do the same with CQ.

Finally, I need to enforce message expiration in the database. This logic needs to be added to the server code to ensure that the contract for expirable messages is in place.

About claforet

I have been photographing since my college days. My current gear is a Nikon D-700 along with a plethora of lenses. My first serious camera was a Canon EF back in the early 80s. A Canon A-1 soon followed. Since then, I also have had a Minolta Maxxum, Nikon FM, Nikon F4, and Nikon Point and Shoot in my film days. I have had and maintained a private full color and B&W lab on and off for much of that time. My photos can be found at https://www.flickr.com/photos/claforet/ Photography and painting are my "sanity breaks" that save me from my day-to-day software development existence! I host a group in Facebook called "Painting as a Second Language" for anyone who is interested in painting as an outlet for the day-to-day pressures. Please look it up and join if you think it will meet your needs. Recently, I have also branched into the video world and am learning how to shoot video better and how to edit compelling video sequences. My learning experiences will be part of this blog and my videos can be seen at http://www.vimeo.com/claforet I live in, and photograph mostly around, North Carolina. I love traveling so there are many shots from states around us, out West, and other places. My daughter has been bitten by the photography bug too. She has spent time in a B&W lab and loves the excitement generated by just the smell of the chemicals. Maybe one day she will take over where I leave off....
This entry was posted in Programming, Software architecture and development and tagged , , , , , , , , , . Bookmark the permalink.

1 Response to CQ: Designing a JMS Implementation (Part 1)

  1. archsentient says:

    Very interesting implementation. A solution would result in an infinitely scalable multi-agent data transfer model which would trend with increased processing in parallel as apposed to calculations per second.

    I’ve pondered this concept quite a bit myself, and we aren’t the only ones^. Multi-agent AI constructs zinging along light speed require this messaging technology for paravirtualization.

    ^ Beautiful Archiecture
    by Diomidis Spinellis; Georgios Gousios
    Publisher: O’Reilly Media, Inc.
    Pub Date: January 29, 2009
    Print ISBN-13: 978-0-596-51798-4

Leave a comment