Skip to content
Snippets Groups Projects
Commit 0306c709 authored by PUBERT Quentin's avatar PUBERT Quentin
Browse files

switched ws2p to using single proc per connection

parent db145bc5
No related branches found
No related tags found
No related merge requests found
Pipeline #1137 failed
......@@ -5,7 +5,6 @@ defmodule MyWebsocketApp do
port = String.to_integer(System.get_env("PORT") || "20900")
children = [
{Task.Supervisor, name: WS2P.TaskSupervisor},
{Registry, keys: :unique, name: WS2P.Connection.Registry},
{DynamicSupervisor, strategy: :one_for_one, name: WS2P.ConnectionSupervisor},
{Task, fn -> WS2P.Endpoint.accept_connections!(port, "/") end}
......
......@@ -2,6 +2,9 @@ defmodule WS2P.Connection do
use GenServer, restart: :transient
require Logger
@keypair Ed25519.generate_key_pair()
@currency_name "g1-test"
def start_link({:connect, address_and_port, socket_options}) do
GenServer.start_link(__MODULE__, {:connect, address_and_port, socket_options},
name: {:via, Registry, {WS2P.Connection.Registry, address_and_port}}
......@@ -9,206 +12,226 @@ defmodule WS2P.Connection do
end
def start_link({:accept, socket}) do
GenServer.start_link(__MODULE__, {:accept, socket},
name: {:via, Registry, {WS2P.Connection.Registry, Socket.Web.remote!(socket)}}
address_and_port = Socket.Web.remote!(socket)
GenServer.start_link(__MODULE__, {:accept, address_and_port, socket},
name: {:via, Registry, {WS2P.Connection.Registry, address_and_port}}
)
end
def init({:connect, {address, port}, socket_options}) do
socket = Socket.Web.connect!({address, port}, socket_options)
Logger.info("Connected socket to #{address}:#{port}")
{:ok, %{socket: socket}}
def init({:connect, address_and_port, socket_options}) do
socket = Socket.Web.connect!(address_and_port, socket_options)
Logger.info("Connected socket to #{inspect(address_and_port)}")
schedule_poll_remote_packets()
{:ok,
%{
socket: socket,
keypair: @keypair,
currency_name: @currency_name,
address_and_port: address_and_port
}, {:continue, :send_connect_object}}
end
def init({:accept, address_and_port, socket}) do
Logger.info("Accepted socket from #{inspect(address_and_port)}")
schedule_poll_remote_packets()
{:ok,
%{
socket: socket,
keypair: @keypair,
currency_name: @currency_name,
address_and_port: address_and_port
}, {:continue, :send_connect_object}}
end
def init({:accept, socket}) do
Socket.Web.accept!(socket)
Logger.info("Accepted socket from #{inspect Socket.Web.remote!(socket)}")
{:ok, %{socket: socket}}
def handle_continue(
:send_connect_object,
%{
address_and_port: address_and_port,
keypair: {sec, pub},
currency_name: currency_name
} = state
) do
local_challenge = UUID.uuid4() <> UUID.uuid4()
pub_as_base58 = pub |> Base58.encode()
raw_format = "WS2P:CONNECT:#{currency_name}:#{pub_as_base58}:#{local_challenge}"
signature = Ed25519.signature(raw_format, sec, pub)
connect_object = %{
"auth" => "CONNECT",
"pub" => pub_as_base58,
"challenge" => local_challenge,
"sig" => signature |> Base.encode64()
}
Logger.info("Sending CONNECT to node #{inspect(address_and_port)}\n#{raw_format}")
handle_local_object(connect_object, Map.put_new(state, :local_challenge, local_challenge))
end
def start_handshake_and_loop(connection, keypair, currency_name) do
connection
|> send_connect(keypair, currency_name)
|> loop()
defp schedule_poll_remote_packets do
Process.send_after(self(), :poll_remote_packets, 1000)
end
defp loop(connection) do
case connection |> WS2P.Connection.receive_object() do
{_object, :answer, answer} ->
connection |> WS2P.Connection.send_object(answer)
def handle_info(:poll_remote_packets, %{socket: socket} = state) do
poll_remote_packets(socket)
|> Enum.map(fn packet ->
GenServer.cast(self(), {:remote_packet, packet})
end)
_ ->
{}
schedule_poll_remote_packets()
{:noreply, state}
end
loop(connection)
defp poll_remote_packets(socket, remote_packets \\ []) do
case Socket.Web.recv(socket, timeout: 0) do
{:ok, packet} ->
poll_remote_packets(socket, [packet | remote_packets])
{:error, :timeout} ->
remote_packets
{:error, code} ->
Logger.error(code)
raise Socket.Error, reason: code
end
end
def handle_object(
%{
"auth" => "OK",
"sig" => _sig
} = ok_object,
%{
socket: socket
} = state
) do
Logger.info("Received OK from node #{inspect(Socket.Web.remote!(socket))}")
{:reply, {ok_object, :noanswer}, state}
defp handle_local_packet(packet, %{socket: socket} = state) do
Socket.Web.send!(socket, packet)
{:noreply, state}
end
defp handle_local_msg(msg, state) do
handle_local_packet({:text, msg}, state)
end
defp handle_local_object(object, state) do
msg = Poison.encode!(object, pretty: true)
handle_local_msg(msg, state)
end
def handle_cast({:local_msg, msg}, state) do
handle_local_msg(msg, state)
end
def handle_cast({:local_object, object}, state) do
handle_local_object(object, state)
end
def handle_cast({:remote_packet, packet}, state) do
handle_remote_packet(packet, state)
end
def handle_object(
defp handle_remote_object(
%{
"auth" => "ACK",
"auth" => "CONNECT",
"challenge" => challenge,
"pub" => _pub,
"sig" => _sig
} = ack_object,
},
%{
socket: socket,
keypair: {my_sec, my_pub},
currency_name: currency_name,
local_challenge: local_challenge
address_and_port: address_and_port
} = state
) do
Logger.info("Received ACK from node #{inspect(Socket.Web.remote!(socket))}")
Logger.info("Received CONNECT from node #{inspect(address_and_port)}")
my_pub_as_base58 = my_pub |> Base58.encode()
raw_format = "WS2P:OK:#{currency_name}:#{my_pub_as_base58}:#{local_challenge}"
raw_format = "WS2P:ACK:#{currency_name}:#{my_pub_as_base58}:#{challenge}"
signature = Ed25519.signature(raw_format, my_sec, my_pub)
ok_object = %{
"auth" => "OK",
ack_object = %{
"auth" => "ACK",
"pub" => my_pub_as_base58,
"sig" => signature |> Base.encode64()
}
Logger.info("Sent OK to node #{inspect(Socket.Web.remote!(socket))}\n#{raw_format}")
{:reply, {ack_object, :answer, ok_object}, state}
Logger.info("Sending ACK to node #{inspect(address_and_port)}\n#{raw_format}")
handle_local_object(ack_object, state)
end
def handle_object(
defp handle_remote_object(
%{
"auth" => "CONNECT",
"challenge" => challenge,
"auth" => "ACK",
"pub" => _pub,
"sig" => _sig
} = connect_object,
%{socket: socket, keypair: {my_sec, my_pub}, currency_name: currency_name} = state
},
%{
keypair: {my_sec, my_pub},
currency_name: currency_name,
local_challenge: local_challenge,
address_and_port: address_and_port
} = state
) do
Logger.info("Received CONNECT from node #{inspect(Socket.Web.remote!(socket))}")
Logger.info("Received ACK from node #{inspect(address_and_port)}")
my_pub_as_base58 = my_pub |> Base58.encode()
raw_format = "WS2P:ACK:#{currency_name}:#{my_pub_as_base58}:#{challenge}"
raw_format = "WS2P:OK:#{currency_name}:#{my_pub_as_base58}:#{local_challenge}"
signature = Ed25519.signature(raw_format, my_sec, my_pub)
ack_object = %{
"auth" => "ACK",
"pub" => my_pub_as_base58,
ok_object = %{
"auth" => "OK",
"sig" => signature |> Base.encode64()
}
Logger.info("Sent ACK to node #{inspect(Socket.Web.remote!(socket))}\n#{raw_format}")
{:reply, {connect_object, :answer, ack_object}, state}
Logger.info("Sent OK to node #{inspect(address_and_port)}\n#{raw_format}")
handle_local_object(ok_object, state)
end
def handle_object(object, state) do
IO.puts("Received unhandled object:")
IO.inspect(object)
{:reply, {object, :noanswer}, state}
end
def handle_packet(:close, %{socket: socket} = state) do
Logger.info("Remote #{inspect(Socket.Web.remote!(socket))} closed socket connection")
Socket.Web.close(socket)
{:stop, :shutdown, state}
defp handle_remote_object(
%{
"auth" => "OK",
"sig" => _sig
},
%{
address_and_port: address_and_port
} = state
) do
Logger.info("Received OK from node #{inspect(address_and_port)}")
{:noreply, state}
end
def handle_packet({:close, code, reason}, %{socket: socket} = state) do
Logger.debug(
"Remote #{inspect(Socket.Web.remote!(socket))} closed socket connection with code \"#{code}\""
)
Logger.debug("Reason: \"#{reason}\"")
Socket.Web.close(socket)
{:stop, :shutdown, state}
defp handle_remote_object(object, state) do
IO.puts("Received unhandled remote object:")
IO.inspect(object)
{:noreply, state}
end
def handle_packet({:text, msg}, state) do
defp handle_remote_packet({:text, msg}, state) do
case Poison.decode(msg) do
{:ok, object} ->
WS2P.Connection.handle_object(object, state)
handle_remote_object(object, state)
{:error, error} ->
Logger.warn("Could not decode text message:\n#{inspect(error, pretty_print: true)}")
{:reply, :noanwser, state}
Logger.warn("Could not decode message:\n#{inspect(error, pretty_print: true)}")
{:noreply, state}
end
end
def handle_packet({type, msg}, %{socket: socket} = state) when type != :text do
defp handle_remote_packet({type, msg}, %{socket: socket} = state) do
Logger.debug("Received message of type #{inspect(type)} from #{socket.origin}:\n#{msg}")
{:reply, :noanswer, state}
end
def handle_call(:receive, _from, %{socket: socket} = state) do
case socket |> Socket.Web.recv() do
{:ok, packet} ->
WS2P.Connection.handle_packet(packet, state)
{:error, error} ->
{:stop, error}
end
end
def handle_cast({:send_packet, packet}, %{socket: socket} = state) do
socket |> Socket.Web.send!(packet)
{:noreply, state}
end
def handle_cast({:send_object, object}, state) do
msg = Poison.encode!(object, pretty: true)
handle_cast({:send_packet, {:text, msg}}, state)
defp handle_remote_packet(:close, %{socket: socket, address_and_port: address_and_port} = state) do
Logger.info("Remote #{inspect(address_and_port)} closed socket connection")
Socket.Web.close(socket)
{:stop, :shutdown, state}
end
def handle_cast(
{:send_connect, {sec, pub} = keypair, currency_name},
%{socket: socket} = state
defp handle_remote_packet(
{:close, code, reason},
%{socket: socket, address_and_port: address_and_port} = state
) do
challenge = UUID.uuid4() <> UUID.uuid4()
pub_as_base58 = pub |> Base58.encode()
raw_format = "WS2P:CONNECT:#{currency_name}:#{pub_as_base58}:#{challenge}"
signature = Ed25519.signature(raw_format, sec, pub)
connect_object = %{
"auth" => "CONNECT",
"pub" => pub_as_base58,
"challenge" => challenge,
"sig" => signature |> Base.encode64()
}
{:noreply, state} = handle_cast({:send_object, connect_object}, state)
Logger.info("Sent CONNECT to node #{inspect(Socket.Web.remote!(socket))}\n#{raw_format}")
new_state =
Map.merge(state, %{
local_challenge: challenge,
keypair: keypair,
currency_name: currency_name
})
{:noreply, new_state}
end
def receive_object(connection) do
GenServer.call(connection, :receive, :infinity)
end
def send_object(connection, object) do
connection |> GenServer.cast({:send_object, object})
connection
end
Logger.debug(
"Remote #{inspect(address_and_port)} closed socket connection with code \"#{code}\""
)
def send_connect(connection, keypair, currency_name) do
connection |> GenServer.cast({:send_connect, keypair, currency_name})
connection
Logger.debug("Reason: \"#{reason}\"")
Socket.Web.close(socket)
{:stop, :shutdown, state}
end
end
defmodule WS2P.Endpoint do
require Logger
@keypair Ed25519.generate_key_pair()
@currency_name "g1-test"
defp start_connection!(args) do
{:ok, connection} =
DynamicSupervisor.start_child(
......@@ -11,34 +8,25 @@ defmodule WS2P.Endpoint do
{WS2P.Connection, args}
)
{:ok, _task_pid} =
Task.Supervisor.start_child(WS2P.TaskSupervisor, fn ->
WS2P.Connection.start_handshake_and_loop(connection, @keypair, @currency_name)
end)
connection
end
def connect!({address, port}, socket_options \\ []) do
start_connection!({:connect, {address, port}, socket_options})
def connect!(address_and_port, socket_options \\ []) do
start_connection!({:connect, address_and_port, socket_options})
end
def accept_connections!(port, path) do
websocket_listener = Socket.Web.listen!(port)
Logger.info("Listening for websocket connections on port #{port} at #{path}")
{_sec, pub} = @keypair
Logger.info("Public key: #{pub |> Base58.encode()}")
loop_acceptor!(websocket_listener, path)
loop(websocket_listener, path)
end
defp loop_acceptor!(websocket_listener, path) do
defp loop(websocket_listener, path) do
client_socket = Socket.Web.accept!(websocket_listener)
if client_socket.path == path do
start_connection!({:accept, client_socket})
end
loop_acceptor!(websocket_listener, path)
loop(websocket_listener, path)
end
end
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment