. (utf8) 2/* Part of SWI-Prolog 3 4 Author: Torbjörn Lager and Jan Wielemaker 5 E-mail: J.Wielemaker@vu.nl 6 WWW: http://www.swi-prolog.org 7 Copyright (C): 2014-2024, Torbjörn Lager, 8 VU University Amsterdam 9 SWI-Prolog Solutions b.v. 10 All rights reserved. 11 12 Redistribution and use in source and binary forms, with or without 13 modification, are permitted provided that the following conditions 14 are met: 15 16 1. Redistributions of source code must retain the above copyright 17 notice, this list of conditions and the following disclaimer. 18 19 2. Redistributions in binary form must reproduce the above copyright 20 notice, this list of conditions and the following disclaimer in 21 the documentation and/or other materials provided with the 22 distribution. 23 24 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 25 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 26 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 27 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 28 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 29 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 30 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 31 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 32 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 33 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 34 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 35 POSSIBILITY OF SUCH DAMAGE. 36*/ 37 38:- module(pengines, 39 [ pengine_create/1, % +Options 40 pengine_ask/3, % +Pengine, :Query, +Options 41 pengine_next/2, % +Pengine. +Options 42 pengine_stop/2, % +Pengine. +Options 43 pengine_event/2, % -Event, +Options 44 pengine_input/2, % +Prompt, -Term 45 pengine_output/1, % +Term 46 pengine_respond/3, % +Pengine, +Input, +Options 47 pengine_debug/2, % +Format, +Args 48 pengine_self/1, % -Pengine 49 pengine_pull_response/2, % +Pengine, +Options 50 pengine_destroy/1, % +Pengine 51 pengine_destroy/2, % +Pengine, +Options 52 pengine_abort/1, % +Pengine 53 pengine_application/1, % +Application 54 current_pengine_application/1, % ?Application 55 pengine_property/2, % ?Pengine, ?Property 56 pengine_user/1, % -User 57 pengine_event_loop/2, % :Closure, +Options 58 pengine_rpc/2, % +Server, :Goal 59 pengine_rpc/3 % +Server, :Goal, +Options 60 ]).
71:- autoload(library(aggregate),[aggregate_all/3]). 72:- autoload(library(apply),[maplist/2,partition/4,exclude/3,maplist/3]). 73:- autoload(library(broadcast),[broadcast/1]). 74:- autoload(library(charsio),[open_chars_stream/2]). 75:- use_module(library(debug),[debug/1,debugging/1,debug/3,assertion/1]). 76:- autoload(library(error), 77 [ must_be/2, 78 existence_error/2, 79 permission_error/3, 80 domain_error/2 81 ]). 82:- autoload(library(filesex),[directory_file_path/3]). 83:- autoload(library(listing),[listing/1]). 84:- autoload(library(lists),[member/2,flatten/2,select/3,append/3]). 85:- autoload(library(modules),[in_temporary_module/3]). 86:- autoload(library(occurs),[sub_term/2]). 87:- autoload(library(option), 88 [select_option/3,option/2,option/3,select_option/4]). 89:- autoload(library(prolog_stack),[print_prolog_backtrace/2]). 90:- autoload(library(sandbox),[safe_goal/1]). 91:- autoload(library(statistics),[thread_statistics/2]). 92:- autoload(library(term_to_json),[term_to_json/2]). 93:- autoload(library(thread_pool), 94 [thread_pool_create/3,thread_create_in_pool/4]). 95:- autoload(library(time),[alarm/4,call_with_time_limit/2]). 96:- autoload(library(uri), 97 [ uri_components/2, 98 uri_query_components/2, 99 uri_data/3, 100 uri_data/4, 101 uri_encoded/3 102 ]). 103:- autoload(library(http/http_client),[http_read_data/3]). 104:- autoload(library(http/http_cors),[cors_enable/0,cors_enable/2]). 105:- autoload(library(http/http_dispatch), 106 [http_handler/3,http_404/2,http_reply_file/3]). 107:- autoload(library(http/http_open),[http_open/3]). 108:- autoload(library(http/http_parameters),[http_parameters/2]). 109:- autoload(library(http/http_stream),[is_cgi_stream/1]). 110:- autoload(library(http/http_wrapper),[http_peer/2]). 111 112:- use_module(library(settings),[setting/2,setting/4]). 113:- use_module(library(http/http_json), 114 [http_read_json_dict/2,reply_json_dict/1]). 115 116:- if(exists_source(library(uuid))). 117:- autoload(library(uuid), [uuid/2]). 118:- endif. 119 120 121:- meta_predicate 122 pengine_create( ), 123 pengine_rpc( , , ), 124 pengine_event_loop( , ). 125 126:- multifile 127 write_result/3, % +Format, +Event, +Dict 128 event_to_json/3, % +Event, -JSON, +Format 129 prepare_module/3, % +Module, +Application, +Options 130 prepare_goal/3, % +GoalIn, -GoalOut, +Options 131 authentication_hook/3, % +Request, +Application, -User 132 not_sandboxed/2, % +User, +App 133 pengine_flush_output_hook/0. 134 135:- predicate_options(pengine_create/1, 1, 136 [ id(-atom), 137 alias(atom), 138 application(atom), 139 destroy(boolean), 140 server(atom), 141 ask(compound), 142 template(compound), 143 chunk(integer;oneof([false])), 144 bindings(list), 145 src_list(list), 146 src_text(any), % text 147 src_url(atom), 148 src_predicates(list) 149 ]). 150:- predicate_options(pengine_ask/3, 3, 151 [ template(any), 152 chunk(integer;oneof([false])), 153 bindings(list) 154 ]). 155:- predicate_options(pengine_next/2, 2, 156 [ chunk(integer), 157 pass_to(pengine_send/3, 3) 158 ]). 159:- predicate_options(pengine_stop/2, 2, 160 [ pass_to(pengine_send/3, 3) 161 ]). 162:- predicate_options(pengine_respond/3, 2, 163 [ pass_to(pengine_send/3, 3) 164 ]). 165:- predicate_options(pengine_rpc/3, 3, 166 [ chunk(integer;oneof([false])), 167 pass_to(pengine_create/1, 1) 168 ]). 169:- predicate_options(pengine_send/3, 3, 170 [ delay(number) 171 ]). 172:- predicate_options(pengine_event/2, 2, 173 [ listen(atom), 174 pass_to(system:thread_get_message/3, 3) 175 ]). 176:- predicate_options(pengine_pull_response/2, 2, 177 [ pass_to(http_open/3, 3) 178 ]). 179:- predicate_options(pengine_event_loop/2, 2, 180 []). % not yet implemented 181 182% :- debug(pengine(transition)). 183:- debug(pengine(debug)). % handle pengine_debug in pengine_rpc/3. 184 185goal_expansion(random_delay, Expanded) :- 186 ( debugging(pengine(delay)) 187 -> Expanded = do_random_delay 188 ; Expanded = true 189 ). 190 191do_random_delay :- 192 Delay is random(20)/1000, 193 sleep(Delay). 194 195:- meta_predicate % internal meta predicates 196 solve( , , , ), 197 findnsols_no_empty( , , , ), 198 pengine_event_loop( , , ).
Remaining options are passed to http_open/3 (meaningful only for non-local pengines) and thread_create/3. Note that for thread_create/3 only options changing the stack-sizes can be used. In particular, do not pass the detached or alias options..
Successful creation of a pengine will return an event term of the following form:
An error will be returned if the pengine could not be created:
253pengine_create(M:Options0) :-
254 translate_local_sources(Options0, Options, M),
255 ( select_option(server(BaseURL), Options, RestOptions)
256 -> remote_pengine_create(BaseURL, RestOptions)
257 ; local_pengine_create(Options)
258 ).
src_predicates
and src_list
options into
src_text
. We need to do that anyway for remote pengines. For
local pengines, we could avoid this step, but there is very
little point in transferring source to a local pengine anyway as
local pengines can access any Prolog predicate that you make
visible to the application.
Multiple sources are concatenated to end up with a single src_text option.
272translate_local_sources(OptionsIn, Options, Module) :- 273 translate_local_sources(OptionsIn, Sources, Options2, Module), 274 ( Sources == [] 275 -> Options = Options2 276 ; Sources = [Source] 277 -> Options = [src_text(Source)|Options2] 278 ; atomics_to_string(Sources, Source) 279 -> Options = [src_text(Source)|Options2] 280 ). 281 282translate_local_sources([], [], [], _). 283translate_local_sources([H0|T], [S0|S], Options, M) :- 284 nonvar(H0), 285 translate_local_source(H0, S0, M), 286 !, 287 translate_local_sources(T, S, Options, M). 288translate_local_sources([H|T0], S, [H|T], M) :- 289 translate_local_sources(T0, S, T, M). 290 291translate_local_source(src_predicates(PIs), Source, M) :- 292 must_be(list, PIs), 293 with_output_to(string(Source), 294 maplist(list_in_module(M), PIs)). 295translate_local_source(src_list(Terms), Source, _) :- 296 must_be(list, Terms), 297 with_output_to(string(Source), 298 forall(member(Term, Terms), 299 format('~k .~n', [Term]))). 300translate_local_source(src_text(Source), Source, _). 301 302list_in_module(M, PI) :- 303 listing(M:PI).
pengine_send(NameOrID, Term, [])
.
*/
310pengine_send(Target, Event) :-
311 pengine_send(Target, Event, []).
Any remaining options are passed to http_open/3. */
326pengine_send(Target, Event, Options) :- 327 must_be(atom, Target), 328 pengine_send2(Target, Event, Options). 329 330pengine_send2(self, Event, Options) :- 331 !, 332 thread_self(Queue), 333 delay_message(queue(Queue), Event, Options). 334pengine_send2(Name, Event, Options) :- 335 child(Name, Target), 336 !, 337 delay_message(pengine(Target), Event, Options). 338pengine_send2(Target, Event, Options) :- 339 delay_message(pengine(Target), Event, Options). 340 341delay_message(Target, Event, Options) :- 342 option(delay(Delay), Options), 343 !, 344 alarm(Delay, 345 send_message(Target, Event, Options), 346 _AlarmID, 347 [remove(true)]). 348delay_message(Target, Event, Options) :- 349 random_delay, 350 send_message(Target, Event, Options). 351 352send_message(queue(Queue), Event, _) :- 353 thread_send_message(Queue, pengine_request(Event)). 354send_message(pengine(Pengine), Event, Options) :- 355 ( pengine_remote(Pengine, Server) 356 -> remote_pengine_send(Server, Pengine, Event, Options) 357 ; pengine_thread(Pengine, Thread) 358 -> thread_send_message(Thread, pengine_request(Event)) 359 ; existence_error(pengine, Pengine) 360 ).
idle_limit
setting while using thread_idle/2 to minimis
resources.370pengine_request(Request) :- 371 thread_self(Me), 372 thread_get_message(Me, pengine_request(Request), [timeout(1)]), 373 !. 374pengine_request(Request) :- 375 pengine_self(Self), 376 get_pengine_application(Self, Application), 377 setting(Application:idle_limit, IdleLimit0), 378 IdleLimit is IdleLimit0-1, 379 thread_self(Me), 380 ( thread_idle(thread_get_message(Me, pengine_request(Request), 381 [timeout(IdleLimit)]), 382 long) 383 -> true 384 ; Request = destroy 385 ).
If the message cannot be sent within the idle_limit
setting of
the pengine, abort the pengine.
398pengine_reply(Event) :- 399 pengine_parent(Queue), 400 pengine_reply(Queue, Event). 401 402pengine_reply(_Queue, _Event0) :- 403 nb_current(pengine_idle_limit_exceeded, true), 404 !. 405pengine_reply(Queue, Event0) :- 406 arg(1, Event0, ID), 407 wrap_first_answer(ID, Event0, Event), 408 random_delay, 409 debug(pengine(event), 'Reply to ~p: ~p', [Queue, Event]), 410 ( pengine_self(ID), 411 \+ pengine_detached(ID, _) 412 -> get_pengine_application(ID, Application), 413 setting(Application:idle_limit, IdleLimit), 414 debug(pengine(reply), 'Sending ~p, timeout: ~q', [Event, IdleLimit]), 415 ( thread_send_message(Queue, pengine_event(ID, Event), 416 [ timeout(IdleLimit) 417 ]) 418 -> true 419 ; thread_self(Me), 420 debug(pengine(reply), 'pengine_reply: timeout for ~q (thread ~q)', 421 [ID, Me]), 422 nb_setval(pengine_idle_limit_exceeded, true), 423 thread_detach(Me), 424 abort 425 ) 426 ; thread_send_message(Queue, pengine_event(ID, Event)) 427 ). 428 429wrap_first_answer(ID, Event0, CreateEvent) :- 430 wrap_first_answer_in_create_event(CreateEvent, [answer(Event0)]), 431 arg(1, CreateEvent, ID), 432 !, 433 retract(wrap_first_answer_in_create_event(CreateEvent, [answer(Event0)])). 434wrap_first_answer(_ID, Event, Event). 435 436 437empty_queue :- 438 pengine_parent(Queue), 439 empty_queue(Queue, 0, Discarded), 440 debug(pengine(abort), 'Abort: discarded ~D messages', [Discarded]). 441 442empty_queue(Queue, C0, C) :- 443 thread_get_message(Queue, _Term, [timeout(0)]), 444 !, 445 C1 is C0+1, 446 empty_queue(Queue, C1, C). 447empty_queue(_, C, C).
Options is a list of options:
false
, the
Pengine goal is not executed using findall/3 and friends and
we do not backtrack immediately over the goal. As a result,
changes to backtrackable global state are retained. This is
similar that using set_prolog_flag(toplevel_mode, recursive)
.Name = Var
terms, providing access to the actual variable
names.Any remaining options are passed to pengine_send/3.
Note that the predicate pengine_ask/3 is deterministic, even for queries that have more than one solution. Also, the variables in Query will not be bound. Instead, results will be returned in the form of event terms.
true
or false
, indicating whether we can expect the
pengine to be able to return more solutions or not, would we call
pengine_next/2.Defined in terms of pengine_send/3, like so:
pengine_ask(ID, Query, Options) :- partition(pengine_ask_option, Options, AskOptions, SendOptions), pengine_send(ID, ask(Query, AskOptions), SendOptions).
*/
516pengine_ask(ID, Query, Options) :- 517 partition(pengine_ask_option, Options, AskOptions, SendOptions), 518 pengine_send(ID, ask(Query, AskOptions), SendOptions). 519 520 521pengine_ask_option(template(_)). 522pengine_ask_option(chunk(_)). 523pengine_ask_option(bindings(_)). 524pengine_ask_option(breakpoints(_)).
chunk(false)
.Remaining options are passed to pengine_send/3. The result of re-executing the current goal is returned to the caller's message queue in the form of event terms.
Defined in terms of pengine_send/3, as follows:
pengine_next(ID, Options) :- pengine_send(ID, next, Options).
*/
569pengine_next(ID, Options) :- 570 select_option(chunk(Count), Options, Options1), 571 !, 572 pengine_send(ID, next(Count), Options1). 573pengine_next(ID, Options) :- 574 pengine_send(ID, next, Options).
Defined in terms of pengine_send/3, like so:
pengine_stop(ID, Options) :- pengine_send(ID, stop, Options).
*/
590pengine_stop(ID, Options) :- pengine_send(ID, stop, Options).
601pengine_abort(Name) :-
602 ( child(Name, Pengine)
603 -> true
604 ; Pengine = Name
605 ),
606 ( pengine_remote(Pengine, Server)
607 -> remote_pengine_abort(Server, Pengine, [])
608 ; pengine_thread(Pengine, Thread),
609 debug(pengine(abort), 'Signalling thread ~p', [Thread]),
610 catch(thread_signal(Thread, throw(abort_query)), _, true)
611 ).
force(true)
, the pengine
is killed using abort/0 and pengine_destroy/2 succeeds.
*/621pengine_destroy(ID) :- 622 pengine_destroy(ID, []). 623 624pengine_destroy(Name, Options) :- 625 ( child(Name, ID) 626 -> true 627 ; ID = Name 628 ), 629 option(force(true), Options), 630 !, 631 ( pengine_thread(ID, Thread) 632 -> catch(thread_signal(Thread, abort), 633 error(existence_error(thread, _), _), true) 634 ; true 635 ). 636pengine_destroy(ID, Options) :- 637 catch(pengine_send(ID, destroy, Options), 638 error(existence_error(pengine, ID), _), 639 retractall(child(_,ID))). 640 641 642/*================= pengines administration ======================= 643*/
thread(ThreadId)
remote(URL)
654:- dynamic 655 current_pengine/6, % Id, ParentId, Thread, URL, App, Destroy 656 pengine_queue/4, % Id, Queue, TimeOut, Time 657 output_queue/3, % Id, Queue, Time 658 pengine_user/2, % Id, User 659 pengine_data/2, % Id, Data 660 pengine_detached/2. % Id, Data 661:- volatile 662 current_pengine/6, 663 pengine_queue/4, 664 output_queue/3, 665 pengine_user/2, 666 pengine_data/2, 667 pengine_detached/2. 668 669:- thread_local 670 child/2. % ?Name, ?Child
676pengine_register_local(Id, Thread, Queue, URL, Application, Destroy) :- 677 asserta(current_pengine(Id, Queue, Thread, URL, Application, Destroy)). 678 679pengine_register_remote(Id, URL, Application, Destroy) :- 680 thread_self(Queue), 681 asserta(current_pengine(Id, Queue, 0, URL, Application, Destroy)).
http
and the queue is the
message queue used to send events to the HTTP workers.689pengine_unregister(Id) :- 690 thread_self(Me), 691 ( current_pengine(Id, Queue, Me, http, _, _) 692 -> with_mutex(pengine, sync_destroy_queue_from_pengine(Id, Queue)) 693 ; true 694 ), 695 retractall(current_pengine(Id, _, Me, _, _, _)), 696 retractall(pengine_user(Id, _)), 697 retractall(pengine_data(Id, _)). 698 699pengine_unregister_remote(Id) :- 700 retractall(current_pengine(Id, _Parent, 0, _, _, _)).
706pengine_self(Id) :- 707 thread_self(Thread), 708 current_pengine(Id, _Parent, Thread, _URL, _Application, _Destroy). 709 710pengine_parent(Parent) :- 711 nb_getval(pengine_parent, Parent). 712 713pengine_thread(Pengine, Thread) :- 714 current_pengine(Pengine, _Parent, Thread, _URL, _Application, _Destroy), 715 Thread \== 0, 716 !. 717 718pengine_remote(Pengine, URL) :- 719 current_pengine(Pengine, _Parent, 0, URL, _Application, _Destroy). 720 721get_pengine_application(Pengine, Application) :- 722 current_pengine(Pengine, _Parent, _, _URL, Application, _Destroy), 723 !. 724 725get_pengine_module(Pengine, Pengine). 726 727:- if(current_predicate(uuid/2)). 728pengine_uuid(Id) :- 729 uuid(Id, [version(4)]). % Version 4 is random. 730:- else. 731pengine_uuid(Id) :- 732 ( current_prolog_flag(max_integer, Max1) 733 -> Max is Max1-1 734 ; Max is 1<<128 735 ), 736 random_between(0, Max, Num), 737 atom_number(Id, Num). 738:- endif.
This also runs Goal if the Pengine no longer exists. This deals with Pengines terminated through destroy_or_continue/1.
755:- meta_predicate protect_pengine( , ). 756 757protect_pengine(Id, Goal) :- 758 term_hash(Id, Hash), 759 LockN is Hash mod 64, 760 atom_concat(pengine_done_, LockN, Lock), 761 with_mutex(Lock, 762 ( pengine_thread(Id, _) 763 -> Goal 764 ; Goal 765 )).
pengine_sandbox
. The example below creates a new application
address_book
and imports the API defined in the module file
adress_book_api.pl
into the application.
:- pengine_application(address_book). :- use_module(address_book:adress_book_api).
*/
782pengine_application(Application) :- 783 throw(error(context_error(nodirective, 784 pengine_application(Application)), _)). 785 786:- multifile 787 system:term_expansion/2, 788 current_application/1.
796current_pengine_application(Application) :- 797 current_application(Application). 798 799 800% Default settings for all applications 801 802:- setting(thread_pool_size, integer, 100, 803 'Maximum number of pengines this application can run.'). 804:- setting(thread_pool_stacks, list(compound), [], 805 'Maximum stack sizes for pengines this application can run.'). 806:- setting(slave_limit, integer, 3, 807 'Maximum number of slave pengines a master pengine can create.'). 808:- setting(time_limit, number, 300, 809 'Maximum time to wait for output'). 810:- setting(idle_limit, number, 300, 811 'Pengine auto-destroys when idle for this time'). 812:- setting(safe_goal_limit, number, 10, 813 'Maximum time to try proving safety of the goal'). 814:- setting(program_space, integer, 100_000_000, 815 'Maximum memory used by predicates'). 816:- setting(allow_from, list(atom), [*], 817 'IP addresses from which remotes are allowed to connect'). 818:- setting(deny_from, list(atom), [], 819 'IP addresses from which remotes are NOT allowed to connect'). 820:- setting(debug_info, boolean, false, 821 'Keep information to support source-level debugging'). 822 823 824systemterm_expansion((:- pengine_application(Application)), Expanded) :- 825 must_be(atom, Application), 826 ( module_property(Application, file(_)) 827 -> permission_error(create, pengine_application, Application) 828 ; true 829 ), 830 expand_term((:- setting(Application:thread_pool_size, integer, 831 setting(pengines:thread_pool_size), 832 'Maximum number of pengines this \c 833 application can run.')), 834 ThreadPoolSizeSetting), 835 expand_term((:- setting(Application:thread_pool_stacks, list(compound), 836 setting(pengines:thread_pool_stacks), 837 'Maximum stack sizes for pengines \c 838 this application can run.')), 839 ThreadPoolStacksSetting), 840 expand_term((:- setting(Application:slave_limit, integer, 841 setting(pengines:slave_limit), 842 'Maximum number of local slave pengines \c 843 a master pengine can create.')), 844 SlaveLimitSetting), 845 expand_term((:- setting(Application:time_limit, number, 846 setting(pengines:time_limit), 847 'Maximum time to wait for output')), 848 TimeLimitSetting), 849 expand_term((:- setting(Application:idle_limit, number, 850 setting(pengines:idle_limit), 851 'Pengine auto-destroys when idle for this time')), 852 IdleLimitSetting), 853 expand_term((:- setting(Application:safe_goal_limit, number, 854 setting(pengines:safe_goal_limit), 855 'Maximum time to try proving safety of the goal')), 856 SafeGoalLimitSetting), 857 expand_term((:- setting(Application:program_space, integer, 858 setting(pengines:program_space), 859 'Maximum memory used by predicates')), 860 ProgramSpaceSetting), 861 expand_term((:- setting(Application:allow_from, list(atom), 862 setting(pengines:allow_from), 863 'IP addresses from which remotes are allowed \c 864 to connect')), 865 AllowFromSetting), 866 expand_term((:- setting(Application:deny_from, list(atom), 867 setting(pengines:deny_from), 868 'IP addresses from which remotes are NOT \c 869 allowed to connect')), 870 DenyFromSetting), 871 expand_term((:- setting(Application:debug_info, boolean, 872 setting(pengines:debug_info), 873 'Keep information to support source-level \c 874 debugging')), 875 DebugInfoSetting), 876 flatten([ pengines:current_application(Application), 877 ThreadPoolSizeSetting, 878 ThreadPoolStacksSetting, 879 SlaveLimitSetting, 880 TimeLimitSetting, 881 IdleLimitSetting, 882 SafeGoalLimitSetting, 883 ProgramSpaceSetting, 884 AllowFromSetting, 885 DenyFromSetting, 886 DebugInfoSetting 887 ], Expanded). 888 889% Register default application 890 891:- pengine_application(pengine_sandbox).
alias
option when creating the pengine.true
if the pengines is destroyed automatically
after completing the query.debug_info
is present.928pengine_property(Id, Prop) :- 929 nonvar(Id), nonvar(Prop), 930 pengine_property2(Prop, Id), 931 !. 932pengine_property(Id, Prop) :- 933 pengine_property2(Prop, Id). 934 935pengine_property2(self(Id), Id) :- 936 current_pengine(Id, _Parent, _Thread, _URL, _Application, _Destroy). 937pengine_property2(module(Id), Id) :- 938 current_pengine(Id, _Parent, _Thread, _URL, _Application, _Destroy). 939pengine_property2(alias(Alias), Id) :- 940 child(Alias, Id), 941 Alias \== Id. 942pengine_property2(thread(Thread), Id) :- 943 current_pengine(Id, _Parent, Thread, _URL, _Application, _Destroy), 944 Thread \== 0. 945pengine_property2(remote(Server), Id) :- 946 current_pengine(Id, _Parent, 0, Server, _Application, _Destroy). 947pengine_property2(application(Application), Id) :- 948 current_pengine(Id, _Parent, _Thread, _Server, Application, _Destroy). 949pengine_property2(destroy(Destroy), Id) :- 950 current_pengine(Id, _Parent, _Thread, _Server, _Application, Destroy). 951pengine_property2(parent(Parent), Id) :- 952 current_pengine(Id, Parent, _Thread, _URL, _Application, _Destroy). 953pengine_property2(source(SourceID, Source), Id) :- 954 pengine_data(Id, source(SourceID, Source)). 955pengine_property2(detached(When), Id) :- 956 pengine_detached(Id, When).
963pengine_output(Term) :-
964 pengine_self(Me),
965 pengine_reply(output(Me, Term)).
console.log(Message)
if there is a console. The predicate
pengine_rpc/3 calls debug(pengine(debug), '~w', [Message])
. The debug
topic pengine(debug)
is enabled by default.
980pengine_debug(Format, Args) :- 981 pengine_parent(Queue), 982 pengine_self(Self), 983 catch(safe_goal(format(atom(_), Format, Args)), E, true), 984 ( var(E) 985 -> format(atom(Message), Format, Args) 986 ; message_to_string(E, Message) 987 ), 988 pengine_reply(Queue, debug(Self, Message)). 989 990 991/*================= Local pengine ======================= 992*/
1003local_pengine_create(Options) :-
1004 thread_self(Self),
1005 option(application(Application), Options, pengine_sandbox),
1006 create(Self, Child, Options, local, Application),
1007 option(alias(Name), Options, Child),
1008 assert(child(Name, Child)).
1015:- multifile thread_pool:create_pool/1. 1016 1017thread_poolcreate_pool(Application) :- 1018 current_application(Application), 1019 setting(Application:thread_pool_size, Size), 1020 setting(Application:thread_pool_stacks, Stacks), 1021 thread_pool_create(Application, Size, Stacks).
1031create(Queue, Child, Options, local, Application) :- 1032 !, 1033 pengine_child_id(Child), 1034 create0(Queue, Child, Options, local, Application). 1035create(Queue, Child, Options, URL, Application) :- 1036 pengine_child_id(Child), 1037 catch(create0(Queue, Child, Options, URL, Application), 1038 Error, 1039 create_error(Queue, Child, Error)). 1040 1041pengine_child_id(Child) :- 1042 ( nonvar(Child) 1043 -> true 1044 ; pengine_uuid(Child) 1045 ). 1046 1047create_error(Queue, Child, Error) :- 1048 pengine_reply(Queue, error(Child, Error)). 1049 1050create0(Queue, Child, Options, URL, Application) :- 1051 ( current_application(Application) 1052 -> true 1053 ; existence_error(pengine_application, Application) 1054 ), 1055 ( URL \== http % pengine is _not_ a child of the 1056 % HTTP server thread 1057 -> aggregate_all(count, child(_,_), Count), 1058 setting(Application:slave_limit, Max), 1059 ( Count >= Max 1060 -> throw(error(resource_error(max_pengines), _)) 1061 ; true 1062 ) 1063 ; true 1064 ), 1065 partition(pengine_create_option, Options, PengineOptions, RestOptions), 1066 thread_create_in_pool( 1067 Application, 1068 pengine_main(Queue, PengineOptions, Application), ChildThread, 1069 [ at_exit(pengine_done) 1070 | RestOptions 1071 ]), 1072 option(destroy(Destroy), PengineOptions, true), 1073 pengine_register_local(Child, ChildThread, Queue, URL, Application, Destroy), 1074 thread_send_message(ChildThread, pengine_registered(Child)), 1075 ( option(id(Id), Options) 1076 -> Id = Child 1077 ; true 1078 ). 1079 1080pengine_create_option(src_text(_)). 1081pengine_create_option(src_url(_)). 1082pengine_create_option(application(_)). 1083pengine_create_option(destroy(_)). 1084pengine_create_option(ask(_)). 1085pengine_create_option(template(_)). 1086pengine_create_option(bindings(_)). 1087pengine_create_option(chunk(_)). 1088pengine_create_option(alias(_)). 1089pengine_create_option(user(_)).
at_exit
option. Destroys child
pengines using pengine_destroy/1. Cleaning up the Pengine is
synchronised by the pengine_done
mutex. See read_event/6.1098:- public 1099 pengine_done/0. 1100 1101pengine_done :- 1102 thread_self(Me), 1103 ( thread_property(Me, status(exception(Ex))), 1104 abort_exception(Ex), 1105 thread_detach(Me), 1106 pengine_self(Pengine) 1107 -> catch(pengine_reply(destroy(Pengine, abort(Pengine))), 1108 error(_,_), true) 1109 ; true 1110 ), 1111 forall(child(_Name, Child), 1112 pengine_destroy(Child)), 1113 pengine_self(Id), 1114 protect_pengine(Id, pengine_unregister(Id)). 1115 1116abort_exception('$aborted'). 1117abort_exception(unwind(abort)).
1124:- thread_local wrap_first_answer_in_create_event/2. 1125 1126:- meta_predicate 1127 pengine_prepare_source( , ). 1128 1129pengine_main(Parent, Options, Application) :- 1130 fix_streams, 1131 thread_get_message(pengine_registered(Self)), 1132 nb_setval(pengine_parent, Parent), 1133 pengine_register_user(Options), 1134 set_prolog_flag(mitigate_spectre, true), 1135 catch(in_temporary_module( 1136 Self, 1137 pengine_prepare_source(Application, Options), 1138 pengine_create_and_loop(Self, Application, Options)), 1139 prepare_source_failed, 1140 pengine_terminate(Self)). 1141 1142pengine_create_and_loop(Self, Application, Options) :- 1143 setting(Application:slave_limit, SlaveLimit), 1144 CreateEvent = create(Self, [slave_limit(SlaveLimit)|Extra]), 1145 ( option(ask(Query0), Options) 1146 -> asserta(wrap_first_answer_in_create_event(CreateEvent, Extra)), 1147 ( string(Query0) % string is not callable 1148 -> ( option(template(TemplateS), Options) 1149 -> Ask2 = Query0-TemplateS 1150 ; Ask2 = Query0 1151 ), 1152 catch(ask_to_term(Ask2, Self, Query, Template, Bindings), 1153 Error, true), 1154 ( var(Error) 1155 -> true 1156 ; send_error(Error), 1157 throw(prepare_source_failed) 1158 ) 1159 ; Query = Query0, 1160 option(template(Template), Options, Query), 1161 option(bindings(Bindings), Options, []) 1162 ), 1163 option(chunk(Chunk), Options, 1), 1164 pengine_ask(Self, Query, 1165 [ template(Template), 1166 chunk(Chunk), 1167 bindings(Bindings) 1168 ]) 1169 ; Extra = [], 1170 pengine_reply(CreateEvent) 1171 ), 1172 pengine_main_loop(Self).
1182ask_to_term(Ask-Template, Module, Ask1, Template1, Bindings) :- 1183 !, 1184 format(string(AskTemplate), 't((~s),(~s))', [Template, Ask]), 1185 term_string(t(Template1,Ask1), AskTemplate, 1186 [ variable_names(Bindings0), 1187 module(Module) 1188 ]), 1189 phrase(template_bindings(Template1, Bindings0), Bindings). 1190ask_to_term(Ask, Module, Ask1, Template, Bindings1) :- 1191 term_string(Ask1, Ask, 1192 [ variable_names(Bindings), 1193 module(Module) 1194 ]), 1195 exclude(anon, Bindings, Bindings1), 1196 dict_create(Template, swish_default_template, Bindings1). 1197 1198template_bindings(Var, Bindings) --> 1199 { var(Var) }, !, 1200 ( { var_binding(Bindings, Var, Binding) 1201 } 1202 -> [Binding] 1203 ; [] 1204 ). 1205template_bindings([H|T], Bindings) --> 1206 !, 1207 template_bindings(H, Bindings), 1208 template_bindings(T, Bindings). 1209template_bindings(Compoound, Bindings) --> 1210 { compound(Compoound), !, 1211 compound_name_arguments(Compoound, _, Args) 1212 }, 1213 template_bindings(Args, Bindings). 1214template_bindings(_, _) --> []. 1215 1216var_binding(Bindings, Var, Binding) :- 1217 member(Binding, Bindings), 1218 arg(2, Binding, V), 1219 V == Var, !.
1226fix_streams :- 1227 fix_stream(current_output). 1228 1229fix_stream(Name) :- 1230 is_cgi_stream(Name), 1231 !, 1232 debug(pengine(stream), '~w is a CGI stream!', [Name]), 1233 set_stream(user_output, alias(Name)). 1234fix_stream(_).
1243pengine_prepare_source(Module:Application, Options) :- 1244 setting(Application:program_space, SpaceLimit), 1245 set_module(Module:program_space(SpaceLimit)), 1246 delete_import_module(Module, user), 1247 add_import_module(Module, Application, start), 1248 catch(prep_module(Module, Application, Options), Error, true), 1249 ( var(Error) 1250 -> true 1251 ; send_error(Error), 1252 throw(prepare_source_failed) 1253 ). 1254 1255prep_module(Module, Application, Options) :- 1256 maplist(copy_flag(Module, Application), [var_prefix]), 1257 forall(prepare_module(Module, Application, Options), true), 1258 setup_call_cleanup( 1259 '$set_source_module'(OldModule, Module), 1260 maplist(process_create_option(Module), Options), 1261 '$set_source_module'(OldModule)). 1262 1263copy_flag(Module, Application, Flag) :- 1264 current_prolog_flag(ApplicationFlag, Value), 1265 !, 1266 set_prolog_flag(ModuleFlag, Value). 1267copy_flag(_, _, _). 1268 1269process_create_option(Application, src_text(Text)) :- 1270 !, 1271 pengine_src_text(Text, Application). 1272process_create_option(Application, src_url(URL)) :- 1273 !, 1274 pengine_src_url(URL, Application). 1275process_create_option(_, _).
src_text
and
src_url
options1298pengine_main_loop(ID) :- 1299 catch(guarded_main_loop(ID), abort_query, pengine_aborted(ID)). 1300 1301pengine_aborted(ID) :- 1302 thread_self(Self), 1303 debug(pengine(abort), 'Aborting ~p (thread ~p)', [ID, Self]), 1304 empty_queue, 1305 destroy_or_continue(abort(ID)).
1318guarded_main_loop(ID) :- 1319 pengine_request(Request), 1320 ( Request = destroy 1321 -> debug(pengine(transition), '~q: 2 = ~q => 1', [ID, destroy]), 1322 pengine_terminate(ID) 1323 ; Request = ask(Goal, Options) 1324 -> debug(pengine(transition), '~q: 2 = ~q => 3', [ID, ask(Goal)]), 1325 ask(ID, Goal, Options) 1326 ; debug(pengine(transition), '~q: 2 = ~q => 2', [ID, protocol_error]), 1327 pengine_reply(error(ID, error(protocol_error, _))), 1328 guarded_main_loop(ID) 1329 ). 1330 1331 1332pengine_terminate(ID) :- 1333 pengine_reply(destroy(ID)), 1334 thread_self(Me), % Make the thread silently disappear 1335 thread_detach(Me).
1346solve(Chunk, Template, Goal, ID) :- 1347 prolog_current_choice(Choice), 1348 ( integer(Chunk) 1349 -> State = count(Chunk) 1350 ; Chunk == false 1351 -> State = no_chunk 1352 ; domain_error(chunk, Chunk) 1353 ), 1354 statistics(cputime, Epoch), 1355 Time = time(Epoch), 1356 nb_current('$variable_names', Bindings), 1357 filter_template(Template, Bindings, Template2), 1358 '$current_typein_module'(CurrTypeIn), 1359 ( '$set_typein_module'(ID), 1360 call_cleanup(catch(findnsols_no_empty(State, Template2, 1361 set_projection(Goal, Bindings), 1362 Result), 1363 Error, true), 1364 query_done(Det, CurrTypeIn)), 1365 arg(1, Time, T0), 1366 statistics(cputime, T1), 1367 CPUTime is T1-T0, 1368 forall(pengine_flush_output_hook, true), 1369 ( var(Error) 1370 -> projection(Projection), 1371 ( var(Det) 1372 -> pengine_reply(success(ID, Result, Projection, 1373 CPUTime, true)), 1374 more_solutions(ID, Choice, State, Time) 1375 ; !, % commit 1376 destroy_or_continue(success(ID, Result, Projection, 1377 CPUTime, false)) 1378 ) 1379 ; !, % commit 1380 ( Error == abort_query 1381 -> throw(Error) 1382 ; destroy_or_continue(error(ID, Error)) 1383 ) 1384 ) 1385 ; !, % commit 1386 arg(1, Time, T0), 1387 statistics(cputime, T1), 1388 CPUTime is T1-T0, 1389 destroy_or_continue(failure(ID, CPUTime)) 1390 ). 1391solve(_, _, _, _). % leave a choice point 1392 1393query_done(true, CurrTypeIn) :- 1394 '$set_typein_module'(CurrTypeIn).
1403set_projection(Goal, Bindings) :- 1404 b_setval('$variable_names', Bindings), 1405 call(Goal). 1406 1407projection(Projection) :- 1408 nb_current('$variable_names', Bindings), 1409 !, 1410 maplist(var_name, Bindings, Projection). 1411projection([]).
1421filter_template(Template0, Bindings, Template) :- 1422 is_dict(Template0, swish_default_template), 1423 !, 1424 dict_create(Template, swish_default_template, Bindings). 1425filter_template(Template, _Bindings, Template). 1426 1427findnsols_no_empty(no_chunk, Template, Goal, List) => 1428 List = [Template], 1429 call(Goal). 1430findnsols_no_empty(State, Template, Goal, List) => 1431 findnsols(State, Template, Goal, List), 1432 List \== []. 1433 1434destroy_or_continue(Event) :- 1435 arg(1, Event, ID), 1436 ( pengine_property(ID, destroy(true)) 1437 -> thread_self(Me), 1438 thread_detach(Me), 1439 pengine_reply(destroy(ID, Event)) 1440 ; pengine_reply(Event), 1441 guarded_main_loop(ID) 1442 ).
chunk
solutions.next
, but sets the new chunk-size to Count.1460more_solutions(ID, Choice, State, Time) :- 1461 pengine_request(Event), 1462 more_solutions(Event, ID, Choice, State, Time). 1463 1464more_solutions(stop, ID, _Choice, _State, _Time) :- 1465 !, 1466 debug(pengine(transition), '~q: 6 = ~q => 7', [ID, stop]), 1467 destroy_or_continue(stop(ID)). 1468more_solutions(next, ID, _Choice, _State, Time) :- 1469 !, 1470 debug(pengine(transition), '~q: 6 = ~q => 3', [ID, next]), 1471 statistics(cputime, T0), 1472 nb_setarg(1, Time, T0), 1473 fail. 1474more_solutions(next(Count), ID, _Choice, State, Time) :- 1475 Count > 0, 1476 State = count(_), % else fallthrough to protocol error 1477 !, 1478 debug(pengine(transition), '~q: 6 = ~q => 3', [ID, next(Count)]), 1479 nb_setarg(1, State, Count), 1480 statistics(cputime, T0), 1481 nb_setarg(1, Time, T0), 1482 fail. 1483more_solutions(ask(Goal, Options), ID, Choice, _State, _Time) :- 1484 !, 1485 debug(pengine(transition), '~q: 6 = ~q => 3', [ID, ask(Goal)]), 1486 prolog_cut_to(Choice), 1487 ask(ID, Goal, Options). 1488more_solutions(destroy, ID, _Choice, _State, _Time) :- 1489 !, 1490 debug(pengine(transition), '~q: 6 = ~q => 1', [ID, destroy]), 1491 pengine_terminate(ID). 1492more_solutions(Event, ID, Choice, State, Time) :- 1493 debug(pengine(transition), '~q: 6 = ~q => 6', [ID, protocol_error(Event)]), 1494 pengine_reply(error(ID, error(protocol_error, _))), 1495 more_solutions(ID, Choice, State, Time).
chunk(N)
option.
1503ask(ID, Goal, Options) :-
1504 catch(prepare_goal(ID, Goal, Goal1, Options), Error, true),
1505 !,
1506 ( var(Error)
1507 -> option(template(Template), Options, Goal),
1508 option(chunk(N), Options, 1),
1509 solve(N, Template, Goal1, ID)
1510 ; pengine_reply(error(ID, Error)),
1511 guarded_main_loop(ID)
1512 ).
Note that expand_goal(Module:GoalIn, GoalOut)
is what we'd like
to write, but this does not work correctly if the user wishes to
expand X:Y
while interpreting X not as the module in which
to run Y. This happens in the CQL package. Possibly we should
disallow this reinterpretation?
1526prepare_goal(ID, Goal0, Module:Goal, Options) :-
1527 option(bindings(Bindings), Options, []),
1528 b_setval('$variable_names', Bindings),
1529 ( prepare_goal(Goal0, Goal1, Options)
1530 -> true
1531 ; Goal1 = Goal0
1532 ),
1533 get_pengine_module(ID, Module),
1534 setup_call_cleanup(
1535 '$set_source_module'(Old, Module),
1536 expand_goal(Goal1, Goal),
1537 '$set_source_module'(_, Old)),
1538 ( pengine_not_sandboxed(ID)
1539 -> true
1540 ; get_pengine_application(ID, App),
1541 setting(App:safe_goal_limit, Limit),
1542 catch(call_with_time_limit(
1543 Limit,
1544 safe_goal(Module:Goal)), E, true)
1545 -> ( var(E)
1546 -> true
1547 ; E = time_limit_exceeded
1548 -> throw(error(sandbox(time_limit_exceeded, Limit),_))
1549 ; throw(E)
1550 )
1551 ).
not_sandboxed(User, Application)
must succeed.
1571pengine_not_sandboxed(ID) :-
1572 pengine_user(ID, User),
1573 pengine_property(ID, application(App)),
1574 not_sandboxed(User, App),
1575 !.
1597pengine_pull_response(Pengine, Options) :- 1598 pengine_remote(Pengine, Server), 1599 !, 1600 remote_pengine_pull_response(Server, Pengine, Options). 1601pengine_pull_response(_ID, _Options).
1610pengine_input(Prompt, Term) :-
1611 pengine_self(Self),
1612 pengine_parent(Parent),
1613 pengine_reply(Parent, prompt(Self, Prompt)),
1614 pengine_request(Request),
1615 ( Request = input(Input)
1616 -> Term = Input
1617 ; Request == destroy
1618 -> abort
1619 ; throw(error(protocol_error,_))
1620 ).
Defined in terms of pengine_send/3, as follows:
pengine_respond(Pengine, Input, Options) :- pengine_send(Pengine, input(Input), Options).
*/
1637pengine_respond(Pengine, Input, Options) :-
1638 pengine_send(Pengine, input(Input), Options).
1647send_error(error(Formal, context(prolog_stack(Frames), Message))) :- 1648 is_list(Frames), 1649 !, 1650 with_output_to(string(Stack), 1651 print_prolog_backtrace(current_output, Frames)), 1652 pengine_self(Self), 1653 replace_blobs(Formal, Formal1), 1654 replace_blobs(Message, Message1), 1655 pengine_reply(error(Self, error(Formal1, 1656 context(prolog_stack(Stack), Message1)))). 1657send_error(Error) :- 1658 pengine_self(Self), 1659 replace_blobs(Error, Error1), 1660 pengine_reply(error(Self, Error1)).
1668replace_blobs(Blob, Atom) :- 1669 blob(Blob, Type), Type \== text, 1670 !, 1671 format(atom(Atom), '~p', [Blob]). 1672replace_blobs(Term0, Term) :- 1673 compound(Term0), 1674 !, 1675 compound_name_arguments(Term0, Name, Args0), 1676 maplist(replace_blobs, Args0, Args), 1677 compound_name_arguments(Term, Name, Args). 1678replace_blobs(Term, Term). 1679 1680 1681/*================= Remote pengines ======================= 1682*/ 1683 1684 1685remote_pengine_create(BaseURL, Options) :- 1686 partition(pengine_create_option, Options, PengineOptions0, RestOptions), 1687 ( option(ask(Query), PengineOptions0), 1688 \+ option(template(_Template), PengineOptions0) 1689 -> PengineOptions = [template(Query)|PengineOptions0] 1690 ; PengineOptions = PengineOptions0 1691 ), 1692 options_to_dict(PengineOptions, PostData), 1693 remote_post_rec(BaseURL, create, PostData, Reply, RestOptions), 1694 arg(1, Reply, ID), 1695 ( option(id(ID2), Options) 1696 -> ID = ID2 1697 ; true 1698 ), 1699 option(alias(Name), Options, ID), 1700 assert(child(Name, ID)), 1701 ( ( functor(Reply, create, _) % actually created 1702 ; functor(Reply, output, _) % compiler messages 1703 ) 1704 -> option(application(Application), PengineOptions, pengine_sandbox), 1705 option(destroy(Destroy), PengineOptions, true), 1706 pengine_register_remote(ID, BaseURL, Application, Destroy) 1707 ; true 1708 ), 1709 thread_self(Queue), 1710 pengine_reply(Queue, Reply). 1711 1712options_to_dict(Options, Dict) :- 1713 select_option(ask(Ask), Options, Options1), 1714 select_option(template(Template), Options1, Options2), 1715 !, 1716 no_numbered_var_in(Ask+Template), 1717 findall(AskString-TemplateString, 1718 ask_template_to_strings(Ask, Template, AskString, TemplateString), 1719 [ AskString-TemplateString ]), 1720 options_to_dict(Options2, Dict0), 1721 Dict = Dict0.put(_{ask:AskString,template:TemplateString}). 1722options_to_dict(Options, Dict) :- 1723 maplist(prolog_option, Options, Options1), 1724 dict_create(Dict, _, Options1). 1725 1726no_numbered_var_in(Term) :- 1727 sub_term(Sub, Term), 1728 subsumes_term('$VAR'(_), Sub), 1729 !, 1730 domain_error(numbered_vars_free_term, Term). 1731no_numbered_var_in(_). 1732 1733ask_template_to_strings(Ask, Template, AskString, TemplateString) :- 1734 numbervars(Ask+Template, 0, _), 1735 WOpts = [ numbervars(true), ignore_ops(true), quoted(true) ], 1736 format(string(AskTemplate), '~W\n~W', [ Ask, WOpts, 1737 Template, WOpts 1738 ]), 1739 split_string(AskTemplate, "\n", "", [AskString, TemplateString]). 1740 1741prolog_option(Option0, Option) :- 1742 create_option_type(Option0, term), 1743 !, 1744 Option0 =.. [Name,Value], 1745 format(string(String), '~k', [Value]), 1746 Option =.. [Name,String]. 1747prolog_option(Option, Option). 1748 1749create_option_type(ask(_), term). 1750create_option_type(template(_), term). 1751create_option_type(application(_), atom). 1752 1753remote_pengine_send(BaseURL, ID, Event, Options) :- 1754 remote_send_rec(BaseURL, send, ID, [event=Event], Reply, Options), 1755 thread_self(Queue), 1756 pengine_reply(Queue, Reply). 1757 1758remote_pengine_pull_response(BaseURL, ID, Options) :- 1759 remote_send_rec(BaseURL, pull_response, ID, [], Reply, Options), 1760 thread_self(Queue), 1761 pengine_reply(Queue, Reply). 1762 1763remote_pengine_abort(BaseURL, ID, Options) :- 1764 remote_send_rec(BaseURL, abort, ID, [], Reply, Options), 1765 thread_self(Queue), 1766 pengine_reply(Queue, Reply).
1773remote_send_rec(Server, Action, ID, [event=Event], Reply, Options) :- 1774 !, 1775 server_url(Server, Action, [id=ID], URL), 1776 http_open(URL, Stream, % putting this in setup_call_cleanup/3 1777 [ post(prolog(Event)) % makes it impossible to interrupt. 1778 | Options 1779 ]), 1780 call_cleanup( 1781 read_prolog_reply(Stream, Reply), 1782 close(Stream)). 1783remote_send_rec(Server, Action, ID, Params, Reply, Options) :- 1784 server_url(Server, Action, [id=ID|Params], URL), 1785 http_open(URL, Stream, Options), 1786 call_cleanup( 1787 read_prolog_reply(Stream, Reply), 1788 close(Stream)). 1789 1790remote_post_rec(Server, Action, Data, Reply, Options) :- 1791 server_url(Server, Action, [], URL), 1792 probe(Action, URL, Options), 1793 http_open(URL, Stream, 1794 [ post(json(Data)) 1795 | Options 1796 ]), 1797 call_cleanup( 1798 read_prolog_reply(Stream, Reply), 1799 close(Stream)).
1807probe(create, URL, Options) :- 1808 !, 1809 http_open(URL, Stream, [method(options)|Options]), 1810 close(Stream). 1811probe(_, _, _). 1812 1813read_prolog_reply(In, Reply) :- 1814 set_stream(In, encoding(utf8)), 1815 read(In, Reply0), 1816 rebind_cycles(Reply0, Reply). 1817 1818rebind_cycles(@(Reply, Bindings), Reply) :- 1819 is_list(Bindings), 1820 !, 1821 maplist(bind, Bindings). 1822rebind_cycles(Reply, Reply). 1823 1824bind(Var = Value) :- 1825 Var = Value. 1826 1827server_url(Server, Action, Params, URL) :- 1828 atom_concat('pengine/', Action, PAction), 1829 uri_edit([ path(PAction), 1830 search(Params) 1831 ], Server, URL).
Valid options are:
timeout
.1852pengine_event(Event) :- 1853 pengine_event(Event, []). 1854 1855pengine_event(Event, Options) :- 1856 thread_self(Self), 1857 option(listen(Id), Options, _), 1858 ( thread_get_message(Self, pengine_event(Id, Event), Options) 1859 -> true 1860 ; Event = timeout 1861 ), 1862 update_remote_destroy(Event). 1863 1864update_remote_destroy(Event) :- 1865 destroy_event(Event), 1866 arg(1, Event, Id), 1867 pengine_remote(Id, _Server), 1868 !, 1869 pengine_unregister_remote(Id). 1870update_remote_destroy(_). 1871 1872destroy_event(destroy(_)). 1873destroy_event(destroy(_,_)). 1874destroy_event(create(_,Features)) :- 1875 memberchk(answer(Answer), Features), 1876 !, 1877 nonvar(Answer), 1878 destroy_event(Answer).
ignore(call(Closure, E))
. A
closure thus acts as a handler for the event. Some events are also
treated specially:
Valid options are:
all
,
all_but_sender
or a Prolog list of NameOrIDs. [not yet
implemented]*/
1907pengine_event_loop(Closure, Options) :- 1908 child(_,_), 1909 !, 1910 pengine_event(Event), 1911 ( option(autoforward(all), Options) % TODO: Implement all_but_sender and list of IDs 1912 -> forall(child(_,ID), pengine_send(ID, Event)) 1913 ; true 1914 ), 1915 pengine_event_loop(Event, Closure, Options). 1916pengine_event_loop(_, _). 1917 1918:- meta_predicate 1919 pengine_process_event( , , , ). 1920 1921pengine_event_loop(Event, Closure, Options) :- 1922 pengine_process_event(Event, Closure, Continue, Options), 1923 ( Continue == true 1924 -> pengine_event_loop(Closure, Options) 1925 ; true 1926 ). 1927 1928pengine_process_event(create(ID, T), Closure, Continue, Options) :- 1929 debug(pengine(transition), '~q: 1 = /~q => 2', [ID, create(T)]), 1930 ( select(answer(First), T, T1) 1931 -> ignore(call(Closure, create(ID, T1))), 1932 pengine_process_event(First, Closure, Continue, Options) 1933 ; ignore(call(Closure, create(ID, T))), 1934 Continue = true 1935 ). 1936pengine_process_event(output(ID, Msg), Closure, true, _Options) :- 1937 debug(pengine(transition), '~q: 3 = /~q => 4', [ID, output(Msg)]), 1938 ignore(call(Closure, output(ID, Msg))), 1939 pengine_pull_response(ID, []). 1940pengine_process_event(debug(ID, Msg), Closure, true, _Options) :- 1941 debug(pengine(transition), '~q: 3 = /~q => 4', [ID, debug(Msg)]), 1942 ignore(call(Closure, debug(ID, Msg))), 1943 pengine_pull_response(ID, []). 1944pengine_process_event(prompt(ID, Term), Closure, true, _Options) :- 1945 debug(pengine(transition), '~q: 3 = /~q => 5', [ID, prompt(Term)]), 1946 ignore(call(Closure, prompt(ID, Term))). 1947pengine_process_event(success(ID, Sol, _Proj, _Time, More), Closure, true, _) :- 1948 debug(pengine(transition), '~q: 3 = /~q => 6/2', [ID, success(Sol, More)]), 1949 ignore(call(Closure, success(ID, Sol, More))). 1950pengine_process_event(failure(ID, _Time), Closure, true, _Options) :- 1951 debug(pengine(transition), '~q: 3 = /~q => 2', [ID, failure]), 1952 ignore(call(Closure, failure(ID))). 1953pengine_process_event(error(ID, Error), Closure, Continue, _Options) :- 1954 debug(pengine(transition), '~q: 3 = /~q => 2', [ID, error(Error)]), 1955 ( call(Closure, error(ID, Error)) 1956 -> Continue = true 1957 ; forall(child(_,Child), pengine_destroy(Child)), 1958 throw(Error) 1959 ). 1960pengine_process_event(stop(ID), Closure, true, _Options) :- 1961 debug(pengine(transition), '~q: 7 = /~q => 2', [ID, stop]), 1962 ignore(call(Closure, stop(ID))). 1963pengine_process_event(destroy(ID, Event), Closure, Continue, Options) :- 1964 pengine_process_event(Event, Closure, _, Options), 1965 pengine_process_event(destroy(ID), Closure, Continue, Options). 1966pengine_process_event(destroy(ID), Closure, true, _Options) :- 1967 retractall(child(_,ID)), 1968 debug(pengine(transition), '~q: 1 = /~q => 0', [ID, destroy]), 1969 ignore(call(Closure, destroy(ID))).
copy_term_nat(Query, Copy), % attributes are not copied to the server call(Copy), % executed on server at URL Query = Copy.
Valid options are:
pengines:time_limit
.Remaining options (except the server option) are passed to pengine_create/1. */
1998pengine_rpc(URL, Query) :- 1999 pengine_rpc(URL, Query, []). 2000 2001pengine_rpc(URL, Query, M:Options0) :- 2002 translate_local_sources(Options0, Options1, M), 2003 ( option(timeout(_), Options1) 2004 -> Options = Options1 2005 ; setting(time_limit, Limit), 2006 Options = [timeout(Limit)|Options1] 2007 ), 2008 term_variables(Query, Vars), 2009 Template =.. [v|Vars], 2010 State = destroy(true), % modified by process_event/4 2011 setup_call_catcher_cleanup( 2012 pengine_create([ ask(Query), 2013 template(Template), 2014 server(URL), 2015 id(Id) 2016 | Options 2017 ]), 2018 wait_event(Template, State, [listen(Id)|Options]), 2019 Why, 2020 pengine_destroy_and_wait(State, Id, Why, Options)). 2021 2022pengine_destroy_and_wait(destroy(true), Id, Why, Options) :- 2023 !, 2024 debug(pengine(rpc), 'Destroying RPC because of ~p', [Why]), 2025 pengine_destroy(Id, Options), 2026 wait_destroy(Id, 10). 2027pengine_destroy_and_wait(_, _, Why, _) :- 2028 debug(pengine(rpc), 'Not destroying RPC (~p)', [Why]). 2029 2030wait_destroy(Id, _) :- 2031 \+ child(_, Id), 2032 !. 2033wait_destroy(Id, N) :- 2034 pengine_event(Event, [listen(Id),timeout(10)]), 2035 !, 2036 ( destroy_event(Event) 2037 -> retractall(child(_,Id)) 2038 ; succ(N1, N) 2039 -> wait_destroy(Id, N1) 2040 ; debug(pengine(rpc), 'RPC did not answer to destroy ~p', [Id]), 2041 pengine_unregister_remote(Id), 2042 retractall(child(_,Id)) 2043 ). 2044 2045wait_event(Template, State, Options) :- 2046 pengine_event(Event, Options), 2047 debug(pengine(event), 'Received ~p', [Event]), 2048 process_event(Event, Template, State, Options). 2049 2050process_event(create(_ID, Features), Template, State, Options) :- 2051 memberchk(answer(First), Features), 2052 process_event(First, Template, State, Options). 2053process_event(error(_ID, Error), _Template, _, _Options) :- 2054 throw(Error). 2055process_event(failure(_ID, _Time), _Template, _, _Options) :- 2056 fail. 2057process_event(prompt(ID, Prompt), Template, State, Options) :- 2058 pengine_rpc_prompt(ID, Prompt, Reply), 2059 pengine_send(ID, input(Reply)), 2060 wait_event(Template, State, Options). 2061process_event(output(ID, Term), Template, State, Options) :- 2062 pengine_rpc_output(ID, Term), 2063 pengine_pull_response(ID, Options), 2064 wait_event(Template, State, Options). 2065process_event(debug(ID, Message), Template, State, Options) :- 2066 debug(pengine(debug), '~w', [Message]), 2067 pengine_pull_response(ID, Options), 2068 wait_event(Template, State, Options). 2069process_event(success(_ID, Solutions, _Proj, _Time, false), 2070 Template, _, _Options) :- 2071 !, 2072 member(Template, Solutions). 2073process_event(success(ID, Solutions, _Proj, _Time, true), 2074 Template, State, Options) :- 2075 ( member(Template, Solutions) 2076 ; pengine_next(ID, Options), 2077 wait_event(Template, State, Options) 2078 ). 2079process_event(destroy(ID, Event), Template, State, Options) :- 2080 !, 2081 retractall(child(_,ID)), 2082 nb_setarg(1, State, false), 2083 debug(pengine(destroy), 'State: ~p~n', [State]), 2084 process_event(Event, Template, State, Options). 2085% compatibility with older versions of the protocol. 2086process_event(success(ID, Solutions, Time, More), 2087 Template, State, Options) :- 2088 process_event(success(ID, Solutions, _Proj, Time, More), 2089 Template, State, Options). 2090 2091 2092pengine_rpc_prompt(ID, Prompt, Term) :- 2093 prompt(ID, Prompt, Term0), 2094 !, 2095 Term = Term0. 2096pengine_rpc_prompt(_ID, Prompt, Term) :- 2097 setup_call_cleanup( 2098 prompt(Old, Prompt), 2099 read(Term), 2100 prompt(_, Old)). 2101 2102pengine_rpc_output(ID, Term) :- 2103 output(ID, Term), 2104 !. 2105pengine_rpc_output(_ID, Term) :- 2106 print(Term).
2113:- multifile prompt/3.
2120:- multifile output/2. 2121 2122 2123/*================= HTTP handlers ======================= 2124*/ 2125 2126% Declare HTTP locations we serve and how. Note that we use 2127% time_limit(inifinite) because pengines have their own timeout. Also 2128% note that we use spawn. This is needed because we can easily get 2129% many clients waiting for some action on a pengine to complete. 2130% Without spawning, we would quickly exhaust the worker pool of the 2131% HTTP server. 2132% 2133% FIXME: probably we should wait for a short time for the pengine on 2134% the default worker thread. Only if that time has expired, we can 2135% call http_spawn/2 to continue waiting on a new thread. That would 2136% improve the performance and reduce the usage of threads. 2137 2138:- http_handler(root(pengine), http_404([]), 2139 [ id(pengines) ]). 2140:- http_handler(root(pengine/create), http_pengine_create, 2141 [ time_limit(infinite), spawn([]) ]). 2142:- http_handler(root(pengine/send), http_pengine_send, 2143 [ time_limit(infinite), spawn([]) ]). 2144:- http_handler(root(pengine/pull_response), http_pengine_pull_response, 2145 [ time_limit(infinite), spawn([]) ]). 2146:- http_handler(root(pengine/abort), http_pengine_abort, []). 2147:- http_handler(root(pengine/detach), http_pengine_detach, []). 2148:- http_handler(root(pengine/list), http_pengine_list, []). 2149:- http_handler(root(pengine/ping), http_pengine_ping, []). 2150:- http_handler(root(pengine/destroy_all), http_pengine_destroy_all, []). 2151 2152:- http_handler(root(pengine/'pengines.js'), 2153 http_reply_file(library('http/web/js/pengines.js'), []), []). 2154:- http_handler(root(pengine/'plterm.css'), 2155 http_reply_file(library('http/web/css/plterm.css'), []), []).
application/json
and as
www-form-encoded
. Accepted parameters:
Parameter | Default | Comment |
---|---|---|
format | prolog | Output format |
application | pengine_sandbox | Pengine application |
chunk | 1 | Chunk-size for results |
collate | 0 (off) | Join output events |
solutions | chunked | If all , emit all results |
ask | - | The query |
template | - | Output template |
src_text | "" | Program |
src_url | - | Program to download |
disposition | - | Download location |
Note that solutions=all internally uses chunking to obtain the results from the pengine, but the results are combined in a single HTTP reply. This is currently only implemented by the CSV backend that is part of SWISH for downloading unbounded result sets with limited memory resources.
Using chunk=false
simulates the recursive toplevel. See
pengine_ask/3.
2186http_pengine_create(Request) :- 2187 reply_options(Request, [post]), 2188 !. 2189http_pengine_create(Request) :- 2190 memberchk(content_type(CT), Request), 2191 sub_atom(CT, 0, _, _, 'application/json'), 2192 !, 2193 http_read_json_dict(Request, Dict), 2194 dict_atom_option(format, Dict, Format, prolog), 2195 dict_atom_option(application, Dict, Application, pengine_sandbox), 2196 http_pengine_create(Request, Application, Format, Dict). 2197http_pengine_create(Request) :- 2198 Optional = [optional(true)], 2199 OptString = [string|Optional], 2200 Form = [ format(Format, [default(prolog)]), 2201 application(Application, [default(pengine_sandbox)]), 2202 chunk(_, [nonneg;oneof([false]), default(1)]), 2203 collate(_, [number, default(0)]), 2204 solutions(_, [oneof([all,chunked]), default(chunked)]), 2205 ask(_, OptString), 2206 template(_, OptString), 2207 src_text(_, OptString), 2208 disposition(_, OptString), 2209 src_url(_, Optional) 2210 ], 2211 http_parameters(Request, Form), 2212 form_dict(Form, Dict), 2213 http_pengine_create(Request, Application, Format, Dict). 2214 2215dict_atom_option(Key, Dict, Atom, Default) :- 2216 ( get_dict(Key, Dict, String) 2217 -> atom_string(Atom, String) 2218 ; Atom = Default 2219 ). 2220 2221form_dict(Form, Dict) :- 2222 form_values(Form, Pairs), 2223 dict_pairs(Dict, _, Pairs). 2224 2225form_values([], []). 2226form_values([H|T], Pairs) :- 2227 arg(1, H, Value), 2228 nonvar(Value), 2229 !, 2230 functor(H, Name, _), 2231 Pairs = [Name-Value|PairsT], 2232 form_values(T, PairsT). 2233form_values([_|T], Pairs) :- 2234 form_values(T, Pairs).
2239http_pengine_create(Request, Application, Format, Dict) :- 2240 current_application(Application), 2241 !, 2242 allowed(Request, Application), 2243 authenticate(Request, Application, UserOptions), 2244 dict_to_options(Dict, Application, CreateOptions0), 2245 append(UserOptions, CreateOptions0, CreateOptions), 2246 pengine_uuid(Pengine), 2247 message_queue_create(Queue, [max_size(25)]), 2248 setting(Application:time_limit, TimeLimit), 2249 get_time(Now), 2250 asserta(pengine_queue(Pengine, Queue, TimeLimit, Now)), 2251 broadcast(pengine(create(Pengine, Application, CreateOptions))), 2252 create(Queue, Pengine, CreateOptions, http, Application), 2253 create_wait_and_output_result(Pengine, Queue, Format, 2254 TimeLimit, Dict), 2255 gc_abandoned_queues. 2256http_pengine_create(_Request, Application, Format, _Dict) :- 2257 Error = existence_error(pengine_application, Application), 2258 pengine_uuid(ID), 2259 output_result(ID, Format, error(ID, error(Error, _))). 2260 2261 2262dict_to_options(Dict, Application, CreateOptions) :- 2263 dict_pairs(Dict, _, Pairs), 2264 pairs_create_options(Pairs, Application, CreateOptions). 2265 2266pairs_create_options([], _, []) :- !. 2267pairs_create_options([N-V0|T0], App, [Opt|T]) :- 2268 Opt =.. [N,V], 2269 pengine_create_option(Opt), N \== user, 2270 !, 2271 ( create_option_type(Opt, atom) 2272 -> atom_string(V, V0) % term creation must be done if 2273 ; V = V0 % we created the source and know 2274 ), % the operators. 2275 pairs_create_options(T0, App, T). 2276pairs_create_options([_|T0], App, T) :- 2277 pairs_create_options(T0, App, T).
time_limit
,
Pengine is aborted and the result is error(time_limit_exceeded,
_)
.
2288wait_and_output_result(Pengine, Queue, Format, TimeLimit, Collate0) :-
2289 Collate is min(Collate0, TimeLimit/10),
2290 get_time(Epoch),
2291 ( catch(thread_get_message(Queue, pengine_event(_, Event),
2292 [ timeout(TimeLimit)
2293 ]),
2294 Error, true)
2295 -> ( var(Error)
2296 -> debug(pengine(wait), 'Got ~q from ~q', [Event, Queue]),
2297 ( collating_event(Collate, Event)
2298 -> Deadline is Epoch+TimeLimit,
2299 collect_events(Pengine, Collate, Queue, Deadline, 100, More),
2300 Events = [Event|More],
2301 ignore(destroy_queue_from_http(Pengine, Events, Queue)),
2302 protect_pengine(Pengine, output_result(Pengine, Format, Events))
2303 ; ignore(destroy_queue_from_http(Pengine, Event, Queue)),
2304 protect_pengine(Pengine, output_result(Pengine, Format, Event))
2305 )
2306 ; output_result(Pengine, Format, died(Pengine))
2307 )
2308 ; time_limit_exceeded(Pengine, Format)
2309 ).
2316collect_events(_Pengine, _Collate, _Queue, _Deadline, 0, []) :- 2317 !. 2318collect_events(Pengine, Collate, Queue, Deadline, Max, Events) :- 2319 debug(pengine(wait), 'Waiting to collate events', []), 2320 ( catch(thread_get_message(Queue, pengine_event(_, Event), 2321 [ timeout(Collate) 2322 ]), 2323 Error, true) 2324 -> ( var(Error) 2325 -> debug(pengine(wait), 'Got ~q from ~q', [Event, Queue]), 2326 Events = [Event|More], 2327 ( collating_event(Collate, Event) 2328 -> Max2 is Max - 1, 2329 collect_events(Pengine, Collate, Queue, Deadline, Max2, More) 2330 ; More = [] 2331 ) 2332 ; Events = [died(Pengine)] 2333 ) 2334 ; get_time(Now), 2335 Now > Deadline 2336 -> time_limit_event(Pengine, TimeLimitEvent), 2337 Events = [TimeLimitEvent] 2338 ; Events = [] 2339 ). 2340 2341collating_event(0, _) :- 2342 !, 2343 fail. 2344collating_event(_, output(_,_)).
disposition
key to denote the
download location.2353create_wait_and_output_result(Pengine, Queue, Format, TimeLimit, Dict) :- 2354 get_dict(solutions, Dict, all), 2355 !, 2356 between(1, infinite, Page), 2357 ( catch(thread_get_message(Queue, pengine_event(_, Event), 2358 [ timeout(TimeLimit) 2359 ]), 2360 Error, true) 2361 -> ( var(Error) 2362 -> debug(pengine(wait), 'Page ~D: got ~q from ~q', [Page, Event, Queue]), 2363 ( destroy_queue_from_http(Pengine, Event, Queue) 2364 -> !, 2365 protect_pengine(Pengine, 2366 output_result_2(Format, page(Page, Event), Dict)) 2367 ; is_more_event(Event) 2368 -> pengine_thread(Pengine, Thread), 2369 thread_send_message(Thread, pengine_request(next)), 2370 protect_pengine(Pengine, 2371 output_result_2(Format, page(Page, Event), Dict)), 2372 fail 2373 ; !, 2374 protect_pengine(Pengine, 2375 output_result_2(Format, page(Page, Event), Dict)) 2376 ) 2377 ; !, output_result(Pengine, Format, died(Pengine)) 2378 ) 2379 ; !, time_limit_exceeded(Pengine, Format) 2380 ), 2381 !. 2382create_wait_and_output_result(Pengine, Queue, Format, TimeLimit, Dict) :- 2383 wait_and_output_result(Pengine, Queue, Format, TimeLimit, Dict.get(collate,0)). 2384 2385is_more_event(success(_Id, _Answers, _Projection, _Time, true)). 2386is_more_event(create(_, Options)) :- 2387 memberchk(answer(Event), Options), 2388 is_more_event(Event).
2402time_limit_exceeded(Pengine, Format) :- 2403 time_limit_event(Pengine, Event), 2404 call_cleanup( 2405 pengine_destroy(Pengine, [force(true)]), 2406 output_result(Pengine, Format, Event)). 2407 2408time_limit_event(Pengine, 2409 destroy(Pengine, error(Pengine, time_limit_exceeded))). 2410 2411destroy_pengine_after_output(Pengine, Events) :- 2412 is_list(Events), 2413 last(Events, Last), 2414 time_limit_event(Pengine, Last), 2415 !, 2416 catch(ignore(pengine_destroy(Pengine, [force(true)])), error(_,_), true). 2417destroy_pengine_after_output(_, _).
2432destroy_queue_from_http(ID, _, Queue) :- 2433 output_queue(ID, Queue, _), 2434 !, 2435 destroy_queue_if_empty(Queue). 2436destroy_queue_from_http(ID, Event, Queue) :- 2437 debug(pengine(destroy), 'DESTROY? ~p', [Event]), 2438 is_destroy_event(Event), 2439 !, 2440 message_queue_property(Queue, size(Waiting)), 2441 debug(pengine(destroy), 'Destroy ~p (waiting ~D)', [Queue, Waiting]), 2442 with_mutex(pengine, sync_destroy_queue_from_http(ID, Queue)). 2443 2444is_destroy_event(destroy(_)). 2445is_destroy_event(destroy(_,_)). 2446is_destroy_event(create(_, Options)) :- 2447 memberchk(answer(Event), Options), 2448 is_destroy_event(Event). 2449 2450destroy_queue_if_empty(Queue) :- 2451 thread_peek_message(Queue, _), 2452 !. 2453destroy_queue_if_empty(Queue) :- 2454 retractall(output_queue(_, Queue, _)), 2455 message_queue_destroy(Queue).
2463:- dynamic 2464 last_gc/1. 2465 2466gc_abandoned_queues :- 2467 consider_queue_gc, 2468 !, 2469 get_time(Now), 2470 ( output_queue(_, Queue, Time), 2471 Now-Time > 15*60, 2472 retract(output_queue(_, Queue, Time)), 2473 message_queue_destroy(Queue), 2474 fail 2475 ; retractall(last_gc(_)), 2476 asserta(last_gc(Now)) 2477 ). 2478gc_abandoned_queues. 2479 2480consider_queue_gc :- 2481 predicate_property(output_queue(_,_,_), number_of_clauses(N)), 2482 N > 100, 2483 ( last_gc(Time), 2484 get_time(Now), 2485 Now-Time > 5*60 2486 -> true 2487 ; \+ last_gc(_) 2488 ).
2506:- dynamic output_queue_destroyed/1. 2507 2508sync_destroy_queue_from_http(ID, Queue) :- 2509 ( output_queue(ID, Queue, _) 2510 -> destroy_queue_if_empty(Queue) 2511 ; thread_peek_message(Queue, pengine_event(_, output(_,_))) 2512 -> debug(pengine(destroy), 'Delay destruction of ~p because of output', 2513 [Queue]), 2514 get_time(Now), 2515 asserta(output_queue(ID, Queue, Now)) 2516 ; message_queue_destroy(Queue), 2517 asserta(output_queue_destroyed(Queue)) 2518 ).
pengine
held.2525sync_destroy_queue_from_pengine(ID, Queue) :- 2526 ( retract(output_queue_destroyed(Queue)) 2527 -> true 2528 ; get_time(Now), 2529 asserta(output_queue(ID, Queue, Now)) 2530 ), 2531 retractall(pengine_queue(ID, Queue, _, _)). 2532 2533 2534http_pengine_send(Request) :- 2535 reply_options(Request, [get,post]), 2536 !. 2537http_pengine_send(Request) :- 2538 http_parameters(Request, 2539 [ id(ID, [ type(atom) ]), 2540 event(EventString, [optional(true)]), 2541 collate(Collate, [number, default(0)]), 2542 format(Format, [default(prolog)]) 2543 ]), 2544 catch(read_event(ID, Request, Format, EventString, Event), 2545 Error, 2546 true), 2547 ( var(Error) 2548 -> debug(pengine(event), 'HTTP send: ~p', [Event]), 2549 ( pengine_thread(ID, Thread) 2550 -> pengine_queue(ID, Queue, TimeLimit, _), 2551 random_delay, 2552 broadcast(pengine(send(ID, Event))), 2553 thread_send_message(Thread, pengine_request(Event)), 2554 wait_and_output_result(ID, Queue, Format, TimeLimit, Collate) 2555 ; atom(ID) 2556 -> pengine_died(Format, ID) 2557 ; http_404([], Request) 2558 ) 2559 ; Error = error(existence_error(pengine, ID), _) 2560 -> pengine_died(Format, ID) 2561 ; output_result(ID, Format, error(ID, Error)) 2562 ). 2563 2564pengine_died(Format, Pengine) :- 2565 output_result(Pengine, Format, 2566 error(Pengine, error(existence_error(pengine, Pengine),_))).
pengine_done
mutex.
2577read_event(Pengine, Request, Format, EventString, Event) :- 2578 protect_pengine( 2579 Pengine, 2580 ( get_pengine_module(Pengine, Module), 2581 read_event_2(Request, EventString, Module, Event0, Bindings) 2582 )), 2583 !, 2584 fix_bindings(Format, Event0, Bindings, Event). 2585read_event(Pengine, Request, _Format, _EventString, _Event) :- 2586 debug(pengine(event), 'Pengine ~q vanished', [Pengine]), 2587 discard_post_data(Request), 2588 existence_error(pengine, Pengine).
event
parameter or as a posted document.2596read_event_2(_Request, EventString, Module, Event, Bindings) :- 2597 nonvar(EventString), 2598 !, 2599 term_string(Event, EventString, 2600 [ variable_names(Bindings), 2601 module(Module) 2602 ]). 2603read_event_2(Request, _EventString, Module, Event, Bindings) :- 2604 option(method(post), Request), 2605 http_read_data(Request, Event, 2606 [ content_type('application/x-prolog'), 2607 module(Module), 2608 variable_names(Bindings) 2609 ]).
2615discard_post_data(Request) :- 2616 option(method(post), Request), 2617 !, 2618 setup_call_cleanup( 2619 open_null_stream(NULL), 2620 http_read_data(Request, _, [to(stream(NULL))]), 2621 close(NULL)). 2622discard_post_data(_).
json(-s)
Format from the variables in
the asked Goal. Variables starting with an underscore, followed
by an capital letter are ignored from the template.2630fix_bindings(Format, 2631 ask(Goal, Options0), Bindings, 2632 ask(Goal, NewOptions)) :- 2633 json_lang(Format), 2634 !, 2635 exclude(anon, Bindings, NamedBindings), 2636 template(NamedBindings, Template, Options0, Options1), 2637 select_option(chunk(Paging), Options1, Options2, 1), 2638 NewOptions = [ template(Template), 2639 chunk(Paging), 2640 bindings(NamedBindings) 2641 | Options2 2642 ]. 2643fix_bindings(_, Command, _, Command). 2644 2645template(_, Template, Options0, Options) :- 2646 select_option(template(Template), Options0, Options), 2647 !. 2648template(Bindings, Template, Options, Options) :- 2649 dict_create(Template, swish_default_template, Bindings). 2650 2651anon(Name=_) :- 2652 sub_atom(Name, 0, _, _, '_'), 2653 sub_atom(Name, 1, 1, _, Next), 2654 char_type(Next, prolog_var_start). 2655 2656var_name(Name=_, Name).
2663json_lang(json) :- !. 2664json_lang(Format) :- 2665 sub_atom(Format, 0, _, _, 'json-').
2672http_pengine_pull_response(Request) :- 2673 reply_options(Request, [get]), 2674 !. 2675http_pengine_pull_response(Request) :- 2676 http_parameters(Request, 2677 [ id(ID, []), 2678 format(Format, [default(prolog)]), 2679 collate(Collate, [number, default(0)]) 2680 ]), 2681 reattach(ID), 2682 ( ( pengine_queue(ID, Queue, TimeLimit, _) 2683 -> true 2684 ; output_queue(ID, Queue, _), 2685 TimeLimit = 0 2686 ) 2687 -> wait_and_output_result(ID, Queue, Format, TimeLimit, Collate) 2688 ; http_404([], Request) 2689 ).
2698http_pengine_abort(Request) :- 2699 reply_options(Request, [get,post]), 2700 !. 2701http_pengine_abort(Request) :- 2702 http_parameters(Request, 2703 [ id(ID, []) 2704 ]), 2705 ( pengine_thread(ID, _Thread) 2706 -> broadcast(pengine(abort(ID))), 2707 abort_pending_output(ID), 2708 pengine_abort(ID), 2709 reply_json_dict(true) 2710 ; http_404([], Request) 2711 ).
2723http_pengine_detach(Request) :- 2724 reply_options(Request, [post]), 2725 !. 2726http_pengine_detach(Request) :- 2727 http_parameters(Request, 2728 [ id(ID, []) 2729 ]), 2730 http_read_json_dict(Request, ClientData), 2731 ( pengine_property(ID, application(Application)), 2732 allowed(Request, Application), 2733 authenticate(Request, Application, _UserOptions) 2734 -> broadcast(pengine(detach(ID))), 2735 get_time(Now), 2736 assertz(pengine_detached(ID, ClientData.put(time, Now))), 2737 pengine_queue(ID, Queue, _TimeLimit, _Now), 2738 message_queue_set(Queue, max_size(1000)), 2739 pengine_reply(Queue, detached(ID)), 2740 reply_json_dict(true) 2741 ; http_404([], Request) 2742 ). 2743 2744reattach(ID) :- 2745 ( retract(pengine_detached(ID, _Data)), 2746 pengine_queue(ID, Queue, _TimeLimit, _Now) 2747 -> message_queue_set(Queue, max_size(25)) 2748 ; true 2749 ).
2757http_pengine_destroy_all(Request) :- 2758 reply_options(Request, [get,post]), 2759 !. 2760http_pengine_destroy_all(Request) :- 2761 http_parameters(Request, 2762 [ ids(IDsAtom, []) 2763 ]), 2764 atomic_list_concat(IDs, ',', IDsAtom), 2765 forall(( member(ID, IDs), 2766 \+ pengine_detached(ID, _) 2767 ), 2768 pengine_destroy(ID, [force(true)])), 2769 reply_json_dict("ok").
status(Pengine, Stats)
is created, where Stats
is the return of thread_statistics/2.2777http_pengine_ping(Request) :- 2778 reply_options(Request, [get]), 2779 !. 2780http_pengine_ping(Request) :- 2781 http_parameters(Request, 2782 [ id(Pengine, []), 2783 format(Format, [default(prolog)]) 2784 ]), 2785 ( pengine_thread(Pengine, Thread), 2786 Error = error(_,_), 2787 catch(thread_statistics(Thread, Stats), Error, fail) 2788 -> output_result(Pengine, Format, ping(Pengine, Stats)) 2789 ; output_result(Pengine, Format, died(Pengine)) 2790 ).
2799http_pengine_list(Request) :- 2800 reply_options(Request, [get]), 2801 !. 2802http_pengine_list(Request) :- 2803 http_parameters(Request, 2804 [ status(Status, [default(detached), oneof([detached])]), 2805 application(Application, [default(pengine_sandbox)]) 2806 ]), 2807 allowed(Request, Application), 2808 authenticate(Request, Application, _UserOptions), 2809 findall(Term, listed_pengine(Application, Status, Term), Terms), 2810 reply_json_dict(json{pengines: Terms}). 2811 2812listed_pengine(Application, detached, State) :- 2813 State = pengine{id:Id, 2814 detached:Time, 2815 queued:Queued, 2816 stats:Stats}, 2817 2818 pengine_property(Id, application(Application)), 2819 pengine_property(Id, detached(Time)), 2820 pengine_queue(Id, Queue, _TimeLimit, _Now), 2821 message_queue_property(Queue, size(Queued)), 2822 ( pengine_thread(Id, Thread), 2823 catch(thread_statistics(Thread, Stats), _, fail) 2824 -> true 2825 ; Stats = thread{status:died} 2826 ).
prolog
, json
or json-s
.
2837:- dynamic 2838 pengine_replying/2. % +Pengine, +Thread 2839 2840output_result(Pengine, Format, Event) :- 2841 thread_self(Thread), 2842 cors_enable, % contingent on http:cors setting 2843 disable_client_cache, 2844 setup_call_cleanup( 2845 asserta(pengine_replying(Pengine, Thread), Ref), 2846 catch(output_result_2(Format, Event, _{}), 2847 pengine_abort_output, 2848 true), 2849 erase(Ref)), 2850 destroy_pengine_after_output(Pengine, Event). 2851 2852output_result_2(Lang, Event, Dict) :- 2853 write_result(Lang, Event, Dict), 2854 !. 2855output_result_2(prolog, Event, _) :- 2856 !, 2857 format('Content-type: text/x-prolog; charset=UTF-8~n~n'), 2858 write_term(Event, 2859 [ quoted(true), 2860 ignore_ops(true), 2861 fullstop(true), 2862 blobs(portray), 2863 portray_goal(portray_blob), 2864 nl(true) 2865 ]). 2866output_result_2(Lang, Event, _) :- 2867 json_lang(Lang), 2868 !, 2869 ( event_term_to_json_data(Event, JSON, Lang) 2870 -> reply_json_dict(JSON) 2871 ; assertion(event_term_to_json_data(Event, _, Lang)) 2872 ). 2873output_result_2(Lang, _Event, _) :- % FIXME: allow for non-JSON format 2874 domain_error(pengine_format, Lang).
'$BLOB'(Type)
.
Future versions may include more info, depending on Type.2884:- public portray_blob/2. % called from write-term 2885portray_blob(Blob, _Options) :- 2886 blob(Blob, Type), 2887 writeq('$BLOB'(Type)).
2894abort_pending_output(Pengine) :- 2895 forall(pengine_replying(Pengine, Thread), 2896 abort_output_thread(Thread)). 2897 2898abort_output_thread(Thread) :- 2899 catch(thread_signal(Thread, throw(pengine_abort_output)), 2900 error(existence_error(thread, _), _), 2901 true).
prolog
and various JSON dialects. The hook
event_to_json/3 can be used to refine the JSON dialects. This
hook must be used if a completely different output format is
desired.2917disable_client_cache :- 2918 format('Cache-Control: no-cache, no-store, must-revalidate\r\n\c 2919 Pragma: no-cache\r\n\c 2920 Expires: 0\r\n'). 2921 2922event_term_to_json_data(Events, JSON, Lang) :- 2923 is_list(Events), 2924 !, 2925 events_to_json_data(Events, JSON, Lang). 2926event_term_to_json_data(Event, JSON, Lang) :- 2927 event_to_json(Event, JSON, Lang), 2928 !. 2929event_term_to_json_data(success(ID, Bindings0, Projection, Time, More), 2930 json{event:success, id:ID, time:Time, 2931 data:Bindings, more:More, projection:Projection}, 2932 json) :- 2933 !, 2934 term_to_json(Bindings0, Bindings). 2935event_term_to_json_data(destroy(ID, Event), 2936 json{event:destroy, id:ID, data:JSON}, 2937 Style) :- 2938 !, 2939 event_term_to_json_data(Event, JSON, Style). 2940event_term_to_json_data(create(ID, Features0), JSON, Style) :- 2941 !, 2942 ( select(answer(First0), Features0, Features1) 2943 -> event_term_to_json_data(First0, First, Style), 2944 Features = [answer(First)|Features1] 2945 ; Features = Features0 2946 ), 2947 dict_create(JSON, json, [event(create), id(ID)|Features]). 2948event_term_to_json_data(destroy(ID, Event), 2949 json{event:destroy, id:ID, data:JSON}, Style) :- 2950 !, 2951 event_term_to_json_data(Event, JSON, Style). 2952event_term_to_json_data(error(ID, ErrorTerm), Error, _Style) :- 2953 !, 2954 Error0 = json{event:error, id:ID, data:Message}, 2955 add_error_details(ErrorTerm, Error0, Error), 2956 message_to_string(ErrorTerm, Message). 2957event_term_to_json_data(failure(ID, Time), 2958 json{event:failure, id:ID, time:Time}, _) :- 2959 !. 2960event_term_to_json_data(EventTerm, json{event:F, id:ID}, _) :- 2961 functor(EventTerm, F, 1), 2962 !, 2963 arg(1, EventTerm, ID). 2964event_term_to_json_data(EventTerm, json{event:F, id:ID, data:JSON}, _) :- 2965 functor(EventTerm, F, 2), 2966 arg(1, EventTerm, ID), 2967 arg(2, EventTerm, Data), 2968 term_to_json(Data, JSON). 2969 2970events_to_json_data([], [], _). 2971events_to_json_data([E|T0], [J|T], Lang) :- 2972 event_term_to_json_data(E, J, Lang), 2973 events_to_json_data(T0, T, Lang). 2974 2975:- public add_error_details/3.
pengines_io.pl
.
2982add_error_details(Error, JSON0, JSON) :-
2983 add_error_code(Error, JSON0, JSON1),
2984 add_error_location(Error, JSON1, JSON).
code
field to JSON0 of Error is an ISO error term. The error
code is the functor name of the formal part of the error, e.g.,
syntax_error
, type_error
, etc. Some errors carry more
information:
2997add_error_code(error(existence_error(Type, Obj), _), Error0, Error) :- 2998 atom(Type), 2999 !, 3000 to_atomic(Obj, Value), 3001 Error = Error0.put(_{code:existence_error, arg1:Type, arg2:Value}). 3002add_error_code(error(Formal, _), Error0, Error) :- 3003 callable(Formal), 3004 !, 3005 functor(Formal, Code, _), 3006 Error = Error0.put(code, Code). 3007add_error_code(_, Error, Error). 3008 3009% What to do with large integers? 3010to_atomic(Obj, Atomic) :- atom(Obj), !, Atomic = Obj. 3011to_atomic(Obj, Atomic) :- number(Obj), !, Atomic = Obj. 3012to_atomic(Obj, Atomic) :- string(Obj), !, Atomic = Obj. 3013to_atomic(Obj, Atomic) :- term_string(Obj, Atomic).
location
property if the error can be associated with a
source location. The location is an object with properties file
and line
and, if available, the character location in the line.3022add_error_location(error(_, file(Path, Line, -1, _CharNo)), Term0, Term) :- 3023 atom(Path), integer(Line), 3024 !, 3025 Term = Term0.put(_{location:_{file:Path, line:Line}}). 3026add_error_location(error(_, file(Path, Line, Ch, _CharNo)), Term0, Term) :- 3027 atom(Path), integer(Line), integer(Ch), 3028 !, 3029 Term = Term0.put(_{location:_{file:Path, line:Line, ch:Ch}}). 3030add_error_location(_, Term, Term).
success(ID, Bindings, Projection, Time, More)
and output(ID,
Term)
into a format suitable for processing at the client side.3041%:- multifile pengines:event_to_json/3. 3042 3043 3044 /******************************* 3045 * ACCESS CONTROL * 3046 *******************************/
forbidden
header if contact is not allowed.3053allowed(Request, Application) :- 3054 setting(Application:allow_from, Allow), 3055 match_peer(Request, Allow), 3056 setting(Application:deny_from, Deny), 3057 \+ match_peer(Request, Deny), 3058 !. 3059allowed(Request, _Application) :- 3060 memberchk(request_uri(Here), Request), 3061 throw(http_reply(forbidden(Here))). 3062 3063match_peer(_, Allowed) :- 3064 memberchk(*, Allowed), 3065 !. 3066match_peer(_, []) :- !, fail. 3067match_peer(Request, Allowed) :- 3068 http_peer(Request, Peer), 3069 debug(pengine(allow), 'Peer: ~q, Allow: ~q', [Peer, Allowed]), 3070 ( memberchk(Peer, Allowed) 3071 -> true 3072 ; member(Pattern, Allowed), 3073 match_peer_pattern(Pattern, Peer) 3074 ). 3075 3076match_peer_pattern(Pattern, Peer) :- 3077 ip_term(Pattern, IP), 3078 ip_term(Peer, IP), 3079 !. 3080 3081ip_term(Peer, Pattern) :- 3082 split_string(Peer, ".", "", PartStrings), 3083 ip_pattern(PartStrings, Pattern). 3084 3085ip_pattern([], []). 3086ip_pattern([*], _) :- !. 3087ip_pattern([S|T0], [N|T]) :- 3088 number_string(N, S), 3089 ip_pattern(T0, T).
[user(User)]
, []
or
an exception.3097authenticate(Request, Application, UserOptions) :- 3098 authentication_hook(Request, Application, User), 3099 !, 3100 must_be(ground, User), 3101 UserOptions = [user(User)]. 3102authenticate(_, _, []).
throw(http_reply(authorise(basic(Realm))))
Start a normal HTTP login challenge (reply 401)throw(http_reply(forbidden(Path)))
)
Reject the request using a 403 repply.3124pengine_register_user(Options) :- 3125 option(user(User), Options), 3126 !, 3127 pengine_self(Me), 3128 asserta(pengine_user(Me, User)). 3129pengine_register_user(_).
3140pengine_user(User) :-
3141 pengine_self(Me),
3142 pengine_user(Me, User).
3148reply_options(Request, Allowed) :- 3149 option(method(options), Request), 3150 !, 3151 cors_enable(Request, 3152 [ methods(Allowed) 3153 ]), 3154 format('Content-type: text/plain\r\n'), 3155 format('~n'). % empty body 3156 3157 3158 /******************************* 3159 * COMPILE SOURCE * 3160 *******************************/
3169pengine_src_text(Src, Module) :- 3170 pengine_self(Self), 3171 format(atom(ID), 'pengine://~w/src', [Self]), 3172 extra_load_options(Self, Options), 3173 setup_call_cleanup( 3174 open_chars_stream(Src, Stream), 3175 load_files(Module:ID, 3176 [ stream(Stream), 3177 module(Module), 3178 silent(true) 3179 | Options 3180 ]), 3181 close(Stream)), 3182 keep_source(Self, ID, Src). 3183 3184system'#file'(File, _Line) :- 3185 prolog_load_context(stream, Stream), 3186 set_stream(Stream, file_name(File)), 3187 set_stream(Stream, record_position(false)), 3188 set_stream(Stream, record_position(true)).
3198pengine_src_url(URL, Module) :- 3199 pengine_self(Self), 3200 uri_encoded(path, URL, Path), 3201 format(atom(ID), 'pengine://~w/url/~w', [Self, Path]), 3202 extra_load_options(Self, Options), 3203 ( get_pengine_application(Self, Application), 3204 setting(Application:debug_info, false) 3205 -> setup_call_cleanup( 3206 http_open(URL, Stream, []), 3207 ( set_stream(Stream, encoding(utf8)), 3208 load_files(Module:ID, 3209 [ stream(Stream), 3210 module(Module) 3211 | Options 3212 ]) 3213 ), 3214 close(Stream)) 3215 ; setup_call_cleanup( 3216 http_open(URL, TempStream, []), 3217 ( set_stream(TempStream, encoding(utf8)), 3218 read_string(TempStream, _, Src) 3219 ), 3220 close(TempStream)), 3221 setup_call_cleanup( 3222 open_chars_stream(Src, Stream), 3223 load_files(Module:ID, 3224 [ stream(Stream), 3225 module(Module) 3226 | Options 3227 ]), 3228 close(Stream)), 3229 keep_source(Self, ID, Src) 3230 ). 3231 3232 3233extra_load_options(Pengine, Options) :- 3234 pengine_not_sandboxed(Pengine), 3235 !, 3236 Options = []. 3237extra_load_options(_, [sandboxed(true)]). 3238 3239 3240keep_source(Pengine, ID, SrcText) :- 3241 get_pengine_application(Pengine, Application), 3242 setting(Application:debug_info, true), 3243 !, 3244 to_string(SrcText, SrcString), 3245 assertz(pengine_data(Pengine, source(ID, SrcString))). 3246keep_source(_, _, _). 3247 3248to_string(String, String) :- 3249 string(String), 3250 !. 3251to_string(Atom, String) :- 3252 atom_string(Atom, String), 3253 !. 3254 3255 /******************************* 3256 * SANDBOX * 3257 *******************************/ 3258 3259:- multifile 3260 sandbox:safe_primitive/1. 3261 3262sandbox:safe_primitive(pengines:pengine_input(_, _)). 3263sandbox:safe_primitive(pengines:pengine_output(_)). 3264sandbox:safe_primitive(pengines:pengine_debug(_,_)). 3265 3266 3267 /******************************* 3268 * MESSAGES * 3269 *******************************/ 3270 3271prologerror_message(sandbox(time_limit_exceeded, Limit)) --> 3272 [ 'Could not prove safety of your goal within ~f seconds.'-[Limit], nl, 3273 'This is normally caused by an insufficiently instantiated'-[], nl, 3274 'meta-call (e.g., call(Var)) for which it is too expensive to'-[], nl, 3275 'find all possible instantations of Var.'-[] 3276 ]
Pengines: Web Logic Programming Made Easy
The library(pengines) provides an infrastructure for creating Prolog engines in a (remote) pengine server and accessing these engines either from Prolog or JavaScript.