1:- module(jsonrpc_connectors_tcp,[
    2  create_tcp_server/3,
    3  start_jsonrpc_server/2,
    4  run_jsonrpc_server/3,
    5  stop_jsonrpc_server/2
    6  ]).    7
    8:- use_module(library(socket)).    9:- use_module(library(log4p)).   10:- use_module('../server').   11
   12% 
   13% Client methods
   14% 
   15
   16% connect_to_server(Server, Connection)
   17jsonrpc_connectors:connect_to_server(tcp(ServerAddress), Connection) :-
   18  tcp_connect(ServerAddress,StreamPair,[]),
   19  Connection = connection(ServerAddress,StreamPair).
   20
   21jsonrpc_connectors:connection_stream_pair(connection(_, StreamPair), StreamPair).
   22
   23% close_connection(Connnection)
   24jsonrpc_connectors:close_connection(connection(_,StreamPair)) :-
   25  ignore(close(StreamPair)).
   26
   27% 
   28% Server methods
   29% 
   30
   31create_tcp_server(ServerName, Port, tcp_server(ServerName, Port)).
   32
   33jsonrpc_connectors:serve_messages(tcp_server(ServerName, Port)) :-
   34  safe_run_jsonrpc_server(ServerName, Port).
   35
   36% jsonrpc_server(Server, Port, ServerThreadId)
   37:- dynamic jsonrpc_server/3.   38
   39% jsonrpc_connection(Server, Port, Peer, ServerThreadId)
   40:- dynamic jsonrpc_connection/4.   41
   42start_jsonrpc_server(ServerName, _) :-
   43  jsonrpc_server(ServerName, _, _),
   44  throw(error(server_already_started)).
   45
   46start_jsonrpc_server(_, Port) :-
   47  jsonrpc_server(_, Port, _),
   48  throw(error(server_already_on_port)).
   49
   50start_jsonrpc_server(ServerName,Port) :-
   51  create_tcp_server(ServerName, Port, Server),
   52  thread_create(jsonrpc_connectors:serve_messages(Server), ServerThreadId,[detched(true)]),
   53  assertz(jsonrpc_server(ServerName,Port,ServerThreadId)).
   54
   55stop_jsonrpc_server(ServerName,Port) :-
   56  jsonrpc_server(ServerName,Port,ServerThreadId),
   57  thread_signal(ServerThreadId,throw(exit)),
   58  thread_join(ServerThreadId,Status),
   59  info('JSON RPC server %t exited with %t',[ServerName,Status]),
   60  retractall(jsonrpc_server(ServerName, Port, ServerThreadId)),
   61  !.
   62
   63stop_jsonrpc_server(_, _) :-
   64  throw(error(server_not_started)).
   65
   66safe_run_jsonrpc_server(ServerName,Port) :-
   67  info('Started JSON RPC server %w on %w',[ServerName,Port]),
   68  catch(
   69    setup_call_cleanup(
   70      setup_server(Port,Socket),
   71      run_jsonrpc_server(ServerName,Port,Socket),
   72      cleanup_server(ServerName,Port,Socket)
   73      ),
   74    Exception,
   75    warn('Exited JSON RPC server %w: %w',[ServerName,Exception])).
   76
   77setup_server(Port,Socket) :-
   78  tcp_socket(Socket),
   79  tcp_setopt(Socket,reuseaddr),
   80  tcp_bind(Socket, Port).
   81
   82run_jsonrpc_server(ServerName,Port,Socket) :-
   83  tcp_listen(Socket, 5),
   84  tcp_open_socket(Socket, AcceptFd, _),
   85  dispatch_connections(ServerName,Port,AcceptFd).
   86
   87cleanup_server(ServerName,Port,Socket) :-
   88  catch(
   89    tcp_close_socket(Socket),
   90    _,
   91    info('Stopped JSON RPC server %w on %w',[ServerName,Port])),
   92  findall(ConnectionThreadId,jsonrpc_connection(ServerName,Port,_,ConnectionThreadId),ConnectionThreadIds),
   93  forall(member(ThreadId,ConnectionThreadIds),thread_signal(ThreadId,throw(exit))).
   94
   95dispatch_connections(ServerName,Port,ServerFd) :-
   96  tcp_accept(ServerFd, Client, Peer),
   97  info('accepted connection on ~w for ~w',[Port, Client]),
   98  thread_create(safe_handle_connection(ServerName,Port,Client, Peer), _, [detched(true)]),
   99  dispatch_connections(ServerName,Port,ServerFd).
  100
  101safe_handle_connection(ServerName, Port, Socket, Peer) :-
  102  debug('handling connection from %w on %w', [Peer, Port]),
  103  setup_call_cleanup(
  104    setup_connection(ServerName, Port, Socket, Peer, StreamPair),
  105    catch(
  106      handle_connection(ServerName, Peer, StreamPair),
  107      eof,
  108      info('Exiting: connection closed from %w to %w on %w',[Peer,ServerName,Port])),
  109    cleanup_connection(ServerName, Port, Peer, StreamPair)).
  110
  111setup_connection(ServerName, Port, Socket, Peer, StreamPair) :-
  112  thread_self(ThreadId),
  113  \+ jsonrpc_connection(ServerName, Port, Peer, ThreadId),
  114  % Note there is still a chance of races, but this hopefully helps with cleanup 
  115  % of connections; deliberately using asserta here
  116  asserta(jsonrpc_connection(ServerName, Port, Peer,ThreadId)),
  117  info('Setup connection on %w for %w', [Port, Peer]),
  118  tcp_open_socket(Socket, StreamPair).
  119
  120cleanup_connection(ServerName, Port, Peer, StreamPair) :-
  121  close(StreamPair),
  122  thread_self(ThreadId),
  123  retractall(jsonrpc_connection(ServerName, Port, Peer, ThreadId)),
  124  info('Closed connection to %w',[Peer])