diff --git a/src/eu/telecom_bretagne/praxis/core/execution/Activity.java b/src/eu/telecom_bretagne/praxis/core/execution/Activity.java index c694a3a876534d72f4fc1decea6108edad0eb31e..135954d7824649ebc3b826f7c16b56d70b61904a 100644 --- a/src/eu/telecom_bretagne/praxis/core/execution/Activity.java +++ b/src/eu/telecom_bretagne/praxis/core/execution/Activity.java @@ -30,7 +30,7 @@ public class Activity private static synchronized String nextKey() { // let's assume we won't have - return new java.text.SimpleDateFormat("yyyyMMdd-HHmmssSSS/").format(java.util.Calendar.getInstance() + return new java.text.SimpleDateFormat("yyyyMMdd-HHmmssSSS_").format(java.util.Calendar.getInstance() .getTime())+(++seq).toString(); } diff --git a/src/eu/telecom_bretagne/praxis/core/execution/ExecutionSet.java b/src/eu/telecom_bretagne/praxis/core/execution/ExecutionSet.java index 4fe1d7e0d1994f592713b5c53fd07e33280f0e0d..5561901c602ab87c1d24b797cac9d41d2d089120 100644 --- a/src/eu/telecom_bretagne/praxis/core/execution/ExecutionSet.java +++ b/src/eu/telecom_bretagne/praxis/core/execution/ExecutionSet.java @@ -4,7 +4,6 @@ package eu.telecom_bretagne.praxis.core.execution; import java.util.AbstractSet; -import java.util.Collection; import java.util.HashMap; import java.util.Iterator; @@ -21,7 +20,13 @@ public class ExecutionSet extends AbstractSet<Program> implements java.io.Serial public static class ProgramInfo { - int sequence; + /** + * Set by execution planners to indicate in which order program of an execution set should be executed. + * Programs having the same sequence number will be runned independently or one after another, in a + * non-specified order. + * @see ExecutionSet#programsSortedForExecution() + */ + int sequence = 0; } private HashMap<Program, ProgramInfo> map = new HashMap<Program, ProgramInfo>(); @@ -95,7 +100,7 @@ public class ExecutionSet extends AbstractSet<Program> implements java.io.Serial } @Override - public boolean removeAll(Collection< ? > c) + public boolean removeAll(java.util.Collection< ? > c) { /* * Extracted from java.util.AbstractSet @@ -182,4 +187,21 @@ public class ExecutionSet extends AbstractSet<Program> implements java.io.Serial public void remove() { throw new UnsupportedOperationException(); } }; } + + public Program[] programsSortedForExecution() + { + final ExecutionSet this_set = this; + Program[] sortedPrograms = map.keySet().toArray(new Program[map.size()]); + java.util.Arrays.sort(sortedPrograms, + new java.util.Comparator<Program>() { + @Override + public int compare(Program o1, Program o2) + { + final int seq1 = this_set.getInfoForProgram(o1).sequence; + final int seq2 = this_set.getInfoForProgram(o2).sequence; + return seq1 > seq2 ? 1 : (seq1 == seq2 ? 0 : -1); + } + }); + return sortedPrograms; + } } diff --git a/src/eu/telecom_bretagne/praxis/core/execution/SimpleWorkflowPlanner.java b/src/eu/telecom_bretagne/praxis/core/execution/SimpleWorkflowPlanner.java index caa91a3db936c00e711fab8243bf801fecf1bde3..5540421ea961c71f3e1734fff2194d11a6f7d562 100644 --- a/src/eu/telecom_bretagne/praxis/core/execution/SimpleWorkflowPlanner.java +++ b/src/eu/telecom_bretagne/praxis/core/execution/SimpleWorkflowPlanner.java @@ -6,6 +6,9 @@ package eu.telecom_bretagne.praxis.core.execution; import java.util.ArrayList; +import eu.telecom_bretagne.praxis.core.execution.ExecutionSet.ProgramInfo; +import eu.telecom_bretagne.praxis.core.workflow.Program; + /** * @author Sébastien Bigaret * @@ -111,22 +114,41 @@ public class SimpleWorkflowPlanner private void decideExecSet(Activity activity) { boolean mergeAll = true; - for (Activity predecessor: activity.getPredecessors()) + java.util.Set <Activity> predecessors = activity.getPredecessors(); + for (Activity predecessor: predecessors) { if (predecessor.getPlatform() != activity.getPlatform()) mergeAll = false; } final Process process = activity.getContainer(); - if (mergeAll) + if (!mergeAll) + return; + + // determine the max sequence in predecessors + int max = 0; + for (Activity predecessor: predecessors) { - // all predecessors are assigned to the same platform as activity: merge them - for (Activity predecessor: activity.getPredecessors()) + final ExecutionSet pred_set = predecessor.getExecutionSet(); + for (Program p: pred_set) { - activity = process.merge(activity, predecessor); + final ProgramInfo pred_info = pred_set.getInfoForProgram(p); + if ( pred_info.sequence > max ) + max = pred_info.sequence; } } + // add (max+1) to all our sequence + final ExecutionSet activity_set = activity.getExecutionSet(); + for (Program p: activity_set) + { + activity_set.getInfoForProgram(p).sequence += max+1; + } + // Finally, assign all predecessors to the same platform as activity: merge them + for (Activity predecessor: predecessors) + { + activity = process.merge(activity, predecessor); + } } } diff --git a/src/eu/telecom_bretagne/praxis/server/execution/SimpleFormatterPlatform.java b/src/eu/telecom_bretagne/praxis/server/execution/SimpleFormatterPlatform.java index 68b691f29b7c7416a9fac8ce4694dda6e21bc3dc..f557c12dd0074ecdd0d8e008c602fefca49d159b 100644 --- a/src/eu/telecom_bretagne/praxis/server/execution/SimpleFormatterPlatform.java +++ b/src/eu/telecom_bretagne/praxis/server/execution/SimpleFormatterPlatform.java @@ -148,7 +148,7 @@ public abstract class SimpleFormatterPlatform { identifyInputFiles(activity); // should be done before the script is generated: getCode_output() relies on it identifyOutputFiles(activity); - for (Program program: activity.getExecutionSet()) + for (Program program: activity.getExecutionSet().programsSortedForExecution()) { generateScript(program); } diff --git a/src/eu/telecom_bretagne/praxis/server/execution/platform/PlatformDescription.java b/src/eu/telecom_bretagne/praxis/server/execution/platform/PlatformDescription.java index 24376bb136bb73309eea967b4426cc955ed5a383..401661a8a1fa61080766426acc34cc1adb9ee5ef 100644 --- a/src/eu/telecom_bretagne/praxis/server/execution/platform/PlatformDescription.java +++ b/src/eu/telecom_bretagne/praxis/server/execution/platform/PlatformDescription.java @@ -146,7 +146,7 @@ public class PlatformDescription } - /** The type of a platform is sued to determine its execution engine. Example: shell, SGE */ + /** The type of a platform is used to determine its execution engine. Example: shell, SGE */ private String type; /** The name of the operating system on which the platform runs */ @@ -354,12 +354,13 @@ public class PlatformDescription // TODO remplacer ça: getInputFiles() renvoie des infos qu'il n'a pas à avoir // en particulier le nom des fichiers de sortie des progs précédents! - final Program program = activity.getExecutionSet().iterator().next(); + final ProcessContext pContext = activity.getContainer().getContext(); + for (Program program: activity.getExecutionSet().programsSortedForExecution()) + { final String prgID = program.getProgramID(); final ArrayList<File> missingFiles = new ArrayList<File>(); final ArrayList<String> missingOutputs = new ArrayList<String>(); final Result currentResult = activity.getContainer().getCurrentExecutionSummary(); - final ProcessContext pContext = activity.getContainer().getContext(); for (Parameter input: program.getInputParameters()) { if (!input.isActivated()) @@ -367,6 +368,8 @@ public class PlatformDescription // at this point we know they are all connected, this has been checked in prepareForExecution(), above if (input.getPrgInput()!=null) { + if (activity.getExecutionSet().contains(input.getPrgInput().getProgram())) + continue; final Parameter predecessorOutputParam = input.getPrgInput(); // search the corresponding file generated by the preceding program String prgInputId = predecessorOutputParam.getDescription().getId(); @@ -420,12 +423,12 @@ public class PlatformDescription Log.log.severe(msg.toString()); throw new CannotExecuteException(msg.toString()); } - + } // compress... File zipFile = null; if (!filesNeededForExec.isEmpty()) { - zipFile = new File(pContext.workingDirectory, "script-" + prgID + ".zip"); + zipFile = new File(pContext.workingDirectory, "script-" + activity.getKey() + ".zip"); try { Utile.zipFiles(filesNeededForExec, zipFile);