August 7, 2010

Java NIO server example

Tags: Java, Technical

I was looking to learn how to write a non-blocking IO server in Java, but couldn’t find one that suited my needs online. I found this example, but it didn’t handle my situation. There is also Apache MINA, but it was a little complex for my simple needs. So armed with these examples and a couple of tutorials (here and here), I created my own.

My code is available here. It is an example so feel free to modify it as required. It consists of an abstract non-blocking server and a matching blocking client. Create concrete implementations to use them - code showing example usage is in the tests. Both are designed to be run in their own thread (thus the use of the Runnable interface), and are single threaded - more on the concurrency options later. The client is blocking as it only connects to a single server and runs in its own thread, thus it will still have to wait for messages from the server so there is no benefit to making it non-blocking. The server only handles standard TCP connections. If UDP, SSL or something else is required you will need to add it yourself.

In writing this code I learnt a few things. Apart from the standard API calls to open and manage connections it is useful to know the differing use of selection keys, the tricks of message handling, and threading issues.

The basics of opening and handling a connection are commonly available on the web and repeated in the simplified code block below (with some bits cut out - the full version is in the code download). Start by opening a Selector (a multiplexor for network channels) with each channel denoted by a SelectionKey. Then open a server socket on a specified port. By registering the socket with the selector for OP_ACCEPT events, any incoming connections will be available on the selector. The code below then loops forever waiting for events on the selector. When it gets one, if it is a connection request, it accepts the connection and registers an interest in messages sent on that connection (via the OP_READ registration). If it is a message (key.isReadable()), then the code to handle the message has yet to be written. The code below is also quite brittle, any error will result in the server completely stopping.

Selector selector = null;
ServerSocketChannel server = null;
try { 
	selector = Selector.open(); 
	server = ServerSocketChannel.open(); 
	server.socket().bind(new InetSocketAddress(port)); 
	server.configureBlocking(false); 
	server.register(selector, SelectionKey.OP_ACCEPT); 
	while (true) {
		selector.select();
		for (Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext();) { 
			SelectionKey key = i.next(); 
			i.remove(); 
			if (key.isConnectable()) { 
				((SocketChannel)key.channel()).finishConnect(); 
			} 
			if (key.isAcceptable()) { 
				// accept connection 
				SocketChannel client = server.accept(); 
				client.configureBlocking(false); 
				client.socket().setTcpNoDelay(true); 
				client.register(selector, SelectionKey.OP_READ);
			} 
			if (key.isReadable()) { 
				// ...read messages...
			} 
		}
	}   		
} catch (Throwable e) { 
	throw new RuntimeException("Server failure: "+e.getMessage());
} finally {
	try {
		selector.close();
		server.socket().close();
		server.close();
		stopped();
	} catch (Exception e) {
		// do nothing - server failed
	}
}

One thing to note about selection keys is that they do not represent a socket. Instead they are a channel registered with the selector. Thus the event to open a connection from a client (the OP_ACCEPT event) will be on a different key to the messages sent (the OP_READ events) from that client. This means that different types of events from the same client will be on different keys. Don’t try to compare them. The benefit of this is that the different events can be registered with different selectors (the reason to do this is threading - detailed below).

When reading a message there are a number of situations to consider. When reading from the connection the resulting data might not contain all of a message (the remainder arriving at some later time) or it may contain more than one message. Thus some way of denoting the end of a message must be considered. Then the reading code must buffer input data and split it up into messages. Common ways of denoting the end of a message are:

  1. a fixed message size
  2. prefixing the message with the size of the message
  3. a special symbol representing the end of the message

My code uses the second method. Each method is prefixed with 2 bytes containing the number of bytes in the rest of the message (thus limiting the message to less than 65535 bytes). Also data is read in using ByteBuffers, so it is useful to have a code understanding of how to use them (check the API link for a starting point). The code below reads the data and passes it to readMessage method to be split up into discrete messages. Note the use of the readBuffer. The default buffer size should be set such that it is as small as possible, but at the same time not so small that messages are regularly larger than the buffer. The smaller the buffer, the faster it can be processed. However, if a message received is larger than the buffer, the buffer must be resized to handle it.

private List<ByteBuffer> readIncomingMessage(SelectionKey key) throws IOException { 
	ByteBuffer readBuffer = readBuffers.get(key); 
	if (readBuffer==null) {
		readBuffer = ByteBuffer.allocate(defaultBufferSize); 
		readBuffers.put(key, readBuffer); 
	}
	if (((ReadableByteChannel)key.channel()).read(readBuffer)==-1) {
		throw new IOException("Read on closed key");
	}
	
	readBuffer.flip(); 
	List<ByteBuffer> result = new ArrayList<ByteBuffer>();
	   	
	ByteBuffer msg = readMessage(key, readBuffer);
	while (msg!=null) {
		result.add(msg);
		msg = readMessage(key, readBuffer);
	}
	
 	return result;
}

Here is the code I used to split the buffered data into messages.

private ByteBuffer readMessage(SelectionKey key, ByteBuffer readBuffer) {
	int bytesToRead; 
	if (readBuffer.remaining()>messageLength.byteLength()) { // must have at least enough bytes to read the size of the message	
 		byte[] lengthBytes = new byte[messageLength.byteLength()];
		readBuffer.get(lengthBytes);
		bytesToRead = (int)messageLength.bytesToLength(lengthBytes);
		if ((readBuffer.limit()-readBuffer.position())<bytesToRead) { 
			// Not enough data - prepare for writing again 
			if (readBuffer.limit()==readBuffer.capacity()) {
	    		// message may be longer than buffer => resize buffer to message size
				int oldCapacity = readBuffer.capacity();
				ByteBuffer tmp = ByteBuffer.allocate(bytesToRead+messageLength.byteLength());
				readBuffer.position(0);
				tmp.put(readBuffer);
				readBuffer = tmp;   				
				readBuffer.position(oldCapacity); 
    			readBuffer.limit(readBuffer.capacity()); 
				readBuffers.put(key, readBuffer); 
	    		return null;
	    	} else {
	    		// rest for writing
    			readBuffer.position(readBuffer.limit()); 
    			readBuffer.limit(readBuffer.capacity()); 
    			return null; 
	    	}
		} 
	} else { 
		// Not enough data - prepare for writing again 
		readBuffer.position(readBuffer.limit()); 
		readBuffer.limit(readBuffer.capacity()); 
		return null; 
	} 
	byte[] resultMessage = new byte[bytesToRead];
	readBuffer.get(resultMessage, 0, bytesToRead); 
	// remove read message from buffer
	int remaining = readBuffer.remaining();
	readBuffer.limit(readBuffer.capacity());
	readBuffer.compact();
	readBuffer.position(0);
	readBuffer.limit(remaining);
	return ByteBuffer.wrap(resultMessage);
} 

The example code provided is single threaded - all connections are handled by the same thread. It is possible to use multiple threads. On an individual key, reading and writing can be done by different threads, although only one thread at a time can do either (that is, there can’t be 2 threads both reading at the same time). Similarly only one thread at a time can use a selector. Single threaded code suited my needs, but there seem to be a few ways to handle concurrency. Each of the ways I describe uses a pool of threads handling read events, and one selector and thread handling OP_ACCEPT events.

  1. One selector for each connected client. When an accept event is received, create a new selector and register for read events on that new selector. This selector then has a task to listen and handle read events and it is this task executed by the thread pool. I’m not sure how heavy a selector is on resource usage, so I don’t know how well this would scale.
  2. One selector per thread. When creating executor threads assign each a selector and then assign clients to each selector in some balanced manner. Then each thread handles the read events on its selector. This is the method that MINA uses. The problem comes with how the threads are balanced (MINA uses a round-robin algorithm) - if not careful it is possible to end up with some threads very busy and others doing very little.
  3. All events on one selector. This then requires some careful synchronisation as keys are passed out to the threads to be read while ensuring they are not read by other threads until the current operation is complete. Selectors would need to be very heavy before I could imagine this being the best way.

I’ll leave handling concurrency as a task for the interested reader. Good luck with your code. Again, my example is here.

Update 22nd Dec 2011: A reader emailed to point out that the original tests included a bug. Some of the tests used InputStreamReader which decodes bytes into a character stream, and if the characterset doesn’t use all 8 bits then the tests will fail for certain message lengths (when decoding the header length). I have updated the tests in the example to fix this.

Comments: I have removed the commenting system (for privacy concerns), below are the comments that were left on this post before doing so…

Vic Cekvenich @ 2015-01-20 - Hi, this is awesome, I’m using it and it works great. Do you have a suggestion how to make the client synchronous? ie write() return on messageReceived? On way is for me to pass a sequence # on write that server returns onmessage, and then I resolve to a stored callback. Do you have another suggestion?

Charles @ 2015-01-20 - Hi Vic, glad to hear you find the code useful! How to make the client synchronous? I think your suggestion sounds fine. I think the normal technique is to lock the client until a appropriate response is received. If you need multiple synchronous requests then there needs to be some way of identifying to which request a response belongs. A sequence # seems fine here, or a thread id would also work. Don’t forget to include some sort of timeout and associated error handling. I found this http://stackoverflow.com/qu…, it says largely the same thing, but it is always nice when someone agrees with you.