From ac746db0caaf061a3b9588e2f525387fb2977993 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=A9bastien=20Bigaret?=
 <sebastien.bigaret@telecom-bretagne.eu>
Date: Thu, 15 Sep 2011 17:02:25 +0200
Subject: [PATCH] Execution of execution sets is now fully functional.

However, the server is currently not notified after every program is
successfully executed; it only gets an execution set's results (status
and files) after the whole execution set is executed.

This means two things:
1. the client does not show execution progress whithin an execution set
   anymore,
2. the exec. loop cannot execute an execution set S2 depending on some
   results in an other one (S1) before the latter (S1) is finished,
   even if the results the former set (S2) is depending on are produced
   in early steps during the execution of S1.

Point 1. will be fixed soon.

Point 2. can happen due to the way the SimpleWorkflowPlanner builds
execution sets.  Solving it means that the platform must be able to
send partial results to the server.
---
 .../praxis/core/execution/Activity.java       |  2 +-
 .../praxis/core/execution/ExecutionSet.java   | 28 ++++++++++++++--
 .../core/execution/SimpleWorkflowPlanner.java | 32 ++++++++++++++++---
 .../execution/SimpleFormatterPlatform.java    |  2 +-
 .../platform/PlatformDescription.java         | 13 +++++---
 5 files changed, 62 insertions(+), 15 deletions(-)

diff --git a/src/eu/telecom_bretagne/praxis/core/execution/Activity.java b/src/eu/telecom_bretagne/praxis/core/execution/Activity.java
index c694a3a8..135954d7 100644
--- a/src/eu/telecom_bretagne/praxis/core/execution/Activity.java
+++ b/src/eu/telecom_bretagne/praxis/core/execution/Activity.java
@@ -30,7 +30,7 @@ public class Activity
 	private static synchronized String nextKey()
 	{
 		// let's assume we won't have 
-		return new java.text.SimpleDateFormat("yyyyMMdd-HHmmssSSS/").format(java.util.Calendar.getInstance()
+		return new java.text.SimpleDateFormat("yyyyMMdd-HHmmssSSS_").format(java.util.Calendar.getInstance()
 		        .getTime())+(++seq).toString();
 	}
 	
diff --git a/src/eu/telecom_bretagne/praxis/core/execution/ExecutionSet.java b/src/eu/telecom_bretagne/praxis/core/execution/ExecutionSet.java
index 4fe1d7e0..5561901c 100644
--- a/src/eu/telecom_bretagne/praxis/core/execution/ExecutionSet.java
+++ b/src/eu/telecom_bretagne/praxis/core/execution/ExecutionSet.java
@@ -4,7 +4,6 @@
 package eu.telecom_bretagne.praxis.core.execution;
 
 import java.util.AbstractSet;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 
@@ -21,7 +20,13 @@ public class ExecutionSet extends AbstractSet<Program> implements java.io.Serial
 
 	public static class ProgramInfo
 	{
-		int sequence;		
+		/**
+		 * Set by execution planners to indicate in which order program of an execution set should be executed.
+		 * Programs having the same sequence number will be runned independently or one after another, in a
+		 * non-specified order.
+		 * @see ExecutionSet#programsSortedForExecution()
+		 */
+		int sequence = 0;
 	}
 	
 	private HashMap<Program, ProgramInfo> map = new HashMap<Program, ProgramInfo>();
@@ -95,7 +100,7 @@ public class ExecutionSet extends AbstractSet<Program> implements java.io.Serial
 	}
 	
 	@Override
-	public boolean removeAll(Collection< ? > c)
+	public boolean removeAll(java.util.Collection< ? > c)
 	{
 		/*
 		 * Extracted from java.util.AbstractSet
@@ -182,4 +187,21 @@ public class ExecutionSet extends AbstractSet<Program> implements java.io.Serial
             public void remove() { throw new UnsupportedOperationException(); }
 		};
     }
+	
+	public Program[] programsSortedForExecution()
+	{
+		final ExecutionSet this_set = this;
+		Program[] sortedPrograms = map.keySet().toArray(new Program[map.size()]);
+		java.util.Arrays.sort(sortedPrograms,
+		                      new java.util.Comparator<Program>() {
+			@Override
+			public int compare(Program o1, Program o2)
+			{
+				final int seq1 = this_set.getInfoForProgram(o1).sequence;
+				final int seq2 = this_set.getInfoForProgram(o2).sequence;
+				return seq1 > seq2 ? 1 : (seq1 == seq2 ? 0 : -1);
+			}
+		});
+		return sortedPrograms;
+	}
 }
diff --git a/src/eu/telecom_bretagne/praxis/core/execution/SimpleWorkflowPlanner.java b/src/eu/telecom_bretagne/praxis/core/execution/SimpleWorkflowPlanner.java
index caa91a3d..5540421e 100644
--- a/src/eu/telecom_bretagne/praxis/core/execution/SimpleWorkflowPlanner.java
+++ b/src/eu/telecom_bretagne/praxis/core/execution/SimpleWorkflowPlanner.java
@@ -6,6 +6,9 @@ package eu.telecom_bretagne.praxis.core.execution;
 
 import java.util.ArrayList;
 
+import eu.telecom_bretagne.praxis.core.execution.ExecutionSet.ProgramInfo;
+import eu.telecom_bretagne.praxis.core.workflow.Program;
+
 /**
  * @author Sébastien Bigaret
  *
@@ -111,22 +114,41 @@ public class SimpleWorkflowPlanner
 	private void decideExecSet(Activity activity)
 	{
 		boolean mergeAll = true;
-		for (Activity predecessor: activity.getPredecessors())
+		java.util.Set <Activity> predecessors = activity.getPredecessors();
+		for (Activity predecessor: predecessors)
 		{
 			if (predecessor.getPlatform() != activity.getPlatform())
 				mergeAll = false;
 		}
 		
 		final Process process = activity.getContainer();
-		if (mergeAll)
+		if (!mergeAll)
+			return;
+
+		// determine the max sequence in predecessors
+		int max = 0;
+		for  (Activity predecessor: predecessors)
 		{
-			// all predecessors are assigned to the same platform as activity: merge them
-			for (Activity predecessor: activity.getPredecessors())
+			final ExecutionSet pred_set = predecessor.getExecutionSet();
+			for (Program p: pred_set)
 			{
-				activity = process.merge(activity, predecessor);
+				final ProgramInfo pred_info = pred_set.getInfoForProgram(p);
+				if ( pred_info.sequence > max )
+					max = pred_info.sequence;
 			}
 		}
+		// add (max+1) to all our sequence
+		final ExecutionSet activity_set = activity.getExecutionSet();
+		for (Program p: activity_set)
+		{
+			activity_set.getInfoForProgram(p).sequence += max+1;
+		}
 		
+		// Finally, assign all predecessors to the same platform as activity: merge them
+		for (Activity predecessor: predecessors)
+		{
+			activity = process.merge(activity, predecessor);
+		}
 	}
 	
 }
diff --git a/src/eu/telecom_bretagne/praxis/server/execution/SimpleFormatterPlatform.java b/src/eu/telecom_bretagne/praxis/server/execution/SimpleFormatterPlatform.java
index 68b691f2..f557c12d 100644
--- a/src/eu/telecom_bretagne/praxis/server/execution/SimpleFormatterPlatform.java
+++ b/src/eu/telecom_bretagne/praxis/server/execution/SimpleFormatterPlatform.java
@@ -148,7 +148,7 @@ public abstract class SimpleFormatterPlatform
 	{
 		identifyInputFiles(activity); // should be done before the script is generated: getCode_output() relies on it
 		identifyOutputFiles(activity);
-		for (Program program: activity.getExecutionSet())
+		for (Program program: activity.getExecutionSet().programsSortedForExecution())
 		{
 			generateScript(program);
 		}
diff --git a/src/eu/telecom_bretagne/praxis/server/execution/platform/PlatformDescription.java b/src/eu/telecom_bretagne/praxis/server/execution/platform/PlatformDescription.java
index 24376bb1..401661a8 100644
--- a/src/eu/telecom_bretagne/praxis/server/execution/platform/PlatformDescription.java
+++ b/src/eu/telecom_bretagne/praxis/server/execution/platform/PlatformDescription.java
@@ -146,7 +146,7 @@ public class PlatformDescription
 
 	}
 
-	/** The type of a platform is sued to determine its execution engine. Example: shell, SGE */
+	/** The type of a platform is used to determine its execution engine. Example: shell, SGE */
 	private String type;
 
 	/** The name of the operating system on which the platform runs */
@@ -354,12 +354,13 @@ public class PlatformDescription
 		// TODO remplacer ça: getInputFiles() renvoie des infos qu'il n'a pas à avoir
 		// en particulier le nom des fichiers de sortie des progs précédents!
 
-		final Program program = activity.getExecutionSet().iterator().next();
+		final ProcessContext pContext = activity.getContainer().getContext();
+		for (Program program: activity.getExecutionSet().programsSortedForExecution())
+		{
 		final String prgID = program.getProgramID();
 		final ArrayList<File> missingFiles = new ArrayList<File>();
 		final ArrayList<String> missingOutputs = new ArrayList<String>();
 		final Result currentResult = activity.getContainer().getCurrentExecutionSummary();
-		final ProcessContext pContext = activity.getContainer().getContext();
 		for (Parameter input: program.getInputParameters())
 		{
 			if (!input.isActivated())
@@ -367,6 +368,8 @@ public class PlatformDescription
 			// at this point we know they are all connected, this has been checked in prepareForExecution(), above
 			if (input.getPrgInput()!=null)
 			{
+				if (activity.getExecutionSet().contains(input.getPrgInput().getProgram()))
+					continue; 
 				final Parameter predecessorOutputParam = input.getPrgInput();
 				// search the corresponding file generated by the preceding program
 				String prgInputId = predecessorOutputParam.getDescription().getId();
@@ -420,12 +423,12 @@ public class PlatformDescription
 			Log.log.severe(msg.toString());
 			throw new CannotExecuteException(msg.toString());
 		}
-		
+		}
 		// compress...
 		File zipFile = null;
 		if (!filesNeededForExec.isEmpty())
 		{
-			zipFile = new File(pContext.workingDirectory, "script-" + prgID + ".zip");
+			zipFile = new File(pContext.workingDirectory, "script-" + activity.getKey() + ".zip");
 			try
 			{
 				Utile.zipFiles(filesNeededForExec, zipFile);
-- 
GitLab