diff --git a/docs/cli.md b/docs/cli.md index 310c97b35d..d9faee429d 100644 --- a/docs/cli.md +++ b/docs/cli.md @@ -1134,6 +1134,9 @@ The `run` command is used to execute a local pipeline script or remote pipeline `-cache` : Enable/disable processes caching. +`-cloudcache` +: Enable the use of the Cloud cache plugin for storing cache metadata to an object storage bucket. + `-d, -deep` : Create a shallow clone of the specified depth. diff --git a/modules/nextflow/src/main/groovy/nextflow/Session.groovy b/modules/nextflow/src/main/groovy/nextflow/Session.groovy index f8e2cffbd5..7f8d4ecf24 100644 --- a/modules/nextflow/src/main/groovy/nextflow/Session.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/Session.groovy @@ -188,6 +188,11 @@ class Session implements ISession { */ boolean debug + /** + * Defines the cloud path where store cache meta-data + */ + Path cloudCachePath + /** * Local path where script generated classes are saved */ @@ -361,11 +366,25 @@ class Session implements ISession { this.workDir = ((config.workDir ?: 'work') as Path).complete() this.setLibDir( config.libDir as String ) + // -- init cloud cache path + this.cloudCachePath = cloudCachePath(config.cloudcache as Map, workDir) + // -- file porter config this.filePorter = new FilePorter(this) } + protected Path cloudCachePath(Map cloudcache, Path workDir) { + if( !cloudcache?.enabled ) + return null + final String path = cloudcache.path + final result = path ? FileHelper.asPath(path) : workDir + if( result.scheme !in ['s3','az','gs'] ) { + throw new IllegalArgumentException("Storage path not supported by Cloud-cache - offending value: '${result}'") + } + return result + } + /** * Initialize the session workDir, libDir, baseDir and scriptName variables */ diff --git a/modules/nextflow/src/main/groovy/nextflow/cli/CmdRun.groovy b/modules/nextflow/src/main/groovy/nextflow/cli/CmdRun.groovy index 67b4fba639..a44201d35a 100644 --- a/modules/nextflow/src/main/groovy/nextflow/cli/CmdRun.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/cli/CmdRun.groovy @@ -111,6 +111,9 @@ class CmdRun extends CmdBase implements HubOptions { @Parameter(names=['-bucket-dir'], description = 'Remote bucket where intermediate result files are stored') String bucketDir + @Parameter(names=['-cloudcache'], description = 'Enable the use of object storage bucket as storage for cache meta-data') + String cloudCachePath + /** * Defines the parameters to be passed to the pipeline script */ diff --git a/modules/nextflow/src/main/groovy/nextflow/cli/Launcher.groovy b/modules/nextflow/src/main/groovy/nextflow/cli/Launcher.groovy index b41c57a5dc..ad3a32f844 100644 --- a/modules/nextflow/src/main/groovy/nextflow/cli/Launcher.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/cli/Launcher.groovy @@ -224,6 +224,10 @@ class Launcher { normalized << '%all' } + else if( current == '-cloudcache' && (i==args.size() || args[i].startsWith('-'))) { + normalized << '-' + } + else if( current == '-with-trace' && (i==args.size() || args[i].startsWith('-'))) { normalized << '-' } diff --git a/modules/nextflow/src/main/groovy/nextflow/config/ConfigBuilder.groovy b/modules/nextflow/src/main/groovy/nextflow/config/ConfigBuilder.groovy index f19a785619..02270c9aa1 100644 --- a/modules/nextflow/src/main/groovy/nextflow/config/ConfigBuilder.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/config/ConfigBuilder.groovy @@ -709,6 +709,19 @@ class ConfigBuilder { config.fusion.enabled = cmdRun.withFusion == 'true' } + // -- set cloudcache options + final envCloudPath = env.get('NXF_CLOUDCACHE_PATH') + if( cmdRun.cloudCachePath || envCloudPath ) { + if( !(config.cloudcache instanceof Map) ) + config.cloudcache = [:] + if( !config.cloudcache.isSet('enabled') ) + config.cloudcache.enabled = true + if( cmdRun.cloudCachePath && cmdRun.cloudCachePath != '-' ) + config.cloudcache.path = cmdRun.cloudCachePath + else if( !config.cloudcache.isSet('path') && envCloudPath ) + config.cloudcache.path = envCloudPath + } + // -- add the command line parameters to the 'taskConfig' object if( cmdRun.hasParams() ) config.params = mergeMaps( (Map)config.params, cmdRun.parsedParams(configVars()), NF.strictMode ) diff --git a/modules/nextflow/src/test/groovy/nextflow/cli/LauncherTest.groovy b/modules/nextflow/src/test/groovy/nextflow/cli/LauncherTest.groovy index 5a4594dfe2..bb1c82e182 100644 --- a/modules/nextflow/src/test/groovy/nextflow/cli/LauncherTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/cli/LauncherTest.groovy @@ -187,6 +187,10 @@ class LauncherTest extends Specification { launcher.normalizeArgs('run', '-', '-a', '-b') == ['run','-stdin', '-a', '-b'] launcher.normalizeArgs('run') == ['run'] + launcher.normalizeArgs('run','-cloudcache') == ['run', '-cloudcache', '-'] + launcher.normalizeArgs('run','-cloudcache', '-x') == ['run', '-cloudcache', '-', '-x'] + launcher.normalizeArgs('run','-cloudcache', 's3://foo/bar') == ['run', '-cloudcache','s3://foo/bar'] + launcher.normalizeArgs('run','-with-tower') == ['run', '-with-tower', '-'] launcher.normalizeArgs('run','-with-tower', '-x') == ['run', '-with-tower', '-', '-x'] launcher.normalizeArgs('run','-with-tower', 'foo.com') == ['run', '-with-tower','foo.com'] diff --git a/modules/nextflow/src/test/groovy/nextflow/config/ConfigBuilderTest.groovy b/modules/nextflow/src/test/groovy/nextflow/config/ConfigBuilderTest.groovy index de690f0d40..08076f3eca 100644 --- a/modules/nextflow/src/test/groovy/nextflow/config/ConfigBuilderTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/config/ConfigBuilderTest.groovy @@ -1157,6 +1157,98 @@ class ConfigBuilderTest extends Specification { config.wave.endpoint == 'https://wave.seqera.io' } + def 'should set cloudcache options' () { + + given: + def env = [:] + def builder = [:] as ConfigBuilder + + when: + def config = new ConfigObject() + builder.configRunOptions(config, env, new CmdRun()) + then: + !config.cloudcache + + when: + config = new ConfigObject() + config.cloudcache.path = 's3://foo/bar' + builder.configRunOptions(config, env, new CmdRun()) + then: + config.cloudcache instanceof Map + !config.cloudcache.enabled + config.cloudcache.path == 's3://foo/bar' + + when: + config = new ConfigObject() + builder.configRunOptions(config, env, new CmdRun(cloudCachePath: 's3://this/that')) + then: + config.cloudcache instanceof Map + config.cloudcache.enabled + config.cloudcache.path == 's3://this/that' + + when: + config = new ConfigObject() + builder.configRunOptions(config, env, new CmdRun(cloudCachePath: '-')) + then: + config.cloudcache instanceof Map + config.cloudcache.enabled + !config.cloudcache.path + + when: + config = new ConfigObject() + config.cloudcache.path = 's3://alpha/delta' + builder.configRunOptions(config, env, new CmdRun(cloudCachePath: '-')) + then: + config.cloudcache instanceof Map + config.cloudcache.enabled + config.cloudcache.path == 's3://alpha/delta' + + when: + config = new ConfigObject() + config.cloudcache.path = 's3://alpha/delta' + builder.configRunOptions(config, env, new CmdRun(cloudCachePath: 's3://should/override/config')) + then: + config.cloudcache instanceof Map + config.cloudcache.enabled + config.cloudcache.path == 's3://should/override/config' + + when: + config = new ConfigObject() + config.cloudcache.enabled = false + builder.configRunOptions(config, env, new CmdRun(cloudCachePath: 's3://should/override/config')) + then: + config.cloudcache instanceof Map + !config.cloudcache.enabled + config.cloudcache.path == 's3://should/override/config' + + when: + config = new ConfigObject() + builder.configRunOptions(config, [NXF_CLOUDCACHE_PATH:'s3://foo'], new CmdRun(cloudCachePath: 's3://should/override/env')) + then: + config.cloudcache instanceof Map + config.cloudcache.enabled + config.cloudcache.path == 's3://should/override/env' + + when: + config = new ConfigObject() + config.cloudcache.path = 's3://config/path' + builder.configRunOptions(config, [NXF_CLOUDCACHE_PATH:'s3://foo'], new CmdRun()) + then: + config.cloudcache instanceof Map + config.cloudcache.enabled + config.cloudcache.path == 's3://config/path' + + when: + config = new ConfigObject() + config.cloudcache.path = 's3://config/path' + builder.configRunOptions(config, [NXF_CLOUDCACHE_PATH:'s3://foo'], new CmdRun(cloudCachePath: 's3://should/override/config')) + then: + config.cloudcache instanceof Map + config.cloudcache.enabled + config.cloudcache.path == 's3://should/override/config' + + } + def 'should enable conda env' () { given: diff --git a/modules/nf-commons/src/main/nextflow/plugin/PluginsFacade.groovy b/modules/nf-commons/src/main/nextflow/plugin/PluginsFacade.groovy index 5800582a40..5af5b2d2a6 100644 --- a/modules/nf-commons/src/main/nextflow/plugin/PluginsFacade.groovy +++ b/modules/nf-commons/src/main/nextflow/plugin/PluginsFacade.groovy @@ -371,8 +371,8 @@ class PluginsFacade implements PluginStateListener { specs << defaultPlugins.getPlugin('nf-wave') } - // add cloudcache plugin when NXF_CLOUDCACHE_PATH is set - if( env.NXF_CLOUDCACHE_PATH ) { + // add cloudcache plugin when cloudcache is enabled in the config + if( Bolts.navigate(config, 'cloudcache.enabled')==true ) { specs << defaultPlugins.getPlugin('nf-cloudcache') } diff --git a/modules/nf-commons/src/test/nextflow/plugin/PluginsFacadeTest.groovy b/modules/nf-commons/src/test/nextflow/plugin/PluginsFacadeTest.groovy index d8a58b637e..20eb74ac32 100644 --- a/modules/nf-commons/src/test/nextflow/plugin/PluginsFacadeTest.groovy +++ b/modules/nf-commons/src/test/nextflow/plugin/PluginsFacadeTest.groovy @@ -140,8 +140,8 @@ class PluginsFacadeTest extends Specification { result == [ new PluginSpec('nf-google','2.0.0') ] when: - handler = new PluginsFacade(defaultPlugins: defaults, env: [NXF_CLOUDCACHE_PATH:'xyz']) - result = handler.pluginsRequirement([:]) + handler = new PluginsFacade(defaultPlugins: defaults, env: [:]) + result = handler.pluginsRequirement([cloudcache:[enabled:true]]) then: result == [ new PluginSpec('nf-cloudcache', '0.1.0') ] diff --git a/plugins/nf-amazon/src/test/nextflow/S3SessionTest.groovy b/plugins/nf-amazon/src/test/nextflow/S3SessionTest.groovy new file mode 100644 index 0000000000..6a3b853e64 --- /dev/null +++ b/plugins/nf-amazon/src/test/nextflow/S3SessionTest.groovy @@ -0,0 +1,61 @@ +/* + * Copyright 2013-2023, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package nextflow + +import java.nio.file.Path + +import nextflow.file.FileHelper +import spock.lang.Specification +import spock.lang.Unroll + +/** + * + * @author Paolo Di Tommaso + */ +class S3SessionTest extends Specification { + + @Unroll + def 'should get cloud cache path' () { + given: + def session = Spy(Session) + + expect: + session.cloudCachePath(CONFIG, FileHelper.asPath(WORKDIR)) == EXPECTED + + where: + CONFIG | WORKDIR | EXPECTED + null | '/foo' | null + [enabled:true] | 's3://foo/work' | FileHelper.asPath('s3://foo/work') + [enabled:true, path:'s3://this/that'] | '/foo' | FileHelper.asPath('s3://this/that') + + } + + + def 'should error with non-cloud bucket' () { + given: + def session = Spy(Session) + + when: + session.cloudCachePath([enabled:true], Path.of('/foo/dir')) + then: + def e = thrown(IllegalArgumentException) + e.message == "Storage path not supported by Cloud-cache - offending value: '/foo/dir'" + + } + +} diff --git a/plugins/nf-cloudcache/src/main/nextflow/cache/CloudCacheFactory.groovy b/plugins/nf-cloudcache/src/main/nextflow/cache/CloudCacheFactory.groovy index 41f00b98fb..bcd2de989f 100644 --- a/plugins/nf-cloudcache/src/main/nextflow/cache/CloudCacheFactory.groovy +++ b/plugins/nf-cloudcache/src/main/nextflow/cache/CloudCacheFactory.groovy @@ -20,8 +20,8 @@ package nextflow.cache import java.nio.file.Path import groovy.transform.CompileStatic -import nextflow.cache.CacheDB -import nextflow.cache.CacheFactory +import nextflow.Global +import nextflow.Session import nextflow.exception.AbortOperationException import nextflow.plugin.Priority /** @@ -39,7 +39,10 @@ class CloudCacheFactory extends CacheFactory { protected CacheDB newInstance(UUID uniqueId, String runName, Path home) { if( !uniqueId ) throw new AbortOperationException("Missing cache `uuid`") if( !runName ) throw new AbortOperationException("Missing cache `runName`") - final store = new CloudCacheStore(uniqueId, runName, home) + final path = (Global.session as Session).cloudCachePath + if( !path ) + throw new IllegalArgumentException("Cloud-cache path not defined - use either -cloudcatch run option or NXF_CLOUDCACHE_PATH environment variable") + final store = new CloudCacheStore(uniqueId, runName, path) return new CacheDB(store) } diff --git a/plugins/nf-cloudcache/src/main/nextflow/cache/CloudCacheStore.groovy b/plugins/nf-cloudcache/src/main/nextflow/cache/CloudCacheStore.groovy index c9f98bdef4..10f8c1712e 100644 --- a/plugins/nf-cloudcache/src/main/nextflow/cache/CloudCacheStore.groovy +++ b/plugins/nf-cloudcache/src/main/nextflow/cache/CloudCacheStore.groovy @@ -23,7 +23,6 @@ import java.nio.file.Path import com.google.common.hash.HashCode import groovy.transform.CompileStatic -import nextflow.SysEnv import nextflow.exception.AbortOperationException import nextflow.extension.FilesEx import nextflow.util.CacheHelper @@ -58,23 +57,18 @@ class CloudCacheStore implements CacheStore { /** Index file output stream */ private OutputStream indexWriter - CloudCacheStore(UUID uniqueId, String runName, Path basePath=null) { + CloudCacheStore(UUID uniqueId, String runName, Path basePath) { + assert uniqueId, "Missing cloudcache 'uniqueId' argument" + assert runName, "Missing cloudcache 'runName' argument" + assert basePath, "Missing cloudcache 'basePath' argument" this.KEY_SIZE = CacheHelper.hasher('x').hash().asBytes().size() this.uniqueId = uniqueId this.runName = runName - this.basePath = basePath ?: defaultBasePath() + this.basePath = basePath this.dataPath = this.basePath.resolve("$uniqueId") this.indexPath = dataPath.resolve("index.$runName") } - private Path defaultBasePath() { - final basePath = SysEnv.get('NXF_CLOUDCACHE_PATH') - if( !basePath ) - throw new IllegalArgumentException("NXF_CLOUDCACHE_PATH must be defined when using the cloud cache store") - - return basePath as Path - } - @Override CloudCacheStore open() { indexWriter = new BufferedOutputStream(Files.newOutputStream(indexPath)) diff --git a/plugins/nf-tower/src/main/io/seqera/tower/plugin/CacheCommand.groovy b/plugins/nf-tower/src/main/io/seqera/tower/plugin/CacheCommand.groovy index 846f35f3b9..bfaf0426cf 100644 --- a/plugins/nf-tower/src/main/io/seqera/tower/plugin/CacheCommand.groovy +++ b/plugins/nf-tower/src/main/io/seqera/tower/plugin/CacheCommand.groovy @@ -20,7 +20,6 @@ package io.seqera.tower.plugin import groovy.transform.CompileStatic import groovy.util.logging.Slf4j import nextflow.Session -import nextflow.SysEnv import nextflow.cli.PluginAbstractExec /** * Implements nextflow cache and restore commands @@ -47,7 +46,7 @@ class CacheCommand implements PluginAbstractExec { protected void cacheBackup() { log.debug "Running Nextflow cache backup" - if( !SysEnv.get('NXF_CLOUDCACHE_PATH')) { + if( !getSession().cloudCachePath ) { // legacy cache manager new CacheManager(System.getenv()).saveCacheFiles() } @@ -69,7 +68,7 @@ class CacheCommand implements PluginAbstractExec { } protected void cacheRestore() { - if( !SysEnv.get('NXF_CLOUDCACHE_PATH')) { + if( !getSession().cloudCachePath ) { log.debug "Running Nextflow cache restore" // legacy cache manager new CacheManager(System.getenv()).restoreCacheFiles() diff --git a/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFactory.groovy b/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFactory.groovy index e3e7f67aa2..3133a5270b 100644 --- a/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFactory.groovy +++ b/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFactory.groovy @@ -54,7 +54,7 @@ class TowerFactory implements TraceObserverFactory { final tower = createTowerClient(session, config) result.add(tower) // create the logs checkpoint - if( env.containsKey('NXF_CLOUDCACHE_PATH') ) + if( session.cloudCachePath ) result.add( new LogsCheckpoint() ) return result }