1:- module(jolog,[ op(1200,xfx,&-) 2 , op(1100,xfy,&) 3 , jolog_import_sentinel/0 4 , send/1 5 , start_jolog/1 6 , start_jolog/2 7 ]). 8:- use_module(library(debug), [debug/3]). 9:- use_module(library(list_util), [split/3, xfy_list/3]). 10:- use_module(library(lists), [same_length/2]). 11:- use_module(library(error), [domain_error/2]). 12 13:- use_module(library(jolog/manager)). 14 15 16:- thread_local channels/2. % channels(Module, Message) 17 18/************* Jolog runtime code *******************/
main :- % starting point for Prolog start_jolog(user, go). go &- % starting point for Jolog ( one_process & another_process ). ...
start_jolog/2 returns when one of the following is true:
39start_jolog(Module,Main) :- 40 setup_call_cleanup( 41 manager_create(Module,Main), 42 manager_loop(Module), 43 manager_destroy(Module) 44 ). 45 46create_worker(Module,Queue,ThreadId) :- 47 thread_create(worker_loop(Module,Queue), ThreadId, [detached(true)]).
main
as the first message.
53start_jolog(Module) :-
54 start_jolog(Module, main).
60set_meta(Module, Name, Value) :-
61 retractall(meta(Module,Name,_)),
62 assertz(meta(Module,Name,Value)).
69:- dynamic meta/3. 70 71 72% called by macro-expanded Jolog code to spawn a new, parallel process. 73% Should only be called by the manager thread. 74spawn_process(Module, Process) :- 75 debug(jolog, '~w', spawn_process(Module, Process)), 76 77 % put process code in the workers' queue 78 meta(Module, work_queue, WorkQueue), 79 thread_send_message(WorkQueue, run_process(Process)), 80 81 % notify the manager that one more worker is active 82 meta(Module, manager_queue, ManagerQueue), 83 thread_send_message(ManagerQueue, active(+1)).
send(hello) % hello/0 channel send(hello(world)) % hello/1 channel send(foo(alpha,beta,gamma,delta)) % foo/4 channel
96:- meta_predicate send( ). 97send(Module:Message) :- 98 % someone listens on this channel; send the message 99 functor(Message, Name, Arity), 100 defined_channel(Module, Name, Arity), 101 !, 102 debug(jolog, '~w', [send(Module,Message)]), 103 meta(Module, manager_queue, ManagerQueue), 104 thread_send_message(ManagerQueue, send_message(Message)). 105send(Module:Message) :- 106 % nobody listens on this channel; generate a warning 107 print_message(warning, jolog_nobody_listening(Module, Message)). 108 109% loop executed by Jolog worker threads 110worker_loop(Module, Queue) :- 111 debug(jolog,'~w',[worker(waiting)]), 112 thread_get_message(Queue, Work), 113 debug(jolog,'~w', [worker(job(Work))]), 114 ( Work = halt -> 115 debug(jolog,'worker exiting',[]), 116 thread_exit(halt) 117 ; Work = run_process(Goal) -> 118 catch(Module:ignore(Goal),Ex,report_exception(Module,Goal,Ex)), 119 meta(Module, manager_queue, ManagerQueue), 120 debug(jolog,'~w',[worker(finished)]), 121 thread_send_message(ManagerQueue, active(-1)) 122 ; % otherwise -> 123 domain_error(jolog_worker_message, Work) 124 ), 125 worker_loop(Module, Queue). 126 127 128report_exception(Module,Goal,Ex) :- 129 print_message(warning,jolog_worker_crashed(Module,Goal)), 130 print_message(warning,Ex). 131 132 133prologmessage(jolog_worker_crashed(Module,Goal)) --> 134 ["Caught exception in Jolog worker running ~q."-[Module:Goal]]. 135 136 137/*************************** Macro expansion code ***********************/
143jolog_import_sentinel. 144 145 146% True if the currently loading module wants jolog macro expansion 147wants_jolog_expansion :- 148 prolog_load_context(module, Module), 149 predicate_property(Module:jolog_import_sentinel, imported_from(jolog)). 150 151 152% Parse a jolog clause into its constituent parts 153parse_join_clause((Head &- Body), Patterns, Guards, Processes) :- 154 % separate head into individual join patterns 155 xfy_list(',', Head, Patterns), 156 157 % separate body into guards and process terms 158 xfy_list(',', Body, Goals), 159 split(Goals, then, BodyParts), 160 ( BodyParts = [Guards, ProcessTerms] -> 161 true 162 ; BodyParts = [ProcessTerms] -> % missing 'then' goal 163 Guards = [] 164 ), 165 166 % build process goals from process terms 167 ( ProcessTerms = [] -> 168 Processes = [] 169 ; ProcessTerms = [ProcessDisjunction] -> 170 xfy_list('&', ProcessDisjunction, Processes) 171 ; % otherwise -> 172 xfy_list(',', Process, ProcessTerms), 173 Processes = [Process] 174 ).
184build_peek_goal(Module, Pattern, MessageRef, PeekGoal) :-
185 PeekGoal = jolog:clause(channels(Module,Pattern), true, MessageRef).
192:- dynamic defined_channel/3. 193remember_channel(Module, Pattern) :- 194 functor(Pattern, Name, Arity), 195 ( defined_channel(Module,Name,Arity) -> 196 true 197 ; % otherwise -> 198 assertz(defined_channel(Module,Name,Arity)) 199 ). 200 201 202userterm_expansion((Head &- Body), ('$jolog_code' :- Goals)) :- 203 wants_jolog_expansion, 204 parse_join_clause((Head &- Body), Patterns, Guards, Processes), 205 206 % build goals to peek at messages 207 same_length(Patterns, MessageRefs), 208 prolog_load_context(module, Module), 209 maplist(build_peek_goal(Module), Patterns, MessageRefs, Peeks), 210 211 % remember which channels have been defined 212 prolog_load_context(module, Module), 213 maplist(remember_channel(Module), Patterns), 214 215 % build jolog clause body 216 xfy_list(',', PeekGoals, Peeks), 217 ( Guards=[] -> GuardGoals=true; xfy_list(',', GuardGoals, Guards) ), 218 Module:dynamic('$jolog_code'/0), 219 Goals = ( 220 debug(jolog,'Does head match? ~w', [Head]), 221 PeekGoals, 222 GuardGoals, 223 !, 224 maplist(erase, MessageRefs), 225 maplist(jolog:spawn_process(Module), Processes) 226 ). 227userterm_expansion(end_of_file, _) :- 228 % create Jolog clause to handle system halt 229 wants_jolog_expansion, 230 prolog_load_context(module, Module), 231 232 % only add a 'halt' rule if there are other rules 233 Module:once(clause('$jolog_code', _)), 234 235 term_expansion(( 236 halt &- 237 debug(jolog, 'halting', []), 238 jolog:meta(Module, manager_queue, ManagerQueue), 239 thread_send_message(ManagerQueue, halt), 240 then 241 ), Clause), 242 Module:asserta(Clause), % halt clause goes first 243 remember_channel(Module, halt), 244 245 fail. % let others have a chance to expand end_of_file