Asynchronous I/O

One difficulty that arises when coordinating multiple channels is dealing with incomplete data. Due to network delays or other factors, data written by one side may arrive at the reader's side in several chunks with time delays between them. If we insist on reading all of the data then we will wait, and become unresponsive to the other channels in the meantime. If we try to manage partial data and reassemble it ourselves then the I/O handling becomes extremely complicated.

Magma's resolution of this difficulty is called asynchronous I/O. In asynchronous I/O, instead of performing I/O operations immediately they are instead queued to occur on the channel. At some later time they will complete, and this completion status can be waited for with WaitForIO. Thus it is possible to manage multiple channels while staying responsive, and using a simple interface.

Important: Currently Magma does not support truly asynchronous I/O; instead, asynchronous I/O operations are only guaranteed to advance during a call to WaitForIO with the associated channel in one of the argument lists. This meshes well with common paradigms, but it is important to be aware of it. In particular, queueing an asynchronous write without a following WaitForIO may never perform the write.

The asynchronous I/O intrinsics all start with Async, and are the counterparts of the corresponding synchronous intrinsics (without the Async). After an asynchronous read is performed, the data must be retrieved by using the corresponding synchronous read. An error will arise if the synchronous read operation is not compatible with the corresponding asynchronous read that was queued.

Contents

A Note on Files and Pipes

File objects are considered to be "fast" --- it is expected that I/O on them will always complete immediately, without waiting. This is not always true (for instance, with network-mounted filesystems), but reflects an equivalent assumption in Linux. Consequently, although it is permitted to pass file objects to WaitForIO it is rarely useful, and asynchronous I/O operations will attempt to complete immediately.

Pipe objects are not properly handled yet, and attempting to perform asynchronous I/O on them will produce an error. When used with WaitForIO, pipes will incorrectly be treated as fast channels, which may cause undesired blocking. These issues will be fixed in an upcoming version of Magma.

Strings

AsyncRead(I : parameters) : IO ->
    Max: RngIntElt                      Default: 0
Queues an asynchronous read request of a string to I. If the parameter Max is set to a positive value then at most that many bytes will be read. Note that the resulting string may contain fewer than Max bytes, depending on the amount of data that becomes available.
AsyncRead(I, n) : IO, RngIntElt ->
Queues an asynchronous read request of an n-byte string to I. Note that the resulting string may contain fewer than n bytes if the input has terminated.
AsyncWrite(I, s) : IO, MonStgElt ->
Queues an asynchronous write request of the string s to I.

Byte Sequences

AsyncReadBytes(I : parameters) : IO ->
    Max: RngIntElt                      Default: 0
Queues an asynchronous read request of a byte sequence to I. If the parameter Max is set to a positive value then at most that many bytes will be read. Note that the resulting sequence may contain fewer than Max bytes, depending on the amount of data that becomes available.
AsyncReadBytes(I, n) : IO, RngIntElt ->
Queues an asynchronous read request of an n-byte sequence to I. Note that the resulting sequence may contain fewer than n bytes if the input has terminated.
AsyncWriteBytes(I, S) : IO, SeqEnum ->
Queues an asynchronous write request of the byte sequence S to I.

Objects

See Subsection Objects for more information about I/O with Magma objects and for descriptions of the corresponding synchronous functions.

AsyncReadObject(I) : IO ->
Queues an asynchronous read request of an object to I.
AsyncWriteObject(I, x) : IO, Any ->
Queues an asynchronous write request of x to I.

Versions

See Subsection Versions for an explanation of versions and for descriptions of the corresponding synchronous functions.

AsyncNotifyVersion(I) : IO ->
Queues an asynchronous version notify request to I.
AsyncExpectVersion(I) : IO ->
Queues an asynchronous read request of a version to I.
AsyncExchangeVersions(I) : IO ->
Queues an asynchronous exchange of versions to I.

Example IO_Collatz (H3E18)

We show an implementation of the manager/worker model that uses asynchronous I/O and object transmission, greatly simplifying the I/O loop.

The workload chosen will be computing hailstone sequences. Given an integer n, we iterate the Collatz map until we reach 1; the number of iterations taken and the maximum intermediate value will be returned to the manager. For the sake of simplicity we assume that 1 will always be reached.

The worker side of the computation involves a work function to actually do the work and a simple wrapper around it that interacts with the manager. The work function will take whatever object the manager sends, interpret it, do the computation, and return a single object as the result. Since we have multiple values to return, we combine them into a single tuple. A small random time delay is inserted in each computation for the purposes of this example so that the effects of multiple workers can be seen; real workloads would not include such a delay.

