1:- module(jsonrpc_connectors_tcp,[
2 create_tcp_server/3,
3 start_jsonrpc_server/2,
4 run_jsonrpc_server/3,
5 stop_jsonrpc_server/2
6 ]). 7
8:- use_module(library(socket)). 9:- use_module(library(log4p)). 10:- use_module('../server'). 11
15
17jsonrpc_connectors:connect_to_server(tcp(ServerAddress), Connection) :-
18 tcp_connect(ServerAddress,StreamPair,[]),
19 Connection = connection(ServerAddress,StreamPair).
20
21jsonrpc_connectors:connection_stream_pair(connection(_, StreamPair), StreamPair).
22
24jsonrpc_connectors:close_connection(connection(_,StreamPair)) :-
25 ignore(close(StreamPair)).
26
30
31create_tcp_server(ServerName, Port, tcp_server(ServerName, Port)).
32
33jsonrpc_connectors:serve_messages(tcp_server(ServerName, Port)) :-
34 safe_run_jsonrpc_server(ServerName, Port).
35
37:- dynamic jsonrpc_server/3. 38
40:- dynamic jsonrpc_connection/4. 41
42start_jsonrpc_server(ServerName, _) :-
43 jsonrpc_server(ServerName, _, _),
44 throw(error(server_already_started)).
45
46start_jsonrpc_server(_, Port) :-
47 jsonrpc_server(_, Port, _),
48 throw(error(server_already_on_port)).
49
50start_jsonrpc_server(ServerName,Port) :-
51 create_tcp_server(ServerName, Port, Server),
52 thread_create(jsonrpc_connectors:serve_messages(Server), ServerThreadId,[detched(true)]),
53 assertz(jsonrpc_server(ServerName,Port,ServerThreadId)).
54
55stop_jsonrpc_server(ServerName,Port) :-
56 jsonrpc_server(ServerName,Port,ServerThreadId),
57 thread_signal(ServerThreadId,throw(exit)),
58 thread_join(ServerThreadId,Status),
59 info('JSON RPC server %t exited with %t',[ServerName,Status]),
60 retractall(jsonrpc_server(ServerName, Port, ServerThreadId)),
61 !.
62
63stop_jsonrpc_server(_, _) :-
64 throw(error(server_not_started)).
65
66safe_run_jsonrpc_server(ServerName,Port) :-
67 info('Started JSON RPC server %w on %w',[ServerName,Port]),
68 catch(
69 setup_call_cleanup(
70 setup_server(Port,Socket),
71 run_jsonrpc_server(ServerName,Port,Socket),
72 cleanup_server(ServerName,Port,Socket)
73 ),
74 Exception,
75 warn('Exited JSON RPC server %w: %w',[ServerName,Exception])).
76
77setup_server(Port,Socket) :-
78 tcp_socket(Socket),
79 tcp_setopt(Socket,reuseaddr),
80 tcp_bind(Socket, Port).
81
82run_jsonrpc_server(ServerName,Port,Socket) :-
83 tcp_listen(Socket, 5),
84 tcp_open_socket(Socket, AcceptFd, _),
85 dispatch_connections(ServerName,Port,AcceptFd).
86
87cleanup_server(ServerName,Port,Socket) :-
88 catch(
89 tcp_close_socket(Socket),
90 _,
91 info('Stopped JSON RPC server %w on %w',[ServerName,Port])),
92 findall(ConnectionThreadId,jsonrpc_connection(ServerName,Port,_,ConnectionThreadId),ConnectionThreadIds),
93 forall(member(ThreadId,ConnectionThreadIds),thread_signal(ThreadId,throw(exit))).
94
95dispatch_connections(ServerName,Port,ServerFd) :-
96 tcp_accept(ServerFd, Client, Peer),
97 info('accepted connection on ~w for ~w',[Port, Client]),
98 thread_create(safe_handle_connection(ServerName,Port,Client, Peer), _, [detched(true)]),
99 dispatch_connections(ServerName,Port,ServerFd).
100
101safe_handle_connection(ServerName, Port, Socket, Peer) :-
102 debug('handling connection from %w on %w', [Peer, Port]),
103 setup_call_cleanup(
104 setup_connection(ServerName, Port, Socket, Peer, StreamPair),
105 catch(
106 handle_connection(ServerName, Peer, StreamPair),
107 eof,
108 info('Exiting: connection closed from %w to %w on %w',[Peer,ServerName,Port])),
109 cleanup_connection(ServerName, Port, Peer, StreamPair)).
110
111setup_connection(ServerName, Port, Socket, Peer, StreamPair) :-
112 thread_self(ThreadId),
113 \+ jsonrpc_connection(ServerName, Port, Peer, ThreadId),
114 115 116 asserta(jsonrpc_connection(ServerName, Port, Peer,ThreadId)),
117 info('Setup connection on %w for %w', [Port, Peer]),
118 tcp_open_socket(Socket, StreamPair).
119
120cleanup_connection(ServerName, Port, Peer, StreamPair) :-
121 close(StreamPair),
122 thread_self(ThreadId),
123 retractall(jsonrpc_connection(ServerName, Port, Peer, ThreadId)),
124 info('Closed connection to %w',[Peer])