xdev.niodev
Class NIODevice

java.lang.Object
  extended by xdev.Device
      extended by xdev.niodev.NIODevice

public class NIODevice
extends Device

This class is implementation of xdev based on the Java New I/O package.

Overview

Java New I/O adds non-blocking I/O to the Java language, which is extensively used in this device, to provide MPI functionality. Instead of directly using java.net.Socket, niodev uses java.nio.channels.SocketChannel. This device alongwith the request classes like xdev.niodev.NIORequest, xdev.niodev.NIOSendRequest, and xdev.niodev.NIORecvRequest forms the basis of communication functionality.

Initialization

'niodev' reads a configuration file which could be placed in the local/shared file system, or is accessbile through http server. The device reads this configuration file, and tries to find the IP@PORT@RANK entry. The basis for this search is the rank provided to the device by the runtime infrastructure. Once this entry is located , the device knows which ports to start the ServerSocketChannel on. Once the server socket channels are started at this port and port+1, these are registered with the selector to accept connections. Every time a client socket connects to this server socket channel, an OP_ACCEPT event is generated. After starting these server sockets, a process connects to the process with rank lesser than its own. This essentially means that if there are four processes, then Process 0 will start two server sockets, Process 1 will start two server socket, and then try to connect to server sockets of Process 0. Similarly after starting two server sockets, Process 2 and 3 will connect to Processes 0&1, and Processes 0&1&2 respectively.

Every time, niodev accepts or connects, it puts the java.nio.channels.SocketChannel into an instance of java.util.Vector writableChannels (for writing messages) or readableChannels (for reading messages), depending on the serverSocketPort. Note that accepting client request is done in the selector thread, and connecting to server socket is done in the user-thread. This may result in concurrent access to writableChannels and readableChannels, and thus access to these should be synchronized. Once alltoall connectivity has been acheived, which means [writableChannels.size() == N-1] and [readableChannels.size() == N-1], then each process need to send information like its rank and UUID to every other process. These rank are the ones read from the configuration file provided by the MPJ runtime infrastructure. Once all the processes have exchanged this information, niodev has worldWritableTable and worldReadableTable, which are instances of java.util.Hashtable. These two hashtables contain UUID as keys, and SocketChannels as values. Note that the channels in 'worldWritableTable' are in blocking mode and are only used for writing messages. For 'niodev', we have decided to keep different channels for reading and writing. The reason is that we want to use non-blocking reads and blocking writes. Non-blocking writes could hurt 'thread-safety' of niodev, or result in very complex code. These hashtables would be later used in send/recv method to obtain the reference of SocketChannel while providing key as the UUID of each process. These UUID, are contained within the xdev.ProcessID objects. Again, while exchanging information, access to worldWriteTable, and worldReadableTable should be synchronized. Normally, the user thread sends all the information, and then waits to selector thread to receive similar messages from all the other processes. When the selector thread reads a message, it first looks at the first four bytes, and after looking at the header information, adds the information received appropriately to one of the hashtables. The value of headers could be INIT_MSG_HEADER_DATA_CHANNEL, and INIT_MSG_HEADER_CONTROL_CHANNEL. Once all of this is done, niodev has been initialized.

Modes of Send

MPI specifications defines four modes of send operation. These are: standard mode of send, buffered mode of send, ready mode of send, and synchronous mode of send. xdev supports two modes of send -- standard and synchronous send. Ready send is similar to standard mode of send, and buffered mode is supported at the higher level alongwith the MPJ buffering API.

Standard Mode of Send

The standard mode of send uses two communication protocols. The first is 'Eager-Send Protocol' and the second is 'Rendezvous Protocol'.

EagerSend Protocol

niodev uses eager send protocol to communicate small messages. The rationale behind using this communication protcol is to minimize the latency for small messages. This protocol assumes that the receiver has buffer space to store the messages in case the matching recv is not posted. Eager-send protocol is used for messages of size less than and equal to 128K bytes.

Rendezvous Protocol

niodev uses rendezvous protocol to communicate large messages. Before communicating large messages, there is an exchange of control messages to make sure that a matching recv is posted. This is necessary to avoid additional copying to temporary xdev buffer.

Synchronous Mode of Send

The synchronous mode of send uses rendezvous protocol described above for communication.

User and Selector Threads

During the initialization of xdev, xdev.NIODevice.init( ...) creates a selector thread which is used to first accept connections. Once all-to-all connectivity has been acheived, then the channels (both control and data) register with the selectors for READ_EVENT. This essentially means that whenever a channel receives some data, it generates OP_READ event, which basically informs that there is some data to read on this channel. Thus, the selector-thread is used normally for reading data from the channels. Also, when there is a short write -- suppose a thread is trying to write 10K message and only succeeds to write 5K bytes, then the channel register with the selector for OP_WRITE event, and comes back to complete writing the message into the SocketChannel.

The user thread is basically invoked when isend/issend/send/ssend/ recv/irecv methods are called. xdev also attempts to provide multiple thread functionality, which basically means there could be multiple user-threads and trying to make calls to these (non) blocking send/recv methods.

This poses a great programming challenge, because user threads and selector threads should synchronize before accessing send/recv queues that contain pending messages that are waiting for the data to be written or read from the channel

Send and Recv Queues

Same Process Communications

There is special case, when a process is trying to send and recv a message to itself. In this case, the message is just copied from the sender buffer into the receiver buffer. The complexity comes in when wild-card like ANY_SOURCE are used.


Field Summary
 
Fields inherited from class xdev.Device
ANY_SOURCE, ANY_SRC, ANY_TAG, deviceName
 
Constructor Summary
NIODevice()
           
 
Method Summary
 void finish()
          This method shuts down the device.
 int getRecvOverhead()
           
 int getSendOverhead()
           
 ProcessID id()
          Returns the id of this process.
 ProcessID[] init(java.lang.String[] args)
          Initializes niodev.
 Status iprobe(ProcessID srcID, int tag, int context)
          Non-Blocking probe method.
 Request irecv(Buffer buf, ProcessID srcID, int tag, int context, Status status)
          Blocking receive method.
 Request isend(Buffer buf, ProcessID dstID, int tag, int context)
          Non-blocking send method
 Request issend(Buffer buf, ProcessID dstID, int tag, int context)
          Non-blocking synchronous send.
 Request peek()
           
 Status probe(ProcessID srcID, int tag, int context)
          Blocking probe method
 Status recv(Buffer buf, ProcessID srcID, int tag, int context)
          This method is the non-blocking recv method.
 void send(Buffer buf, ProcessID dstID, int tag, int context)
          Blocking send method.
 void ssend(Buffer buf, ProcessID dstID, int tag, int context)
          Blocking synchronous send
 
Methods inherited from class xdev.Device
newInstance
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Constructor Detail

NIODevice

public NIODevice()
Method Detail

init

public ProcessID[] init(java.lang.String[] args)
                 throws XDevException
Initializes niodev.

Specified by:
init in class Device
Parameters:
args - Arguments to NIODevice.
Returns:
ProcessID[] An array of ProcessIDs.
Throws:
XDevException - If there is an error initializing the device

id

public ProcessID id()
Returns the id of this process.

Specified by:
id in class Device
Returns:
ProcessID An object containing UUID of the process

getSendOverhead

public int getSendOverhead()
Specified by:
getSendOverhead in class Device

getRecvOverhead

public int getRecvOverhead()
Specified by:
getRecvOverhead in class Device

iprobe

public Status iprobe(ProcessID srcID,
                     int tag,
                     int context)
              throws XDevException
Non-Blocking probe method.

Specified by:
iprobe in class Device
Parameters:
srcID -
tag -
context -
Returns:
mpjdev.Status
Throws:
XDevException - If there is an exception. The specific exception depends on the device.

probe

public Status probe(ProcessID srcID,
                    int tag,
                    int context)
             throws XDevException
Blocking probe method

Specified by:
probe in class Device
Parameters:
srcID - The sourceID of the sender
tag - The tag of the message
context - The integer specifying the context
Returns:
mpjdev.Status The status object
Throws:
XDevException - If there is an exception. The specific exception depends on the device.

isend

public Request isend(Buffer buf,
                     ProcessID dstID,
                     int tag,
                     int context)
              throws XDevException
Non-blocking send method

Specified by:
isend in class Device
Parameters:
buf - The mpjbuf.Buffer object containing the data.
dstID - ProcessID of the destination process.
tag - The unique identifier of the message.
context - An integer providing "safe universe" for messages.
Returns:
mpjdev.Request The Request object, which is later used to check the status of the message.
Throws:
XDevException - If there is an exception. The specific exception depends on the device.

issend

public Request issend(Buffer buf,
                      ProcessID dstID,
                      int tag,
                      int context)
               throws XDevException
Non-blocking synchronous send.

Specified by:
issend in class Device
Parameters:
buf - mpjbuf.Buffer object containing the data
dstID - ProcessID of the destination process
tag - An integer representing the tag (id) of the message
context - An integer specifying context.
Returns:
mpjdev.Request Request object that can be used to check the status and/or progress of the communication.
Throws:
XDevException - If there is an exception. The specific exception depends on the device.

send

public void send(Buffer buf,
                 ProcessID dstID,
                 int tag,
                 int context)
          throws XDevException
Blocking send method.

Specified by:
send in class Device
Parameters:
buf - The mpjbuf.Buffer object containing the data.
dstID - ProcessID of the destination
tag - The unique identifier of the message
context - An integer providing "safe universe" for messages.
Throws:
MPJException - If the buffer is null, dest process ID is insane.
java.nio.BufferOverflowException
java.nio.ReadOnlyBufferException
java.io.IOException - If some I/O error occurs
XDevException - If there is an exception. The specific exception depends on the device.

ssend

public void ssend(Buffer buf,
                  ProcessID dstID,
                  int tag,
                  int context)
           throws XDevException
Blocking synchronous send

Specified by:
ssend in class Device
Parameters:
buf - mpjbuf.Buffer object containing the data
dstID - ProcessID of the destination process
tag - An integer representing the tag (id) of the message
context - An integer specifying context.
Throws:
XDevException - If there is an exception. The specific exception depends on the device.

peek

public Request peek()
             throws XDevException
Specified by:
peek in class Device
Throws:
XDevException

recv

public Status recv(Buffer buf,
                   ProcessID srcID,
                   int tag,
                   int context)
            throws XDevException
This method is the non-blocking recv method.

Specified by:
recv in class Device
Parameters:
buf - The mpjbuf.Buffer object where the user wishes to receive the actual message
srcID - The process id of the sending process
tag - The unique identifier of the message
context - An integer specifying context.
Returns:
Status The status object containing the details of recv
Throws:
MPJException - If the buffer is null or the src is insane
java.io.IOException - If some I/O error occurs
java.lang.IllegalArgumentException
XDevException - If there is an exception. The specific exception depends on the device.

irecv

public Request irecv(Buffer buf,
                     ProcessID srcID,
                     int tag,
                     int context,
                     Status status)
              throws XDevException
Blocking receive method.

Specified by:
irecv in class Device
Parameters:
buf - The mpjbuf.Buffer objereceive the actual message
srcID - The process id of the sending process
tag - The unique identifier of the message
context - An integer that provides "safe communication" universe
status - A mpjdev.Status object initialized at mpjdev/MPJ level.
Returns:
mpjdev.Status The status object containing the details of recv
Throws:
XDevException - If there is an exception. The specific exception depends on the device.

finish

public void finish()
            throws XDevException
This method shuts down the device.

Specified by:
finish in class Device
Throws:
MPJException
java.io.IOException - If some I/O error occurs
XDevException