34
35:- module(conc_forall,
36 [ conc_forall/2,
37 conc_forall/3,
38 cond_forall/4
39 ]). 40
41:- use_module(countsols). 42:- use_module(library(thread), []). 43:- use_module(library(ordsets)). 44:- use_module(library(debug)). 45:- use_module(library(lists)). 46
47:- meta_predicate
48 handle_result(+, 0 ),
49 conc_forall(0, 0 ),
50 conc_forall(0, 0, 0 ),
51 cond_forall(+, 0, 0, 0 ). 52
70conc_forall(Cond, Action) :-
71 conc_forall(Cond, Action, true).
79conc_forall(Cond, Action, Join) :-
80 current_prolog_flag(cpu_count, CPUCount),
81 message_queue_create(Done),
82 message_queue_create(Queue),
83 ini_counter(0, Counter),
84 SWorkers = workers(0, []),
85 term_variables(Join, JoinVars), sort(JoinVars, OJoinVars),
86 term_variables(Cond, CondVars), sort(CondVars, OCondVars),
87 ord_subtract(OJoinVars, OCondVars, ExternVars),
88 Templ =.. [v|JoinVars],
89 copy_term(t(Join, Templ, ExternVars), t(Join2, Templ2, ExternVars)),
90 forall(Cond,
91 ( SWorkers = workers(WorkerCount, Workers),
92 Counter = count(I1),
93 concur(done(WorkerCount), I1, I2, Join2, Templ2, Done, cleanup(Workers, Queue), Result, [], Exitted),
94 nb_setarg(1, Counter, I2),
95 handle_result(Result,
96 ( subtract(Workers, Exitted, RemainingWorkers),
97 forall(member(_, RemainingWorkers),
98 thread_send_message(Queue, done)),
99 thread:concur_cleanup(Result, RemainingWorkers, [Queue, Done])
100 )),
101 inc_counter(Counter, I),
102 ( WorkerCount < CPUCount
103 -> create_worker(Queue, Done, Id, []),
104 succ(WorkerCount, WorkerCount2),
105 nb_setarg(1, SWorkers, WorkerCount2),
106 nb_setarg(2, SWorkers, [Id|Workers])
107 ; true
108 ),
109 thread_send_message(Queue, goal(I, Action, Templ))
110 )),
111 SWorkers = workers(WorkerCount, Workers),
112 forall(member(_, Workers),
113 thread_send_message(Queue, done)),
114 Counter = count(I),
115 concur(wait, I, _, Join2, Templ2, Done, cleanup(Workers, Queue), Result, [], Exitted),
116 subtract(Workers, Exitted, RemainingWorkers),
117 thread:concur_cleanup(Result, RemainingWorkers, [Queue, Done]),
118 handle_result(Result, true).
119
120handle_result(Result, FailHandler) :-
121 ( Result == true
122 -> true
123 ; Result = false
124 -> FailHandler,
125 fail
126 ; Result = exception(Error)
127 -> throw(Error)
128 ).
129
130create_worker(Queue, Done, Id, Options) :-
131 thread_create(worker(Queue, Done), Id,
132 [ at_exit(thread_send_message(Done, finished(Id)))
133 | Options
134 ]).
140worker(Queue, Done) :-
141 thread_get_message(Queue, Message),
142 debug(concurrent, 'Worker: received ~p', [Message]),
143 ( Message = goal(Id, Goal, Vars)
144 -> ( Goal
145 -> thread_send_message(Done, done(Id, Vars)),
146 worker(Queue, Done)
147 )
148 ; true
149 ).
159concur(done(NW), N, N, _, _, Done, _, true, Exitted, Exitted) :-
160 message_queue_property(Done, size(0 )),
161 N =< NW,
162 !.
163concur(wait, 0, 0, _, _, _, _, true, Exitted, Exitted) :- !.
164concur(Wait, N1, N, Join, Vars, Done, Cleanup, Status, Exitted1, Exitted) :-
165 debug(concurrent, 'Concurrent: waiting for workers ...', []),
166 catch(thread_get_message(Done, Exit), Error,
167 thread:concur_abort(Error, Cleanup, Done, Exitted1)),
168 debug(concurrent, 'Waiting: received ~p', [Exit]),
169 ( Exit = done(Id, Bind)
170 -> debug(concurrent, 'Concurrent: Job ~p completed', [Id]),
171 ignore(\+ ( Vars = Bind,
172 Join
173 )),
174 N2 is N1 - 1,
175 concur(Wait, N2, N, Join, Vars, Done, Cleanup, Status, Exitted1, Exitted)
176 ; Exit = finished(Thread)
177 -> thread_join(Thread, JoinStatus),
178 debug(concurrent, 'Concurrent: waiter ~p joined: ~p',
179 [Thread, JoinStatus]),
180 ( JoinStatus == true
181 -> concur(Wait, N1, N, Join, Vars, Done, Cleanup, Status, [Thread|Exitted1], Exitted)
182 ; Status = JoinStatus,
183 Exitted = [Thread|Exitted1]
184 )
185 ).
186
187cond_forall(true, A, B, C) :- conc_forall(A, B, C).
188cond_forall(false, A, B, C) :- forall(A, (B, ignore(C)))