diff --git a/src/slang/parallel_map_chunked.sl b/src/slang/parallel_map_chunked.sl new file mode 100644 index 0000000000000000000000000000000000000000..93907a8a2981dbbb3bd9a3ff3bed5d5f0a6c1c91 --- /dev/null +++ b/src/slang/parallel_map_chunked.sl @@ -0,0 +1,171 @@ +private variable __parallel_map_chunk_register = struct { + returnTypes = NULL, + userFunction = NULL, + args = NULL +}; + +private define __parallel_map_chunk_run_task(id) { + variable ii; + if ( + __parallel_map_chunk_register.returnTypes == NULL || + __parallel_map_chunk_register.userFunction == NULL || + __parallel_map_chunk_register.args == NULL + ) { + throw InternalError, "Argument passing register not fully populated"; + } + variable args = {}; + _for ii(0, length(__parallel_map_chunk_register.returnTypes)-1, 1) { + list_append(args, __parallel_map_chunk_register.returnTypes[ii]); + } + variable nRet = length(args); + list_append(args, __parallel_map_chunk_register.userFunction); + _for ii(0, length(__parallel_map_chunk_register.args)-1, 1) { + list_append(args, __parallel_map_chunk_register.args[ii][id]); + } + array_map(__push_list(args);; __qualifiers); + % Leaving the result purposfully on the stack to be picked up by the caller +} + +%!%+ +%\function{parallel_map_chunked} +%\synopsis{More efficient version of parallel_map} +%\usage{[(Return_Values, ...) =] parallel_map_chunked ([Return_Types...,] &func, args, ... [; qualifiers])} +%\description +% Performs the same operation as parallel_map, but does not spawn an +% individual process for each element but only one process per slave. +% This can be much more efficient under certain circumstances, for example +% when iterating very often using a function with short execution time. +% See the help of parallel_map for more information. +% Under the hood it uses a combination of parallel_map and array_map. +% Make sure to set Isis_Slaves.num_slaves to a useful value, i.e. an integer +% of at least 1. +% +% IMPORTANT: Make sure that the supplied function does not have any +% sideeffects, since this function does not fork for each new process. +%\example +% See the example in parallel_map +%\seealso{parallel_map, array_map} +%!%- +define parallel_map_chunked() { + variable ii, jj; + if ( _NARGS < 2 ) { + throw UsageError, "At least 2 arguments required"; + } + variable args = __pop_list(_NARGS); + variable returnTypes = {}; + variable userFunction; + ii = 0; + while ( typeof(args[0]) == DataType_Type ) { + list_append(returnTypes, args[0]); + list_delete(args, 0); + } + if ( typeof(args[0]) != Ref_Type || not __is_callable(args[0]) ) { + throw UsageError, "First non datatype argument is not a callable reference"; + } + userFunction = args[0]; + list_delete(args, 0); + + if ( length(args) == 0 ) { + throw UsageError, "No arguments after the function reference given"; + } + + variable containerArgsMask = Integer_Type[length(args)]; + _for ii(0, length(args)-1, 1) { + containerArgsMask[ii] = typeof(args[ii]) == Array_Type || typeof(args[ii]) == List_Type; + } + variable lengths = Integer_Type[length(args)]; + _for ii(0, length(args)-1, 1) { + lengths[ii] = length(args[ii]); + } + variable lengthsVectors = lengths[where(containerArgsMask)]; + if ( length(lengthsVectors) > 0 && length(unique(lengthsVectors)) != 1 ) { + throw UsageError, "Lengths of vectors must be equal"; + } + + if ( Isis_Slaves.num_slaves == NULL ) { + throw UsageError, "Isis_Slaves.num_slaves is set to NULL"; + } + if ( Isis_Slaves.num_slaves == 0 ) { + throw UsageError, "Isis_Slaves.num_slaves is set to 0"; + } + if ( Isis_Slaves.num_slaves < 0 ) { + throw UsageError, "Isis_Slaves.num_slaves is negative?"; + } + variable numTasks = length(lengthsVectors) > 0 ? lengthsVectors[0] : 1; + variable numProcs = min([Isis_Slaves.num_slaves, numTasks]); + variable tasksPerProc = numTasks / numProcs; + variable tasksOverhead = numTasks mod numProcs; + + variable todoPerProc = Integer_Type[numProcs]; + _for ii(0, numProcs-1, 1) { + todoPerProc[ii] = tasksPerProc + (ii < tasksOverhead); + } + if ( sum(todoPerProc) != numTasks ) { + throw InternalError, "Sum of tasks per process does not match total number of tasks"; + } + + variable procTasksIStart = Integer_Type[numProcs]; + variable procTasksIEnd = Integer_Type[numProcs]; + procTasksIStart[*] = -1; + procTasksIEnd[*] = -1; + + procTasksIStart[0] = 0; + procTasksIEnd[0] = todoPerProc[0] - 1; + _for ii(1, numProcs-1, 1) { + procTasksIStart[ii] = procTasksIEnd[ii-1] + 1; + procTasksIEnd[ii] = procTasksIStart[ii] + todoPerProc[ii] - 1; + if ( procTasksIEnd[ii] - procTasksIStart[ii] + 1 != todoPerProc[ii] ) { + throw InternalError, sprintf("Difference in indicies does not match tasks per process in segment %d", ii+1); + } + } + + variable subArgs = {}; + _for ii(0, length(args)-1, 1) { + variable thing; + !if ( containerArgsMask[ii] ) { + thing = @typeof(args[ii])[numProcs]; + thing[*] = args[ii]; + list_append(subArgs, thing); + continue; + } + thing = @typeof(args[ii])[numProcs]; + _for jj(0, numProcs-1, 1) { + thing[jj] = args[ii][[procTasksIStart[jj]:procTasksIEnd[jj]]]; + } + list_append(subArgs, thing); + } + + % There are several reasons to do it this way + % 1.: References can not be passed as an argument to parallel_map due to + % a limitation of fwrite + % 2.: Putting this into a global variable where it can easily be accessed + % by the child processes combined with copy on write makes the access + % faster than putting it through the stack and passing via IPC + __parallel_map_chunk_register.returnTypes = returnTypes; + __parallel_map_chunk_register.userFunction = userFunction; + __parallel_map_chunk_register.args = subArgs; + + variable parallelArgs = {}; + _for ii(0, length(returnTypes)-1, 1) { + list_append(parallelArgs, Array_Type); + } + list_append(parallelArgs, &__parallel_map_chunk_run_task); + list_append(parallelArgs, [0:numProcs-1]); + try { + parallel_map(__push_list(parallelArgs);; __qualifiers); + } finally { + __parallel_map_chunk_register.returnTypes = NULL; + __parallel_map_chunk_register.userFunction = NULL; + __parallel_map_chunk_register.args = NULL; + } + variable results = __pop_list(length(returnTypes)); + variable ret = {}; + _for ii(0, length(returnTypes)-1, 1) { + thing = returnTypes[ii][numTasks]; + _for jj(0, numProcs-1, 1) { + thing[[procTasksIStart[jj]:procTasksIEnd[jj]]] = results[ii][jj]; + } + list_append(ret, thing); + } + __push_list(ret); +}