From 71070d7d55689f19319aabff4da6c0404f180cd1 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=A9bastien=20Bigaret?=
 <sebastien.bigaret@telecom-bretagne.eu>
Date: Fri, 23 Sep 2011 17:35:26 +0200
Subject: [PATCH] Platforms now sends fine-grained execution progress.

Progress was previously communicated on the basis of execution sets;
the grain is now quite finer since the execution of every single
programs is notified (both when it begins or ends).

This is done with the help of:
- a new ProgramStatus.Type: WAITING,
- a new event that can be send by platforms to server:
  EXECUTION_PROGRESS
- an ExecutionProgressMonitor specially designed for these events.

This applies to every deploiement configurations, from simple,
standalone ones to configurations embedding the specific
PlatformToClientBridge (which allows local executions along with
distant ones).
---
 .../common/events/PlatformToServerEvent.java  |  4 +-
 .../praxis/core/execution/ExecutionLoop.java  | 22 +++++--
 .../praxis/core/execution/Process.java        | 20 +++----
 .../praxis/core/execution/Result.java         | 12 +++-
 .../praxis/platform/AbstractPlatform.java     | 19 ++++--
 .../platform/PlatformToClientBridge.java      |  8 ++-
 .../platform/execution/PlatformExecution.java | 15 ++++-
 .../praxis/server/Server.java                 | 14 ++++-
 .../server/execution/ExecutionEngine.java     | 58 ++++++++++++++++++-
 .../platform/ProgramExecutionEngine.java      |  4 ++
 .../platform/SGEExecutionEngine.java          |  4 ++
 .../platform/ShellExecutionEngine.java        |  7 ++-
 12 files changed, 152 insertions(+), 35 deletions(-)

diff --git a/src/eu/telecom_bretagne/praxis/common/events/PlatformToServerEvent.java b/src/eu/telecom_bretagne/praxis/common/events/PlatformToServerEvent.java
index c94ad402..52e86f9f 100644
--- a/src/eu/telecom_bretagne/praxis/common/events/PlatformToServerEvent.java
+++ b/src/eu/telecom_bretagne/praxis/common/events/PlatformToServerEvent.java
@@ -31,11 +31,12 @@ public class PlatformToServerEvent extends Event
 	{
 		public void platformSendsConfiguration(Event event);
 		public void platformSendsResourcesAvailabilityUpdates(Event event);
+		public void platformSendsExecutionProgress(Event event);
 		public void platformSendsResults(Event event);
 		public void platformLogout(Event event);
 	}
 	
-	public static enum Type { CONFIGURATION, AVAILABLE_RESOURCES, END_OF_EXECUTION, LOGOUT }
+	public static enum Type { CONFIGURATION, AVAILABLE_RESOURCES, EXECUTION_PROGRESS, END_OF_EXECUTION, LOGOUT }
 	
 	Type type;
 	
@@ -60,6 +61,7 @@ public class PlatformToServerEvent extends Event
 		{
 			case CONFIGURATION:       l.platformSendsConfiguration(this); break;
 			case AVAILABLE_RESOURCES: l.platformSendsResourcesAvailabilityUpdates(this); break;
+			case EXECUTION_PROGRESS:  l.platformSendsExecutionProgress(this); break;
 			case END_OF_EXECUTION:    l.platformSendsResults(this); break;
 			case LOGOUT:              l.platformLogout(this); break;
 		}
diff --git a/src/eu/telecom_bretagne/praxis/core/execution/ExecutionLoop.java b/src/eu/telecom_bretagne/praxis/core/execution/ExecutionLoop.java
index a04970c4..d758da31 100644
--- a/src/eu/telecom_bretagne/praxis/core/execution/ExecutionLoop.java
+++ b/src/eu/telecom_bretagne/praxis/core/execution/ExecutionLoop.java
@@ -221,16 +221,20 @@ public class ExecutionLoop implements Runnable, Serializable
 			// time to execute the activity
 
 			// mark start
