|
|||||||||
PREV CLASS NEXT CLASS | FRAMES NO FRAMES | ||||||||
SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD |
java.lang.Object xdev.Device xdev.niodev.NIODevice
public class NIODevice
This class is implementation of xdev based on the Java New I/O package.
Java New I/O adds non-blocking I/O to the Java language, which is
extensively used in this device, to provide MPI functionality. Instead
of directly using java.net.Socket
, niodev
uses java.nio.channels.SocketChannel
. This device alongwith the
request classes like xdev.niodev.NIORequest
,
xdev.niodev.NIOSendRequest
, and
xdev.niodev.NIORecvRequest
forms the
basis of communication functionality.
'niodev' reads a configuration file which could be placed in the
local/shared file system, or is accessbile through http server.
The device
reads this configuration file, and tries to find the
IP@PORT@RANK entry. The basis for this search is the rank provided
to the device by the runtime infrastructure. Once this entry is located
, the device knows which ports to start the ServerSocketChannel
on. Once the server socket channels are started at
this port and port+1, these are registered with the selector to accept
connections. Every time a client socket connects to this server socket
channel, an OP_ACCEPT event is generated. After starting these server
sockets, a process connects to the process with rank lesser than its own.
This essentially means that if there are four processes, then Process 0
will start two server sockets, Process 1 will start two server socket,
and then try to connect to server sockets of Process 0. Similarly after
starting two server sockets, Process 2 and 3 will connect to
Processes 0&1, and Processes 0&1&2 respectively.
Every time, niodev accepts or connects, it puts the java.nio.channels.SocketChannel
into
an instance of java.util.Vector
writableChannels
(for writing messages) or readableChannels (for reading messages),
depending on the serverSocketPort. Note that accepting client request
is done in the selector thread, and
connecting to server socket is done in the user-thread. This may
result in concurrent access to writableChannels and readableChannels,
and thus
access to these should be synchronized. Once alltoall connectivity
has been acheived, which means [writableChannels.size() == N-1] and
[readableChannels.size() == N-1], then each process need to
send information like its rank and UUID to every other process.
These rank are the ones read from the configuration file provided
by the MPJ runtime infrastructure. Once all the processes
have exchanged this information, niodev has worldWritableTable and
worldReadableTable, which are instances of java.util.Hashtable
.
These two hashtables contain UUID as keys, and SocketChannels as
values. Note that the channels in 'worldWritableTable' are in
blocking mode and are only used for writing messages. For 'niodev',
we have decided to keep different channels for reading and writing.
The reason is that we want to use non-blocking reads and
blocking writes. Non-blocking writes could hurt 'thread-safety' of
niodev, or result in very complex code.
These hashtables would be later used in send/recv method to
obtain the reference of SocketChannel while providing key as the UUID
of each process. These UUID, are contained within the
xdev.ProcessID
objects. Again, while exchanging
information, access to worldWriteTable, and worldReadableTable should be
synchronized. Normally, the user thread sends all the information,
and then waits to selector thread to receive similar messages from
all the other processes. When the selector thread reads a message,
it first looks at the first four bytes, and after looking at
the header information, adds the information received appropriately to
one of the hashtables. The value of headers could be
INIT_MSG_HEADER_DATA_CHANNEL, and INIT_MSG_HEADER_CONTROL_CHANNEL.
Once all of this is done, niodev has been initialized.
MPI specifications defines four modes of send operation. These are: standard mode of send, buffered mode of send, ready mode of send, and synchronous mode of send. xdev supports two modes of send -- standard and synchronous send. Ready send is similar to standard mode of send, and buffered mode is supported at the higher level alongwith the MPJ buffering API.
The standard mode of send uses two communication protocols. The first is 'Eager-Send Protocol' and the second is 'Rendezvous Protocol'.
niodev uses eager send protocol to communicate small messages. The rationale behind using this communication protcol is to minimize the latency for small messages. This protocol assumes that the receiver has buffer space to store the messages in case the matching recv is not posted. Eager-send protocol is used for messages of size less than and equal to 128K bytes.
niodev uses rendezvous protocol to communicate large messages. Before communicating large messages, there is an exchange of control messages to make sure that a matching recv is posted. This is necessary to avoid additional copying to temporary xdev buffer.
The synchronous mode of send uses rendezvous protocol described above for communication.
During the initialization of xdev, xdev.NIODevice.init( ...) creates a selector thread which is used to first accept connections. Once all-to-all connectivity has been acheived, then the channels (both control and data) register with the selectors for READ_EVENT. This essentially means that whenever a channel receives some data, it generates OP_READ event, which basically informs that there is some data to read on this channel. Thus, the selector-thread is used normally for reading data from the channels. Also, when there is a short write -- suppose a thread is trying to write 10K message and only succeeds to write 5K bytes, then the channel register with the selector for OP_WRITE event, and comes back to complete writing the message into the SocketChannel.
The user thread is basically invoked when isend/issend/send/ssend/ recv/irecv methods are called. xdev also attempts to provide multiple thread functionality, which basically means there could be multiple user-threads and trying to make calls to these (non) blocking send/recv methods.
This poses a great programming challenge, because user threads and selector threads should synchronize before accessing send/recv queues that contain pending messages that are waiting for the data to be written or read from the channel
There is special case, when a process is trying to send and recv a message to itself. In this case, the message is just copied from the sender buffer into the receiver buffer. The complexity comes in when wild-card like ANY_SOURCE are used.
Field Summary |
---|
Fields inherited from class xdev.Device |
---|
ANY_SOURCE, ANY_SRC, ANY_TAG, deviceName |
Constructor Summary | |
---|---|
NIODevice()
|
Method Summary | |
---|---|
void |
finish()
This method shuts down the device. |
int |
getRecvOverhead()
|
int |
getSendOverhead()
|
ProcessID |
id()
Returns the id of this process. |
ProcessID[] |
init(java.lang.String[] args)
Initializes niodev. |
Status |
iprobe(ProcessID srcID,
int tag,
int context)
Non-Blocking probe method. |
Request |
irecv(Buffer buf,
ProcessID srcID,
int tag,
int context,
Status status)
Blocking receive method. |
Request |
isend(Buffer buf,
ProcessID dstID,
int tag,
int context)
Non-blocking send method |
Request |
issend(Buffer buf,
ProcessID dstID,
int tag,
int context)
Non-blocking synchronous send. |
Request |
peek()
|
Status |
probe(ProcessID srcID,
int tag,
int context)
Blocking probe method |
Status |
recv(Buffer buf,
ProcessID srcID,
int tag,
int context)
This method is the non-blocking recv method. |
void |
send(Buffer buf,
ProcessID dstID,
int tag,
int context)
Blocking send method. |
void |
ssend(Buffer buf,
ProcessID dstID,
int tag,
int context)
Blocking synchronous send |
Methods inherited from class xdev.Device |
---|
newInstance |
Methods inherited from class java.lang.Object |
---|
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait |
Constructor Detail |
---|
public NIODevice()
Method Detail |
---|
public ProcessID[] init(java.lang.String[] args) throws XDevException
init
in class Device
args
- Arguments to NIODevice.
XDevException
- If there is an error initializing the devicepublic ProcessID id()
id
in class Device
public int getSendOverhead()
getSendOverhead
in class Device
public int getRecvOverhead()
getRecvOverhead
in class Device
public Status iprobe(ProcessID srcID, int tag, int context) throws XDevException
iprobe
in class Device
srcID
- tag
- context
-
XDevException
- If there is an exception. The specific exception
depends on the device.public Status probe(ProcessID srcID, int tag, int context) throws XDevException
probe
in class Device
srcID
- The sourceID of the sendertag
- The tag of the messagecontext
- The integer specifying the context
XDevException
- If there is an exception. The specific exception
depends on the device.public Request isend(Buffer buf, ProcessID dstID, int tag, int context) throws XDevException
isend
in class Device
buf
- The mpjbuf.Buffer object containing the data.dstID
- ProcessID of the destination process.tag
- The unique identifier of the message.context
- An integer providing "safe universe" for messages.
XDevException
- If there is an exception. The specific exception
depends on the device.public Request issend(Buffer buf, ProcessID dstID, int tag, int context) throws XDevException
issend
in class Device
buf
- mpjbuf.Buffer
object containing the datadstID
- ProcessID
of the destination processtag
- An integer representing the tag (id) of the messagecontext
- An integer specifying context.
XDevException
- If there is an exception. The specific exception
depends on the device.public void send(Buffer buf, ProcessID dstID, int tag, int context) throws XDevException
send
in class Device
buf
- The mpjbuf.Buffer object containing the data.dstID
- ProcessID of the destinationtag
- The unique identifier of the messagecontext
- An integer providing "safe universe" for messages.
MPJException
- If the buffer is null, dest process ID is insane.
java.nio.BufferOverflowException
java.nio.ReadOnlyBufferException
java.io.IOException
- If some I/O error occurs
XDevException
- If there is an exception. The specific exception
depends on the device.public void ssend(Buffer buf, ProcessID dstID, int tag, int context) throws XDevException
ssend
in class Device
buf
- mpjbuf.Buffer
object containing the datadstID
- ProcessID
of the destination processtag
- An integer representing the tag (id) of the messagecontext
- An integer specifying context.
XDevException
- If there is an exception. The specific exception
depends on the device.public Request peek() throws XDevException
peek
in class Device
XDevException
public Status recv(Buffer buf, ProcessID srcID, int tag, int context) throws XDevException
recv
in class Device
buf
- The mpjbuf.Buffer object where the user wishes to receive
the actual messagesrcID
- The process id of the sending processtag
- The unique identifier of the messagecontext
- An integer specifying context.
MPJException
- If the buffer is null or the src is insane
java.io.IOException
- If some I/O error occurs
java.lang.IllegalArgumentException
XDevException
- If there is an exception. The specific exception
depends on the device.public Request irecv(Buffer buf, ProcessID srcID, int tag, int context, Status status) throws XDevException
irecv
in class Device
buf
- The mpjbuf.Buffer objereceive the actual messagesrcID
- The process id of the sending processtag
- The unique identifier of the messagecontext
- An integer that provides "safe communication" universestatus
- A mpjdev.Status
object initialized
at mpjdev/MPJ level.
XDevException
- If there is an exception. The specific exception
depends on the device.public void finish() throws XDevException
finish
in class Device
MPJException
java.io.IOException
- If some I/O error occurs
XDevException
|
|||||||||
PREV CLASS NEXT CLASS | FRAMES NO FRAMES | ||||||||
SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD |