Skip to content

Commit

Permalink
Merge branch 'master' into azure_batch_configurable_start_task
Browse files Browse the repository at this point in the history
  • Loading branch information
adamrtalbot committed Apr 5, 2024
2 parents 119c3ca + 74d7d7a commit b42d983
Show file tree
Hide file tree
Showing 13 changed files with 187 additions and 66 deletions.
8 changes: 7 additions & 1 deletion docs/aws.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

# AWS Cloud

:::{tip}
This page describes how to manually set up and use Nextflow with AWS Cloud.
You may be interested in using [Batch Forge](https://docs.seqera.io/platform/latest/compute-envs/aws-batch) in [Seqera Platform](https://seqera.io/platform/),
which automatically creates the required AWS infrastructure for you with minimal intervention.
:::

## AWS security credentials

Nextflow uses the [AWS security credentials](https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html) to make programmatic calls to AWS services.
Expand Down Expand Up @@ -256,7 +262,7 @@ There are several reasons why you might need to create your own [AMI (Amazon Mac

### Create your custom AMI

From the EC2 Dashboard, select **Launch Instance**, then select **Browse more AMIs**. In the new page, select
From the EC2 Dashboard, select **Launch Instance**, then select **Browse more AMIs**. In the new page, select
**AWS Marketplace AMIs**, and then search for **Amazon ECS-Optimized Amazon Linux 2 (AL2) x86_64 AMI**. Select the AMI and continue as usual to configure and launch the instance.

:::{note}
Expand Down
2 changes: 1 addition & 1 deletion docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ The following settings are available:
: Enable the automatic creation of batch pools depending on the pipeline resources demand (default: `true`).

`azure.batch.copyToolInstallMode`
: Specify where the `azcopy` tool used by Nextflow. When `node` is specified it's copied once during the pool creation. When `task` is provider, it's installed for each task execution (default: `node`).
: Specify where the `azcopy` tool used by Nextflow. When `node` is specified it's copied once during the pool creation. When `task` is provider, it's installed for each task execution. Finally when `off` is specified, the `azcopy` tool is not installed (default: `node`).

`azure.batch.deleteJobsOnCompletion`
: Delete all jobs when the workflow completes (default: `false`).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/

package nextflow.container
import java.nio.file.Path
import java.nio.file.Paths
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
/**
Expand All @@ -29,7 +31,11 @@ import groovy.util.logging.Slf4j
@CompileStatic
@Slf4j
class CharliecloudBuilder extends ContainerBuilder<CharliecloudBuilder> {

protected boolean useSquash

protected boolean writeFake

CharliecloudBuilder(String name) {
this.image = name
}
Expand All @@ -46,6 +52,12 @@ class CharliecloudBuilder extends ContainerBuilder<CharliecloudBuilder> {
if( params.containsKey('runOptions') )
addRunOptions(params.runOptions.toString())

if ( params.containsKey('useSquash') )
this.useSquash = params.useSquash?.toString() == 'true'

if ( params.containsKey('writeFake') )
this.writeFake = params.writeFake?.toString() == 'true'

if( params.containsKey('readOnlyInputs') )
this.readOnlyInputs = params.readOnlyInputs?.toString() == 'true'

Expand All @@ -60,8 +72,37 @@ class CharliecloudBuilder extends ContainerBuilder<CharliecloudBuilder> {
@Override
CharliecloudBuilder build(StringBuilder result) {
assert image
def imageStorage = Paths.get(image).parent.parent
def imageToRun = String

if (!writeFake) {
// define image to run, if --write-fake is not used this is a copy of the image in the current workDir
imageToRun = '"$NXF_TASK_WORKDIR"/container_' + image.split('/')[-1]

// optional squash
if (useSquash) {
imageToRun = imageToRun + '.squashfs'
}

result << 'ch-convert -i ch-image --storage '
// handle storage to deal with cases where CH_IMAGE_STORAGE is not set
result << imageStorage
result << ' '
result << image.split('/')[-1]
result << ' '
result << imageToRun
result << ' && '
}

result << 'ch-run --unset-env="*" -c "$NXF_TASK_WORKDIR" --set-env '

if (writeFake) {
result << '--write-fake '
// if we are using writeFake we do not need to create a temporary imagae
// image is run by name from the storage directory
imageToRun = image.split('/')[-1]
}

if (!readOnlyInputs)
result << '-w '

Expand All @@ -74,8 +115,8 @@ class CharliecloudBuilder extends ContainerBuilder<CharliecloudBuilder> {

if( runOptions )
result << runOptions.join(' ') << ' '

result << image
result << imageToRun
result << ' --'

runCommand = result.toString()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class CharliecloudCache {

private Duration pullTimeout = Duration.of('20min')

private String registry

/** Only for debugging purpose - do not use */
@PackageScope
CharliecloudCache() {}
Expand Down Expand Up @@ -77,8 +79,13 @@ class CharliecloudCache {
String simpleName(String imageUrl) {
def p = imageUrl.indexOf('://')
def name = p != -1 ? imageUrl.substring(p+3) : imageUrl

// add registry
if( registry )
name = registry + name

name = name.replace(':','+').replace('/','%')
return name
return name
}

/**
Expand Down Expand Up @@ -168,20 +175,21 @@ class CharliecloudCache {
return localPath
}

// final file = new File("${localPath.parent.parent.parent}/.${localPath.name}.lock")
final file = new File("${localPath.parent.parent.parent}/.ch-pulling.lock")
final wait = "Another Nextflow instance is pulling the image $imageUrl with Charliecloud -- please wait until the download completes"
final err = "Unable to acquire exclusive lock after $pullTimeout on file: $file"

final mutex = new FileMutex(target: file, timeout: pullTimeout, waitMessage: wait, errorMessage: err)
try {
mutex .lock { downloadCharliecloudImage0(imageUrl, localPath) }
}
finally {
file.delete()
}

int count = 0;
int maxTries = 5;
boolean imagePulled = false
while(!imagePulled) {
try {
downloadCharliecloudImage0(imageUrl, localPath)
imagePulled = true
} catch (e) {
if (++count == maxTries) throw e
log.info "Another image is currently pulled. Attempting again in 30 seconds [$count/$maxTries]"
Thread.sleep(30000)
}
}
return localPath

}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package nextflow.executor.local

import java.lang.reflect.InvocationTargetException
import java.util.concurrent.Callable
import java.util.concurrent.Future

Expand Down Expand Up @@ -91,11 +92,15 @@ class NativeTaskHandler extends TaskHandler {
boolean checkIfCompleted() {
if( isRunning() && result.isDone() ) {
status = TaskStatus.COMPLETED
if( result.get() instanceof Throwable ) {
task.error = (Throwable)result.get()
final ret = result.get()
if( ret instanceof InvocationTargetException ) {
task.error = ret.cause
}
else if( ret instanceof Throwable ) {
task.error = (Throwable)ret
}
else {
task.stdout = result.get()
task.stdout = ret
}
return true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,86 +36,110 @@ class CharliecloudBuilderTest extends Specification {
def path2 = Paths.get('/bar/data/file2')

expect:
new CharliecloudBuilder('busybox')
new CharliecloudBuilder('/cacheDir/img/busybox')
.build()
.runCommand == 'ch-run --unset-env="*" -c "$NXF_TASK_WORKDIR" --set-env -w -b "$NXF_TASK_WORKDIR" busybox --'
.runCommand == 'ch-convert -i ch-image --storage /cacheDir busybox "$NXF_TASK_WORKDIR"/container_busybox && ch-run --unset-env="*" -c "$NXF_TASK_WORKDIR" --set-env -w -b "$NXF_TASK_WORKDIR" "$NXF_TASK_WORKDIR"/container_busybox --'

new CharliecloudBuilder('busybox')
new CharliecloudBuilder('/cacheDir/img/busybox')
.params(runOptions: '-j --no-home')
.build()
.runCommand == 'ch-run --unset-env="*" -c "$NXF_TASK_WORKDIR" --set-env -w -b "$NXF_TASK_WORKDIR" -j --no-home busybox --'
.runCommand == 'ch-convert -i ch-image --storage /cacheDir busybox "$NXF_TASK_WORKDIR"/container_busybox && ch-run --unset-env="*" -c "$NXF_TASK_WORKDIR" --set-env -w -b "$NXF_TASK_WORKDIR" -j --no-home "$NXF_TASK_WORKDIR"/container_busybox --'

new CharliecloudBuilder('busybox')
new CharliecloudBuilder('/cacheDir/img/busybox')
.params(temp: '/foo')
.build()
.runCommand == 'ch-run --unset-env="*" -c "$NXF_TASK_WORKDIR" --set-env -w -b /foo:/tmp -b "$NXF_TASK_WORKDIR" busybox --'
.runCommand == 'ch-convert -i ch-image --storage /cacheDir busybox "$NXF_TASK_WORKDIR"/container_busybox && ch-run --unset-env="*" -c "$NXF_TASK_WORKDIR" --set-env -w -b /foo:/tmp -b "$NXF_TASK_WORKDIR" "$NXF_TASK_WORKDIR"/container_busybox --'

new CharliecloudBuilder('busybox')
new CharliecloudBuilder('/cacheDir/img/busybox')
.addEnv('X=1')
.addEnv(ALPHA:'aaa', BETA: 'bbb')
.build()
.runCommand == 'ch-run --unset-env="*" -c "$NXF_TASK_WORKDIR" --set-env -w --set-env=X=1 --set-env=ALPHA=aaa --set-env=BETA=bbb -b "$NXF_TASK_WORKDIR" busybox --'
.runCommand == 'ch-convert -i ch-image --storage /cacheDir busybox "$NXF_TASK_WORKDIR"/container_busybox && ch-run --unset-env="*" -c "$NXF_TASK_WORKDIR" --set-env -w --set-env=X=1 --set-env=ALPHA=aaa --set-env=BETA=bbb -b "$NXF_TASK_WORKDIR" "$NXF_TASK_WORKDIR"/container_busybox --'

new CharliecloudBuilder('ubuntu')
new CharliecloudBuilder('/cacheDir/img/ubuntu')
.addMount(path1)
.build()
.runCommand == 'ch-run --unset-env="*" -c "$NXF_TASK_WORKDIR" --set-env -w -b /foo/data/file1 -b "$NXF_TASK_WORKDIR" ubuntu --'
.runCommand == 'ch-convert -i ch-image --storage /cacheDir ubuntu "$NXF_TASK_WORKDIR"/container_ubuntu && ch-run --unset-env="*" -c "$NXF_TASK_WORKDIR" --set-env -w -b /foo/data/file1 -b "$NXF_TASK_WORKDIR" "$NXF_TASK_WORKDIR"/container_ubuntu --'

new CharliecloudBuilder('ubuntu')
new CharliecloudBuilder('/cacheDir/img/ubuntu')
.addMount(path1)
.addMount(path2)
.build()
.runCommand == 'ch-run --unset-env="*" -c "$NXF_TASK_WORKDIR" --set-env -w -b /foo/data/file1 -b /bar/data/file2 -b "$NXF_TASK_WORKDIR" ubuntu --'
.runCommand == 'ch-convert -i ch-image --storage /cacheDir ubuntu "$NXF_TASK_WORKDIR"/container_ubuntu && ch-run --unset-env="*" -c "$NXF_TASK_WORKDIR" --set-env -w -b /foo/data/file1 -b /bar/data/file2 -b "$NXF_TASK_WORKDIR" "$NXF_TASK_WORKDIR"/container_ubuntu --'
}

def db_file = Paths.get('/home/db')
def 'should get run command' () {

when:
def cmd = new CharliecloudBuilder('ubuntu').build().getRunCommand()
def cmd = new CharliecloudBuilder('/cacheDir/img/ubuntu')
.build()
.getRunCommand()
then:
cmd == 'ch-run --unset-env="*" -c "$NXF_TASK_WORKDIR" --set-env -w -b "$NXF_TASK_WORKDIR" ubuntu --'
cmd == 'ch-convert -i ch-image --storage /cacheDir ubuntu "$NXF_TASK_WORKDIR"/container_ubuntu && ch-run --unset-env="*" -c "$NXF_TASK_WORKDIR" --set-env -w -b "$NXF_TASK_WORKDIR" "$NXF_TASK_WORKDIR"/container_ubuntu --'

when:
cmd = new CharliecloudBuilder('ubuntu').build().getRunCommand('bwa --this --that file.fastq')
cmd = new CharliecloudBuilder('/cacheDir/img/ubuntu')
.params(useSquash: 'true')
.build()
.getRunCommand()
then:
cmd == 'ch-run --unset-env="*" -c "$NXF_TASK_WORKDIR" --set-env -w -b "$NXF_TASK_WORKDIR" ubuntu -- bwa --this --that file.fastq'
cmd == 'ch-convert -i ch-image --storage /cacheDir ubuntu "$NXF_TASK_WORKDIR"/container_ubuntu.squashfs && ch-run --unset-env="*" -c "$NXF_TASK_WORKDIR" --set-env -w -b "$NXF_TASK_WORKDIR" "$NXF_TASK_WORKDIR"/container_ubuntu.squashfs --'

when:
cmd = new CharliecloudBuilder('ubuntu').params(entry:'/bin/sh').build().getRunCommand('bwa --this --that file.fastq')
cmd = new CharliecloudBuilder('/cacheDir/img/ubuntu')
.params(writeFake: 'true')
.build()
.getRunCommand()
then:
cmd == 'ch-run --unset-env="*" -c "$NXF_TASK_WORKDIR" --set-env -w -b "$NXF_TASK_WORKDIR" ubuntu -- /bin/sh -c "bwa --this --that file.fastq"'
cmd == 'ch-run --unset-env="*" -c "$NXF_TASK_WORKDIR" --set-env --write-fake -w -b "$NXF_TASK_WORKDIR" ubuntu --'

when:
cmd = new CharliecloudBuilder('ubuntu').params(entry:'/bin/sh').params(readOnlyInputs: 'true').build().getRunCommand('bwa --this --that file.fastq')
cmd = new CharliecloudBuilder('/cacheDir/img/ubuntu')
.params(entry:'/bin/sh')
.build()
.getRunCommand('bwa --this --that file.fastq')
then:
cmd == 'ch-run --unset-env="*" -c "$NXF_TASK_WORKDIR" --set-env -b "$NXF_TASK_WORKDIR" ubuntu -- /bin/sh -c "bwa --this --that file.fastq"'
cmd == 'ch-convert -i ch-image --storage /cacheDir ubuntu "$NXF_TASK_WORKDIR"/container_ubuntu && ch-run --unset-env="*" -c "$NXF_TASK_WORKDIR" --set-env -w -b "$NXF_TASK_WORKDIR" "$NXF_TASK_WORKDIR"/container_ubuntu -- /bin/sh -c "bwa --this --that file.fastq"'

when:
cmd = new CharliecloudBuilder('ubuntu').params(entry:'/bin/sh').params(readOnlyInputs: 'false').build().getRunCommand('bwa --this --that file.fastq')
cmd = new CharliecloudBuilder('/cacheDir/img/ubuntu')
.params(entry:'/bin/sh')
.params(readOnlyInputs: 'true')
.build()
.getRunCommand('bwa --this --that file.fastq')
then:
cmd == 'ch-convert -i ch-image --storage /cacheDir ubuntu "$NXF_TASK_WORKDIR"/container_ubuntu && ch-run --unset-env="*" -c "$NXF_TASK_WORKDIR" --set-env -b "$NXF_TASK_WORKDIR" "$NXF_TASK_WORKDIR"/container_ubuntu -- /bin/sh -c "bwa --this --that file.fastq"'

when:
cmd = new CharliecloudBuilder('/cacheDir/img/ubuntu')
.params(entry:'/bin/sh')
.params(readOnlyInputs: 'false')
.build()
.getRunCommand('bwa --this --that file.fastq')
then:
cmd == 'ch-run --unset-env="*" -c "$NXF_TASK_WORKDIR" --set-env -w -b "$NXF_TASK_WORKDIR" ubuntu -- /bin/sh -c "bwa --this --that file.fastq"'
cmd == 'ch-convert -i ch-image --storage /cacheDir ubuntu "$NXF_TASK_WORKDIR"/container_ubuntu && ch-run --unset-env="*" -c "$NXF_TASK_WORKDIR" --set-env -w -b "$NXF_TASK_WORKDIR" "$NXF_TASK_WORKDIR"/container_ubuntu -- /bin/sh -c "bwa --this --that file.fastq"'

when:
cmd = new CharliecloudBuilder('ubuntu')
cmd = new CharliecloudBuilder('/cacheDir/img/ubuntu')
.params(entry:'/bin/sh')
.addMount(db_file)
.addMount(db_file)
.params(readOnlyInputs: 'true')
.build().getRunCommand('bwa --this --that file.fastq')
then:
cmd == 'ch-run --unset-env="*" -c "$NXF_TASK_WORKDIR" --set-env -b /home -b "$NXF_TASK_WORKDIR" ubuntu -- /bin/sh -c "bwa --this --that file.fastq"'
cmd == 'ch-convert -i ch-image --storage /cacheDir ubuntu "$NXF_TASK_WORKDIR"/container_ubuntu && ch-run --unset-env="*" -c "$NXF_TASK_WORKDIR" --set-env -b /home -b "$NXF_TASK_WORKDIR" "$NXF_TASK_WORKDIR"/container_ubuntu -- /bin/sh -c "bwa --this --that file.fastq"'

when:
cmd = new CharliecloudBuilder('ubuntu')
cmd = new CharliecloudBuilder('/cacheDir/img/ubuntu')
.params(entry:'/bin/sh')
.addMount(db_file)
.addMount(db_file)
.params(readOnlyInputs: 'false')
.build()
.getRunCommand('bwa --this --that file.fastq')
then:
cmd == 'ch-run --unset-env="*" -c "$NXF_TASK_WORKDIR" --set-env -w -b /home/db -b "$NXF_TASK_WORKDIR" ubuntu -- /bin/sh -c "bwa --this --that file.fastq"'
cmd == 'ch-convert -i ch-image --storage /cacheDir ubuntu "$NXF_TASK_WORKDIR"/container_ubuntu && ch-run --unset-env="*" -c "$NXF_TASK_WORKDIR" --set-env -w -b /home/db -b "$NXF_TASK_WORKDIR" "$NXF_TASK_WORKDIR"/container_ubuntu -- /bin/sh -c "bwa --this --that file.fastq"'
}

@Unroll
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -681,13 +681,14 @@ class AzBatchService implements Closeable {
.withContainerConfiguration(containerConfig)
}

protected void createPool(AzVmPoolSpec spec) {

def resourceFiles = new ArrayList(10)
protected StartTask createStartTask() {
if( config.batch().getCopyToolInstallMode() != CopyToolInstallMode.node )
return null

final resourceFiles = new ArrayList(10)
resourceFiles << new ResourceFile()
.withHttpUrl(AZCOPY_URL)
.withFilePath('azcopy')
.withHttpUrl(AZCOPY_URL)
.withFilePath('azcopy')

def poolStartTask = new StartTask()
.withCommandLine(startTaskCmd(spec.opts))
Expand All @@ -701,7 +702,11 @@ class AzBatchService implements Closeable {
// same as the num ofd cores
// https://docs.microsoft.com/en-us/azure/batch/batch-parallel-node-tasks
.withTaskSlotsPerNode(spec.vmType.numberOfCores)
.withStartTask(poolStartTask)

final startTask = createStartTask()
if( startTask ) {
poolParams .withStartTask(startTask)
}

// resource labels
if( spec.metadata ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ import java.util.regex.Matcher
import java.util.regex.Pattern

import groovy.transform.CompileStatic
import nextflow.Global
import nextflow.Session
import nextflow.cloud.CloudTransferOptions
import nextflow.fusion.FusionHelper
import nextflow.util.Duration
import nextflow.util.StringUtils

Expand Down Expand Up @@ -136,10 +139,12 @@ class AzBatchOpts implements CloudTransferOptions {
CopyToolInstallMode getCopyToolInstallMode() {
// if the `installAzCopy` is not specified
// `true` is returned when the pool is not create by Nextflow
// since it can be a pol provided by the user which does not
// since it can be a pool provided by the user which does not
// provide the required `azcopy` tool
if( copyToolInstallMode )
return copyToolInstallMode
if( FusionHelper.isFusionEnabled((Session) Global.session) )
return CopyToolInstallMode.off
canCreatePool() ? CopyToolInstallMode.node : CopyToolInstallMode.task
}
}
Loading

0 comments on commit b42d983

Please sign in to comment.