-			Result result = a.getContainer().result;
+			Result processResult = a.getContainer().result;
+			Result result = new Result(processResult.workflowID, processResult.executionID);
 			for (Program prg: a.getExecutionSet())
 			{
 				ProgramResult pResult = result.getProgramInfo(prg.getProgramID(), true);
-				pResult.status = ProgramStatus.RUNNING;
+				pResult.status = ProgramStatus.WAITING;
 				pResult.start = new java.util.Date();
+				processResult.mergeWith(result, false);
+				processResult.getProgramInfo(prg.getProgramID()).start = pResult.start;
 			}
-			Serveur.resultStore().updateResult(result);
+			result.setStatus(Status.RUNNING);
+			Serveur.resultStore().updateResult(processResult);
 			
-			a.setState(Status.RUNNING);
+			a.setResult(result);
 			Log.log.info("Executing activity: "+a);
 			PlatformDescription.executeActivity(a);
 		}
@@ -278,8 +282,14 @@ public class ExecutionLoop implements Runnable, Serializable
 				Log.log.severe("No activity for result: dropping it: "+result);
 				continue;
 			}
-			activity.setResult(result);
-			
+			activity.setResult(result); // this takes care of informing the requester (client) if appropriate
+
+			// For the moment being, if we received an result indicating the progress of an execution WITHIN an
+			// execution set, we do not examine it and we stop here.
+			// (sent by platform in an event w/ type=EXECUTION_PROGRESS
+			if (result.getStatus().equals(Result.Status.RUNNING))
+				break;
+
 			// mark end
 			for (Program prg: activity.getExecutionSet())
 			{
diff --git a/src/eu/telecom_bretagne/praxis/core/execution/Process.java b/src/eu/telecom_bretagne/praxis/core/execution/Process.java
index eb395df5..8384fa31 100644
--- a/src/eu/telecom_bretagne/praxis/core/execution/Process.java
+++ b/src/eu/telecom_bretagne/praxis/core/execution/Process.java
@@ -205,7 +205,7 @@ public class Process
 	/**
 	 * Tells whether a process is closed (i.e. its execution is finished).  A process is closed if all its activities
 	 * have a closed status.
-	 * @return {@code true} if the process is close, {@code false} otherwise.
+	 * @return {@code true} if the process is closed, {@code false} otherwise.
 	 * @see Result.Status#isClosed()
 	 * @see Activity#getState()
 	 */
@@ -330,19 +330,15 @@ public class Process
 			return summary;
 		}
 		// If this.isClosed() is false,
-		// we store the current status to be able to restore it before returning the result object: 
-		// Result.mergeWith() is intended to be used when everything is finished so it updates the status
-		// and this definitely not what we want here: e.g. if the process is running, we want to return
-		// a Result object saying that it is running.
-		Status currentStatus = null;
-		if (!this.isClosed())
-			currentStatus = summary.getStatus();
+		// we leave the current status unchanged: 
+		// if the process is running, we want to return a Result object saying that it is running.
+		boolean isClosed = this.isClosed();
 		for (Activity activity: activities)
 		{
-			if (activity.getState().isClosed())
-				summary.mergeWith(activity.getResult());
+			if (activity.getResult() != null)
+				summary.mergeWith(activity.getResult(), isClosed);
 			else
-			{ // Only applies to situations where this.isClosed()==false
+			{ // Only applies to situations where all activities have not yet received a valid Result object
 				for (Program prg: activity.getExecutionSet())
 				{
 					String prgID = prg.getProgramID();
@@ -350,8 +346,6 @@ public class Process
 				}
 			}
 		}
-		if (currentStatus!=null)
-			summary.setStatus(currentStatus);
 		return summary;
 	}
 
diff --git a/src/eu/telecom_bretagne/praxis/core/execution/Result.java b/src/eu/telecom_bretagne/praxis/core/execution/Result.java
index 979fa6e0..28c3677a 100644
--- a/src/eu/telecom_bretagne/praxis/core/execution/Result.java
+++ b/src/eu/telecom_bretagne/praxis/core/execution/Result.java
@@ -251,11 +251,16 @@ public class Result
 	 *             when <code>aResult.status==NOT_STARTED</code> or when the two objects do not have the same workflow id
 	 */
 	public synchronized void mergeWith(Result aResult)
+	{
+		mergeWith(aResult, true);
+	}
+	
+	public synchronized void mergeWith(Result aResult, boolean updateStatus)
 	{
 		if (!aResult.workflowID.equals(this.workflowID))
 			throw new IllegalArgumentException("Cannot merge two results with a different workflowID");
-		if (aResult.status == Status.NOT_STARTED)
-			throw new IllegalArgumentException("Cannot merge a result with an UNSET status");
+		if (updateStatus && aResult.status == Status.NOT_STARTED)
+			throw new IllegalArgumentException("Cannot merge a result with an NOT_STARTED status");
 		
 		for (ProgramResult prgInfo: aResult.programs.values())
 		{
@@ -264,6 +269,9 @@ public class Result
 		
 		this.date = new Date();
 		
+		if (!updateStatus)
+			return;
+
 		if (this.status == Status.NOT_STARTED || this.status == Status.RUNNING)
 		{
 			this.status = aResult.status;
diff --git a/src/eu/telecom_bretagne/praxis/platform/AbstractPlatform.java b/src/eu/telecom_bretagne/praxis/platform/AbstractPlatform.java
index cb724989..0a0ddc95 100644
--- a/src/eu/telecom_bretagne/praxis/platform/AbstractPlatform.java
+++ b/src/eu/telecom_bretagne/praxis/platform/AbstractPlatform.java
@@ -1,9 +1,7 @@
 /* LICENSE */
 package eu.telecom_bretagne.praxis.platform;
 
-import static eu.telecom_bretagne.praxis.common.events.PlatformToServerEvent.Type.AVAILABLE_RESOURCES;
-import static eu.telecom_bretagne.praxis.common.events.PlatformToServerEvent.Type.CONFIGURATION;
-import static eu.telecom_bretagne.praxis.common.events.PlatformToServerEvent.Type.END_OF_EXECUTION;
+import static eu.telecom_bretagne.praxis.common.events.PlatformToServerEvent.Type.*;
 
 import java.io.InputStream;
 import java.util.ArrayList;
@@ -193,8 +191,19 @@ public abstract class AbstractPlatform implements ServerToPlatformEventListener
 		cnx.send(event);
 	}
 	
-	public void sendResults(Result result) {
-		PlatformToServerEvent event = new PlatformToServerEvent(END_OF_EXECUTION);
+	public void sendProgress(Result result)
+	{
+		sendResult(EXECUTION_PROGRESS, result);
+	}
+
+	public void sendResults(Result result)
+	{
+		sendResult(END_OF_EXECUTION, result);
+	}
+
+	private void sendResult(PlatformToServerEvent.Type eventType, Result result)
+	{
+		PlatformToServerEvent event = new PlatformToServerEvent(eventType);
 		event.data = result;
 		event.file_conveyor=result.zipFile();
 		result.readyToSerialize();
diff --git a/src/eu/telecom_bretagne/praxis/platform/PlatformToClientBridge.java b/src/eu/telecom_bretagne/praxis/platform/PlatformToClientBridge.java
index 2a22ebfd..b898a9ce 100644
--- a/src/eu/telecom_bretagne/praxis/platform/PlatformToClientBridge.java
+++ b/src/eu/telecom_bretagne/praxis/platform/PlatformToClientBridge.java
@@ -131,7 +131,13 @@ public class PlatformToClientBridge
 		}
 		if (result.getStatus().isClosed())
 			client.requestResult(result);
-		// TODO else inform the server? / the client
+		else
+		{
+			// Communicate the update using the original workflow ID
+			Result _result = new Result(result);
+			_result.setWorkflowXML((WorkflowID) result.data, result.getWorkflowXML());
+			sendProgress(_result);
+		}
 	}
 
 	@Override
diff --git a/src/eu/telecom_bretagne/praxis/platform/execution/PlatformExecution.java b/src/eu/telecom_bretagne/praxis/platform/execution/PlatformExecution.java
index b15e9d1d..76f51a6d 100644
--- a/src/eu/telecom_bretagne/praxis/platform/execution/PlatformExecution.java
+++ b/src/eu/telecom_bretagne/praxis/platform/execution/PlatformExecution.java
@@ -15,7 +15,7 @@ import eu.telecom_bretagne.praxis.server.execution.ExecutionEngine;
 public class PlatformExecution
     extends Thread
 {
-	protected Platform platform;
+	protected final Platform platform;
 	protected File infiles;
 	protected File executionDirectory;
 	protected ExecutionEngine executionEngine;
@@ -42,6 +42,15 @@ public class PlatformExecution
 	public void run()
 	{
 		Result result = null;
+		executionEngine.setProgressMonitor(new ExecutionEngine.ExecutionProgressMonitor()
+		{
+			@Override
+			public void setProgress(Result result)
+			{
+				platform.sendProgress(result);
+			}
+		});
+
 		try
 		{
 			result = executionEngine.execute(executionDirectory, infiles);
@@ -58,6 +67,10 @@ public class PlatformExecution
                         e);
             e.printStackTrace();
         }
+        finally
+        {
+        	executionEngine.setProgressMonitor(null);
+        }
         // if not interrupted AND results!=null ... TODO
         if (platform.executesWorkflow(executionDirectory) && result != null)        
         	platform.sendResults(result);
diff --git a/src/eu/telecom_bretagne/praxis/server/Server.java b/src/eu/telecom_bretagne/praxis/server/Server.java
index be380383..d87dcf69 100644
--- a/src/eu/telecom_bretagne/praxis/server/Server.java
+++ b/src/eu/telecom_bretagne/praxis/server/Server.java
@@ -172,6 +172,12 @@ implements PlatformToServerEventListener, ClientToServerEventListener
         platform.updateResourcesAvailability(map);
     }
 
+    @Override
+    public void platformSendsExecutionProgress(Event event)
+    {
+    	platformSendsResults(event);
+    }
+
     @Override
     public void platformSendsResults(Event event)
     {
@@ -188,8 +194,12 @@ implements PlatformToServerEventListener, ClientToServerEventListener
     		Log.log.severe("Received result is not known to the execution loop, dropping it: "+result);
     		return;
     	}
-    	Utile.unzip(event.file_conveyor, process.getContext().resultsDirectory());
-        event.file_conveyor.delete();
+    	if (event.file_conveyor != null)
+    	{
+    		// if null, the platform is just updating the status of the execution
+    		Utile.unzip(event.file_conveyor, process.getContext().resultsDirectory());
+    		event.file_conveyor.delete();
+    	}
         ExecutionLoop.executionLoop.add(result);
     }
 
diff --git a/src/eu/telecom_bretagne/praxis/server/execution/ExecutionEngine.java b/src/eu/telecom_bretagne/praxis/server/execution/ExecutionEngine.java
index a6526e9e..4f6c2c93 100644
--- a/src/eu/telecom_bretagne/praxis/server/execution/ExecutionEngine.java
+++ b/src/eu/telecom_bretagne/praxis/server/execution/ExecutionEngine.java
@@ -11,6 +11,8 @@ import eu.telecom_bretagne.praxis.common.Utile;
 import eu.telecom_bretagne.praxis.core.execution.Activity;
 import eu.telecom_bretagne.praxis.core.execution.ExecutionID;
 import eu.telecom_bretagne.praxis.core.execution.Result;
+import eu.telecom_bretagne.praxis.core.execution.ProgramResult.ProgramStatus;
+import eu.telecom_bretagne.praxis.core.workflow.Program;
 import eu.telecom_bretagne.praxis.core.workflow.WorkflowID;
 import eu.telecom_bretagne.praxis.server.execution.platform.PlatformDescription;
 import eu.telecom_bretagne.praxis.server.execution.platform.ProgramExecutionEngine;
@@ -38,6 +40,31 @@ public abstract class ExecutionEngine
 		public abstract ExecutionEngine getEngine(WorkflowID workflowID, ExecutionID executionID);
 	}
 	