> collatz := function(n)
>    orign := n;
>    niters := 0;
>    maxval := n;
>    while n gt 1 do
>       n := IsEven(n) select n div 2 else 3*n + 1;
>       niters +:= 1;
>       maxval := Max(maxval, n);
>    end while;
>    // delay by 100, 200, or 300 milliseconds:
>    _ := WaitForIO([] : TimeLimit := 100*Random(1,3));
>    return <orign, niters, maxval>;
> end function;
The wrapper for the worker takes a host and port to contact the manager on, and is a simple work-processing loop thereafter. It uses synchronous I/O calls throughout, as the only I/O it could be waiting for involves the server.
> worker := procedure(host, port)
>    S := Socket(host, port);
>    ExchangeVersions(S);
>    while true do
>       ok, data := ReadObjectCheck(S);
>       if not ok then break; end if;
>       result := collatz(data);
>       WriteObject(S, result);
>    end while;
> end procedure;
First we create the communication channel and then immediately exchange versions with the server. This ensures that the program will work even if the manager and workers are using different versions of Magma. Then we loop on the work, despatching it and returning the results; using object I/O avoids any concerns about incomplete I/O operations. We use ReadObjectCheck in order to gracefully exit on EOF.

Now for the manager side of things. The manager will use asynchronous I/O calls in order to remain responsive. For the purposes of this example we will print output about the workers, so we start with a simple function to produce an identifier for each communication channel.

> channelid := func<I | t where _,t := SocketInformation(I)>;
The manager takes a sequence of work items as its argument (in this example, the work items are simply integers). It would also be reasonable for it to take the host and port information that it should listen on (since the workers will need to know this), but we instead let the system select those values and print them out.
> manager := procedure(workleft)
>    server := Socket();
>    host, port := Explode(SocketInformation(server));
>    printf "Manager listening on port %o of %o n", port, host;
>    busy := ;
>    while #workleft gt 0 or #busy gt 0 do
>       ready := WaitForIO([server] cat Setseq(busy));
>       idle := [];          // workers awaiting work
>       for I in ready do
>          if I eq server then    // new worker has connected
>             W := WaitForConnection(I);
>             printf "New worker %o joined n", channelid(W);
>             AsyncExchangeVersions(W);
>             Append(~idle, W);
>          else
>             result := ReadObject(I);
>             n, count, maxv := Explode(result);
>             printf "%o: %o reached 1 (max %o) after %o iterations n",
>                 channelid(I), n, maxv, count;
>             Exclude(~busy, I);
>             Append(~idle, I);
>          end if;
>       end for;
>       for W in idle do
>          if #workleft eq 0 then break; end if;
>          data := workleft[1];
>          Remove(~workleft, 1);
>          AsyncWriteObject(W, data);
>          AsyncReadObject(W);
>          Include(~busy, W);
>       end for;
>    end while;
> end procedure;
First the manager creates the server socket for workers to connect to; it prints out the associated information so that we know what arguments to give to start the workers with.

During the computation, the set busy will keep track of those channels associated with workers that currently have work assigned to them, and the sequence idle will track those workers which are waiting to have work assigned to them. The manager is done only when all work has been assigned, and all assigned work has been completed.

We wait for new workers to connect (on the server socket) and for completed work to be reported by the busy workers. A subtle point in the above is that we used the version of WaitForIO that only waits for read events. This is the typical case --- the manager queues both the write and the read, and the worker will only send results after having read the data. Thus only the completion of the read event is of interest to the manager.

After a new worker joins then version exchange is queued immediately, since we will be sending and receiving objects. The worker is then added to the idle list.

When a worker has completed its task and the result is ready, we perform the synchronous ReadObject to get this result. We then mark the worker as idle (and no longer busy).

Finally, for each idle worker we attempt to (asynchronously) assign remaining work to it. We queue both the write and the following read, and mark the worker as busy.

We now demonstrate the code in action with two workers. First we start the manager in order to get the channel information for the workers to use.

> // manager
> manager([1..10]);
Manager listening on port 46119 of 0.0.0.0
Then we launch the workers (in this case, on the same machine).
> // worker 1
> worker("localhost", 46119);
> // worker 2
> worker("localhost", 46119);
And the manager displays the progress of the computation for us. Note that without the introduced delay the first worker would probably have performed all the computation before we could launch the second one.
New worker <"127.0.0.1", 43352> joined
<"127.0.0.1", 43352>: 1 reached 1 (max 1) after 0 iterations
New worker <"127.0.0.1", 43354> joined
<"127.0.0.1", 43354>: 3 reached 1 (max 16) after 7 iterations
<"127.0.0.1", 43352>: 2 reached 1 (max 2) after 1 iterations
<"127.0.0.1", 43352>: 5 reached 1 (max 16) after 5 iterations
<"127.0.0.1", 43354>: 4 reached 1 (max 4) after 2 iterations
<"127.0.0.1", 43354>: 7 reached 1 (max 52) after 16 iterations
<"127.0.0.1", 43352>: 6 reached 1 (max 16) after 8 iterations
<"127.0.0.1", 43354>: 8 reached 1 (max 8) after 3 iterations
<"127.0.0.1", 43352>: 9 reached 1 (max 52) after 19 iterations
<"127.0.0.1", 43354>: 10 reached 1 (max 16) after 6 iterations
V2.28, 13 July 2023