Skip to content
Snippets Groups Projects
Commit ac746db0 authored by BIGARET Sebastien's avatar BIGARET Sebastien
Browse files

Execution of execution sets is now fully functional.

However, the server is currently not notified after every program is
successfully executed; it only gets an execution set's results (status
and files) after the whole execution set is executed.

This means two things:
1. the client does not show execution progress whithin an execution set
   anymore,
2. the exec. loop cannot execute an execution set S2 depending on some
   results in an other one (S1) before the latter (S1) is finished,
   even if the results the former set (S2) is depending on are produced
   in early steps during the execution of S1.

Point 1. will be fixed soon.

Point 2. can happen due to the way the SimpleWorkflowPlanner builds
execution sets.  Solving it means that the platform must be able to
send partial results to the server.
parent a2663330
No related branches found
No related tags found
No related merge requests found
Pipeline #68 failed
...@@ -30,7 +30,7 @@ public class Activity ...@@ -30,7 +30,7 @@ public class Activity
private static synchronized String nextKey() private static synchronized String nextKey()
{ {
// let's assume we won't have // let's assume we won't have
return new java.text.SimpleDateFormat("yyyyMMdd-HHmmssSSS/").format(java.util.Calendar.getInstance() return new java.text.SimpleDateFormat("yyyyMMdd-HHmmssSSS_").format(java.util.Calendar.getInstance()
.getTime())+(++seq).toString(); .getTime())+(++seq).toString();
} }
......
...@@ -4,7 +4,6 @@ ...@@ -4,7 +4,6 @@
package eu.telecom_bretagne.praxis.core.execution; package eu.telecom_bretagne.praxis.core.execution;
import java.util.AbstractSet; import java.util.AbstractSet;
import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
...@@ -21,7 +20,13 @@ public class ExecutionSet extends AbstractSet<Program> implements java.io.Serial ...@@ -21,7 +20,13 @@ public class ExecutionSet extends AbstractSet<Program> implements java.io.Serial
public static class ProgramInfo public static class ProgramInfo
{ {
int sequence; /**
* Set by execution planners to indicate in which order program of an execution set should be executed.
* Programs having the same sequence number will be runned independently or one after another, in a
* non-specified order.
* @see ExecutionSet#programsSortedForExecution()
*/
int sequence = 0;
} }
private HashMap<Program, ProgramInfo> map = new HashMap<Program, ProgramInfo>(); private HashMap<Program, ProgramInfo> map = new HashMap<Program, ProgramInfo>();
...@@ -95,7 +100,7 @@ public class ExecutionSet extends AbstractSet<Program> implements java.io.Serial ...@@ -95,7 +100,7 @@ public class ExecutionSet extends AbstractSet<Program> implements java.io.Serial
} }
@Override @Override
public boolean removeAll(Collection< ? > c) public boolean removeAll(java.util.Collection< ? > c)
{ {
/* /*
* Extracted from java.util.AbstractSet * Extracted from java.util.AbstractSet
...@@ -182,4 +187,21 @@ public class ExecutionSet extends AbstractSet<Program> implements java.io.Serial ...@@ -182,4 +187,21 @@ public class ExecutionSet extends AbstractSet<Program> implements java.io.Serial
public void remove() { throw new UnsupportedOperationException(); } public void remove() { throw new UnsupportedOperationException(); }
}; };
} }
public Program[] programsSortedForExecution()
{
final ExecutionSet this_set = this;
Program[] sortedPrograms = map.keySet().toArray(new Program[map.size()]);
java.util.Arrays.sort(sortedPrograms,
new java.util.Comparator<Program>() {
@Override
public int compare(Program o1, Program o2)
{
final int seq1 = this_set.getInfoForProgram(o1).sequence;
final int seq2 = this_set.getInfoForProgram(o2).sequence;
return seq1 > seq2 ? 1 : (seq1 == seq2 ? 0 : -1);
}
});
return sortedPrograms;
}
} }
...@@ -6,6 +6,9 @@ package eu.telecom_bretagne.praxis.core.execution; ...@@ -6,6 +6,9 @@ package eu.telecom_bretagne.praxis.core.execution;
import java.util.ArrayList; import java.util.ArrayList;
import eu.telecom_bretagne.praxis.core.execution.ExecutionSet.ProgramInfo;
import eu.telecom_bretagne.praxis.core.workflow.Program;
/** /**
* @author Sébastien Bigaret * @author Sébastien Bigaret
* *
...@@ -111,22 +114,41 @@ public class SimpleWorkflowPlanner ...@@ -111,22 +114,41 @@ public class SimpleWorkflowPlanner
private void decideExecSet(Activity activity) private void decideExecSet(Activity activity)
{ {
boolean mergeAll = true; boolean mergeAll = true;
for (Activity predecessor: activity.getPredecessors()) java.util.Set <Activity> predecessors = activity.getPredecessors();
for (Activity predecessor: predecessors)
{ {
if (predecessor.getPlatform() != activity.getPlatform()) if (predecessor.getPlatform() != activity.getPlatform())
mergeAll = false; mergeAll = false;
} }
final Process process = activity.getContainer(); final Process process = activity.getContainer();
if (mergeAll) if (!mergeAll)
return;
// determine the max sequence in predecessors
int max = 0;
for (Activity predecessor: predecessors)
{ {
// all predecessors are assigned to the same platform as activity: merge them final ExecutionSet pred_set = predecessor.getExecutionSet();
for (Activity predecessor: activity.getPredecessors()) for (Program p: pred_set)
{ {
activity = process.merge(activity, predecessor); final ProgramInfo pred_info = pred_set.getInfoForProgram(p);
if ( pred_info.sequence > max )
max = pred_info.sequence;
}
} }
// add (max+1) to all our sequence
final ExecutionSet activity_set = activity.getExecutionSet();
for (Program p: activity_set)
{
activity_set.getInfoForProgram(p).sequence += max+1;
} }
// Finally, assign all predecessors to the same platform as activity: merge them
for (Activity predecessor: predecessors)
{
activity = process.merge(activity, predecessor);
}
} }
} }
...@@ -148,7 +148,7 @@ public abstract class SimpleFormatterPlatform ...@@ -148,7 +148,7 @@ public abstract class SimpleFormatterPlatform
{ {
identifyInputFiles(activity); // should be done before the script is generated: getCode_output() relies on it identifyInputFiles(activity); // should be done before the script is generated: getCode_output() relies on it
identifyOutputFiles(activity); identifyOutputFiles(activity);
for (Program program: activity.getExecutionSet()) for (Program program: activity.getExecutionSet().programsSortedForExecution())
{ {
generateScript(program); generateScript(program);
} }
......
...@@ -146,7 +146,7 @@ public class PlatformDescription ...@@ -146,7 +146,7 @@ public class PlatformDescription
} }
/** The type of a platform is sued to determine its execution engine. Example: shell, SGE */ /** The type of a platform is used to determine its execution engine. Example: shell, SGE */
private String type; private String type;
/** The name of the operating system on which the platform runs */ /** The name of the operating system on which the platform runs */
...@@ -354,12 +354,13 @@ public class PlatformDescription ...@@ -354,12 +354,13 @@ public class PlatformDescription
// TODO remplacer ça: getInputFiles() renvoie des infos qu'il n'a pas à avoir // TODO remplacer ça: getInputFiles() renvoie des infos qu'il n'a pas à avoir
// en particulier le nom des fichiers de sortie des progs précédents! // en particulier le nom des fichiers de sortie des progs précédents!
final Program program = activity.getExecutionSet().iterator().next(); final ProcessContext pContext = activity.getContainer().getContext();
for (Program program: activity.getExecutionSet().programsSortedForExecution())
{
final String prgID = program.getProgramID(); final String prgID = program.getProgramID();
final ArrayList<File> missingFiles = new ArrayList<File>(); final ArrayList<File> missingFiles = new ArrayList<File>();
final ArrayList<String> missingOutputs = new ArrayList<String>(); final ArrayList<String> missingOutputs = new ArrayList<String>();
final Result currentResult = activity.getContainer().getCurrentExecutionSummary(); final Result currentResult = activity.getContainer().getCurrentExecutionSummary();
final ProcessContext pContext = activity.getContainer().getContext();
for (Parameter input: program.getInputParameters()) for (Parameter input: program.getInputParameters())
{ {
if (!input.isActivated()) if (!input.isActivated())
...@@ -367,6 +368,8 @@ public class PlatformDescription ...@@ -367,6 +368,8 @@ public class PlatformDescription
// at this point we know they are all connected, this has been checked in prepareForExecution(), above // at this point we know they are all connected, this has been checked in prepareForExecution(), above
if (input.getPrgInput()!=null) if (input.getPrgInput()!=null)
{ {
if (activity.getExecutionSet().contains(input.getPrgInput().getProgram()))
continue;
final Parameter predecessorOutputParam = input.getPrgInput(); final Parameter predecessorOutputParam = input.getPrgInput();
// search the corresponding file generated by the preceding program // search the corresponding file generated by the preceding program
String prgInputId = predecessorOutputParam.getDescription().getId(); String prgInputId = predecessorOutputParam.getDescription().getId();
...@@ -420,12 +423,12 @@ public class PlatformDescription ...@@ -420,12 +423,12 @@ public class PlatformDescription
Log.log.severe(msg.toString()); Log.log.severe(msg.toString());
throw new CannotExecuteException(msg.toString()); throw new CannotExecuteException(msg.toString());
} }
}
// compress... // compress...
File zipFile = null; File zipFile = null;
if (!filesNeededForExec.isEmpty()) if (!filesNeededForExec.isEmpty())
{ {
zipFile = new File(pContext.workingDirectory, "script-" + prgID + ".zip"); zipFile = new File(pContext.workingDirectory, "script-" + activity.getKey() + ".zip");
try try
{ {
Utile.zipFiles(filesNeededForExec, zipFile); Utile.zipFiles(filesNeededForExec, zipFile);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment