diff --git a/configurations/configuration.defaults b/configurations/configuration.defaults index a54acd32e69a39d2c42a0d1ea649368dd47b74b4..22dd80a3f00c5a4e6e4b951066e07d17c8abc190 100644 --- a/configurations/configuration.defaults +++ b/configurations/configuration.defaults @@ -208,6 +208,9 @@ collect_usage.in_directory= #servers=rmi socket socket-ssl servers= +# The minimum delay a client spends in a call to retrieveEvent() +server.to.client.retrieveEvent.delay=0 +server.to.client.retrieveEvent.delay_INT-CHECK_ # # Platforms diff --git a/configurations/configuration.template b/configurations/configuration.template index 4d7c85f02f598a96740101510597767030ff1118..1e2e15031121cfeaad2320edb1b569c857ffed91 100644 --- a/configurations/configuration.template +++ b/configurations/configuration.template @@ -114,6 +114,8 @@ collect_usage.in_directory= # The rmi registry and the MXBeans won't be launched/deployed if empty servers=rmi socket socket-ssl +# The minimum delay a client spends in a call to retrieveEvent() +server.to.client.retrieveEvent.delay=0 # # Platforms diff --git a/src/eu/telecom_bretagne/praxis/client/Client.java b/src/eu/telecom_bretagne/praxis/client/Client.java index 1a1371dfa8e59507d2d04eb60a7d1a5ff755c126..d8c23008551743c03a8609b5ebf24defd75105a2 100644 --- a/src/eu/telecom_bretagne/praxis/client/Client.java +++ b/src/eu/telecom_bretagne/praxis/client/Client.java @@ -23,6 +23,7 @@ import eu.telecom_bretagne.praxis.common.ReleaseInfo; import eu.telecom_bretagne.praxis.common.Utile; import eu.telecom_bretagne.praxis.common.events.ClientToServerEvent; import eu.telecom_bretagne.praxis.common.events.CommunicationFacade; +import eu.telecom_bretagne.praxis.common.events.CommunicationFacade.Type; import eu.telecom_bretagne.praxis.common.events.Event; import eu.telecom_bretagne.praxis.common.events.ServerToClientEvent; import eu.telecom_bretagne.praxis.common.events.ServerToClientEvent.ServerToClientEventAdapter; @@ -282,7 +283,7 @@ public class Client implements ApplicationListener if (cnx != null) throw new IllegalStateException("Client has already been initialized"); - cnx = CommunicationFacade.buildConnection("Client", server); + cnx = CommunicationFacade.buildConnection(Type.CLIENT_TO_SERVER, "Client", server); cnx.start(); } diff --git a/src/eu/telecom_bretagne/praxis/common/events/CommunicationFacade.java b/src/eu/telecom_bretagne/praxis/common/events/CommunicationFacade.java index 9d8a048dc878c5a7f8a09b05c7961db76456af9d..3b374c890ab0c5aa9c70d2081fd8b5dfb13e69c4 100644 --- a/src/eu/telecom_bretagne/praxis/common/events/CommunicationFacade.java +++ b/src/eu/telecom_bretagne/praxis/common/events/CommunicationFacade.java @@ -17,6 +17,7 @@ import eu.telecom_bretagne.praxis.common.events.Event.IncompatibleListenerExcept public abstract class CommunicationFacade extends Thread { + public static enum Type { CLIENT_TO_SERVER, SERVER_TO_CLIENT, SERVER_TO_PLATFORM, PLATFORM_TO_SERVER }; /** * This interface is used by event senders willing to add some post-processing after the event has been sent. For * example, a sender may use one to delete the event's file after it has been transmitted: <code> @@ -47,7 +48,7 @@ public abstract class CommunicationFacade * @throws IllegalStateException * if {@code method} is invalid, or if the hostname or the port number are required and invalid. */ - public static CommunicationFacade buildConnection(String name, String[] connection) throws IOException + public static CommunicationFacade buildConnection(Type type, String name, String[] connection) throws IOException { String method=connection[0]; String hostname = connection[1]; @@ -85,16 +86,20 @@ public abstract class CommunicationFacade { boolean useHTTP = method.endsWith("http"); // CHECK client had: cnx.setPriority(Thread.MIN_PRIORITY); - return new RMICommunicationFacade(name+" (RMI cnx to "+hostname+":"+port+")", hostname, port, useHTTP); + return new RMICommunicationFacade(type, name+" (RMI cnx to "+hostname+":"+port+")", hostname, port, useHTTP); } else if ("socket".equals(method)) - return new SocketCommunicationFacade(name + "(socket)", address, port); + return new SocketCommunicationFacade(type, name + "(socket)", address, port); else if ("socket-ssl".equals(method) || "sockets".equals(method)) - return new SSLSocketCommunicationFacade(name + " (socket-ssl)", address, port); + return new SSLSocketCommunicationFacade(type, name + " (socket-ssl)", address, port); else if ("direct".equals(method)) { - DirectCommunicationFacade cnx = new DirectCommunicationFacade(name + " (direct)"); - DirectCommunicationFacade.accept(cnx); + DirectCommunicationFacade cnx = new DirectCommunicationFacade(type, name + " (direct)"); + if (Type.CLIENT_TO_SERVER.equals(type)) + DirectCommunicationFacade.accept(Type.SERVER_TO_CLIENT, cnx); + else + DirectCommunicationFacade.accept(Type.SERVER_TO_PLATFORM, cnx); + return cnx; } else @@ -108,13 +113,28 @@ public abstract class CommunicationFacade private boolean already_disconnected = false; + private Type type = null; + + protected synchronized void setType(Type type) + { + if ( this.type != null ) + throw new IllegalStateException("type cannot be reassigned"); + this.type = type; + } + + protected Type getType() + { + return this.type; + } + /** * Creates a new façade with the supplied name. * @param name the name given to this façade, for example: "Client (using socket)" */ - public CommunicationFacade(String name) + public CommunicationFacade(Type type, String name) { super(name); + this.type = type; } /** diff --git a/src/eu/telecom_bretagne/praxis/common/events/DirectCommunicationFacade.java b/src/eu/telecom_bretagne/praxis/common/events/DirectCommunicationFacade.java index c458df43535a9ccfd2df52520c7f8a5872c18f2b..68dea38ef9c6833594b5c0d9a58ea426242d63a2 100644 --- a/src/eu/telecom_bretagne/praxis/common/events/DirectCommunicationFacade.java +++ b/src/eu/telecom_bretagne/praxis/common/events/DirectCommunicationFacade.java @@ -28,18 +28,18 @@ public class DirectCommunicationFacade extends CommunicationFacade */ static {} - public static void accept(DirectCommunicationFacade platformFacade) + public static void accept(Type type, DirectCommunicationFacade platformFacade) { - DirectCommunicationFacade serveur_dcf = new DirectCommunicationFacade("Serveur (direct)"); + DirectCommunicationFacade serveur_dcf = new DirectCommunicationFacade(type, "Serveur (direct)"); new Server(serveur_dcf); serveur_dcf.connect(platformFacade); platformFacade.connect(serveur_dcf); serveur_dcf.start(); } - public DirectCommunicationFacade(String name) + public DirectCommunicationFacade(Type type, String name) { - super(name); + super(type, name); } public synchronized void connect(DirectCommunicationFacade platformFacade) diff --git a/src/eu/telecom_bretagne/praxis/common/events/RMICommunicationFacade.java b/src/eu/telecom_bretagne/praxis/common/events/RMICommunicationFacade.java index 40ecaceecd9c410d5a30f73e83701d2b80d5bdbb..80bbd541d13a0a2c88dc8ac03eb9aaa8df3effee 100644 --- a/src/eu/telecom_bretagne/praxis/common/events/RMICommunicationFacade.java +++ b/src/eu/telecom_bretagne/praxis/common/events/RMICommunicationFacade.java @@ -13,7 +13,7 @@ import java.rmi.server.RMISocketFactory; import java.rmi.server.ServerNotActiveException; import java.rmi.server.UnicastRemoteObject; import java.rmi.server.Unreferenced; -import java.util.ArrayList; +import java.util.LinkedList; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.logging.Level; @@ -149,7 +149,7 @@ public class RMICommunicationFacade * @return a remote object exported through RMI * @throws RemoteException */ - public RMIConnection connect() throws RemoteException; + public RMIConnection connect(Type type) throws RemoteException; } /** @@ -178,7 +178,7 @@ public class RMICommunicationFacade super(Configuration.RMI_REGISTRY_PORT, new SslRMIClientSocketFactory(), new SslRMIServerSocketFactory()); } - public CNX connect() throws RemoteException + public CNX connect(Type type) throws RemoteException { String clientIP; try @@ -189,7 +189,7 @@ public class RMICommunicationFacade { clientIP = null; } - RMICommunicationFacade c = new RMICommunicationFacade(clientIP); + RMICommunicationFacade c = new RMICommunicationFacade(type, clientIP); new Server(c); return CNX.buildCNX(c, Configuration.getBoolean("rmi.useSSL")); } @@ -200,6 +200,10 @@ public class RMICommunicationFacade */ public static final int RETRIEVE_EVENT_DELAY = 10; + /** + * Defines the number of seconds between events are sent to the client. + */ + public static final int INTER_EVENT_DELAY = Configuration.getInt("server.to.client.retrieveEvent.delay"); /** * Name under which the RMIServer is registered into the RMI registry * @see RMICommunicationFacade#launchServer() @@ -217,10 +221,12 @@ public class RMICommunicationFacade * On the server-side, events are stored in this list, accessed through {@link #sendEvent(Event)} * {@link #retrieveEvent()} in FIFO order. */ - protected final ArrayList<Event> events = new ArrayList<Event>(); + protected final LinkedList<Event> events = new LinkedList<Event>(); protected final Semaphore events_semaphore = new Semaphore(0); + protected long server_last_sent_timestamp = 0; + /** * The client's IP on the server side. On the client side, it equals to null. */ @@ -255,9 +261,9 @@ public class RMICommunicationFacade * Constructor to be called on the server-side * @param clientIP */ - public RMICommunicationFacade(String clientIP) + public RMICommunicationFacade(Type type, String clientIP) { - super("RMI client " + clientIP); + super(type, "RMI client " + clientIP); this.clientCnxToServer = null; this.clientIP = clientIP; this.useHTTP = false; // not used on the server-side @@ -268,9 +274,9 @@ public class RMICommunicationFacade * @param hostname * @param portNumber */ - public RMICommunicationFacade(String name, String hostname, int portNumber, boolean useHTTP) + public RMICommunicationFacade(Type type, String name, String hostname, int portNumber, boolean useHTTP) { - super(name); + super(type, name); this.clientIP = null; this.useHTTP = useHTTP; try @@ -296,7 +302,10 @@ public class RMICommunicationFacade RMIServerInterface rmiServer = (RMIServerInterface) registry.lookup(RMI_CNX_SERVER_NAME);// (RMIServerInterface)Naming.lookup(rmiServerURL); // String ipClient = InetAddress.getLocalHost().getHostAddress(); - this.clientCnxToServer = rmiServer.connect(); + if (Type.CLIENT_TO_SERVER == type) + this.clientCnxToServer = rmiServer.connect(Type.SERVER_TO_CLIENT); + else + this.clientCnxToServer = rmiServer.connect(Type.SERVER_TO_PLATFORM); } catch (Exception e) { @@ -332,11 +341,28 @@ public class RMICommunicationFacade else sendToServer(evt); } - + protected void sendFromServer(Event event) { synchronized(events) { + if (event instanceof ServerToClientEvent) + { + final ServerToClientEvent evt = (ServerToClientEvent) event; + if ( evt.type == ServerToClientEvent.Type.EXECUTION_STATUS ) + { + for (int i=0; i<events.size(); i++) + { + final ServerToClientEvent e = (ServerToClientEvent) events.get(i); + if ( e == null ) + continue; + if (e.type==ServerToClientEvent.Type.EXECUTION_STATUS && e.workflowID.equals(evt.workflowID)) + events.set(i, null); + } + } + } + else + server_last_sent_timestamp += INTER_EVENT_DELAY; events.add(event); } events_semaphore.release(); @@ -376,6 +402,8 @@ public class RMICommunicationFacade public byte[] retrieveEvent() throws RemoteException { Event event = null; + while (event == null) + { try { Log.log.finest("Waiting events "+this); @@ -389,36 +417,57 @@ public class RMICommunicationFacade */ if (!events_semaphore.tryAcquire(RETRIEVE_EVENT_DELAY, TimeUnit.SECONDS)) return null; + System.out.println("############### 3. tryAcquire OK()"); } catch (InterruptedException e) { return null; } + System.out.println("############### "+(System.currentTimeMillis() - server_last_sent_timestamp)+"-"+INTER_EVENT_DELAY); + if ( Type.SERVER_TO_CLIENT.equals(getType()) && System.currentTimeMillis() - server_last_sent_timestamp < INTER_EVENT_DELAY ) + { + System.out.println("############### 4. release()");//+(System.currentTimeMillis() - server_last_sent_timestamp)+"-"+INTER_EVENT_DELAY); + events_semaphore.release(); + try + { + System.out.println("############### sleep()"); + Thread.sleep(INTER_EVENT_DELAY - System.currentTimeMillis() + server_last_sent_timestamp); + System.out.println("############### / sleep()"); + } + catch (InterruptedException e) + { + ; // IGNORED + } + return null; + } synchronized(events) { // IndexOutOfBounds will not happen: events are added before the semaphore is released - event = events.remove(0); + event = events.remove(0); // may be null + System.out.println("null"); + } } + server_last_sent_timestamp = System.currentTimeMillis(); + Log.log.info("event: "+event.toString()+" w/ file size: "+(event.file_conveyor!=null?event.file_conveyor.length():-1)); ByteArrayOutputStream baos = new ByteArrayOutputStream(); try - { - event.writeTo(baos); - // baos.close(); // useless: ByteArrayOutputStream.close() does nothing - } - catch (IOException e) - { - Log.log.log(Level.SEVERE, "Unable to serialized an event: "+event, e); - throw new RemoteException("Couldn't serialize an event"); - } - + { + event.writeTo(baos); + // baos.close(); // useless: ByteArrayOutputStream.close() does nothing + } + catch (IOException e) + { + Log.log.log(Level.SEVERE, "Unable to serialized an event: "+event, e); + throw new RemoteException("Couldn't serialize an event"); + } + // This is the reason why we serialize the event by hand, instead of letting RMI do it automatically: we // need to mark the event as sent... If the attached callback does some cleanup, such as deleting the attached // file, the event could not be sent with its file! eventSent(event); - return baos.toByteArray(); } diff --git a/src/eu/telecom_bretagne/praxis/common/events/SSLSocketCommunicationFacade.java b/src/eu/telecom_bretagne/praxis/common/events/SSLSocketCommunicationFacade.java index 047ed9fa32ab83a9d0f0c4d4f76e7033a7b98151..ea6ed1796be041519548d22a120bd2abdb56025e 100644 --- a/src/eu/telecom_bretagne/praxis/common/events/SSLSocketCommunicationFacade.java +++ b/src/eu/telecom_bretagne/praxis/common/events/SSLSocketCommunicationFacade.java @@ -48,7 +48,7 @@ public class SSLSocketCommunicationFacade while (true) { Socket socket = serverSocket.accept(); - SocketCommunicationFacade cnx = new SocketCommunicationFacade("Server (socket SSL)", socket); + SocketCommunicationFacade cnx = new SocketCommunicationFacade(null, "Server (socket SSL)", socket); new Server(cnx); cnx.start(); } @@ -65,17 +65,17 @@ public class SSLSocketCommunicationFacade new SSLSocketServer(bindAddress, port).start(); } - protected SSLSocketCommunicationFacade(String name, Socket socket) + protected SSLSocketCommunicationFacade(Type type, String name, Socket socket) throws IOException { - super(name, socket); + super(type, name, socket); } /** Used on client & platform-side */ - public SSLSocketCommunicationFacade(String name, InetAddress addr, int port) + public SSLSocketCommunicationFacade(Type type, String name, InetAddress addr, int port) throws IOException { - super(name, SSLSocketFactory.getDefault().createSocket(addr, port)); + super(type, name, SSLSocketFactory.getDefault().createSocket(addr, port)); } } diff --git a/src/eu/telecom_bretagne/praxis/common/events/SocketCommunicationFacade.java b/src/eu/telecom_bretagne/praxis/common/events/SocketCommunicationFacade.java index 26be6c232a925ef43b64d4153eb925595313a135..c327c68afa44d093847792ee0b51ffff516ec292 100644 --- a/src/eu/telecom_bretagne/praxis/common/events/SocketCommunicationFacade.java +++ b/src/eu/telecom_bretagne/praxis/common/events/SocketCommunicationFacade.java @@ -52,7 +52,7 @@ public class SocketCommunicationFacade while (true) { Socket socket = serverSocket.accept(); - SocketCommunicationFacade cnx = new SocketCommunicationFacade("Server (socket)", socket); + SocketCommunicationFacade cnx = new SocketCommunicationFacade(null, "Server (socket)", socket); new Server(cnx); cnx.start(); } @@ -69,19 +69,19 @@ public class SocketCommunicationFacade new SocketServer(bindAddress, port).start(); } - protected SocketCommunicationFacade(String name, Socket socket) + protected SocketCommunicationFacade(Type type, String name, Socket socket) throws IOException { - super(name); + super(type, name); this.socket = socket; is = socket.getInputStream(); os = socket.getOutputStream(); } - public SocketCommunicationFacade(String name, InetAddress adr, int port) + public SocketCommunicationFacade(Type type, String name, InetAddress adr, int port) throws IOException { - this(name, new Socket(adr,port)); + this(type, name, new Socket(adr,port)); } @Override @@ -132,6 +132,11 @@ public class SocketCommunicationFacade try { event = Event.readFrom(is); + if (getType()==null && event != null) + if ( event instanceof ClientToServerEvent ) // either this or event instanceof ServerToClientEvent + setType(Type.SERVER_TO_CLIENT); + else + setType(Type.SERVER_TO_PLATFORM); handle_event(event); successive_failure = 0; } diff --git a/src/eu/telecom_bretagne/praxis/platform/Platform.java b/src/eu/telecom_bretagne/praxis/platform/Platform.java index 68ca77cb3836c4f1b5f14aeeff74a948a2793037..1182f65e39b27514659dcb74364db75cda7fbec4 100644 --- a/src/eu/telecom_bretagne/praxis/platform/Platform.java +++ b/src/eu/telecom_bretagne/praxis/platform/Platform.java @@ -11,6 +11,7 @@ import eu.telecom_bretagne.praxis.common.Configuration; import eu.telecom_bretagne.praxis.common.ReleaseInfo; import eu.telecom_bretagne.praxis.common.Utile; import eu.telecom_bretagne.praxis.common.events.CommunicationFacade; +import eu.telecom_bretagne.praxis.common.events.CommunicationFacade.Type; import eu.telecom_bretagne.praxis.common.events.Event; import eu.telecom_bretagne.praxis.platform.execution.PlatformExecution; import eu.telecom_bretagne.praxis.server.execution.ExecutionEngine; @@ -141,7 +142,7 @@ public class Platform extends AbstractPlatform public static Platform launchPlatform(Document configuration, String[] connection) throws IOException { - CommunicationFacade cnx = CommunicationFacade.buildConnection("Platform", connection); + CommunicationFacade cnx = CommunicationFacade.buildConnection(Type.PLATFORM_TO_SERVER, "Platform", connection); cnx.start(); Platform p = new Platform(cnx, configuration); return p; diff --git a/src/eu/telecom_bretagne/praxis/platform/PlatformToClientBridge.java b/src/eu/telecom_bretagne/praxis/platform/PlatformToClientBridge.java index eaba49e08890ad5230a2c2088be3252f09ed5155..d4dead0468e76183e5bbbcc283ce7ccb9461e2f7 100644 --- a/src/eu/telecom_bretagne/praxis/platform/PlatformToClientBridge.java +++ b/src/eu/telecom_bretagne/praxis/platform/PlatformToClientBridge.java @@ -24,6 +24,7 @@ import eu.telecom_bretagne.praxis.common.Utile; import eu.telecom_bretagne.praxis.common.events.CommunicationFacade; import eu.telecom_bretagne.praxis.common.events.Event; import eu.telecom_bretagne.praxis.common.events.ServerToClientEvent; +import eu.telecom_bretagne.praxis.common.events.CommunicationFacade.Type; import eu.telecom_bretagne.praxis.common.events.ServerToClientEvent.ServerToClientEventListener; import eu.telecom_bretagne.praxis.core.execution.ExecutionID; import eu.telecom_bretagne.praxis.core.execution.Result; @@ -160,7 +161,7 @@ public class PlatformToClientBridge { // this is the result of an execution that was not sent by us. It may come from a different configuration // where the bridge is not activated. Log it and send it as-is. - Log.log.info("Received a result with result.data==null. This cant be for use. "+result); + Log.log.info("Received a result with result.data==null. This cant be for us. "+result); } this.sendResults(result); if (result.data!=null && result.getStatus().isClosed()) @@ -286,7 +287,7 @@ public class PlatformToClientBridge if (!resource.startsWith("local")) programs.addContent(new Element("program").setAttribute("id", resource)); - CommunicationFacade cnx = CommunicationFacade.buildConnection("Platform", platform_conn); + CommunicationFacade cnx = CommunicationFacade.buildConnection(Type.PLATFORM_TO_SERVER, "Platform", platform_conn); cnx.start(); new PlatformToClientBridge(cnx, conf, client_conn); }