About parallelizing tool executions
To achieve the parallelization of several executions of the same tool, the CGC implements a Common Workflow Language (CWL) feature called scattering.
Scattering is a mechanism that applies to a particular tool and one of its input ports. If a tool is passed a list of inputs on one port and that port is marked as "scattered", then one job will be created for each input in the list.
The scheduling algorithm will have these jobs be run in parallel, as far a the available compute resources allow it. If all jobs cannot be executed in parallel, they will be queued for execution as soon as more resources become available.
Scattering on a critical tool in your workflow may shorten the workflow's run time significantly. For an example of how this can be achieved, see this blog post explaining how a whole genome analysis workflow uses scattering.
Note that scattering is different from performing batch analyses. Batching launches multiple tasks, whereas scattering happens within a single task.
Scatter by single input port
This method performs the same in CWL sbg:draft-2 and v1.x versions.
To understand how scattering on a single input port works, please look at the explanation and the accompanying diagram below:
- We have an input array with two elements, A1 and A2, which can be two files, that we want to provide to an app through a single input port. We want the app to process both input values at the same time and produce an output for each of the values. Note that an input element within an input array can be any available value type, such as File, Directory, string, nested array, etc. A job is created for each element in the input array, regardless of the type of the element.
- The input values are provided on an input port that has scattering enabled. Scattering means that the individual input values, A1 and A2, will be processed in parallel if there are enough computation resources to support the workload.
- Separate jobs (working instances of the app that does the processing) are generated and executed to process A1 and A2 in parallel.
- Outputs resulting from both processing jobs are sent to a single output port.
- The output port produces an array containing output values B1 and B2, resulting from inputs A1 and A2 respectively.
Scatter by multiple input ports
There are several methods for scattering by multiple input ports. Each of the methods is explained in more detail in the sections below.
Dot Product
This method takes one value from each input port and uses them to create a job. There will be as many jobs as there are values provided to one input port. Therefore, the method works properly only if the number of provided input values on each input port is the same. If that is not the case, an error is thrown.
To understand how the dot product method works, please look at the explanation and the accompanying diagram below:
- We have two input ports and the values provided to those ports are input array containing values A1 and A2 for input port A, and input array containing values B1 and B2 for input port B. There are two values provided on each input port, which meets the requirement of having to provide an equal number of values on all scattered input ports if using the dot product method.
- Scattering is enabled on all input ports we want to scatter by, in this case both port A and B.
- Separate jobs (working instances of the app that does the processing) are generated by taking the first values from each input (A1 and B1) to create one job, and the second ones (A2 and B2) to create the second job. These are executed in parallel.
- Outputs resulting from both processing jobs are sent to a single output port.
- The output port produces an output array containing output values C11 and C22, the first one resulting from inputs A1 and B1, and the second one resulting from inputs A2 and B2, respectively.
This is what the dot product scatter method is noted in an app's CWL description, with A and B being identifiers of the scattered input ports:
scatter: [A, B]
scatterMethod: "dotproduct"
Nested Cross Product
This method specifies the Cartesian product of the inputs, producing a job for every combination of input values provided on scattered input ports. The output must be a nested array for each level of scattering in the order in which the input arrays are listed in the scatter field.
To understand how the nested cross product method works, please look at the explanation and the accompanying diagram below:
- We have two input ports and the values provided to those ports are input array containing values A1 and A2 for input port A, and input array containing values B1 and B2 for input port B.
- Scattering is enabled on all input ports we want to scatter by, in this case both port A and B.
- Separate jobs (working instances of the app that does the processing) are generated by combining each input value from port A with each input value from port B (A1 and B1, A1 and B2, A2 and B1, A2 and B2), which gives us a total of 4 jobs when we have two input ports with two input values provided on each port. The jobs are executed in parallel.
- Outputs resulting from both processing jobs are sent to a single output port.
- The output port produces an output array containing one output value per job, in this case value C11 resulting from inputs A1 and B1, C12 resulting from inputs A1 and B2, C21 resulting from inputs A2 and B1, and C22 resulting from inputs A2 and B2. The important thing to note is that outputs are grouped (nested) for each level of scattering in the order in which the input arrays are listed in the scatter field. For example, if the first field that is selected for scattering is field A, the first group of outputs will be C11 and C12, which are results of processing input value A1 with input values B1 and B2 respectively. The second group of outputs will be C21 and C22, which are results of processing input value A2 with input values B1 and B2 respectively. Represented in array format, this would be the final output [[C11, C12], [C21, C22]].
This is what the nested cross product scatter method is noted in an app's CWL description, with A and B being identifiers of the scattered input ports and the first port in the scattering order being input port A:
scatter: [A, B]
scatterMethod: "nested_crossproduct"
Flat Cross Product
This method specifies the Cartesian product of the inputs, producing a job for every combination of input values provided on scattered input ports. The jobs are created and outputs are produced in the order in which the input ports are selected for scattering.
To understand how the flat cross product method works, please look at the explanation and the accompanying diagram below:
- There are two input ports and the values provided on those ports are input array containing values A1 and A2 for input port A, and input array containing values B1 and B2 for input port B.
- Scattering is enabled on all input ports we want to scatter by, in this case both port A and B.
- Separate jobs (working instances of the app that does the processing) are generated by combining each input value from port A with each input value from port B (A1 and B1, A1 and B2, A2 and B1, A2 and B2), which gives us a total of 4 jobs when we have two input ports with two input values provided on each port. The jobs are executed in parallel.
- Outputs resulting from both processing jobs are sent to a single output port.
- The output port produces an array containing one output value per job, in this case value C11 resulting from input values A1 and B1, C12 resulting from input values A1 and B2, C21 resulting from input values A2 and B1, and C22 resulting from input values A2 and B2. Note that outputs are presented in the order in which the input ports are selected for scattering. For example, if the first field that is selected for scattering is field A, the first outputs will be C11 and C12, which are results of processing input value A1 with input values B1 and B2 respectively. The following outputs will be C21 and C22, which are results of processing input value A2 with input values B1 and B2 respectively. Unlike the nested cross product method where the output values are grouped, the flat cross product method outputs a completely flat structure with no nesting (grouping) of output values. Represented as an array, this would be the final output [C11, C12, C21, C22].
This is what the flat cross product scatter method is noted in an app's CWL description, with A and B being identifiers of the scattered input ports and the first port in the scattering order being input port A:
scatter: [A, B]
scatterMethod: "flat_crossproduct"
Configuring scattering in the Workflow Editor
To configure scattering for an input port in a workflow step, first open the step configuration as follows:
- Navigate to a project.
- Click the Apps tab.
- Click the name of the workflow you want to edit.
- In the top-right corner click Edit. Workflow Editor opens.
- Double-click the node in the workflow whose input(s) you want to scatter. Object inspector opens.
- In the Object Inspector, click the Step tab. From this point on, follow the instructions below depending on the CWL version of your workflow.
To set scattering in CWL v1.x workflows:
- Under Scatter, select the input port(s) you want to scatter by. Hold
Cmd
(Ctrl
) orShift
to select multiple ports. - If you have selected more than one port in the previous step, under Scatter Method, select the method that best matches the way you want the input values to be paired and processed.
- Click the save icon to save the changes. You have now enabled scattering for the selected input ports.
To set scattering for workflow steps in sbg:draft-2
workflows (this CWL version allows scattering only by one input port):
- Under Scatter, select the input port you want to scatter by.
- Click the save icon to save the changes. You have now enabled scattering for the input port.
Keeping scattering under control
The power of scattering to reduce analysis time lies in making full use of the available compute resources. You can control the resources available for the execution of an app by specifying instance type and the number of instances to be used in parallel.
While scattering is a powerful tool to shorten your analysis run time, it may well increase the overall cost of your analysis if used in combination with certain other settings.
There are two ways in which you can fine-tune how the scattering works on a tool:
- Configuring computational instances on the tool.
- Setting the maximum number of parallel instances;
Controlling via instance type
Based on the scattered tool's resource requirements, you may want to pick an instance that leaves the least CPU and memory to waste for a given number of scattered jobs and maximum number of parallel instances. This blog post explains how to choose an instance suitable for your analysis.
To set the instance type, set the sbg:AWSInstanceType
or sbg:GoogleInstanceType
hint at workflow level.
Controlling via maximum number of parallel instances
If you anticipate that the execution of the tool which you are scattering is time-critical for the entire workflow, you can configure the maximum number of instances that the Scheduling Algorithm is allowed to have running at any one time.
If the jobs that would be started as a result of scattering cannot fit onto the provisioned instances according to their tool's resource requirements, those jobs will be queued for execution. As soon as enough resources become available following the completion of other jobs, queued jobs will be executed. This ensures there will be less idle time across the entire task.
The CGC bioinformaticians exploit this technique when tuning workflows in Public Apps
To set the maximum number of instances, set the sbg:maxNumberOfParallelInstances
hint at workflow level.
Updated less than a minute ago