Skip to content

Commit

Permalink
Merge pull request #423 from rabix/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
milos-ljubinkovic authored Feb 14, 2018
2 parents c88c0f8 + 1818ddc commit fe9a68e
Show file tree
Hide file tree
Showing 17 changed files with 227 additions and 214 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public static Object evaluate(Object context, Object self, String expr, CWLRunti
}

Context cx = Context.enter();
cx.setLanguageVersion(Context.VERSION_ES6);
cx.setOptimizationLevel(OPTIMIZATION_LEVEL);
cx.setMaximumInterpreterStackDepth(MAX_STACK_DEPTH);
cx.setClassShutter(new CWLExpressionDenyAllClassShutter());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ private String stagePath(String path, StageInput stageInput) throws BindingExcep
return destinationFile.toString();
case LINK:
try {
Files.createLink(destinationFile, file);
Files.createSymbolicLink(destinationFile, file);
} catch (IOException e) {
throw new BindingException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,10 @@ public static List<Map<String, Object>> getSecondaryFiles(SBJob job, HashAlgorit
Path pathToSec = Paths.get(secondaryFilePath);
if (Files.exists(pathToSec) || !onlyExisting) {
Map<String, Object> file = SBFileValueHelper.pathToRawFile(pathToSec, hashAlgorithm, Paths.get(SBFileValueHelper.getPath(fileValue)));
boolean loadContents = SBBindingHelper.loadContents(binding);
if (loadContents) {
SBFileValueHelper.setContents(file);
}
secondaryFileMaps.add(file);
}
} catch (IOException | URISyntaxException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ private String stagePath(String path, StageInput stageInput) throws BindingExcep
return destinationFile.getAbsolutePath();
case LINK:
try {
Files.createLink(destinationFile.toPath(), file.toPath());
Files.createSymbolicLink(destinationFile.toPath(), file.toPath());
} catch (IOException e) {
throw new BindingException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public FileVisitResult visitFileFailed(Path file, IOException exc) throws IOExce
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
if(dir.getFileName() != null) {
if (matcher.matches(dir.getFileName()) && isDir) {
if (matcher.matches(dir.getFileName()) && isDir && !dir.equals(globDir.toPath())) {
files.add(dir.toFile());
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,39 +1,33 @@
package org.rabix.engine.store.postgres.jdbi.impl;

import java.lang.annotation.Annotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.Set;
import java.util.UUID;

import org.rabix.engine.store.postgres.jdbi.impl.JDBIIntermediaryFilesRepository.IntermediaryFileEntityMapper;
import org.rabix.engine.store.repository.IntermediaryFilesRepository;
import org.skife.jdbi.v2.SQLStatement;
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.sqlobject.Bind;
import org.skife.jdbi.v2.sqlobject.Binder;
import org.skife.jdbi.v2.sqlobject.BinderFactory;
import org.skife.jdbi.v2.sqlobject.BindingAnnotation;
import org.skife.jdbi.v2.sqlobject.SqlQuery;
import org.skife.jdbi.v2.sqlobject.SqlUpdate;
import org.skife.jdbi.v2.sqlobject.*;
import org.skife.jdbi.v2.sqlobject.customizers.RegisterMapper;
import org.skife.jdbi.v2.sqlobject.stringtemplate.UseStringTemplate3StatementLocator;
import org.skife.jdbi.v2.tweak.ResultSetMapper;
import org.skife.jdbi.v2.unstable.BindIn;

import java.lang.annotation.*;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.Set;
import java.util.UUID;


@RegisterMapper(IntermediaryFileEntityMapper.class)
@UseStringTemplate3StatementLocator
public interface JDBIIntermediaryFilesRepository extends IntermediaryFilesRepository {

@SqlUpdate("insert into intermediary_files (root_id,filename,count) values (:root_id,:filename,:count)")
void insert(@Bind("root_id") UUID root_id, @Bind("filename") String filename, @Bind("count") Integer count);

@SqlUpdate("insert into intermediary_files (root_id,filename,count) values (:root_id,:filename,:count) on conflict(root_id, filename) do nothing")
void insertIfNotExists(@Bind("root_id") UUID root_id, @Bind("filename") String filename, @Bind("count") Integer count);

@SqlUpdate("update intermediary_files set count=:count where root_id=:root_id and filename=:filename")
void update(@Bind("root_id") UUID root_id, @Bind("filename") String filename, @Bind("count") Integer count);

Expand All @@ -42,21 +36,21 @@ public interface JDBIIntermediaryFilesRepository extends IntermediaryFilesReposi
@Override
@SqlUpdate("insert into intermediary_files (root_id,filename,count) values (:root_id,:filename,1) on conflict (root_id, filename) do update set count=intermediary_files.count+1")
void increment(@Bind("root_id") UUID rootId, @Bind("filename") String filename);

@SqlUpdate("delete from intermediary_files where root_id=:root_id and filename=:filename")
void delete(@Bind("root_id") UUID rootId, @Bind("filename") String filename);

@SqlUpdate("delete from intermediary_files where root_id=:root_id")
void delete(@Bind("root_id") UUID root_id);

@Override
@SqlUpdate("delete from intermediary_files where root_id in (<ids>)")
void deleteByRootIds(@BindIn("ids") Set<UUID> rootIds);

@Override
@SqlQuery("select * from intermediary_files where root_id=:root_id")
List<IntermediaryFileEntity> get(@Bind("root_id") UUID root_id);

@BindingAnnotation(BindIntermediaryFileEntity.IntermediaryFileEntityBinderFactory.class)
@Retention(RetentionPolicy.RUNTIME)
@Target({ ElementType.PARAMETER })
Expand All @@ -72,7 +66,7 @@ public void bind(SQLStatement<?> q, BindIntermediaryFileEntity bind, Intermediar
}
}
}

public static class IntermediaryFileEntityMapper implements ResultSetMapper<IntermediaryFileEntity> {
public IntermediaryFileEntity map(int index, ResultSet resultSet, StatementContext ctx) throws SQLException {
UUID rootId = resultSet.getObject("root_id", UUID.class);
Expand All @@ -81,5 +75,5 @@ public IntermediaryFileEntity map(int index, ResultSet resultSet, StatementConte
return new IntermediaryFileEntity(rootId, filename, count);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,66 +7,61 @@

public class InMemoryIntermediaryFilesRepository implements IntermediaryFilesRepository {

private final Map<UUID, List<IntermediaryFileEntity>> intermediaryFilesRepository;
private final Map<UUID, Map<String, IntermediaryFileEntity>> intermediaryFilesRepository;

public InMemoryIntermediaryFilesRepository() {
intermediaryFilesRepository = new ConcurrentHashMap<>();
}

@Override
public void insert(UUID rootId, String filename, Integer count) {
Map<String, IntermediaryFileEntity> intermediaryPerRoot = intermediaryFilesRepository.getOrDefault(rootId, new ConcurrentHashMap<>());
intermediaryPerRoot.put(filename, new IntermediaryFileEntity(rootId, filename, count));
intermediaryFilesRepository.put(rootId, intermediaryPerRoot);
}

if(intermediaryFilesRepository.containsKey(rootId)) {
List<IntermediaryFileEntity> intermediaryPerRoot = intermediaryFilesRepository.get(rootId);
intermediaryPerRoot.add(new IntermediaryFileEntity(rootId, filename, count));
}
else {
List<IntermediaryFileEntity> intermediaryPerRoot = new ArrayList<>();
intermediaryPerRoot.add(new IntermediaryFileEntity(rootId, filename, count));
@Override
public void insertIfNotExists(UUID rootId, String filename, Integer count) {
Map<String, IntermediaryFileEntity> intermediaryPerRoot = intermediaryFilesRepository.getOrDefault(rootId, new ConcurrentHashMap<>());
IntermediaryFileEntity intermediaryFileEntity = intermediaryPerRoot.get(filename);

if (intermediaryFileEntity == null) {
intermediaryFileEntity = new IntermediaryFileEntity(rootId, filename, count);

intermediaryPerRoot.put(filename, intermediaryFileEntity);
intermediaryFilesRepository.put(rootId, intermediaryPerRoot);
}
}

@Override
public void update(UUID rootId, String filename, Integer count) {
if(intermediaryFilesRepository.containsKey(rootId)) {
List<IntermediaryFileEntity> intermediaryPerRoot = intermediaryFilesRepository.get(rootId);
for(IntermediaryFileEntity file: intermediaryPerRoot) {
if(file.getFilename().equals(filename)) {
file.setCount(count);
break;
}
Map<String, IntermediaryFileEntity> intermediaryPerRoot = intermediaryFilesRepository.get(rootId);
if (intermediaryPerRoot == null) {
insert(rootId, filename, count);
} else {
IntermediaryFileEntity intermediaryFileEntity = intermediaryPerRoot.get(filename);
if (intermediaryFileEntity == null) {
intermediaryFileEntity = new IntermediaryFileEntity(rootId, filename, count);
} else {
intermediaryFileEntity.setCount(count);
}
intermediaryPerRoot.put(filename, intermediaryFileEntity);
}
}

@Override
public void delete(UUID rootId, String filename) {
if(intermediaryFilesRepository.containsKey(rootId)) {
List<IntermediaryFileEntity> intermediaryPerRoot = intermediaryFilesRepository.get(rootId);
for (Iterator<IntermediaryFileEntity> iterator = intermediaryPerRoot.iterator(); iterator.hasNext();) {
IntermediaryFileEntity file = iterator.next();
if (file.getFilename().equals(filename)) {
iterator.remove();
break;
}
}
}
intermediaryFilesRepository.getOrDefault(rootId, new ConcurrentHashMap<>()).remove(filename);
}

@Override
public void delete(UUID rootId) {
if(intermediaryFilesRepository.containsKey(rootId)) {
intermediaryFilesRepository.remove(rootId);
}
intermediaryFilesRepository.remove(rootId);
}

@Override
public List<IntermediaryFileEntity> get(UUID rootId) {
if(intermediaryFilesRepository.containsKey(rootId)) {
return intermediaryFilesRepository.get(rootId);
}
return Collections.emptyList();
return new ArrayList<>(intermediaryFilesRepository.getOrDefault(rootId, new ConcurrentHashMap<>()).values());
}

@Override
Expand All @@ -78,10 +73,31 @@ public void deleteByRootIds(Set<UUID> rootIds) {

@Override
public void decrement(UUID rootId, String filename) {
Map<String, IntermediaryFileEntity> intermediaryFiles = intermediaryFilesRepository.get(rootId);
if (intermediaryFiles == null) {
return;
}

IntermediaryFileEntity intermediaryFileEntity = intermediaryFiles.get(filename);
if (intermediaryFileEntity == null) {
return;
}

intermediaryFileEntity.decrement();
}

@Override
public void increment(UUID rootId, String filename) {
Map<String, IntermediaryFileEntity> intermediaryFiles = intermediaryFilesRepository.getOrDefault(rootId, new ConcurrentHashMap<>());

IntermediaryFileEntity intermediaryFileEntity = intermediaryFiles.get(filename);
if (intermediaryFileEntity == null) {
intermediaryFileEntity = new IntermediaryFileEntity(rootId, filename, 0);
}

intermediaryFileEntity.increment();

intermediaryFiles.put(filename, intermediaryFileEntity);
intermediaryFilesRepository.put(rootId, intermediaryFiles);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,27 @@
public interface IntermediaryFilesRepository {

void insert(UUID rootId, String filename, Integer count);


void insertIfNotExists(UUID rootId, String filename, Integer count);

void update(UUID rootId, String filename, Integer count);

void delete(UUID rootId, String filename);

void delete(UUID rootId);

void deleteByRootIds(Set<UUID> rootIds);

List<IntermediaryFileEntity> get(UUID rootId);

void decrement(UUID rootId, String filename);
public class IntermediaryFileEntity {

class IntermediaryFileEntity {

UUID rootId;
String filename;
Integer count;

public IntermediaryFileEntity(UUID rootId, String filename, Integer count) {
super();
this.rootId = rootId;
Expand All @@ -35,28 +37,41 @@ public IntermediaryFileEntity(UUID rootId, String filename, Integer count) {
public UUID getRootId() {
return rootId;
}

public void setRootId(UUID rootId) {
this.rootId = rootId;
}

public String getFilename() {
return filename;
}

public void setFilename(String filename) {
this.filename = filename;
}

public Integer getCount() {
return count;
}

public void setCount(Integer count) {
this.count = count;
}


public void decrement() {
count = count == null ? -1 : count - 1;
}

public void increment() {
count = count == null ? 1 : count + 1;
}

@Override
public String toString() {
return "IntermediaryFileEntity [filename=" + filename + ", count=" + count + "]";
return "IntermediaryFileEntity [rootId=" + rootId + ", filename=" + filename + ", count=" + count + "]";
}
}

void increment(UUID rootId, String filename);

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,24 @@ public class InputEventHandler implements EventHandler<InputUpdateEvent> {
@Inject
private EventProcessor eventProcessor;
@Inject
private IntermediaryFilesService filesService;

private IntermediaryFilesService intermediaryFilesService;
private Logger logger = LoggerFactory.getLogger(getClass());

@Override
public void handle(InputUpdateEvent event, EventHandlingMode mode) throws EventHandlerException {
logger.debug(event.toString());
JobRecord job = jobService.find(event.getJobId(), event.getContextId());

if (job == null) {
logger.info("Possible stale message. Job {} for root {} doesn't exist.", event.getJobId(), event.getContextId());
return;
}

filesService.handleInputSent(event.getContextId(), event.getValue());
VariableRecord variable = variableService.find(event.getJobId(), event.getPortId(), LinkPortType.INPUT, event.getContextId());
if (!job.isContainer() && !job.isScatterWrapper()) {
intermediaryFilesService.incrementInputFilesReferences(event.getContextId(), event.getValue());
}

VariableRecord variable = variableService.find(event.getJobId(), event.getPortId(), LinkPortType.INPUT, event.getContextId());
DAGNode node = dagNodeService.get(InternalSchemaHelper.normalizeId(job.getId()), event.getContextId(), job.getDagHash());

if (event.isLookAhead()) {
Expand Down
Loading

0 comments on commit fe9a68e

Please sign in to comment.