+	public static abstract class ExecutionProgressMonitor
+	{
+		/**
+		 * Sets the progress of an execution.
+		 * @param result
+		 *            the result object holding information on the progress of
+		 *            the execution
+		 */
+		public abstract void setProgress(final Result result);
+
+		/**
+		 * {@link #setProgress(Result) Sets progress} and treats {@code result} as if its status is
+		 * {@link Result.Status#RUNNING RUNNING}.  A caller uses this when it builds a Result object that will be
+		 * ultimately sent as the final one (i.e. merging partial results with OK, WARNING and ERROR status during
+		 * execution), and that it wishes to communicate as execution progress.
+		 * @param result the result to set.
+		 */
+		public void setRunningExecutionProgress(final Result result)
+		{
+			Result running = new Result(result);
+			running.setStatus(Result.Status.RUNNING);
+			setProgress(running);
+		}
+	}
+
 	/**
 	 * Should be implemented by any concrete factory for {@link PlatformDescription platform visitors}. Concrete
 	 * PlatformVisitors should register themselves in the factories' repositories using
@@ -71,7 +98,13 @@ public abstract class ExecutionEngine
 	 * Note: this is a {@link HashTable} so that keys and values cannot be <code>null</code>
 	 */
 	private static Map<String, ExecutionEngineConfigurationFactory> factories = null;
