From 325e456a68c086256c8b5cc821933d993f66740a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Bigaret?= <sebastien.bigaret@telecom-bretagne.eu> Date: Thu, 23 Mar 2017 13:01:04 +0100 Subject: [PATCH] =?UTF-8?q?tentative=20d'am=C3=A9liorationdes=20=C3=A9chan?= =?UTF-8?q?ges=20mais=20pour=20le=20moment=20c'est=20d=C3=A9bile?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- configurations/configuration.defaults | 3 + configurations/configuration.template | 2 + .../praxis/client/Client.java | 3 +- .../common/events/CommunicationFacade.java | 34 +++++-- .../events/DirectCommunicationFacade.java | 8 +- .../common/events/RMICommunicationFacade.java | 95 ++++++++++++++----- .../events/SSLSocketCommunicationFacade.java | 10 +- .../events/SocketCommunicationFacade.java | 15 ++- .../praxis/platform/Platform.java | 3 +- .../platform/PlatformToClientBridge.java | 5 +- 10 files changed, 130 insertions(+), 48 deletions(-) diff --git a/configurations/configuration.defaults b/configurations/configuration.defaults index a54acd32..22dd80a3 100644 --- a/configurations/configuration.defaults +++ b/configurations/configuration.defaults @@ -208,6 +208,9 @@ collect_usage.in_directory= #servers=rmi socket socket-ssl servers= +# The minimum delay a client spends in a call to retrieveEvent() +server.to.client.retrieveEvent.delay=0 +server.to.client.retrieveEvent.delay_INT-CHECK_ # # Platforms diff --git a/configurations/configuration.template b/configurations/configuration.template index 4d7c85f0..1e2e1503 100644 --- a/configurations/configuration.template +++ b/configurations/configuration.template @@ -114,6 +114,8 @@ collect_usage.in_directory= # The rmi registry and the MXBeans won't be launched/deployed if empty servers=rmi socket socket-ssl +# The minimum delay a client spends in a call to retrieveEvent() +server.to.client.retrieveEvent.delay=0 # # Platforms diff --git a/src/eu/telecom_bretagne/praxis/client/Client.java b/src/eu/telecom_bretagne/praxis/client/Client.java index 1a1371df..d8c23008 100644 --- a/src/eu/telecom_bretagne/praxis/client/Client.java +++ b/src/eu/telecom_bretagne/praxis/client/Client.java @@ -23,6 +23,7 @@ import eu.telecom_bretagne.praxis.common.ReleaseInfo; import eu.telecom_bretagne.praxis.common.Utile; import eu.telecom_bretagne.praxis.common.events.ClientToServerEvent; import eu.telecom_bretagne.praxis.common.events.CommunicationFacade; +import eu.telecom_bretagne.praxis.common.events.CommunicationFacade.Type; import eu.telecom_bretagne.praxis.common.events.Event; import eu.telecom_bretagne.praxis.common.events.ServerToClientEvent; import eu.telecom_bretagne.praxis.common.events.ServerToClientEvent.ServerToClientEventAdapter; @@ -282,7 +283,7 @@ public class Client implements ApplicationListener if (cnx != null) throw new IllegalStateException("Client has already been initialized"); - cnx = CommunicationFacade.buildConnection("Client", server); + cnx = CommunicationFacade.buildConnection(Type.CLIENT_TO_SERVER, "Client", server); cnx.start(); } diff --git a/src/eu/telecom_bretagne/praxis/common/events/CommunicationFacade.java b/src/eu/telecom_bretagne/praxis/common/events/CommunicationFacade.java index 9d8a048d..3b374c89 100644 --- a/src/eu/telecom_bretagne/praxis/common/events/CommunicationFacade.java +++ b/src/eu/telecom_bretagne/praxis/common/events/CommunicationFacade.java @@ -17,6 +17,7 @@ import eu.telecom_bretagne.praxis.common.events.Event.IncompatibleListenerExcept public abstract class CommunicationFacade extends Thread { + public static enum Type { CLIENT_TO_SERVER, SERVER_TO_CLIENT, SERVER_TO_PLATFORM, PLATFORM_TO_SERVER }; /** * This interface is used by event senders willing to add some post-processing after the event has been sent. For * example, a sender may use one to delete the event's file after it has been transmitted: <code> @@ -47,7 +48,7 @@ public abstract class CommunicationFacade * @throws IllegalStateException * if {@code method} is invalid, or if the hostname or the port number are required and invalid. */ - public static CommunicationFacade buildConnection(String name, String[] connection) throws IOException + public static CommunicationFacade buildConnection(Type type, String name, String[] connection) throws IOException { String method=connection[0]; String hostname = connection[1]; @@ -85,16 +86,20 @@ public abstract class CommunicationFacade { boolean useHTTP = method.endsWith("http"); // CHECK client had: cnx.setPriority(Thread.MIN_PRIORITY); - return new RMICommunicationFacade(name+" (RMI cnx to "+hostname+":"+port+")", hostname, port, useHTTP); + return new RMICommunicationFacade(type, name+" (RMI cnx to "+hostname+":"+port+")", hostname, port, useHTTP); } else if ("socket".equals(method)) - return new SocketCommunicationFacade(name + "(socket)", address, port); + return new SocketCommunicationFacade(type, name + "(socket)", address, port); else if ("socket-ssl".equals(method) || "sockets".equals(method)) - return new SSLSocketCommunicationFacade(name + " (socket-ssl)", address, port); + return new SSLSocketCommunicationFacade(type, name + " (socket-ssl)", address, port); else if ("direct".equals(method)) { - DirectCommunicationFacade cnx = new DirectCommunicationFacade(name + " (direct)"); - DirectCommunicationFacade.accept(cnx); + DirectCommunicationFacade cnx = new DirectCommunicationFacade(type, name + " (direct)"); + if (Type.CLIENT_TO_SERVER.equals(type)) + DirectCommunicationFacade.accept(Type.SERVER_TO_CLIENT, cnx); + else + DirectCommunicationFacade.accept(Type.SERVER_TO_PLATFORM, cnx); + return cnx; } else @@ -108,13 +113,28 @@ public abstract class CommunicationFacade private boolean already_disconnected = false; + private Type type = null; + + protected synchronized void setType(Type type) + { + if ( this.type != null ) + throw new IllegalStateException("type cannot be reassigned"); + this.type = type; + } + + protected Type getType() + { + return this.type; + } + /** * Creates a new façade with the supplied name. * @param name the name given to this façade, for example: "Client (using socket)" */ - public CommunicationFacade(String name) + public CommunicationFacade(Type type, String name) { super(name); + this.type = type; } /** diff --git a/src/eu/telecom_bretagne/praxis/common/events/DirectCommunicationFacade.java b/src/eu/telecom_bretagne/praxis/common/events/DirectCommunicationFacade.java index c458df43..68dea38e 100644 --- a/src/eu/telecom_bretagne/praxis/common/events/DirectCommunicationFacade.java +++ b/src/eu/telecom_bretagne/praxis/common/events/DirectCommunicationFacade.java @@ -28,18 +28,18 @@ public class DirectCommunicationFacade extends CommunicationFacade */ static {} - public static void accept(DirectCommunicationFacade platformFacade) + public static void accept(Type type, DirectCommunicationFacade platformFacade) { - DirectCommunicationFacade serveur_dcf = new DirectCommunicationFacade("Serveur (direct)"); + DirectCommunicationFacade serveur_dcf = new DirectCommunicationFacade(type, "Serveur (direct)"); new Server(serveur_dcf); serveur_dcf.connect(platformFacade); platformFacade.connect(serveur_dcf); serveur_dcf.start(); } - public DirectCommunicationFacade(String name) + public DirectCommunicationFacade(Type type, String name) { - super(name); + super(type, name); } public synchronized void connect(DirectCommunicationFacade platformFacade) diff --git a/src/eu/telecom_bretagne/praxis/common/events/RMICommunicationFacade.java b/src/eu/telecom_bretagne/praxis/common/events/RMICommunicationFacade.java index 40ecacee..80bbd541 100644 --- a/src/eu/telecom_bretagne/praxis/common/events/RMICommunicationFacade.java +++ b/src/eu/telecom_bretagne/praxis/common/events/RMICommunicationFacade.java @@ -13,7 +13,7 @@ import java.rmi.server.RMISocketFactory; import java.rmi.server.ServerNotActiveException; import java.rmi.server.UnicastRemoteObject; import java.rmi.server.Unreferenced; -import java.util.ArrayList; +import java.util.LinkedList; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.logging.Level; @@ -149,7 +149,7 @@ public class RMICommunicationFacade * @return a remote object exported through RMI * @throws RemoteException */ - public RMIConnection connect() throws RemoteException; + public RMIConnection connect(Type type) throws RemoteException; } /** @@ -178,7 +178,7 @@ public class RMICommunicationFacade super(Configuration.RMI_REGISTRY_PORT, new SslRMIClientSocketFactory(), new SslRMIServerSocketFactory()); } - public CNX connect() throws RemoteException + public CNX connect(Type type) throws RemoteException { String clientIP; try @@ -189,7 +189,7 @@ public class RMICommunicationFacade { clientIP = null; } - RMICommunicationFacade c = new RMICommunicationFacade(clientIP); + RMICommunicationFacade c = new RMICommunicationFacade(type, clientIP); new Server(c); return CNX.buildCNX(c, Configuration.getBoolean("rmi.useSSL")); } @@ -200,6 +200,10 @@ public class RMICommunicationFacade */ public static final int RETRIEVE_EVENT_DELAY = 10; + /** + * Defines the number of seconds between events are sent to the client. + */ + public static final int INTER_EVENT_DELAY = Configuration.getInt("server.to.client.retrieveEvent.delay"); /** * Name under which the RMIServer is registered into the RMI registry * @see RMICommunicationFacade#launchServer() @@ -217,10 +221,12 @@ public class RMICommunicationFacade * On the server-side, events are stored in this list, accessed through {@link #sendEvent(Event)} * {@link #retrieveEvent()} in FIFO order. */ - protected final ArrayList<Event> events = new ArrayList<Event>(); + protected final LinkedList<Event> events = new LinkedList<Event>(); protected final Semaphore events_semaphore = new Semaphore(0); + protected long server_last_sent_timestamp = 0; + /** * The client's IP on the server side. On the client side, it equals to null. */ @@ -255,9 +261,9 @@ public class RMICommunicationFacade * Constructor to be called on the server-side * @param clientIP */ - public RMICommunicationFacade(String clientIP) + public RMICommunicationFacade(Type type, String clientIP) { - super("RMI client " + clientIP); + super(type, "RMI client " + clientIP); this.clientCnxToServer = null; this.clientIP = clientIP; this.useHTTP = false; // not used on the server-side @@ -268,9 +274,9 @@ public class RMICommunicationFacade * @param hostname * @param portNumber */ - public RMICommunicationFacade(String name, String hostname, int portNumber, boolean useHTTP) + public RMICommunicationFacade(Type type, String name, String hostname, int portNumber, boolean useHTTP) { - super(name); + super(type, name); this.clientIP = null; this.useHTTP = useHTTP; try @@ -296,7 +302,10 @@ public class RMICommunicationFacade RMIServerInterface rmiServer = (RMIServerInterface) registry.lookup(RMI_CNX_SERVER_NAME);// (RMIServerInterface)Naming.lookup(rmiServerURL); // String ipClient = InetAddress.getLocalHost().getHostAddress(); - this.clientCnxToServer = rmiServer.connect(); + if (Type.CLIENT_TO_SERVER == type) + this.clientCnxToServer = rmiServer.connect(Type.SERVER_TO_CLIENT); + else + this.clientCnxToServer = rmiServer.connect(Type.SERVER_TO_PLATFORM); } catch (Exception e) { @@ -332,11 +341,28 @@ public class RMICommunicationFacade else sendToServer(evt); } - + protected void sendFromServer(Event event) { synchronized(events) { + if (event instanceof ServerToClientEvent) + { + final ServerToClientEvent evt = (ServerToClientEvent) event; + if ( evt.type == ServerToClientEvent.Type.EXECUTION_STATUS ) + { + for (int i=0; i<events.size(); i++) + { + final ServerToClientEvent e = (ServerToClientEvent) events.get(i); + if ( e == null ) + continue; + if (e.type==ServerToClientEvent.Type.EXECUTION_STATUS && e.workflowID.equals(evt.workflowID)) + events.set(i, null); + } + } + } + else + server_last_sent_timestamp += INTER_EVENT_DELAY; events.add(event); } events_semaphore.release(); @@ -376,6 +402,8 @@ public class RMICommunicationFacade public byte[] retrieveEvent() throws RemoteException { Event event = null; + while (event == null) + { try { Log.log.finest("Waiting events "+this); @@ -389,36 +417,57 @@ public class RMICommunicationFacade */ if (!events_semaphore.tryAcquire(RETRIEVE_EVENT_DELAY, TimeUnit.SECONDS)) return null; + System.out.println("############### 3. tryAcquire OK()"); } catch (InterruptedException e) { return null; } + System.out.println("############### "+(System.currentTimeMillis() - server_last_sent_timestamp)+"-"+INTER_EVENT_DELAY); + if ( Type.SERVER_TO_CLIENT.equals(getType()) && System.currentTimeMillis() - server_last_sent_timestamp < INTER_EVENT_DELAY ) + { + System.out.println("############### 4. release()");//+(System.currentTimeMillis() - server_last_sent_timestamp)+"-"+INTER_EVENT_DELAY); + events_semaphore.release(); + try + { + System.out.println("############### sleep()"); + Thread.sleep(INTER_EVENT_DELAY - System.currentTimeMillis() + server_last_sent_timestamp); + System.out.println("############### / sleep()"); + } + catch (InterruptedException e) + { + ; // IGNORED + } + return null; + } synchronized(events) { // IndexOutOfBounds will not happen: events are added before the semaphore is released - event = events.remove(0); + event = events.remove(0); // may be null + System.out.println("null"); + } } + server_last_sent_timestamp = System.currentTimeMillis(); + Log.log.info("event: "+event.toString()+" w/ file size: "+(event.file_conveyor!=null?event.file_conveyor.length():-1)); ByteArrayOutputStream baos = new ByteArrayOutputStream(); try - { - event.writeTo(baos); - // baos.close(); // useless: ByteArrayOutputStream.close() does nothing - } - catch (IOException e) - { - Log.log.log(Level.SEVERE, "Unable to serialized an event: "+event, e); - throw new RemoteException("Couldn't serialize an event"); - } - + { + event.writeTo(baos); + // baos.close(); // useless: ByteArrayOutputStream.close() does nothing + } + catch (IOException e) + { + Log.log.log(Level.SEVERE, "Unable to serialized an event: "+event, e); + throw new RemoteException("Couldn't serialize an event"); + } + // This is the reason why we serialize the event by hand, instead of letting RMI do it automatically: we // need to mark the event as sent... If the attached callback does some cleanup, such as deleting the attached // file, the event could not be sent with its file! eventSent(event); - return baos.toByteArray(); } diff --git a/src/eu/telecom_bretagne/praxis/common/events/SSLSocketCommunicationFacade.java b/src/eu/telecom_bretagne/praxis/common/events/SSLSocketCommunicationFacade.java index 047ed9fa..ea6ed179 100644 --- a/src/eu/telecom_bretagne/praxis/common/events/SSLSocketCommunicationFacade.java +++ b/src/eu/telecom_bretagne/praxis/common/events/SSLSocketCommunicationFacade.java @@ -48,7 +48,7 @@ public class SSLSocketCommunicationFacade while (true) { Socket socket = serverSocket.accept(); - SocketCommunicationFacade cnx = new SocketCommunicationFacade("Server (socket SSL)", socket); + SocketCommunicationFacade cnx = new SocketCommunicationFacade(null, "Server (socket SSL)", socket); new Server(cnx); cnx.start(); } @@ -65,17 +65,17 @@ public class SSLSocketCommunicationFacade new SSLSocketServer(bindAddress, port).start(); } - protected SSLSocketCommunicationFacade(String name, Socket socket) + protected SSLSocketCommunicationFacade(Type type, String name, Socket socket) throws IOException { - super(name, socket); + super(type, name, socket); } /** Used on client & platform-side */ - public SSLSocketCommunicationFacade(String name, InetAddress addr, int port) + public SSLSocketCommunicationFacade(Type type, String name, InetAddress addr, int port) throws IOException { - super(name, SSLSocketFactory.getDefault().createSocket(addr, port)); + super(type, name, SSLSocketFactory.getDefault().createSocket(addr, port)); } } diff --git a/src/eu/telecom_bretagne/praxis/common/events/SocketCommunicationFacade.java b/src/eu/telecom_bretagne/praxis/common/events/SocketCommunicationFacade.java index 26be6c23..c327c68a 100644 --- a/src/eu/telecom_bretagne/praxis/common/events/SocketCommunicationFacade.java +++ b/src/eu/telecom_bretagne/praxis/common/events/SocketCommunicationFacade.java @@ -52,7 +52,7 @@ public class SocketCommunicationFacade while (true) { Socket socket = serverSocket.accept(); - SocketCommunicationFacade cnx = new SocketCommunicationFacade("Server (socket)", socket); + SocketCommunicationFacade cnx = new SocketCommunicationFacade(null, "Server (socket)", socket); new Server(cnx); cnx.start(); } @@ -69,19 +69,19 @@ public class SocketCommunicationFacade new SocketServer(bindAddress, port).start(); } - protected SocketCommunicationFacade(String name, Socket socket) + protected SocketCommunicationFacade(Type type, String name, Socket socket) throws IOException { - super(name); + super(type, name); this.socket = socket; is = socket.getInputStream(); os = socket.getOutputStream(); } - public SocketCommunicationFacade(String name, InetAddress adr, int port) + public SocketCommunicationFacade(Type type, String name, InetAddress adr, int port) throws IOException { - this(name, new Socket(adr,port)); + this(type, name, new Socket(adr,port)); } @Override @@ -132,6 +132,11 @@ public class SocketCommunicationFacade try { event = Event.readFrom(is); + if (getType()==null && event != null) + if ( event instanceof ClientToServerEvent ) // either this or event instanceof ServerToClientEvent + setType(Type.SERVER_TO_CLIENT); + else + setType(Type.SERVER_TO_PLATFORM); handle_event(event); successive_failure = 0; } diff --git a/src/eu/telecom_bretagne/praxis/platform/Platform.java b/src/eu/telecom_bretagne/praxis/platform/Platform.java index 68ca77cb..1182f65e 100644 --- a/src/eu/telecom_bretagne/praxis/platform/Platform.java +++ b/src/eu/telecom_bretagne/praxis/platform/Platform.java @@ -11,6 +11,7 @@ import eu.telecom_bretagne.praxis.common.Configuration; import eu.telecom_bretagne.praxis.common.ReleaseInfo; import eu.telecom_bretagne.praxis.common.Utile; import eu.telecom_bretagne.praxis.common.events.CommunicationFacade; +import eu.telecom_bretagne.praxis.common.events.CommunicationFacade.Type; import eu.telecom_bretagne.praxis.common.events.Event; import eu.telecom_bretagne.praxis.platform.execution.PlatformExecution; import eu.telecom_bretagne.praxis.server.execution.ExecutionEngine; @@ -141,7 +142,7 @@ public class Platform extends AbstractPlatform public static Platform launchPlatform(Document configuration, String[] connection) throws IOException { - CommunicationFacade cnx = CommunicationFacade.buildConnection("Platform", connection); + CommunicationFacade cnx = CommunicationFacade.buildConnection(Type.PLATFORM_TO_SERVER, "Platform", connection); cnx.start(); Platform p = new Platform(cnx, configuration); return p; diff --git a/src/eu/telecom_bretagne/praxis/platform/PlatformToClientBridge.java b/src/eu/telecom_bretagne/praxis/platform/PlatformToClientBridge.java index eaba49e0..d4dead04 100644 --- a/src/eu/telecom_bretagne/praxis/platform/PlatformToClientBridge.java +++ b/src/eu/telecom_bretagne/praxis/platform/PlatformToClientBridge.java @@ -24,6 +24,7 @@ import eu.telecom_bretagne.praxis.common.Utile; import eu.telecom_bretagne.praxis.common.events.CommunicationFacade; import eu.telecom_bretagne.praxis.common.events.Event; import eu.telecom_bretagne.praxis.common.events.ServerToClientEvent; +import eu.telecom_bretagne.praxis.common.events.CommunicationFacade.Type; import eu.telecom_bretagne.praxis.common.events.ServerToClientEvent.ServerToClientEventListener; import eu.telecom_bretagne.praxis.core.execution.ExecutionID; import eu.telecom_bretagne.praxis.core.execution.Result; @@ -160,7 +161,7 @@ public class PlatformToClientBridge { // this is the result of an execution that was not sent by us. It may come from a different configuration // where the bridge is not activated. Log it and send it as-is. - Log.log.info("Received a result with result.data==null. This cant be for use. "+result); + Log.log.info("Received a result with result.data==null. This cant be for us. "+result); } this.sendResults(result); if (result.data!=null && result.getStatus().isClosed()) @@ -286,7 +287,7 @@ public class PlatformToClientBridge if (!resource.startsWith("local")) programs.addContent(new Element("program").setAttribute("id", resource)); - CommunicationFacade cnx = CommunicationFacade.buildConnection("Platform", platform_conn); + CommunicationFacade cnx = CommunicationFacade.buildConnection(Type.PLATFORM_TO_SERVER, "Platform", platform_conn); cnx.start(); new PlatformToClientBridge(cnx, conf, client_conn); } -- GitLab