Skip to content

Commit

Permalink
Add -cloudcache CLI option (#4385)
Browse files Browse the repository at this point in the history
This commit adds the -cloudcache run command line option that
allows storing Nextflow cache metadata into a cloud object storage
bucket instead of storing them in the local .nextflow directory.

The target cloud cache path is specified as command line argument,
e.g. -cloudcache s3://bucket-name/path.

Moreover, this commit allows controlling the cloudcache via the
nextflow.config file as shown below:

```
cloudcache.enabled = true|false
cloudcache.path = '<some object storage path>'
```

Supported object storage are AWS S3, Azure Blob storage and Google Storage.

Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
  • Loading branch information
pditommaso authored Oct 9, 2023
1 parent d26c42b commit 73fda58
Show file tree
Hide file tree
Showing 14 changed files with 217 additions and 22 deletions.
3 changes: 3 additions & 0 deletions docs/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -1134,6 +1134,9 @@ The `run` command is used to execute a local pipeline script or remote pipeline
`-cache`
: Enable/disable processes caching.

`-cloudcache`
: Enable the use of the Cloud cache plugin for storing cache metadata to an object storage bucket.

`-d, -deep`
: Create a shallow clone of the specified depth.

Expand Down
19 changes: 19 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/Session.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,11 @@ class Session implements ISession {
*/
boolean debug

/**
* Defines the cloud path where store cache meta-data
*/
Path cloudCachePath

/**
* Local path where script generated classes are saved
*/
Expand Down Expand Up @@ -361,11 +366,25 @@ class Session implements ISession {
this.workDir = ((config.workDir ?: 'work') as Path).complete()
this.setLibDir( config.libDir as String )

// -- init cloud cache path
this.cloudCachePath = cloudCachePath(config.cloudcache as Map, workDir)

// -- file porter config
this.filePorter = new FilePorter(this)

}

protected Path cloudCachePath(Map cloudcache, Path workDir) {
if( !cloudcache?.enabled )
return null
final String path = cloudcache.path
final result = path ? FileHelper.asPath(path) : workDir
if( result.scheme !in ['s3','az','gs'] ) {
throw new IllegalArgumentException("Storage path not supported by Cloud-cache - offending value: '${result}'")
}
return result
}

/**
* Initialize the session workDir, libDir, baseDir and scriptName variables
*/
Expand Down
3 changes: 3 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/cli/CmdRun.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ class CmdRun extends CmdBase implements HubOptions {
@Parameter(names=['-bucket-dir'], description = 'Remote bucket where intermediate result files are stored')
String bucketDir

@Parameter(names=['-cloudcache'], description = 'Enable the use of object storage bucket as storage for cache meta-data')
String cloudCachePath

/**
* Defines the parameters to be passed to the pipeline script
*/
Expand Down
4 changes: 4 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/cli/Launcher.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,10 @@ class Launcher {
normalized << '%all'
}

else if( current == '-cloudcache' && (i==args.size() || args[i].startsWith('-'))) {
normalized << '-'
}

else if( current == '-with-trace' && (i==args.size() || args[i].startsWith('-'))) {
normalized << '-'
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,19 @@ class ConfigBuilder {
config.fusion.enabled = cmdRun.withFusion == 'true'
}

// -- set cloudcache options
final envCloudPath = env.get('NXF_CLOUDCACHE_PATH')
if( cmdRun.cloudCachePath || envCloudPath ) {
if( !(config.cloudcache instanceof Map) )
config.cloudcache = [:]
if( !config.cloudcache.isSet('enabled') )
config.cloudcache.enabled = true
if( cmdRun.cloudCachePath && cmdRun.cloudCachePath != '-' )
config.cloudcache.path = cmdRun.cloudCachePath
else if( !config.cloudcache.isSet('path') && envCloudPath )
config.cloudcache.path = envCloudPath
}

// -- add the command line parameters to the 'taskConfig' object
if( cmdRun.hasParams() )
config.params = mergeMaps( (Map)config.params, cmdRun.parsedParams(configVars()), NF.strictMode )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,10 @@ class LauncherTest extends Specification {
launcher.normalizeArgs('run', '-', '-a', '-b') == ['run','-stdin', '-a', '-b']
launcher.normalizeArgs('run') == ['run']

launcher.normalizeArgs('run','-cloudcache') == ['run', '-cloudcache', '-']
launcher.normalizeArgs('run','-cloudcache', '-x') == ['run', '-cloudcache', '-', '-x']
launcher.normalizeArgs('run','-cloudcache', 's3://foo/bar') == ['run', '-cloudcache','s3://foo/bar']

launcher.normalizeArgs('run','-with-tower') == ['run', '-with-tower', '-']
launcher.normalizeArgs('run','-with-tower', '-x') == ['run', '-with-tower', '-', '-x']
launcher.normalizeArgs('run','-with-tower', 'foo.com') == ['run', '-with-tower','foo.com']
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1157,6 +1157,98 @@ class ConfigBuilderTest extends Specification {
config.wave.endpoint == 'https://wave.seqera.io'
}

def 'should set cloudcache options' () {

given:
def env = [:]
def builder = [:] as ConfigBuilder

when:
def config = new ConfigObject()
builder.configRunOptions(config, env, new CmdRun())
then:
!config.cloudcache

when:
config = new ConfigObject()
config.cloudcache.path = 's3://foo/bar'
builder.configRunOptions(config, env, new CmdRun())
then:
config.cloudcache instanceof Map
!config.cloudcache.enabled
config.cloudcache.path == 's3://foo/bar'

when:
config = new ConfigObject()
builder.configRunOptions(config, env, new CmdRun(cloudCachePath: 's3://this/that'))
then:
config.cloudcache instanceof Map
config.cloudcache.enabled
config.cloudcache.path == 's3://this/that'

when:
config = new ConfigObject()
builder.configRunOptions(config, env, new CmdRun(cloudCachePath: '-'))
then:
config.cloudcache instanceof Map
config.cloudcache.enabled
!config.cloudcache.path

when:
config = new ConfigObject()
config.cloudcache.path = 's3://alpha/delta'
builder.configRunOptions(config, env, new CmdRun(cloudCachePath: '-'))
then:
config.cloudcache instanceof Map
config.cloudcache.enabled
config.cloudcache.path == 's3://alpha/delta'

when:
config = new ConfigObject()
config.cloudcache.path = 's3://alpha/delta'
builder.configRunOptions(config, env, new CmdRun(cloudCachePath: 's3://should/override/config'))
then:
config.cloudcache instanceof Map
config.cloudcache.enabled
config.cloudcache.path == 's3://should/override/config'

when:
config = new ConfigObject()
config.cloudcache.enabled = false
builder.configRunOptions(config, env, new CmdRun(cloudCachePath: 's3://should/override/config'))
then:
config.cloudcache instanceof Map
!config.cloudcache.enabled
config.cloudcache.path == 's3://should/override/config'

when:
config = new ConfigObject()
builder.configRunOptions(config, [NXF_CLOUDCACHE_PATH:'s3://foo'], new CmdRun(cloudCachePath: 's3://should/override/env'))
then:
config.cloudcache instanceof Map
config.cloudcache.enabled
config.cloudcache.path == 's3://should/override/env'

when:
config = new ConfigObject()
config.cloudcache.path = 's3://config/path'
builder.configRunOptions(config, [NXF_CLOUDCACHE_PATH:'s3://foo'], new CmdRun())
then:
config.cloudcache instanceof Map
config.cloudcache.enabled
config.cloudcache.path == 's3://config/path'

when:
config = new ConfigObject()
config.cloudcache.path = 's3://config/path'
builder.configRunOptions(config, [NXF_CLOUDCACHE_PATH:'s3://foo'], new CmdRun(cloudCachePath: 's3://should/override/config'))
then:
config.cloudcache instanceof Map
config.cloudcache.enabled
config.cloudcache.path == 's3://should/override/config'

}

def 'should enable conda env' () {

given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,8 +371,8 @@ class PluginsFacade implements PluginStateListener {
specs << defaultPlugins.getPlugin('nf-wave')
}

// add cloudcache plugin when NXF_CLOUDCACHE_PATH is set
if( env.NXF_CLOUDCACHE_PATH ) {
// add cloudcache plugin when cloudcache is enabled in the config
if( Bolts.navigate(config, 'cloudcache.enabled')==true ) {
specs << defaultPlugins.getPlugin('nf-cloudcache')
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ class PluginsFacadeTest extends Specification {
result == [ new PluginSpec('nf-google','2.0.0') ]

when:
handler = new PluginsFacade(defaultPlugins: defaults, env: [NXF_CLOUDCACHE_PATH:'xyz'])
result = handler.pluginsRequirement([:])
handler = new PluginsFacade(defaultPlugins: defaults, env: [:])
result = handler.pluginsRequirement([cloudcache:[enabled:true]])
then:
result == [ new PluginSpec('nf-cloudcache', '0.1.0') ]

Expand Down
61 changes: 61 additions & 0 deletions plugins/nf-amazon/src/test/nextflow/S3SessionTest.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright 2013-2023, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package nextflow

import java.nio.file.Path

import nextflow.file.FileHelper
import spock.lang.Specification
import spock.lang.Unroll

/**
*
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
class S3SessionTest extends Specification {

@Unroll
def 'should get cloud cache path' () {
given:
def session = Spy(Session)

expect:
session.cloudCachePath(CONFIG, FileHelper.asPath(WORKDIR)) == EXPECTED

where:
CONFIG | WORKDIR | EXPECTED
null | '/foo' | null
[enabled:true] | 's3://foo/work' | FileHelper.asPath('s3://foo/work')
[enabled:true, path:'s3://this/that'] | '/foo' | FileHelper.asPath('s3://this/that')

}


def 'should error with non-cloud bucket' () {
given:
def session = Spy(Session)

when:
session.cloudCachePath([enabled:true], Path.of('/foo/dir'))
then:
def e = thrown(IllegalArgumentException)
e.message == "Storage path not supported by Cloud-cache - offending value: '/foo/dir'"

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ package nextflow.cache
import java.nio.file.Path

import groovy.transform.CompileStatic
import nextflow.cache.CacheDB
import nextflow.cache.CacheFactory
import nextflow.Global
import nextflow.Session
import nextflow.exception.AbortOperationException
import nextflow.plugin.Priority
/**
Expand All @@ -39,7 +39,10 @@ class CloudCacheFactory extends CacheFactory {
protected CacheDB newInstance(UUID uniqueId, String runName, Path home) {
if( !uniqueId ) throw new AbortOperationException("Missing cache `uuid`")
if( !runName ) throw new AbortOperationException("Missing cache `runName`")
final store = new CloudCacheStore(uniqueId, runName, home)
final path = (Global.session as Session).cloudCachePath
if( !path )
throw new IllegalArgumentException("Cloud-cache path not defined - use either -cloudcatch run option or NXF_CLOUDCACHE_PATH environment variable")
final store = new CloudCacheStore(uniqueId, runName, path)
return new CacheDB(store)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import java.nio.file.Path

import com.google.common.hash.HashCode
import groovy.transform.CompileStatic
import nextflow.SysEnv
import nextflow.exception.AbortOperationException
import nextflow.extension.FilesEx
import nextflow.util.CacheHelper
Expand Down Expand Up @@ -58,23 +57,18 @@ class CloudCacheStore implements CacheStore {
/** Index file output stream */
private OutputStream indexWriter

CloudCacheStore(UUID uniqueId, String runName, Path basePath=null) {
CloudCacheStore(UUID uniqueId, String runName, Path basePath) {
assert uniqueId, "Missing cloudcache 'uniqueId' argument"
assert runName, "Missing cloudcache 'runName' argument"
assert basePath, "Missing cloudcache 'basePath' argument"
this.KEY_SIZE = CacheHelper.hasher('x').hash().asBytes().size()
this.uniqueId = uniqueId
this.runName = runName
this.basePath = basePath ?: defaultBasePath()
this.basePath = basePath
this.dataPath = this.basePath.resolve("$uniqueId")
this.indexPath = dataPath.resolve("index.$runName")
}

private Path defaultBasePath() {
final basePath = SysEnv.get('NXF_CLOUDCACHE_PATH')
if( !basePath )
throw new IllegalArgumentException("NXF_CLOUDCACHE_PATH must be defined when using the cloud cache store")

return basePath as Path
}

@Override
CloudCacheStore open() {
indexWriter = new BufferedOutputStream(Files.newOutputStream(indexPath))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package io.seqera.tower.plugin
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.Session
import nextflow.SysEnv
import nextflow.cli.PluginAbstractExec
/**
* Implements nextflow cache and restore commands
Expand All @@ -47,7 +46,7 @@ class CacheCommand implements PluginAbstractExec {

protected void cacheBackup() {
log.debug "Running Nextflow cache backup"
if( !SysEnv.get('NXF_CLOUDCACHE_PATH')) {
if( !getSession().cloudCachePath ) {
// legacy cache manager
new CacheManager(System.getenv()).saveCacheFiles()
}
Expand All @@ -69,7 +68,7 @@ class CacheCommand implements PluginAbstractExec {
}

protected void cacheRestore() {
if( !SysEnv.get('NXF_CLOUDCACHE_PATH')) {
if( !getSession().cloudCachePath ) {
log.debug "Running Nextflow cache restore"
// legacy cache manager
new CacheManager(System.getenv()).restoreCacheFiles()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class TowerFactory implements TraceObserverFactory {
final tower = createTowerClient(session, config)
result.add(tower)
// create the logs checkpoint
if( env.containsKey('NXF_CLOUDCACHE_PATH') )
if( session.cloudCachePath )
result.add( new LogsCheckpoint() )
return result
}
Expand Down

0 comments on commit 73fda58

Please sign in to comment.