1:- module(chan, [new/2, send/2, recv/2, recvd/2, close/1]). 2
3:- use_module(library(error),[]). 4
6:- multifile error:has_type/2. 7error:has_type(chan,Chan) :-
8 once( error:has_type(tx_chan,Chan)
9 ; error:has_type(rx_chan,Chan)
10 ).
11error:has_type(tx_chan,tx_chan(Q,Status)) :-
12 is_message_queue(Q),
13 ground(Status),
14 memberchk(Status,[open,closed]).
15error:has_type(rx_chan,rx_chan(Q)) :-
16 is_message_queue(Q).
23new(tx_chan(Q,open),rx_chan(Q)) :-
24 message_queue_create(Q,[max_size(1)]).
31send(tx_chan(Q,Status),Term) :-
32 ( Status=open ->
33 thread_send_message(Q,msg(Term),[])
34 ; Status=closed ->
35 throw(not_allowed_on_channel(send,tx_closed))
36 ).
37send(rx_chan(_)) :-
38 throw(not_allowed_on_channel(send,rx)).
46recv(rx_chan(Q),Term) :-
47 catch(thread_get_message(Q,MaybeTerm),_,fail),
48 ( MaybeTerm=close ->
49 message_queue_destroy(Q),
50 fail
51 ; MaybeTerm=msg(Term) ->
52 true
53 ; otherwise ->
54 throw(unexpected_channel_term(MaybeTerm))
55 ).
56recv(tx_chan(_,_)) :-
57 throw(not_allowed_on_channel(recv,tx)).
64recvd(Rx,Term) :-
65 ( recv(Rx,Term) -> true; !, fail ).
66recvd(Rx,Term) :-
67 recvd(Rx,Term).
74:- redefine_system_predicate(chan:close(_)). 75close(Tx) :-
76 Tx=tx_chan(Q,Status), 77 !,
78 ( Status=open ->
79 thread_send_message(Q,close),
80 nb_setarg(2,Tx,closed)
81 ; Status=closed ->
82 throw(not_allowed_on_channel(close,tx_closed))
83 ; otherwise ->
84 throw(unexpected_channel_status(Status))
85 ).
86close(rx_chan(_)) :-
87 throw(not_allowed_on_channel(close,rx)).
88
89
106
107
109is_message_queue(Q) :-
110 ground(Q),
111 catch(message_queue_property(Q,size(_)),_,fail)