Managing Multiple I/O Objects

Some care is needed when using multiple communication channels at once in order to maintain responsiveness, since any individual I/O operation might take arbitrarily long to complete. (For instance, reading from a channel will not complete until the other end of the channel has written the requisite data.) This is particularly important when implementing distributed parallelism, as processes which are waiting for I/O to occur are not doing useful work.

In particular, consider the manager/worker model of parallelism, where a manager process assigns pieces of work to individual worker processes, collects the results, and assigns new work, repeating until all of the desired work has been completed. If the manager waits for the results from a particular worker (i.e., performs a read operation on the associated channel) then they will do nothing else until that worker has finished their task. Other workers who finish their tasks in the meantime will not be given more work to do until the first worker is finished, potentially causing them to wait for a very long time doing nothing.

One solution to this problem is for the manager not to wait unconditionally on a specific worker, but instead to wait until at least one of the set of workers has results to report, and then process those. In this way the manager never spends time waiting for one worker while another worker has results to report, and thus workers never spend significant time waiting for the manager to receive their results and give them new work to do.

Reinterpreting this approach in terms of I/O channels, the above indicates that we want a way to wait until at least one of a set of channels is ready for I/O to be performed on it. (By "ready" we mean that an I/O operation of the appropriate type will make at least some progress on the channel without further waiting.) This is accomplished with the WaitForIO intrinsic, as described below.

Note that, in principle, I/O operations may only partially complete before waiting becomes necessary. (For instance, network congestion may delay some network packets.) Thus some delays might still arise by using this approach with I/O operations that cannot partially complete (all writes; reads where the byte count is specified; reads of objects). In most cases this is rarely a serious issue, but a more robust approach is possible by using asynchronous I/O, as described in the next section.

WaitForIO(R : parameters) : SeqEnum -> SeqEnum
    TimeLimit: RngIntElt                Default: ∞
Given a sequence of I/O objects R, returns the sequence of those elements of R which are ready to have read operations performed upon them. If no elements of R are ready then this intrinsic will wait until at least one does become ready, or until the specified time limit has elapsed, whichever comes first. The time limit is measured in milliseconds.

Server sockets will be considered ready for reading when a connection attempt has been made and so a call to WaitForConnection will return without delay. Other channels will be considered ready for reading if either a pending read request has been fulfilled, or there are no pending read requests and some data is available for reading.

WaitForIO(R, W : parameters) : SeqEnum -> SeqEnum, SeqEnum
    TimeLimit: RngIntElt                Default: ∞
Given sequences of I/O objects R and W, returns two sequences: the sequence of those elements of R which are ready to have read operations performed upon them, and the sequence of those elements of W which are ready to have write operations performed upon them. If no elements of R or W are ready then this intrinsic will wait until at least one does become ready, or until the specified time limit has elapsed, whichever comes first. The time limit is measured in milliseconds.

The condition for channels to be considered ready for reading are as described in the previous intrinsic. Channels will be considered ready for writing if all pending write requests have been fulfilled and it would be possible to write some data without delay.

Example IO_ChatServer (H3E17)

We demonstrate the use of WaitForIO by writing an extremely simple "chat" server. This will accept connections and read messages from connected channels, forwarding those messages to all connected channels. A simple ID scheme based on port numbers is implemented in order to distinguish senders.
> chatserver := procedure(host, port)
>    server := Socket(: LocalHost := host, LocalPort := port);
>    chatters := ;
>    ids := AssociativeArray();
>    while true do
>       text := "";
>       ready := WaitForIO([server] cat Setseq(chatters));
>       for I in ready do
>          if I eq server then    // new chatter
>             C := WaitForConnection(server);
>             id := Sprintf("[%o]", r[2]) where _,r := SocketInformation(I);
>             text cat:= Sprintf("%o has joined n", id);
>             ids[C] := id;
>             Include(~chatters, C);
>          else
>             ok, msg := ReadCheck(I);
>             if not ok or IsEof(msg) then    // error or EOF, assume they left
>                text cat:= Sprintf("%o has left n", ids[I]);
>                Remove(~ids, I);
>                Exclude(~chatters, I);
>             else    // all is well; add message to text to send
>                text cat:= Sprintf("%o: %o", ids[I], msg);
>             end if;
>          end if;
>       end for;
>       if #text gt 0 then
>          for I in chatters do
>             Write(I, text);
>          end for;
>       end if;
>    end while;
> end procedure;
> chatserver("localhost", 7000);
After starting the chatserver; we test it (not shown here) using the UNIX telnet program to connect to the given host and port. As desired, messages sent by connected telnet instances are seen by all of the connected telnet instances.

Exercise: Modify the chat server to allow a client to change its reported ID by sending an appropriate message. Suggested syntax: /name new-name. Be careful with newlines.

Further exercise: Disallow changing ID to an already-used ID. If that is attempted, send an error message to the relevant client only.

V2.28, 13 July 2023