Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Job arrays #3892

Merged
merged 113 commits into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from 62 commits
Commits
Show all changes
113 commits
Select commit Hold shift + click to select a range
41ac662
Add initial array executor
bentsherman Apr 21, 2023
a0900ae
Add support for SLURM array jobs
bentsherman Apr 21, 2023
b5bad00
Fix failing test
bentsherman Apr 21, 2023
713e33e
Document array executor
bentsherman Apr 21, 2023
76de0d4
Use concurrent queues in array executor, add fallback for leftover tasks
bentsherman Apr 21, 2023
1ef005c
Move batching logic to array task polling monitor
bentsherman Apr 21, 2023
27728eb
Cache status checks in array task handler
bentsherman Apr 21, 2023
343b331
Finalize support for SLURM array jobs
bentsherman Apr 24, 2023
a70e6b4
Merge branch 'master' into 1477-job-array-executor [ci skip]
bentsherman Apr 25, 2023
e5ca280
Refactor array executor as process directive
bentsherman Apr 25, 2023
ed5089d
minor edits
bentsherman Apr 25, 2023
5b36129
Support array jobs for LSF, PBS, SGE
bentsherman Apr 25, 2023
d162948
Replace synchronized with atomics and locks
bentsherman Apr 26, 2023
3e38ec0
Merge branch 'master' into 1477-job-array-executor
pditommaso Apr 30, 2023
99e92d5
Add array job support to AWS Batch
bentsherman Apr 30, 2023
33502e7
Merge branch '1477-job-array-executor' of github.com:nextflow-io/next…
bentsherman Apr 30, 2023
75135d7
Fix bug in awsbatch + fusion + array job
bentsherman Apr 30, 2023
2ee42ae
Support grid + fusion + array jobs
bentsherman Apr 30, 2023
3d84594
Refactor ArrayTask* to TaskArray*
bentsherman May 1, 2023
fc78470
Reduce code duplication
bentsherman May 1, 2023
45a1c35
minor edits
bentsherman May 1, 2023
16af92f
Fix failing tests
bentsherman May 1, 2023
98cfb0e
Reduce code duplication
bentsherman May 2, 2023
f02de40
Reduce code duplication
bentsherman May 2, 2023
eac8099
Merge branch 'master' into 1477-job-array-executor
abhi18av May 2, 2023
603913f
minor edits
bentsherman May 2, 2023
fe3a3d7
minor edits
bentsherman May 2, 2023
aaaa35d
Use TaskRun subclass to submit array job in a generic manner
bentsherman May 2, 2023
6945450
Minor changes
bentsherman May 3, 2023
f95c7d0
Fix bugs in array submit script
bentsherman May 3, 2023
44f6825
Fix Fusion support
bentsherman May 3, 2023
12612a3
Replace array submitter with array handling logic in the task polling…
bentsherman May 3, 2023
b9dafe0
Fix task reporting in log observer
bentsherman May 3, 2023
5a9f6de
Merge branch 'master' into 1477-job-array-executor
abhi18av May 7, 2023
ffc6d31
Fix missing method error
bentsherman May 3, 2023
7ec397f
Add support for AWS SSE env variables
pditommaso May 24, 2023
324681b
Add support for google batch
bentsherman May 26, 2023
eb0750f
Merge branch 'master' into 1477-job-array-executor
bentsherman May 26, 2023
d3d0377
Fix static compilation errors
bentsherman May 26, 2023
acea09f
Delete entire AWS Batch array job on workflow termination
bentsherman May 26, 2023
91b61a5
Merge branch 'master' into 1477-job-array-executor
bentsherman May 29, 2023
d78daa8
Fix failing test
bentsherman May 29, 2023
75dc404
Fix issues with google batch array jobs
bentsherman May 31, 2023
7c7a7b3
Rename getSubmitCommand() to getLaunchCommand()
bentsherman May 31, 2023
52f3195
Fix SLURM+Fusion+array jobs, cleanup
bentsherman May 31, 2023
c5e8b76
Remove unrelated changes
bentsherman May 31, 2023
14b2455
Submit retried tasks directly
bentsherman May 31, 2023
0de3c84
Add unit tests
bentsherman Jun 1, 2023
55d7941
Update docs [ci fast]
bentsherman Jun 13, 2023
eeeb12d
Fix issues with LSF array jobs
bentsherman Jun 13, 2023
fd10361
Add array index name to env whitelist
bentsherman Jun 14, 2023
782dea2
Disable non-native container run for task array
bentsherman Jun 15, 2023
eb45069
Fix incorrect index offset in array job script
bentsherman Jun 21, 2023
50c300c
Change SGE array index start to 1
bentsherman Jun 22, 2023
c206a5c
minor edits
bentsherman Jun 23, 2023
d4ab783
Change SGE array index start to 1 (for real this time)
bentsherman Jul 13, 2023
786a0b3
Use Bolts.withLock
bentsherman Jul 27, 2023
68ea23f
Add array jobs to CRG executor
bentsherman Sep 15, 2023
28dbe47
Add array jobs to CRG executor
bentsherman Sep 15, 2023
6c7494a
Add array job to SGE parseJobId
bentsherman Sep 18, 2023
dfca759
Merge branch 'master' into 1477-job-array-executor
bentsherman Jan 24, 2024
1929773
Refactor "array job" -> "job array"
bentsherman Jan 24, 2024
fcdd6b7
Merge branch 'master' into 1477-job-array-executor
pditommaso Apr 21, 2024
50c8f6f
Fix intellij compiler errors [ci fast]
pditommaso Apr 21, 2024
c5a12ec
Apply suggestions from review
bentsherman Apr 22, 2024
9897a6c
Fix failing tests
bentsherman Apr 23, 2024
c15e5ec
Stability improvements
bentsherman Apr 23, 2024
ba176e0
Improve construction of job array
bentsherman Apr 24, 2024
584c961
Add process directives to wrapper script for Fusion
bentsherman Apr 24, 2024
7d8aa5c
Merge branch 'master' into 1477-job-array-executor
pditommaso Apr 26, 2024
49f7fbd
Prefer try/finally idiom to avoid closure [ci fast]
pditommaso Apr 26, 2024
99a1ec3
Apply suggestions from review
bentsherman Apr 26, 2024
bf04e3f
Fix bug with job array context
bentsherman Apr 29, 2024
67f76a1
Merge branch 'master' into 1477-job-array-executor
pditommaso May 1, 2024
16dfb25
Add integration tests
pditommaso May 1, 2024
0e88bf7
Restore private method access (#4961) [ci fast]
pditommaso May 1, 2024
11aeaae
Minor changes
pditommaso May 1, 2024
519c93a
Add aws batch integration test
pditommaso May 1, 2024
ade72e2
Revert re-use of NXF_CHDIR in job array script
bentsherman May 1, 2024
8c13ec3
Add unit tests for grid executors
bentsherman May 1, 2024
b7e829d
Add unit test for crg executor
bentsherman May 1, 2024
6f6e5a8
Fix TaskArrayRun access to private methods
pditommaso May 1, 2024
4bb0e98
Fix typ
pditommaso May 2, 2024
beddbbd
Add integration tests
pditommaso May 2, 2024
1713167
Use idiomatic name for task array dir var
pditommaso May 2, 2024
5137589
Merge branch 'master' into 1477-job-array-executor
pditommaso May 2, 2024
bf859bd
Use TaskArrayRun as return type
pditommaso May 2, 2024
a4518a3
Promote status update methods private
pditommaso May 2, 2024
56eb754
Minor change
pditommaso May 2, 2024
8afc6a9
Refactor task name in task meta comment
pditommaso May 2, 2024
b42ccbb
Force new test
pditommaso May 3, 2024
7e411fd
Disable job log when submitting job arrays to grid schedulers
bentsherman May 3, 2024
653064f
Fix collectFile saving to GCS with sort: false (#4965)
bentsherman May 2, 2024
ea1cc2c
Fix script error text alignment (#4681)
mahesh-panchal May 2, 2024
7367687
Use for instead eachLine in error formatting [ci fast]
pditommaso May 3, 2024
be536f8
Update aws.md to include Cluster access (#4951) [ci skip]
Kartstig May 3, 2024
d899ebc
Add Wave and Fusion info to workflow metadata (#4945)
marcodelapierre May 3, 2024
11d4a87
Strip auth secret from logs
pditommaso May 4, 2024
a9d5d86
[ci skip] empty
pditommaso May 5, 2024
174bdfb
Job array refactor (#4973)
pditommaso May 6, 2024
12656bc
Merge branch 'master' into 1477-job-array-executor
bentsherman May 6, 2024
a77cfd5
Move misplaced test
bentsherman May 3, 2024
4e41436
Strengthen aws batch deletion logi
pditommaso May 7, 2024
e8b701d
Strengthen google batch deletion logic
pditommaso May 7, 2024
3291f6c
Merge branch 'master' into 1477-job-array-executor
pditommaso May 7, 2024
b7ce0a1
minor edits
bentsherman May 7, 2024
3e29adc
Update google batch logging to select task logs
bentsherman May 7, 2024
247b721
Fix race condition in LogsCheckpoint
bentsherman May 7, 2024
e086674
Revert "Fix race condition in LogsCheckpoint"
pditommaso May 8, 2024
cbbbc46
Resolve conflicts
pditommaso May 8, 2024
6b8bb49
Restore grid handler names [ci fast]
pditommaso May 9, 2024
912f90d
Revert submit naming
pditommaso May 9, 2024
4b45aa0
Revert volatile change
pditommaso May 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions docs/process.md
Original file line number Diff line number Diff line change
Expand Up @@ -1349,6 +1349,64 @@ Allowed values for the `arch` directive are as follows, grouped by equivalent fa

Examples of values for the architecture `target` option are `cascadelake`, `icelake`, `zen2` and `zen3`. See the Spack documentation for the full and up-to-date [list of meaningful targets](https://spack.readthedocs.io/en/latest/basic_usage.html#support-for-specific-microarchitectures).

(process-array)=

## array

:::{versionadded} 24.02.0-edge
:::

:::{warning} *Experimental: may change in a future release.*
:::

The `array` directive allows you to submit tasks as *job arrays* for executors that support it.

A job array is a collection of jobs with the same resource requirements and the same script (parameterized by an index). Job arrays incur significantly less scheduling overhead compared to individual jobs, and as a result they are preferred by HPC schedulers where possible.

The directive should be specified with a given array size, along with an executor that supports job arrays. For example:

```groovy
process cpu_task {
executor 'slurm'
array 100

'''
your_command --here
'''
}
```

Nextflow currently supports job arrays for the following executors:

- {ref}`awsbatch-executor`
- {ref}`google-batch-executor`
- {ref}`lsf-executor`
- {ref}`pbs-executor`
- {ref}`pbspro-executor`
- {ref}`sge-executor`
- {ref}`slurm-executor`

A process using job arrays will collect tasks and submit each batch as a job array when it is ready. Any "leftover" tasks will be submitted as a partial job array.

Once a job array is submitted, each "child" task is executed as an independent job. Any tasks that fail (and can be retried) will be retried without interfering with the tasks that succeeded. Retried tasks are submitted individually rather than through a job array, in order to allow for the use of [dynamic resources](#dynamic-computing-resources).

The following directives must be uniform across all tasks in a process that uses job arrays, because these directives are specified once for the entire job array:

- {ref}`process-accelerator`
- {ref}`process-clusterOptions`
- {ref}`process-cpus`
- {ref}`process-disk`
- {ref}`process-machineType`
- {ref}`process-memory`
- {ref}`process-queue`
- {ref}`process-resourcelabels`
- {ref}`process-time`

For cloud-based executors like AWS Batch, or when using Fusion with any executor, the following additional directives must be uniform:

- {ref}`process-container`
- {ref}`process-containerOptions`

(process-beforescript)=

### beforeScript
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.commons.lang.StringUtils
*/
@Slf4j
@CompileStatic
abstract class AbstractGridExecutor extends Executor {
abstract class AbstractGridExecutor extends Executor implements TaskArrayAware {
bentsherman marked this conversation as resolved.
Show resolved Hide resolved

protected Duration queueInterval

Expand Down Expand Up @@ -407,5 +407,10 @@ abstract class AbstractGridExecutor extends Executor {
// Instead, it is the command wrapper script that is launched run within a container process.
return isFusionEnabled()
}

String getArrayTaskId(String jobId, int index) {
throw new UnsupportedOperationException("Executor '${name}' does not support job arrays")
}

bentsherman marked this conversation as resolved.
Show resolved Hide resolved
}

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package nextflow.executor

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.processor.TaskArray
import nextflow.processor.TaskRun
/**
* An executor specialised for CRG cluster
Expand All @@ -41,6 +42,11 @@ class CrgExecutor extends SgeExecutor {
task.config.penv = 'smp'
}

if( task instanceof TaskArray ) {
final arraySize = task.getArraySize()
result << '-t' << "1-${arraySize}".toString()
}

result << '-N' << getJobNameFor(task)
result << '-o' << quote(task.workDir.resolve(TaskRun.CMD_LOG))
result << '-j' << 'y'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ abstract class Executor {
*
* @param task A {@code TaskRun} instance
*/
final void submit( TaskRun task ) {
void submit( TaskRun task ) {
log.trace "Scheduling process: ${task}"

if( session.isTerminated() ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@ import nextflow.exception.ProcessNonZeroExitStatusException
import nextflow.file.FileHelper
import nextflow.fusion.FusionAwareTask
import nextflow.fusion.FusionHelper
import nextflow.processor.TaskArray
import nextflow.processor.TaskHandler
import nextflow.processor.TaskRun
import nextflow.trace.TraceRecord
import nextflow.util.CmdLineHelper
import nextflow.util.Duration
import nextflow.util.Escape
import nextflow.util.Throttle
/**
* Handles a job execution in the underlying grid platform
Expand Down Expand Up @@ -100,6 +102,27 @@ class GridTaskHandler extends TaskHandler implements FusionAwareTask {
this.sanityCheckInterval = duration
}

@Override
void prepareLauncher() {
// -- create the wrapper script
createTaskWrapper(task).build()
}

@Override
String getWorkDir() {
fusionEnabled()
? FusionHelper.toContainerMount(task.workDir).toString()
: task.workDir.toString()
}

@Override
List<String> getLaunchCommand() {
final workDir = Escape.path(getWorkDir())
final cmd = "bash ${workDir}/${TaskRun.CMD_RUN} 2>&1 | tee ${workDir}/${TaskRun.CMD_LOG}"
pditommaso marked this conversation as resolved.
Show resolved Hide resolved

List.of('bash', '-o', 'pipefail', '-c', cmd.toString())
}

protected ProcessBuilder createProcessBuilder() {

// -- log the qsub command
Expand Down Expand Up @@ -189,7 +212,7 @@ class GridTaskHandler extends TaskHandler implements FusionAwareTask {
// -- wait the the process completes
final result = process.text
final exitStatus = process.waitFor()
final cmd = launchCmd0(builder,pipeScript)
final cmd = submitCmd0(builder,pipeScript)

if( exitStatus ) {
throw new ProcessNonZeroExitStatusException("Failed to submit process to grid scheduler for execution", result, exitStatus, cmd)
Expand Down Expand Up @@ -237,12 +260,12 @@ class GridTaskHandler extends TaskHandler implements FusionAwareTask {
return result
}

protected String launchCmd0(ProcessBuilder builder, String pipeScript) {
protected String submitCmd0(ProcessBuilder builder, String pipeScript) {
def result = CmdLineHelper.toLine(builder.command())
if( pipeScript ) {
result = "cat << 'LAUNCH_COMMAND_EOF' | ${result}\n"
result = "cat << 'SUBMIT_COMMAND_EOF' | ${result}\n"
result += pipeScript.trim() + '\n'
result += 'LAUNCH_COMMAND_EOF\n'
result += 'SUBMIT_COMMAND_EOF\n'
pditommaso marked this conversation as resolved.
Show resolved Hide resolved
}
return result
}
Expand All @@ -254,17 +277,15 @@ class GridTaskHandler extends TaskHandler implements FusionAwareTask {
void submit() {
ProcessBuilder builder = null
try {
// -- create the wrapper script
createTaskWrapper(task).build()
// -- start the execution and notify the event to the monitor
builder = createProcessBuilder()
// -- forward the job launcher script to the command stdin if required
final stdinScript = executor.pipeLauncherScript() ? stdinLauncherScript() : null
// -- execute with a re-triable strategy
final result = safeExecute( () -> processStart(builder, stdinScript) )
// -- save the JobId in the
this.jobId = executor.parseJobId(result)
this.status = SUBMITTED
// -- save the job id
final jobId = (String)executor.parseJobId(result)
this.onSubmit(jobId)
log.debug "[${executor.name.toUpperCase()}] submitted process ${task.name} > jobId: $jobId; workDir: ${task.workDir}"

}
Expand All @@ -284,6 +305,18 @@ class GridTaskHandler extends TaskHandler implements FusionAwareTask {

}

void onSubmit(String jobId) {
if( task instanceof TaskArray ) {
task.children.eachWithIndex { handler, i ->
final arrayTaskId = executor.getArrayTaskId(jobId, i)
((GridTaskHandler)handler).onSubmit(arrayTaskId)
}
}
else {
this.jobId = jobId
this.status = SUBMITTED
}
}

private long startedMillis

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.util.regex.Pattern
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.fusion.FusionHelper
import nextflow.processor.TaskArray
import nextflow.processor.TaskRun
/**
* Processor for LSF resource manager
Expand Down Expand Up @@ -104,7 +105,13 @@ class LsfExecutor extends AbstractGridExecutor {
}

// -- the job name
result << '-J' << getJobNameFor(task)
if( task instanceof TaskArray ) {
final arraySize = task.getArraySize()
result << '-J' << "${getJobNameFor(task)}[1-${arraySize}]".toString()
}
else {
result << '-J' << getJobNameFor(task)
}

// -- at the end append the command script wrapped file name
result.addAll( task.config.getClusterOptionsAsList() )
Expand Down Expand Up @@ -304,4 +311,14 @@ class LsfExecutor extends AbstractGridExecutor {
boolean isFusionEnabled() {
return FusionHelper.isFusionEnabled(session)
}

@Override
String getArrayIndexName() { 'LSB_JOBINDEX' }

@Override
int getArrayIndexStart() { 1 }

@Override
String getArrayTaskId(String jobId, int index) { "${jobId}[${index + 1}]" }

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util.regex.Pattern

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.processor.TaskArray
import nextflow.processor.TaskRun
/**
* Implements a executor for PBS/Torque cluster
Expand All @@ -43,6 +44,11 @@ class PbsExecutor extends AbstractGridExecutor {
protected List<String> getDirectives( TaskRun task, List<String> result ) {
assert result !=null

if( task instanceof TaskArray ) {
final arraySize = task.getArraySize()
result << '-J' << "0-${arraySize - 1}".toString()
}

result << '-N' << getJobNameFor(task)
result << '-o' << quote(task.workDir.resolve(TaskRun.CMD_LOG))
result << '-j' << 'oe'
Expand Down Expand Up @@ -174,4 +180,13 @@ class PbsExecutor extends AbstractGridExecutor {
static protected boolean matchOptions(String value) {
value ? OPTS_REGEX.matcher(value).find() : null
}

@Override
String getArrayIndexName() { 'PBS_ARRAY_INDEX' }

@Override
String getArrayTaskId(String jobId, int index) {
jobId.replace('[]', "[$index]")
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package nextflow.executor

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.processor.TaskArray
import nextflow.processor.TaskRun
/**
* Implements a executor for PBSPro cluster executor
Expand All @@ -44,7 +45,12 @@ class PbsProExecutor extends PbsExecutor {
@Override
protected List<String> getDirectives(TaskRun task, List<String> result ) {
assert result !=null


if( task instanceof TaskArray ) {
final arraySize = task.getArraySize()
result << '-J' << "0-${arraySize - 1}".toString()
}

// when multiple competing directives are provided, only the first one will take effect
// therefore clusterOptions is added as first to give priority over other options as expected
// by the clusterOptions semantics -- see https://github.com/nextflow-io/nextflow/pull/2036
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import java.nio.file.Path

import groovy.transform.CompileStatic
import nextflow.fusion.FusionHelper
import nextflow.processor.TaskArray
import nextflow.processor.TaskRun
/**
* Execute a task script by running it on the SGE/OGE cluster
Expand All @@ -37,6 +38,11 @@ class SgeExecutor extends AbstractGridExecutor {
*/
protected List<String> getDirectives(TaskRun task, List<String> result) {

if( task instanceof TaskArray ) {
final arraySize = task.getArraySize()
result << '-t' << "1-${arraySize}".toString()
}

result << '-N' << getJobNameFor(task)
result << '-o' << quote(task.workDir.resolve(TaskRun.CMD_LOG))
result << '-j' << 'y'
Expand Down Expand Up @@ -114,8 +120,14 @@ class SgeExecutor extends AbstractGridExecutor {
if( entry.toString().isLong() )
return entry

if( (id=entry.tokenize('.').get(0)).isLong() )
return id

if( entry.startsWith('Your job') && entry.endsWith('has been submitted') && (id=entry.tokenize().get(2)) )
return id

if( entry.startsWith('Your job array') && entry.endsWith('has been submitted') && (id=entry.tokenize().get(3)) )
return id.tokenize('.').get(0)
}

throw new IllegalStateException("Invalid SGE submit response:\n$text\n\n")
Expand Down Expand Up @@ -185,4 +197,14 @@ class SgeExecutor extends AbstractGridExecutor {
boolean isFusionEnabled() {
return FusionHelper.isFusionEnabled(session)
}

@Override
String getArrayIndexName() { 'SGE_TASK_ID' }

@Override
int getArrayIndexStart() { 1 }

@Override
String getArrayTaskId(String jobId, int index) { "${jobId}.${index}" }

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.regex.Pattern
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.fusion.FusionHelper
import nextflow.processor.TaskArray
import nextflow.processor.TaskRun
/**
* Processor for SLURM resource manager
Expand Down Expand Up @@ -54,6 +55,11 @@ class SlurmExecutor extends AbstractGridExecutor {
*/
protected List<String> getDirectives(TaskRun task, List<String> result) {

if( task instanceof TaskArray ) {
final arraySize = task.getArraySize()
result << '--array' << "0-${arraySize - 1}".toString()
}

result << '-J' << getJobNameFor(task)
result << '-o' << quote(task.workDir.resolve(TaskRun.CMD_LOG)) // -o OUTFILE and no -e option => stdout and stderr merged to stdout/OUTFILE
result << '--no-requeue' << '' // note: directive need to be returned as pairs
Expand Down Expand Up @@ -211,4 +217,11 @@ class SlurmExecutor extends AbstractGridExecutor {
boolean isFusionEnabled() {
return FusionHelper.isFusionEnabled(session)
}

@Override
String getArrayIndexName() { 'SLURM_ARRAY_TASK_ID' }

@Override
String getArrayTaskId(String jobId, int index) { "${jobId}_${index}" }

}
Loading
Loading