The low-level features necessary to perform distributed parallelism in Magma are sockets and I/O multiplexing; these have been in place for a considerable while. However, using them effectively was often challenging and fiddly, especially when trying to be robust in the face of network errors. Amongst other issues, communicating anything but the simplest data meant needing to take into account the possibilities of fragmentation of that data and managing the reassembly explicitly.
Previously two key features for simplifying distributed communication have been introduced: object transmission and asynchronous I/O. Together, these remove most or all of the complexity in the communication component of a distributed parallel computation. The range of objects that support transmission in this way continues to be expanded as part of our ongoing efforts.
The next step up is to simplify the process of implementing distributed parallel computations themselves. There are several ways this might be done; for the V2.26 release the concentration is on the manager/worker model, with the goal to make implementing such a system as easy as possible. Other models are under consideration, but are left for the future.
In this model of distributed parallelism, there is one special process designated the manager, and one or more other processes designated workers. The manager has a list of data defining work that needs to be done. Workers contact the manager to register their availability to do work, then settle into a cycle of performing tasks received from the manager and reporting the results. The manager allocates tasks to workers and collates the results, then returns them to the user when the computation is finished. It also needs to manage the pool of workers as a whole, handling the addition and removal of workers from this pool as necessary.
We focus on a particular way of using this model. In this approach, a contact socket is set up for the manager, and a list defining work tasks is given to it. Then workers are started on various machines and told to contact the manager (using the known socket information). Once all of the work has been completed the workers exit and the manager reports the results.
This particular approach is well-suited to the situation where the entire computation is distributed, with a clearly defined set of work that is known up-front. For instance, if one were to want to compute some data for each of a large number of objects and then save the results to a file to later be integrated into a database --- this would naturally be handled by setting up a manager with the list of those objects and workers that know how to compute that data for one of the objects.
We have found this approach to be a good fit for many of our needs, and the implementation of the manager/worker model to be a powerful tool for this. However, it is not always the best approach to take; consequently, we also show details of an implementation so that it can be adjusted to meet different circumstances.
At the simplest level, the interface consists of two intrinsics: DistributedManager and DistributedWorker.
A user wishing to use distributed parallelism first creates a socket for the manager to use and then calls DistributedManager. Then workers are started on the desired machines, with each worker calling DistributedWorker. The manager will allocate tasks to each worker and collect the results. When all tasks are complete the DistributedManager returns the collated results, and each call to DistributedWorker exits.
The arguments for DistributedManager are a previously-created server socket for communication purposes and a sequence (or list) of work items. Each work item will be sent to a worker. The return value is the sequence of results computed by the workers, in the order of the corresponding work items.There is also a version that takes a Process for cases where it is not desirable to generate all inputs at once.
The parameters are explained later in the sections on Alternative Results Handling (Alternative Results Handling) and Task Group Management (Task Group Management).
The arguments to DistributedWorker are the host and port information for the manager's socket, and a function that takes a work item as input and returns the appropriate result.
For example purposes we will use a setup with four machines, named circle, triangle, square, and star. The manager will run on circle, while one worker will run on each of triangle, square, and star. We will use port 10000 on circle for communication.
The workload in question will be some very basic information about the Collatz map. This map acts on a positive integer n to produce another positive integer, as follows: If n is even, then the result is n/2; if n is odd, then the result is 3n + 1. It is widely believed that repeated iteration of the Collatz map will always eventually produce the number 1, regardless of the original starting number, but a proof has yet to be found.
Workers will be given a number n and return a tuple containing the original value of n, the number of iterations of the Collatz map required to reach 1, and the largest value that was encountered in the process. The manager will compute this data for each of the first 10 integers.
The worker code is the following, which we put in a file "collatz_worker.m" to make it easier to spawn workers. We will use this worker code unchanged in the basic examples.
host := "circle"; port := 10000; procedure sleep_ms(ms) // "sleeps" for the given number of milliseconds _ := WaitForIO([] : TimeLimit := ms); end procedure; collatz := func<n | IsEven(n) select n div 2 else 3*n + 1>; function collatz_info(n) k := n; niters := 0; maxval := k; while k gt 1 do k := collatz(k); niters +:= 1; maxval := Max(maxval, k); end while; sleep_ms(100*Random(1, 3)); // random delay for example purposes only return <n, niters, maxval>; end function; DistributedWorker(host, port, collatz_info); quit;The implementation is straightforward except for the call to sleep_ms; that line causes the work function to delay for one of 100ms, 200ms, or 300ms. This is done purely for example purposes as the workload is so small that the first worker would likely accept and compute the data for all tasks before the second worker could even be started. Obviously actual workloads should not have such a delay.
The manager session (from a Magma launched on circle) might look like this:
> socket := Socket(: LocalHost := "circle", LocalPort := 10000); > DistributedManager(socket, [1..10]);
Now this Magma is waiting for workers. We start them on the other machines with the following command:
magma collatz_worker.m
A short while after doing the above, the workers on each machine exit and the call to DistributedManager returns:
[ <1, 0, 1>, <2, 1, 2>, <3, 7, 16>, <4, 2, 4>, <5, 5, 16>, <6, 8, 16>, <7, 16, 52>, <8, 3, 8>, <9, 19, 52>, <10, 6, 16> ]
Two verbose flags are available for the manager's use. These provide a little information about the workers, and can be useful in order to track down issues, to reassure yourself that things are actually happening, or to log partial progress in case of a crash.
Setting this verbose flag to 1 will cause a message to be printed each time a worker joins or leaves. This message includes the worker's local host and port.Setting this verbose flag to 2 will additionally cause a message to be printed each time a worker is assigned a task or reports the results of an assigned task. The task data (and result) will also be shown.
Setting this verbose flag to 1 will add timestamps to the other verbose output.
> SetVerbose("ManagerWorker", 2); > socket := Socket(: LocalHost := "circle", LocalPort := 10000); > DistributedManager(socket, [1..10]); New worker joined from triangle:14342 Worker triangle:14342 assigned task 1 Worker triangle:14342 completed task 1 with result <1, 0, 1> Group 1 complete, terminating 0 other workers Worker triangle:14342 assigned task 2 New worker joined from square:12711 Worker square:12711 assigned task 3 Worker triangle:14342 completed task 2 with result <2, 1, 2> Group 2 complete, terminating 0 other workers Worker triangle:14342 assigned task 4 Worker triangle:14342 completed task 3 with result <3, 7, 16> Group 3 complete, terminating 0 other workers Worker square:12711 assigned task 5 Worker triangle:14342 completed task 4 with result <4, 2, 4> Group 4 complete, terminating 0 other workers Worker triangle:14342 assigned task 6 New worker joined from star:17338 Worker star:17338 assigned task 7 Worker star:17338 completed task 7 with result <7, 16, 52> Group 7 complete, terminating 0 other workers Worker star:17338 assigned task 8 Worker square:12711 completed task 5 with result <5, 5, 16> Group 5 complete, terminating 0 other workers Worker square:12711 assigned task 9 Worker triangle:14342 completed task 6 with result <6, 8, 16> Group 6 complete, terminating 0 other workers Worker triangle:14342 assigned task 10 Worker square:12711 completed task 9 with result <9, 19, 52> Group 9 complete, terminating 0 other workers Worker star:17338 completed task 8 with result <8, 3, 8> Group 8 complete, terminating 0 other workers Worker triangle:14342 completed task 10 with result <10, 6, 16> Group 10 complete, terminating 0 other workers [ <1, 0, 1>, <2, 1, 2>, <3, 7, 16>, <4, 2, 4>, <5, 5, 16>, <6, 8, 16>, <7, 16, 52>, <8, 3, 8>, <9, 19, 52>, <10, 6, 16> ]Here workers are identified by their host and local port number. The "task n" components are not the task number, but the task data; in this example they happen to be the same. The meaning of the groups and worker termination messages is explained later in the section on group tasks.
The simple approach described above works for many cases, but there will be other situations where it does not suffice. One common example is when the results are incompatible elements that cannot be put into the same sequence. To handle such situations, DistributedManager has three parameters related to results that may be set.
var initial_results: Any Default: [ ] var update_results: UserProgram Default: var process_results: UserProgram Default: The parameter initial_results provides the initial value to use for the results of the routine. If not set, this defaults to the empty sequence. If it is instead set to the empty list ([* *]) then the results will be returned in a list rather than a sequence.
The parameter update_results is a procedure that modifies the results value to incorporate the result that has been computed for a particular input. It must have one of two forms:
These parameters are all optional, except that the update_results must be provided if the initial_results were set to something other than the empty sequence or the empty list.
> socket := Socket(: LocalHost := "circle", LocalPort := 10000); > DistributedManager(socket, [1..10] : initial_results := [* *]); [* <1, 0, 1>, <2, 1, 2>, <3, 7, 16>, <4, 2, 4>, <5, 5, 16>, <6, 8, 16>, <7, 16, 52>, <8, 3, 8>, <9, 19, 52>, <10, 6, 16> *]
> initial_results_maxval := < 0, 0 >; > update_results_maxval := procedure(~results, result, index) > n, niters, maxval := Explode(result); > assert index eq n; > if results[1] lt maxval then > results := < maxval, n >; > end if; > end procedure; > > socket := Socket(: LocalHost := "circle", LocalPort := 10000); > DistributedManager(socket, [1..10] : > initial_results := initial_results_maxval, > update_results := update_results_maxval > ); < 52, 7 >Of course, for such a small example we could have just examined the results at the end to compute this, but for more interesting workloads it may be undesirable to keep large outputs around when only a small amount of data is wanted.
> initial_results_niters := < 0, 0 >; > update_results_niters := procedure(~results, result, index) > n, niters, maxval := Explode(result); > assert index eq n; > results := < results[1] + niters, results[2] + 1 >; > end procedure; > process_results_niters := procedure(~results) > results := results[1] / results[2]; > end procedure; > > socket := Socket(: LocalHost := "circle", LocalPort := 10000); > DistributedManager(socket, [1..10] : > initial_results := initial_results_niters, > update_results := update_results_niters, > process_results := process_results_niters > ); 67/10
While the options already presented work well for simple situations, there are a few common patterns that arise sufficiently often that special handling has been provided for them.
One such pattern is based on trying several different approaches for an item, and taking whichever result finishes first. This is particularly useful when you have a set of heuristics about ways to solve the problem, but also want to cater for the cases where those heuristics are not correct. Indeed, it can be a powerful tool to aid in the development of such heuristics. A subcategory of this pattern is the ability to try a task with a small bound, and then retry it with a higher bound if unsuccessful.
Another common approach is to split the work for a particular input into several components, such as localisations at certain primes, and then combine all of the factors together to produce the desired result for that item. While this could be handled in the existing framework by making separate items for each local component, it can become less clear to do so.
In order to handle the above usage patterns, and others, the notion of task groups is introduced. With task groups, each input item can be split into multiple tasks that are all associated with the original item. The results of these subtasks can be combined, and once a satisfactory result is computed any remaining workers still assigned to the no-longer-necessary tasks of that group are caused to exit. (This requires a means of restarting such workers, which we describe below.)
There are three further parameters to DistributedManager that provide functionality for task groups.
var group_tasks: UserProgram Default: var update_group: UserProgram Default: var incomplete_group: UserProgram Default: The parameter group_tasks is a function that takes an input item and returns the initial data defining the associated task group. It must have the following form:
The parameter update_group is a function that will take the result of a task and use it to adjust the group result (in a similar manner to how update_results updates the overall results). It must have the following form:
The parameter incomplete_group is a function used to handle groups which ran out of associated tasks without reaching a definitive conclusion. i.e., for which the third value returned by update_group was always false. It must be a function with the following form:
Note that update_group may produce further tasks to try; this allows first trying a simple approach, and then spawning more complicated (or time-consuming) approaches if that is unsuccessful.
All of these parameters are optional.
> group_tasks_hundred := function(n) > return < 0, <0, 0> >, [ 100*n - 99 .. 100*n ]; > end function; > > update_group_maxval := function(item, task, tresult, gresult) > n, niters, maxval := Explode(tresult); > gcount, gtuple := Explode(gresult); > if gtuple[1] lt maxval then > gtuple := < maxval, n >; > end if; > gcount +:= 1; > done := gcount eq 100; > gresult := done select gtuple else < gcount, gtuple >; > return gresult, [], done; > end function; > > socket := Socket(: LocalHost := "circle", LocalPort := 10000); > DistributedManager(socket, [1..10] : > group_tasks := group_tasks_hundred, > update_group := update_group_maxval > ); [ <9232, 27>, <9232, 103>, <13120, 255>, <13120, 383>, <39364, 447>, <39364, 511>, <41524, 639>, <250504, 703>, <190996, 871>, <250504, 937> ]Note: If running this example, we recommend adjusting the millisecond delay in the worker function down by using a multiplier of 10 instead of 100.
A key point to note in this example is that our partial group result includes an extra value (gcount in the code) that is not in the final result. This is so we can tell when the computation is actually complete; since some tasks finish earlier than others, we cannot just check the task data in order to decide whether the group computation is complete or not. Of course, we then have to remove it from the returned group result if the computation is complete. This is done in the second last line of that function where we conditionally assemble the new group result.
We could have left the count in and removed it with a suitable update_results routine (or even with process_results), but in this instance it felt tidier to do it in the group update function.
> group_tasks_hundred := function(n) > return < 0, 0 >, [ 100*n - 99 ]; > end function; > > update_group_maxval := function(item, task, tresult, gresult) > n, niters, maxval := Explode(tresult); > assert task eq n; > if gresult[1] lt maxval then > gresult := < maxval, n >; > end if; > done := IsDivisibleBy(n, 100); > tasks := done select [] else [ task + 1 ]; > return gresult, tasks, done; > end function; > > socket := Socket(: LocalHost := "circle", LocalPort := 10000); > DistributedManager(socket, [1..10] : > group_tasks := group_tasks_hundred, > update_group := update_group_maxval > ); [ <9232, 27>, <9232, 103>, <13120, 255>, <13120, 383>, <39364, 447>, <39364, 511>, <41524, 639>, <250504, 703>, <190996, 871>, <250504, 937> ]Here there is only ever one task in the group at a time; when that task completes it either spawns the next one (by returning the increased task number in a sequence) or finishes if this was the last task in the group. In this scenario we can use the task data instead of needing a gcount value or similar, because we have guaranteed the order in which tasks within the group run.
However, note that by using this approach we have also greatly reduced the amount of parallelism available --- there is at most one task associated with a group at any time, so if we had more workers than task groups some of the workers would be idle. Still, it is sometimes useful to do this; an example might be a search for "small" points on a curve where the nature of the two-dimensional search can make this better than an incremental approach.
The result of a group could become known while there are still workers processing (no longer needed) tasks in that group. When that happens, it would clearly be desirable if the manager could tell those workers to stop those tasks and work on new ones instead. Unfortunately, this is not straightforward; the worker might be deep in the throes of a long computation that cannot be interrupted, or at least not without causing the worker process to exit.
Instead, a less sophisticated approach is used: The manager will close its communication port with such workers, and when they finally try to report a result they will fail and so exit. Whatever spawned the worker originally can then detect this abnormal exit and spawn a new worker to replace it.
A variation of the DistributedWorker intrinsic is provided for these situations:
This procedure behaves the same as DistributedWorker, except that it expects that it may be unexpectedly terminated and uses exit code statuses to communicate with the calling environment. Additionally, on non-Windows platforms, a monitored worker will notice when the communication channel closes and exit immediately, instead of having to finish whatever computation it is working on.The special exit status of 5 is used to indicate that a worker should not be respawned. The recommended procedure for spawning monitored workers, then, is to have a loop that creates a worker and waits for it to exit. The loop terminates if the exit status is 5, otherwise it continues. Using a standard UNIX shell, the following code would do:
while : do magma collatz_worker.m [ ? - eq 5 ] && break done
while : do magma collatzworker.m [ ? -eq 5 ] && break done
> group_tasks_hundred := function(n) > return < 0, 0 >, [ 100*n - 99 .. 100*n ]; > end function; > > update_group_maxval := function(item, task, tresult, gresult) > n, niters, maxval := Explode(tresult); > if maxval ge 6000*item then > return < maxval, n >, [], true; > else > return gresult, [], false; > end if; > end function; > > incomplete_group_maxval := function(item, gresult) > return false; > end function; > > socket := Socket(: LocalHost := "circle", LocalPort := 10000); > DistributedManager(socket, [1..10] : > initial_results := [* *], > group_tasks := group_tasks_hundred, > update_group := update_group_maxval, > incomplete_group := incomplete_group_maxval > ); [* <9232, 27>, false, false, false, <39364, 447>, <39364, 511>, false, <250504, 703>, <190996, 871>, <250504, 937> *] > delete socket; // ensure that workers terminateNote the use of the incomplete group handling here; in particular, how using it enables the group updating code to be quite simple. We could also have used this approach in the original advanced example and thereby avoided the need to track counts, but for expository purposes chose not to.
Also note the explicit deletion of the socket at the end --- this ensures that the workers will terminate promptly. It would be possible to keep the socket around and re-use it for multiple runs instead, as long as the workers were doing the same kind of work.
We show a stand-alone implementation of the manager/worker model for a particular workload. This will demonstrate the usage of both the asynchronous I/O and object transmission facilities provided by Magma. For more complete details about these facilities, see Chapter INPUT AND OUTPUT, particularly the section on asynchronous I/O (Section Asynchronous I/O).
Of course, the DistributedManager and DistributedWorker intrinsics are more convenient to use (and more powerful) than the example shown here. However, this example will serve to illustrate the relevant features so that users can implement alternative models of distributed parallelism as needed.
The foundational concepts and the associated intrinsics are:
This routine exchanges information about the version of the object transmission protocol understood by each side of the channel. This allows newer versions of Magma to still communicate correctly with older versions, using a commonly understood format. Each side of the communication channel should call this before performing any I/O on the channel.
Reads Magma Object Format data from I and returns the corresponding Magma object.
Writes the Magma object x to I, using the Magma Object Format.
Queues an asynchronous read request of a Magma object to I.
Queues an asynchronous write request of x to I.
TimeLimit: RngIntElt Default: ∞
Given a sequence of I/O objects R, returns the sequence of those elements of R which are ready to have read operations performed upon them. If no elements of R are ready then this intrinsic will wait until at least one does become ready, or until the specified time limit has elapsed, whichever comes first. The time limit is measured in milliseconds.Server sockets will be considered ready for reading when a connection attempt has been made (and so a call to WaitForConnection will return without delay). Other channels will be considered ready for reading if either a pending (asynchronous) read request has been fulfilled, or there are no pending read requests and some data is available for reading.
TimeLimit: RngIntElt Default: ∞
Given sequences of I/O objects R and W, returns two sequences: the sequence of those elements of R which are ready to have read operations performed upon them, and the sequence of those elements of W which are ready to have write operations performed upon them. If no elements of R or W are ready then this intrinsic will wait until at least one does become ready, or until the specified time limit has elapsed, whichever comes first. The time limit is measured in milliseconds.The condition for channels to be considered ready for reading are as described in the previous intrinsic. A channel will be considered ready for writing if all pending (asynchronous) write requests have been fulfilled and it would be possible to write some data without delay.
The worker side of the computation involves a work function to actually do the work and a simple wrapper around it that interacts with the manager. The work function will take whatever object the manager sends, interpret it, do the computation, and return a single object as the result. Since we have multiple values to return, we combine them into a single tuple. A small random time delay is inserted in each computation for the purposes of this example so that the effects of multiple workers can be seen; real workloads would not include such a delay.
> collatz := function(n) > orign := n; > niters := 0; > maxval := n; > while n gt 1 do > n := IsEven(n) select n div 2 else 3*n + 1; > niters +:= 1; > maxval := Max(maxval, n); > end while; > // delay by 100, 200, or 300 milliseconds: > _ := WaitForIO([] : TimeLimit := 100*Random(1,3)); > return <orign, niters, maxval>; > end function;The wrapper for the worker takes a host and port to contact the manager on, and is a simple work-processing loop thereafter. It uses synchronous I/O calls throughout, as the only I/O it could be waiting for involves the server.
> worker := procedure(host, port) > S := Socket(host, port); > ExchangeVersions(S); > while true do > ok, data := ReadObjectCheck(S); > if not ok then break; end if; > result := collatz(data); > WriteObject(S, result); > end while; > end procedure;First we create the communication channel and then immediately exchange versions with the server. This ensures that the program will work even if the manager and workers are using different versions of Magma. Then we loop on the work, despatching it and returning the results; using object I/O avoids any concerns about incomplete I/O operations. We use ReadObjectCheck in order to gracefully exit on EOF, which we will receive when the server has no more work to allocate.
Now for the manager side of things. The manager will use asynchronous I/O calls in order to remain responsive. For the purposes of this example we will print output about the workers, so we start with a simple function to produce an identifier for each communication channel.
> channelid := func<I | t where _,t := SocketInformation(I)>;The manager takes a sequence of work items as its argument (in this example, the work items are simply integers). It would also be reasonable for it to take the host and port information that it should listen on (since the workers will need to know this), but we instead let the system select those values and print them out.
> manager := procedure(workleft) > server := Socket(); > host, port := Explode(SocketInformation(server)); > printf "Manager listening on port %o of %o n", port, host; > busy := ; > while #workleft gt 0 or #busy gt 0 do > ready := WaitForIO([server] cat Setseq(busy)); > idle := [ ]; // workers awaiting work > for I in ready do > if I eq server then // new worker has connected > W := WaitForConnection(I); > printf "New worker %o joined n", channelid(W); > AsyncExchangeVersions(W); > Append(~idle, W); > else > result := ReadObject(I); > n, count, maxv := Explode(result); > printf "%o: %o reached 1 (max %o) after %o iterations n", > channelid(I), n, maxv, count; > Exclude(~busy, I); > Append(~idle, I); > end if; > end for; > for W in idle do > if #workleft eq 0 then break; end if; > data := workleft[1]; > Remove(~workleft, 1); > AsyncWriteObject(W, data); > AsyncReadObject(W); > Include(~busy, W); > end for; > end while; > end procedure;First the manager creates the server socket for workers to connect to; it prints out the associated information so that we know what arguments to give when we start the workers.
During the computation, the set busy will keep track of those channels associated with workers that currently have work assigned to them, and the sequence idle will track those workers which are waiting to have work assigned to them. The manager is done only when all work has been assigned, and all assigned work has been completed.
We wait for new workers to connect (on the server socket) and for completed work to be reported by the busy workers. A subtle point in the above is that we used the version of WaitForIO that only waits for read events. This is the typical case --- the manager queues both the write and the read, and the worker will only send results after having read the data. Thus only the completion of the read event is of interest to the manager.
After a new worker joins then version exchange is queued immediately, since we will be sending and receiving objects. Note the use of the asynchronous AsyncExchangeVersions for this, in order to keep the manager responsive. The worker is then added to the idle list.
When a worker has completed its task and the result is ready, we perform the synchronous ReadObject to get this result. This is safe (that is, will not delay arbitrarily) since the manager earlier issued the corresponding asynchronous request AsyncReadObject and the channel becoming ready for reading indicates that this request has completed. We then mark the worker as idle (and no longer busy).
Finally, for each idle worker we attempt to (asynchronously) assign remaining work to it. We queue both the write and the following read, and mark the worker as busy.
We now demonstrate the code in action with two workers. First we start the manager in order to get the channel information for the workers to use.
> // manager > manager([1..10]); Manager listening on port 46119 of 0.0.0.0Then we launch the workers (in this case, on the same machine).
> // worker 1 > worker("localhost", 46119);
> // worker 2 > worker("localhost", 46119);And the manager displays the progress of the computation for us. Note that without the introduced delay the first worker would probably have performed all the computation before we could launch the second one.
New worker <"127.0.0.1", 43352> joined <"127.0.0.1", 43352>: 1 reached 1 (max 1) after 0 iterations New worker <"127.0.0.1", 43354> joined <"127.0.0.1", 43354>: 3 reached 1 (max 16) after 7 iterations <"127.0.0.1", 43352>: 2 reached 1 (max 2) after 1 iterations <"127.0.0.1", 43352>: 5 reached 1 (max 16) after 5 iterations <"127.0.0.1", 43354>: 4 reached 1 (max 4) after 2 iterations <"127.0.0.1", 43354>: 7 reached 1 (max 52) after 16 iterations <"127.0.0.1", 43352>: 6 reached 1 (max 16) after 8 iterations <"127.0.0.1", 43354>: 8 reached 1 (max 8) after 3 iterations <"127.0.0.1", 43352>: 9 reached 1 (max 52) after 19 iterations <"127.0.0.1", 43354>: 10 reached 1 (max 16) after 6 iterations
A manager cannot tell which user created the worker that is connecting to it. (Indeed, the very concept is not well-defined if the worker is running on another machine.) Thus it is possible for someone on another machine on the same network to connect to "your" manager, whether by accident or otherwise, and this cannot be directly distinguished from an intended connection from a worker under your control. Problems may then arise if this worker does not interact in the expected way with the manager.
If you are reasonably sure that no-one else with access to your machine or its network will act in such a way, then you can simply use the manager/worker functions as described above. If your machine is accessible to a wider network, however, then some kind of authentication process is probably desirable. A simple method is to require that client connections first send some data known only to the server, and have the server reject connections which do not do so.
Unfortunately, such behaviour is not currently integrated with the manager/worker routines described here, and so they cannot be used when authentication is a requirement. The code in Section Implementing Distributed Parallelism can be used as a base for re-implementing manager/worker functionality with authentication added.
A more robust and convenient version of authentication is being worked upon, and will appear in a later patch release.