-	
+
+	/** An executionprogress monitor doing nothing whose operations are no-op */
+	public static final ExecutionProgressMonitor noopProgressMonitor = new ExecutionProgressMonitor()
+	{
+		@Override public void setProgress(Result result) { }
+	};
+
 	/**
 	 * Registers the default factories shipped with praxis, namely:
 	 * <ul>
@@ -132,6 +165,8 @@ public abstract class ExecutionEngine
 	
 	protected ExecutionEngineConfiguration configuration;
 	
+	private ExecutionProgressMonitor       progressMonitor;
+	
 	protected ExecutionEngine(ExecutionEngineConfiguration configuration, WorkflowID workflowID, ExecutionID executionID)
 	{
 		this.configuration = configuration;
@@ -202,7 +237,9 @@ public abstract class ExecutionEngine
 		if (activity.hasDisconnectedInputFiles())
 			throw new CannotExecuteException("Program could not be executed: a program's input is not connected\n"+//I18N
 			                                 "Impossible d'exécuter le programme: une entrée de programme n'est pas connectée");
-		
+		result.setStatus(Result.Status.RUNNING);
+		for (Program prg: activity.getExecutionSet())
+			result.setExecStatus(prg.getProgramID(), ProgramStatus.WAITING);
 	}
 	
 	/**
@@ -230,13 +267,28 @@ public abstract class ExecutionEngine
 		return Utile.createTempDirectory("exec", ".bsx", new File(System.getProperty("java.io.tmpdir")));
 	}
 
+	/**
+	 * Returns the progress monitor. If it is unset, or {@link #setProgressMonitor(ExecutionProgressMonitor) set} to
+	 * {@code null}, the methods returns {@link #noopProgressMonitor the no-op progress monitor}.
+	 * @return the current progress monitor.
+	 */
+	public synchronized ExecutionProgressMonitor getProgressMonitor()
+	{
+		return progressMonitor != null ? progressMonitor : noopProgressMonitor;
+	}
+
 	public void setKey(String key)
 	{
 		result.key = key;
 	}
-	
+
 	public Result getResult()
 	{
 		return result;
 	}
+
+	public synchronized void setProgressMonitor(ExecutionProgressMonitor progressMonitor)
+	{
+		this.progressMonitor = progressMonitor;
+	}
 }
diff --git a/src/eu/telecom_bretagne/praxis/server/execution/platform/ProgramExecutionEngine.java b/src/eu/telecom_bretagne/praxis/server/execution/platform/ProgramExecutionEngine.java
index 4a041249..603fd9e3 100644
--- a/src/eu/telecom_bretagne/praxis/server/execution/platform/ProgramExecutionEngine.java
+++ b/src/eu/telecom_bretagne/praxis/server/execution/platform/ProgramExecutionEngine.java
@@ -178,6 +178,9 @@ public class ProgramExecutionEngine
 		
 		for (String prgID: scripts_for_prg.keySet())
 		{
+			result.setExecStatus(prgID, ProgramResult.ProgramStatus.RUNNING);
+			getProgressMonitor().setRunningExecutionProgress(result);
+
 			Result prgResult = new Result(result.workflowID(), null);
 			
 			/* move input files */
@@ -257,6 +260,7 @@ public class ProgramExecutionEngine
 				if (f.isFile() && ( ! "script.txt".equals(f.getName()) ) && (!sentFiles.contains(f.getName())))
 					Utile.renameFile(f, new File(new File(executionDirectory, prgID), f.getName())); // TODO set WARNING state if it returns false?
 			result.mergeWith(prgResult);
+			getProgressMonitor().setRunningExecutionProgress(result);
 			prgResult.dumpContent(executionDirectory);
 		}
 		
diff --git a/src/eu/telecom_bretagne/praxis/server/execution/platform/SGEExecutionEngine.java b/src/eu/telecom_bretagne/praxis/server/execution/platform/SGEExecutionEngine.java
index 9038462f..f2898999 100644
--- a/src/eu/telecom_bretagne/praxis/server/execution/platform/SGEExecutionEngine.java
+++ b/src/eu/telecom_bretagne/praxis/server/execution/platform/SGEExecutionEngine.java
@@ -247,6 +247,9 @@ public class SGEExecutionEngine
 		
 		for (String prgID: scripts_for_prg.keySet())
 		{
+			result.setExecStatus(prgID, ProgramResult.ProgramStatus.RUNNING);
+			getProgressMonitor().setRunningExecutionProgress(result);
+
 			Result prgResult = new Result(result.workflowID(), null);
 			
 			/* Write the script */
@@ -319,6 +322,7 @@ public class SGEExecutionEngine
 				if (f.isFile() && ( ! "script.txt".equals(f.getName()) ) && (!sentFiles.contains(f.getName())))
 					Utile.renameFile(f, new File(new File(executionDirectory, prgID), f.getName())); // TODO set WARNING state if it returns false?
 			result.mergeWith(prgResult);
+			getProgressMonitor().setRunningExecutionProgress(result);
 			prgResult.dumpContent(executionDirectory);
 		}
 		
diff --git a/src/eu/telecom_bretagne/praxis/server/execution/platform/ShellExecutionEngine.java b/src/eu/telecom_bretagne/praxis/server/execution/platform/ShellExecutionEngine.java
index 7851ddd0..72bba1bc 100644
--- a/src/eu/telecom_bretagne/praxis/server/execution/platform/ShellExecutionEngine.java
+++ b/src/eu/telecom_bretagne/praxis/server/execution/platform/ShellExecutionEngine.java
@@ -15,6 +15,7 @@ import eu.telecom_bretagne.praxis.core.execution.Activity;
 import eu.telecom_bretagne.praxis.core.execution.ExecutionID;
 import eu.telecom_bretagne.praxis.core.execution.ProgramResult;
 import eu.telecom_bretagne.praxis.core.execution.Result;
+import eu.telecom_bretagne.praxis.core.execution.Result.Status;
 import eu.telecom_bretagne.praxis.core.workflow.Program;
 import eu.telecom_bretagne.praxis.core.workflow.WorkflowID;
 import eu.telecom_bretagne.praxis.server.execution.ExecutionEngine;
@@ -191,8 +192,11 @@ public class ShellExecutionEngine
 		
 		for (String prgID: scripts_for_prg.keySet())
 		{
+			result.setExecStatus(prgID, ProgramResult.ProgramStatus.RUNNING);
+			getProgressMonitor().setRunningExecutionProgress(result);
+
 			Result prgResult = new Result(result.workflowID(), null);
-			
+
 			/* dos2unix = Runtime.getRuntime().exec("dos2unix "+scriptFile); */
 			File scriptFile = new File(executionDirectory, "script.txt");
 			try
@@ -256,6 +260,7 @@ public class ShellExecutionEngine
 				if (f.isFile() && ( ! "script.txt".equals(f.getName()) ) && (!sentFiles.contains(f.getName())))
 					Utile.renameFile(f, new File(new File(executionDirectory, prgID), f.getName())); // TODO set WARNING state if it returns false?
 			result.mergeWith(prgResult);
+			getProgressMonitor().setRunningExecutionProgress(result);
 			prgResult.dumpContent(executionDirectory);
 		}
 		
-- 
GitLab