View source with raw comments or as raw
    1:- encoding(utf8).
    2/*  Part of SWI-Prolog
    3
    4    Author:        Torbjörn Lager and Jan Wielemaker
    5    E-mail:        J.Wielemaker@vu.nl
    6    WWW:           http://www.swi-prolog.org
    7    Copyright (C): 2014-2024, Torbjörn Lager,
    8                              VU University Amsterdam
    9                              SWI-Prolog Solutions b.v.
   10    All rights reserved.
   11
   12    Redistribution and use in source and binary forms, with or without
   13    modification, are permitted provided that the following conditions
   14    are met:
   15
   16    1. Redistributions of source code must retain the above copyright
   17       notice, this list of conditions and the following disclaimer.
   18
   19    2. Redistributions in binary form must reproduce the above copyright
   20       notice, this list of conditions and the following disclaimer in
   21       the documentation and/or other materials provided with the
   22       distribution.
   23
   24    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   25    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   26    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   27    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   28    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   29    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   30    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   31    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   32    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   33    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   34    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   35    POSSIBILITY OF SUCH DAMAGE.
   36*/
   37
   38:- module(pengines,
   39          [ pengine_create/1,                   % +Options
   40            pengine_ask/3,                      % +Pengine, :Query, +Options
   41            pengine_next/2,                     % +Pengine. +Options
   42            pengine_stop/2,                     % +Pengine. +Options
   43            pengine_event/2,                    % -Event, +Options
   44            pengine_input/2,                    % +Prompt, -Term
   45            pengine_output/1,                   % +Term
   46            pengine_respond/3,                  % +Pengine, +Input, +Options
   47            pengine_debug/2,                    % +Format, +Args
   48            pengine_self/1,                     % -Pengine
   49            pengine_pull_response/2,            % +Pengine, +Options
   50            pengine_destroy/1,                  % +Pengine
   51            pengine_destroy/2,                  % +Pengine, +Options
   52            pengine_abort/1,                    % +Pengine
   53            pengine_application/1,              % +Application
   54            current_pengine_application/1,      % ?Application
   55            pengine_property/2,                 % ?Pengine, ?Property
   56            pengine_user/1,                     % -User
   57            pengine_event_loop/2,               % :Closure, +Options
   58            pengine_rpc/2,                      % +Server, :Goal
   59            pengine_rpc/3                       % +Server, :Goal, +Options
   60          ]).

Pengines: Web Logic Programming Made Easy

The library(pengines) provides an infrastructure for creating Prolog engines in a (remote) pengine server and accessing these engines either from Prolog or JavaScript.

author
- Torbjörn Lager and Jan Wielemaker */
   71:- autoload(library(aggregate),[aggregate_all/3]).   72:- autoload(library(apply),[maplist/2,partition/4,exclude/3,maplist/3]).   73:- autoload(library(broadcast),[broadcast/1]).   74:- autoload(library(charsio),[open_chars_stream/2]).   75:- use_module(library(debug),[debug/1,debugging/1,debug/3,assertion/1]).   76:- autoload(library(error),
   77	    [ must_be/2,
   78	      existence_error/2,
   79	      permission_error/3,
   80	      domain_error/2
   81	    ]).   82:- autoload(library(filesex),[directory_file_path/3]).   83:- autoload(library(listing),[listing/1]).   84:- autoload(library(lists),[member/2,flatten/2,select/3,append/3]).   85:- autoload(library(modules),[in_temporary_module/3]).   86:- autoload(library(occurs),[sub_term/2]).   87:- autoload(library(option),
   88	    [select_option/3,option/2,option/3,select_option/4]).   89:- autoload(library(prolog_stack),[print_prolog_backtrace/2]).   90:- autoload(library(sandbox),[safe_goal/1]).   91:- autoload(library(statistics),[thread_statistics/2]).   92:- autoload(library(term_to_json),[term_to_json/2]).   93:- autoload(library(thread_pool),
   94	    [thread_pool_create/3,thread_create_in_pool/4]).   95:- autoload(library(time),[alarm/4,call_with_time_limit/2]).   96:- autoload(library(uri),
   97	    [ uri_components/2,
   98	      uri_query_components/2,
   99	      uri_data/3,
  100	      uri_data/4,
  101	      uri_encoded/3
  102	    ]).  103:- autoload(library(http/http_client),[http_read_data/3]).  104:- autoload(library(http/http_cors),[cors_enable/0,cors_enable/2]).  105:- autoload(library(http/http_dispatch),
  106	    [http_handler/3,http_404/2,http_reply_file/3]).  107:- autoload(library(http/http_open),[http_open/3]).  108:- autoload(library(http/http_parameters),[http_parameters/2]).  109:- autoload(library(http/http_stream),[is_cgi_stream/1]).  110:- autoload(library(http/http_wrapper),[http_peer/2]).  111
  112:- use_module(library(settings),[setting/2,setting/4]).  113:- use_module(library(http/http_json),
  114              [http_read_json_dict/2,reply_json_dict/1]).  115
  116:- if(exists_source(library(uuid))).  117:- autoload(library(uuid), [uuid/2]).  118:- endif.  119
  120
  121:- meta_predicate
  122    pengine_create(:),
  123    pengine_rpc(+, +, :),
  124    pengine_event_loop(1, +).  125
  126:- multifile
  127    write_result/3,                 % +Format, +Event, +Dict
  128    event_to_json/3,                % +Event, -JSON, +Format
  129    prepare_module/3,               % +Module, +Application, +Options
  130    prepare_goal/3,                 % +GoalIn, -GoalOut, +Options
  131    authentication_hook/3,          % +Request, +Application, -User
  132    not_sandboxed/2,                % +User, +App
  133    pengine_flush_output_hook/0.  134
  135:- predicate_options(pengine_create/1, 1,
  136                     [ id(-atom),
  137                       alias(atom),
  138                       application(atom),
  139                       destroy(boolean),
  140                       server(atom),
  141                       ask(compound),
  142                       template(compound),
  143                       chunk(integer;oneof([false])),
  144                       bindings(list),
  145                       src_list(list),
  146                       src_text(any),           % text
  147                       src_url(atom),
  148                       src_predicates(list)
  149                     ]).  150:- predicate_options(pengine_ask/3, 3,
  151                     [ template(any),
  152                       chunk(integer;oneof([false])),
  153                       bindings(list)
  154                     ]).  155:- predicate_options(pengine_next/2, 2,
  156                     [ chunk(integer),
  157                       pass_to(pengine_send/3, 3)
  158                     ]).  159:- predicate_options(pengine_stop/2, 2,
  160                     [ pass_to(pengine_send/3, 3)
  161                     ]).  162:- predicate_options(pengine_respond/3, 2,
  163                     [ pass_to(pengine_send/3, 3)
  164                     ]).  165:- predicate_options(pengine_rpc/3, 3,
  166                     [ chunk(integer;oneof([false])),
  167                       pass_to(pengine_create/1, 1)
  168                     ]).  169:- predicate_options(pengine_send/3, 3,
  170                     [ delay(number)
  171                     ]).  172:- predicate_options(pengine_event/2, 2,
  173                     [ listen(atom),
  174                       pass_to(system:thread_get_message/3, 3)
  175                     ]).  176:- predicate_options(pengine_pull_response/2, 2,
  177                     [ pass_to(http_open/3, 3)
  178                     ]).  179:- predicate_options(pengine_event_loop/2, 2,
  180                     []).                       % not yet implemented
  181
  182% :- debug(pengine(transition)).
  183:- debug(pengine(debug)).               % handle pengine_debug in pengine_rpc/3.
  184
  185goal_expansion(random_delay, Expanded) :-
  186    (   debugging(pengine(delay))
  187    ->  Expanded = do_random_delay
  188    ;   Expanded = true
  189    ).
  190
  191do_random_delay :-
  192    Delay is random(20)/1000,
  193    sleep(Delay).
  194
  195:- meta_predicate                       % internal meta predicates
  196    solve(+, ?, 0, +),
  197    findnsols_no_empty(+, ?, 0, -),
  198    pengine_event_loop(+, 1, +).
 pengine_create(:Options) is det
Creates a new pengine. Valid options are:
id(-ID)
ID gets instantiated to the id of the created pengine. ID is atomic.
alias(+Name)
The pengine is named Name (an atom). A slave pengine (child) can subsequently be referred to by this name.
application(+Application)
Application in which the pengine runs. See pengine_application/1.
server(+URL)
The pengine will run in (and in the Prolog context of) the pengine server located at URL.
src_list(+List_of_clauses)
Inject a list of Prolog clauses into the pengine.
src_text(+Atom_or_string)
Inject the clauses specified by a source text into the pengine.
src_url(+URL)
Inject the clauses specified in the file located at URL into the pengine.
src_predicates(+List)
Send the local predicates denoted by List to the remote pengine. List is a list of predicate indicators.

Remaining options are passed to http_open/3 (meaningful only for non-local pengines) and thread_create/3. Note that for thread_create/3 only options changing the stack-sizes can be used. In particular, do not pass the detached or alias options..

Successful creation of a pengine will return an event term of the following form:

create(ID, Term)
ID is the id of the pengine that was created. Term is not used at the moment.

An error will be returned if the pengine could not be created:

error(ID, Term)
ID is invalid, since no pengine was created. Term is the exception's error term. */
  253pengine_create(M:Options0) :-
  254    translate_local_sources(Options0, Options, M),
  255    (   select_option(server(BaseURL), Options, RestOptions)
  256    ->  remote_pengine_create(BaseURL, RestOptions)
  257    ;   local_pengine_create(Options)
  258    ).
 translate_local_sources(+OptionsIn, -Options, +Module) is det
Translate the src_predicates and src_list options into src_text. We need to do that anyway for remote pengines. For local pengines, we could avoid this step, but there is very little point in transferring source to a local pengine anyway as local pengines can access any Prolog predicate that you make visible to the application.

Multiple sources are concatenated to end up with a single src_text option.

  272translate_local_sources(OptionsIn, Options, Module) :-
  273    translate_local_sources(OptionsIn, Sources, Options2, Module),
  274    (   Sources == []
  275    ->  Options = Options2
  276    ;   Sources = [Source]
  277    ->  Options = [src_text(Source)|Options2]
  278    ;   atomics_to_string(Sources, Source)
  279    ->  Options = [src_text(Source)|Options2]
  280    ).
  281
  282translate_local_sources([], [], [], _).
  283translate_local_sources([H0|T], [S0|S], Options, M) :-
  284    nonvar(H0),
  285    translate_local_source(H0, S0, M),
  286    !,
  287    translate_local_sources(T, S, Options, M).
  288translate_local_sources([H|T0], S, [H|T], M) :-
  289    translate_local_sources(T0, S, T, M).
  290
  291translate_local_source(src_predicates(PIs), Source, M) :-
  292    must_be(list, PIs),
  293    with_output_to(string(Source),
  294                   maplist(list_in_module(M), PIs)).
  295translate_local_source(src_list(Terms), Source, _) :-
  296    must_be(list, Terms),
  297    with_output_to(string(Source),
  298                   forall(member(Term, Terms),
  299                          format('~k .~n', [Term]))).
  300translate_local_source(src_text(Source), Source, _).
  301
  302list_in_module(M, PI) :-
  303    listing(M:PI).
 pengine_send(+NameOrID, +Term) is det
Same as pengine_send(NameOrID, Term, []). */
  310pengine_send(Target, Event) :-
  311    pengine_send(Target, Event, []).
 pengine_send(+NameOrID, +Term, +Options) is det
Succeeds immediately and places Term in the queue of the pengine NameOrID. Options is a list of options:
delay(+Time)
The actual sending is delayed by Time seconds. Time is an integer or a float.

Any remaining options are passed to http_open/3. */

  326pengine_send(Target, Event, Options) :-
  327    must_be(atom, Target),
  328    pengine_send2(Target, Event, Options).
  329
  330pengine_send2(self, Event, Options) :-
  331    !,
  332    thread_self(Queue),
  333    delay_message(queue(Queue), Event, Options).
  334pengine_send2(Name, Event, Options) :-
  335    child(Name, Target),
  336    !,
  337    delay_message(pengine(Target), Event, Options).
  338pengine_send2(Target, Event, Options) :-
  339    delay_message(pengine(Target), Event, Options).
  340
  341delay_message(Target, Event, Options) :-
  342    option(delay(Delay), Options),
  343    !,
  344    alarm(Delay,
  345          send_message(Target, Event, Options),
  346          _AlarmID,
  347          [remove(true)]).
  348delay_message(Target, Event, Options) :-
  349    random_delay,
  350    send_message(Target, Event, Options).
  351
  352send_message(queue(Queue), Event, _) :-
  353    thread_send_message(Queue, pengine_request(Event)).
  354send_message(pengine(Pengine), Event, Options) :-
  355    (   pengine_remote(Pengine, Server)
  356    ->  remote_pengine_send(Server, Pengine, Event, Options)
  357    ;   pengine_thread(Pengine, Thread)
  358    ->  thread_send_message(Thread, pengine_request(Event))
  359    ;   existence_error(pengine, Pengine)
  360    ).
 pengine_request(-Request) is det
To be used by a pengine to wait for the next request. Such messages are placed in the queue by pengine_send/2. Keeps the thread in normal state if an event arrives within a second. Otherwise it waits for the idle_limit setting while using thread_idle/2 to minimis resources.
  370pengine_request(Request) :-
  371    thread_self(Me),
  372    thread_get_message(Me, pengine_request(Request), [timeout(1)]),
  373    !.
  374pengine_request(Request) :-
  375    pengine_self(Self),
  376    get_pengine_application(Self, Application),
  377    setting(Application:idle_limit, IdleLimit0),
  378    IdleLimit is IdleLimit0-1,
  379    thread_self(Me),
  380    (   thread_idle(thread_get_message(Me, pengine_request(Request),
  381                                       [timeout(IdleLimit)]),
  382                    long)
  383    ->  true
  384    ;   Request = destroy
  385    ).
 pengine_reply(+Event) is det
 pengine_reply(+Queue, +Event) is det
Reply Event to the parent of the current Pengine or the given Queue. Such events are read by the other side with pengine_event/1.

If the message cannot be sent within the idle_limit setting of the pengine, abort the pengine.

  398pengine_reply(Event) :-
  399    pengine_parent(Queue),
  400    pengine_reply(Queue, Event).
  401
  402pengine_reply(_Queue, _Event0) :-
  403    nb_current(pengine_idle_limit_exceeded, true),
  404    !.
  405pengine_reply(Queue, Event0) :-
  406    arg(1, Event0, ID),
  407    wrap_first_answer(ID, Event0, Event),
  408    random_delay,
  409    debug(pengine(event), 'Reply to ~p: ~p', [Queue, Event]),
  410    (   pengine_self(ID),
  411        \+ pengine_detached(ID, _)
  412    ->  get_pengine_application(ID, Application),
  413        setting(Application:idle_limit, IdleLimit),
  414        debug(pengine(reply), 'Sending ~p, timeout: ~q', [Event, IdleLimit]),
  415        (   thread_send_message(Queue, pengine_event(ID, Event),
  416                                [ timeout(IdleLimit)
  417                                ])
  418        ->  true
  419        ;   thread_self(Me),
  420            debug(pengine(reply), 'pengine_reply: timeout for ~q (thread ~q)',
  421                  [ID, Me]),
  422            nb_setval(pengine_idle_limit_exceeded, true),
  423            thread_detach(Me),
  424            abort
  425        )
  426    ;   thread_send_message(Queue, pengine_event(ID, Event))
  427    ).
  428
  429wrap_first_answer(ID, Event0, CreateEvent) :-
  430    wrap_first_answer_in_create_event(CreateEvent, [answer(Event0)]),
  431    arg(1, CreateEvent, ID),
  432    !,
  433    retract(wrap_first_answer_in_create_event(CreateEvent, [answer(Event0)])).
  434wrap_first_answer(_ID, Event, Event).
  435
  436
  437empty_queue :-
  438    pengine_parent(Queue),
  439    empty_queue(Queue, 0, Discarded),
  440    debug(pengine(abort), 'Abort: discarded ~D messages', [Discarded]).
  441
  442empty_queue(Queue, C0, C) :-
  443    thread_get_message(Queue, _Term, [timeout(0)]),
  444    !,
  445    C1 is C0+1,
  446    empty_queue(Queue, C1, C).
  447empty_queue(_, C, C).
 pengine_ask(+NameOrID, @Query, +Options) is det
Asks pengine NameOrID a query Query.

Options is a list of options:

template(+Template)
Template is a variable (or a term containing variables) shared with the query. By default, the template is identical to the query.
chunk(+IntegerOrFalse)
Retrieve solutions in chunks of Integer rather than one by one. 1 means no chunking (default). Other integers indicate the maximum number of solutions to retrieve in one chunk. If false, the Pengine goal is not executed using findall/3 and friends and we do not backtrack immediately over the goal. As a result, changes to backtrackable global state are retained. This is similar that using set_prolog_flag(toplevel_mode, recursive).
bindings(+Bindings)
Sets the global variable '$variable_names' to a list of Name = Var terms, providing access to the actual variable names.

Any remaining options are passed to pengine_send/3.

Note that the predicate pengine_ask/3 is deterministic, even for queries that have more than one solution. Also, the variables in Query will not be bound. Instead, results will be returned in the form of event terms.

success(ID, Terms, Projection, Time, More)
ID is the id of the pengine that succeeded in solving the query. Terms is a list holding instantiations of Template. Projection is a list of variable names that should be displayed. Time is the CPU time used to produce the results and finally, More is either true or false, indicating whether we can expect the pengine to be able to return more solutions or not, would we call pengine_next/2.
failure(ID)
ID is the id of the pengine that failed for lack of a solutions.
error(ID, Term)
ID is the id of the pengine throwing the exception. Term is the exception's error term.
output(ID, Term)
ID is the id of a pengine running the query that called pengine_output/1. Term is the term that was passed in the first argument of pengine_output/1 when it was called.
prompt(ID, Term)
ID is the id of the pengine that called pengine_input/2 and Term is the prompt.

Defined in terms of pengine_send/3, like so:

pengine_ask(ID, Query, Options) :-
    partition(pengine_ask_option, Options, AskOptions, SendOptions),
    pengine_send(ID, ask(Query, AskOptions), SendOptions).

*/

  516pengine_ask(ID, Query, Options) :-
  517    partition(pengine_ask_option, Options, AskOptions, SendOptions),
  518    pengine_send(ID, ask(Query, AskOptions), SendOptions).
  519
  520
  521pengine_ask_option(template(_)).
  522pengine_ask_option(chunk(_)).
  523pengine_ask_option(bindings(_)).
  524pengine_ask_option(breakpoints(_)).
 pengine_next(+NameOrID, +Options) is det
Asks pengine NameOrID for the next solution to a query started by pengine_ask/3. Defined options are:
chunk(+Count)
Modify the chunk-size to Count before asking the next set of solutions. This may not be used if the goal was started with chunk(false).

Remaining options are passed to pengine_send/3. The result of re-executing the current goal is returned to the caller's message queue in the form of event terms.

success(ID, Terms, Projection, Time, More)
See pengine_ask/3.
failure(ID)
ID is the id of the pengine that failed for lack of more solutions.
error(ID, Term)
ID is the id of the pengine throwing the exception. Term is the exception's error term.
output(ID, Term)
ID is the id of a pengine running the query that called pengine_output/1. Term is the term that was passed in the first argument of pengine_output/1 when it was called.
prompt(ID, Term)
ID is the id of the pengine that called pengine_input/2 and Term is the prompt.

Defined in terms of pengine_send/3, as follows:

pengine_next(ID, Options) :-
    pengine_send(ID, next, Options).

*/

  569pengine_next(ID, Options) :-
  570    select_option(chunk(Count), Options, Options1),
  571    !,
  572    pengine_send(ID, next(Count), Options1).
  573pengine_next(ID, Options) :-
  574    pengine_send(ID, next, Options).
 pengine_stop(+NameOrID, +Options) is det
Tells pengine NameOrID to stop looking for more solutions to a query started by pengine_ask/3. Options are passed to pengine_send/3.

Defined in terms of pengine_send/3, like so:

pengine_stop(ID, Options) :-
    pengine_send(ID, stop, Options).

*/

  590pengine_stop(ID, Options) :- pengine_send(ID, stop, Options).
 pengine_abort(+NameOrID) is det
Aborts the running query. The pengine goes back to state `2', waiting for new queries.
See also
- pengine_destroy/1. */
  601pengine_abort(Name) :-
  602    (   child(Name, Pengine)
  603    ->  true
  604    ;   Pengine = Name
  605    ),
  606    (   pengine_remote(Pengine, Server)
  607    ->  remote_pengine_abort(Server, Pengine, [])
  608    ;   pengine_thread(Pengine, Thread),
  609        debug(pengine(abort), 'Signalling thread ~p', [Thread]),
  610        catch(thread_signal(Thread, throw(abort_query)), _, true)
  611    ).
 pengine_destroy(+NameOrID) is det
 pengine_destroy(+NameOrID, +Options) is det
Destroys the pengine NameOrID. With the option force(true), the pengine is killed using abort/0 and pengine_destroy/2 succeeds. */
  621pengine_destroy(ID) :-
  622    pengine_destroy(ID, []).
  623
  624pengine_destroy(Name, Options) :-
  625    (   child(Name, ID)
  626    ->  true
  627    ;   ID = Name
  628    ),
  629    option(force(true), Options),
  630    !,
  631    (   pengine_thread(ID, Thread)
  632    ->  catch(thread_signal(Thread, abort),
  633              error(existence_error(thread, _), _), true)
  634    ;   true
  635    ).
  636pengine_destroy(ID, Options) :-
  637    catch(pengine_send(ID, destroy, Options),
  638          error(existence_error(pengine, ID), _),
  639          retractall(child(_,ID))).
  640
  641
  642/*================= pengines administration =======================
  643*/
 current_pengine(?Id, ?Parent, ?Location)
Dynamic predicate that registers our known pengines. Id is an atomic unique datatype. Parent is the id of our parent pengine. Location is one of
  654:- dynamic
  655    current_pengine/6,              % Id, ParentId, Thread, URL, App, Destroy
  656    pengine_queue/4,                % Id, Queue, TimeOut, Time
  657    output_queue/3,                 % Id, Queue, Time
  658    pengine_user/2,                 % Id, User
  659    pengine_data/2,                 % Id, Data
  660    pengine_detached/2.             % Id, Data
  661:- volatile
  662    current_pengine/6,
  663    pengine_queue/4,
  664    output_queue/3,
  665    pengine_user/2,
  666    pengine_data/2,
  667    pengine_detached/2.  668
  669:- thread_local
  670    child/2.                        % ?Name, ?Child
 pengine_register_local(+Id, +Thread, +Queue, +URL, +App, +Destroy) is det
 pengine_register_remote(+Id, +URL, +Queue, +App, +Destroy) is det
 pengine_unregister(+Id) is det
  676pengine_register_local(Id, Thread, Queue, URL, Application, Destroy) :-
  677    asserta(current_pengine(Id, Queue, Thread, URL, Application, Destroy)).
  678
  679pengine_register_remote(Id, URL, Application, Destroy) :-
  680    thread_self(Queue),
  681    asserta(current_pengine(Id, Queue, 0, URL, Application, Destroy)).
 pengine_unregister(+Id)
Called by the pengine thread destruction. If we are a remote pengine thread, our URL equals http and the queue is the message queue used to send events to the HTTP workers.
  689pengine_unregister(Id) :-
  690    thread_self(Me),
  691    (   current_pengine(Id, Queue, Me, http, _, _)
  692    ->  with_mutex(pengine, sync_destroy_queue_from_pengine(Id, Queue))
  693    ;   true
  694    ),
  695    retractall(current_pengine(Id, _, Me, _, _, _)),
  696    retractall(pengine_user(Id, _)),
  697    retractall(pengine_data(Id, _)).
  698
  699pengine_unregister_remote(Id) :-
  700    retractall(current_pengine(Id, _Parent, 0, _, _, _)).
 pengine_self(-Id) is det
True if the current thread is a pengine with Id.
  706pengine_self(Id) :-
  707    thread_self(Thread),
  708    current_pengine(Id, _Parent, Thread, _URL, _Application, _Destroy).
  709
  710pengine_parent(Parent) :-
  711    nb_getval(pengine_parent, Parent).
  712
  713pengine_thread(Pengine, Thread) :-
  714    current_pengine(Pengine, _Parent, Thread, _URL, _Application, _Destroy),
  715    Thread \== 0,
  716    !.
  717
  718pengine_remote(Pengine, URL) :-
  719    current_pengine(Pengine, _Parent, 0, URL, _Application, _Destroy).
  720
  721get_pengine_application(Pengine, Application) :-
  722    current_pengine(Pengine, _Parent, _, _URL, Application, _Destroy),
  723    !.
  724
  725get_pengine_module(Pengine, Pengine).
  726
  727:- if(current_predicate(uuid/2)).  728pengine_uuid(Id) :-
  729    uuid(Id, [version(4)]).             % Version 4 is random.
  730:- else.  731pengine_uuid(Id) :-
  732    (   current_prolog_flag(max_integer, Max1)
  733    ->  Max is Max1-1
  734    ;   Max is 1<<128
  735    ),
  736    random_between(0, Max, Num),
  737    atom_number(Id, Num).
  738:- endif.
 protect_pengine(+Id, :Goal) is semidet
Run Goal while protecting the Pengine Id from being destroyed. Used by the HTTP I/O routines to avoid that the Pengine's module disappears while I/O is in progress. We use a pool of locks because the lock may be held relatively long by output routines.

This also runs Goal if the Pengine no longer exists. This deals with Pengines terminated through destroy_or_continue/1.

bug
- After destroy_or_continue/1 takes the destroy route, the module may drop-out at any point in time, resulting in a possible crash. Seems the only safe way out is to do (de)serialization inside the Pengine.
  755:- meta_predicate protect_pengine(+, 0).  756
  757protect_pengine(Id, Goal) :-
  758    term_hash(Id, Hash),
  759    LockN is Hash mod 64,
  760    atom_concat(pengine_done_, LockN, Lock),
  761    with_mutex(Lock,
  762               (   pengine_thread(Id, _)
  763               ->  Goal
  764               ;   Goal
  765               )).
 pengine_application(+Application) is det
Directive that must be used to declare a pengine application module. The module must not be associated to any file. The default application is pengine_sandbox. The example below creates a new application address_book and imports the API defined in the module file adress_book_api.pl into the application.
:- pengine_application(address_book).
:- use_module(address_book:adress_book_api).

*/

  782pengine_application(Application) :-
  783    throw(error(context_error(nodirective,
  784                             pengine_application(Application)), _)).
  785
  786:- multifile
  787    system:term_expansion/2,
  788    current_application/1.
 current_pengine_application(?Application) is nondet
True when Application is a currently defined application.
See also
- pengine_application/1
  796current_pengine_application(Application) :-
  797    current_application(Application).
  798
  799
  800% Default settings for all applications
  801
  802:- setting(thread_pool_size, integer, 100,
  803           'Maximum number of pengines this application can run.').  804:- setting(thread_pool_stacks, list(compound), [],
  805           'Maximum stack sizes for pengines this application can run.').  806:- setting(slave_limit, integer, 3,
  807           'Maximum number of slave pengines a master pengine can create.').  808:- setting(time_limit, number, 300,
  809           'Maximum time to wait for output').  810:- setting(idle_limit, number, 300,
  811           'Pengine auto-destroys when idle for this time').  812:- setting(safe_goal_limit, number, 10,
  813           'Maximum time to try proving safety of the goal').  814:- setting(program_space, integer, 100_000_000,
  815           'Maximum memory used by predicates').  816:- setting(allow_from, list(atom), [*],
  817           'IP addresses from which remotes are allowed to connect').  818:- setting(deny_from, list(atom), [],
  819           'IP addresses from which remotes are NOT allowed to connect').  820:- setting(debug_info, boolean, false,
  821           'Keep information to support source-level debugging').  822
  823
  824system:term_expansion((:- pengine_application(Application)), Expanded) :-
  825    must_be(atom, Application),
  826    (   module_property(Application, file(_))
  827    ->  permission_error(create, pengine_application, Application)
  828    ;   true
  829    ),
  830    expand_term((:- setting(Application:thread_pool_size, integer,
  831                            setting(pengines:thread_pool_size),
  832                            'Maximum number of pengines this \c
  833                            application can run.')),
  834                ThreadPoolSizeSetting),
  835    expand_term((:- setting(Application:thread_pool_stacks, list(compound),
  836                            setting(pengines:thread_pool_stacks),
  837                            'Maximum stack sizes for pengines \c
  838                            this application can run.')),
  839                ThreadPoolStacksSetting),
  840    expand_term((:- setting(Application:slave_limit, integer,
  841                            setting(pengines:slave_limit),
  842                            'Maximum number of local slave pengines \c
  843                            a master pengine can create.')),
  844                SlaveLimitSetting),
  845    expand_term((:- setting(Application:time_limit, number,
  846                            setting(pengines:time_limit),
  847                            'Maximum time to wait for output')),
  848                TimeLimitSetting),
  849    expand_term((:- setting(Application:idle_limit, number,
  850                            setting(pengines:idle_limit),
  851                            'Pengine auto-destroys when idle for this time')),
  852                IdleLimitSetting),
  853    expand_term((:- setting(Application:safe_goal_limit, number,
  854                            setting(pengines:safe_goal_limit),
  855                            'Maximum time to try proving safety of the goal')),
  856                SafeGoalLimitSetting),
  857    expand_term((:- setting(Application:program_space, integer,
  858                            setting(pengines:program_space),
  859                            'Maximum memory used by predicates')),
  860                ProgramSpaceSetting),
  861    expand_term((:- setting(Application:allow_from, list(atom),
  862                            setting(pengines:allow_from),
  863                            'IP addresses from which remotes are allowed \c
  864                            to connect')),
  865                AllowFromSetting),
  866    expand_term((:- setting(Application:deny_from, list(atom),
  867                            setting(pengines:deny_from),
  868                            'IP addresses from which remotes are NOT \c
  869                            allowed to connect')),
  870                DenyFromSetting),
  871    expand_term((:- setting(Application:debug_info, boolean,
  872                            setting(pengines:debug_info),
  873                            'Keep information to support source-level \c
  874                            debugging')),
  875                DebugInfoSetting),
  876    flatten([ pengines:current_application(Application),
  877              ThreadPoolSizeSetting,
  878              ThreadPoolStacksSetting,
  879              SlaveLimitSetting,
  880              TimeLimitSetting,
  881              IdleLimitSetting,
  882              SafeGoalLimitSetting,
  883              ProgramSpaceSetting,
  884              AllowFromSetting,
  885              DenyFromSetting,
  886              DebugInfoSetting
  887            ], Expanded).
  888
  889% Register default application
  890
  891:- pengine_application(pengine_sandbox).
 pengine_property(?Pengine, ?Property) is nondet
True when Property is a property of the given Pengine. Enumerates all pengines that are known to the calling Prolog process. Defined properties are:
self(ID)
Identifier of the pengine. This is the same as the first argument, and can be used to enumerate all known pengines.
alias(Name)
Name is the alias name of the pengine, as provided through the alias option when creating the pengine.
thread(Thread)
If the pengine is a local pengine, Thread is the Prolog thread identifier of the pengine.
remote(Server)
If the pengine is remote, the URL of the server.
application(Application)
Pengine runs the given application
module(Module)
Temporary module used for running the Pengine.
destroy(Destroy)
Destroy is true if the pengines is destroyed automatically after completing the query.
parent(Queue)
Message queue to which the (local) pengine reports.
source(?SourceID, ?Source)
Source is the source code with the given SourceID. May be present if the setting debug_info is present.
detached(?Time)
Pengine was detached at Time. */
  928pengine_property(Id, Prop) :-
  929    nonvar(Id), nonvar(Prop),
  930    pengine_property2(Prop, Id),
  931    !.
  932pengine_property(Id, Prop) :-
  933    pengine_property2(Prop, Id).
  934
  935pengine_property2(self(Id), Id) :-
  936    current_pengine(Id, _Parent, _Thread, _URL, _Application, _Destroy).
  937pengine_property2(module(Id), Id) :-
  938    current_pengine(Id, _Parent, _Thread, _URL, _Application, _Destroy).
  939pengine_property2(alias(Alias), Id) :-
  940    child(Alias, Id),
  941    Alias \== Id.
  942pengine_property2(thread(Thread), Id) :-
  943    current_pengine(Id, _Parent, Thread, _URL, _Application, _Destroy),
  944    Thread \== 0.
  945pengine_property2(remote(Server), Id) :-
  946    current_pengine(Id, _Parent, 0, Server, _Application, _Destroy).
  947pengine_property2(application(Application), Id) :-
  948    current_pengine(Id, _Parent, _Thread, _Server, Application, _Destroy).
  949pengine_property2(destroy(Destroy), Id) :-
  950    current_pengine(Id, _Parent, _Thread, _Server, _Application, Destroy).
  951pengine_property2(parent(Parent), Id) :-
  952    current_pengine(Id, Parent, _Thread, _URL, _Application, _Destroy).
  953pengine_property2(source(SourceID, Source), Id) :-
  954    pengine_data(Id, source(SourceID, Source)).
  955pengine_property2(detached(When), Id) :-
  956    pengine_detached(Id, When).
 pengine_output(+Term) is det
Sends Term to the parent pengine or thread. */
  963pengine_output(Term) :-
  964    pengine_self(Me),
  965    pengine_reply(output(Me, Term)).
 pengine_debug(+Format, +Args) is det
Create a message using format/3 from Format and Args and send this to the client. The default JavaScript client will call console.log(Message) if there is a console. The predicate pengine_rpc/3 calls debug(pengine(debug), '~w', [Message]). The debug topic pengine(debug) is enabled by default.
See also
- debug/1 and nodebug/1 for controlling the pengine(debug) topic
- format/2 for format specifications */
  980pengine_debug(Format, Args) :-
  981    pengine_parent(Queue),
  982    pengine_self(Self),
  983    catch(safe_goal(format(atom(_), Format, Args)), E, true),
  984    (   var(E)
  985    ->  format(atom(Message), Format, Args)
  986    ;   message_to_string(E, Message)
  987    ),
  988    pengine_reply(Queue, debug(Self, Message)).
  989
  990
  991/*================= Local pengine =======================
  992*/
 local_pengine_create(+Options)
Creates a local Pengine, which is a thread running pengine_main/2. It maintains two predicates:
 1003local_pengine_create(Options) :-
 1004    thread_self(Self),
 1005    option(application(Application), Options, pengine_sandbox),
 1006    create(Self, Child, Options, local, Application),
 1007    option(alias(Name), Options, Child),
 1008    assert(child(Name, Child)).
 thread_pool:create_pool(+Application) is det
On demand creation of a thread pool for a pengine application.
 1015:- multifile thread_pool:create_pool/1. 1016
 1017thread_pool:create_pool(Application) :-
 1018    current_application(Application),
 1019    setting(Application:thread_pool_size, Size),
 1020    setting(Application:thread_pool_stacks, Stacks),
 1021    thread_pool_create(Application, Size, Stacks).
 create(+Queue, -Child, +Options, +URL, +Application) is det
Create a new pengine thread.
Arguments:
Queue- is the queue (or thread handle) to report to
Child- is the identifier of the created pengine.
URL- is one of local or http
 1031create(Queue, Child, Options, local, Application) :-
 1032    !,
 1033    pengine_child_id(Child),
 1034    create0(Queue, Child, Options, local, Application).
 1035create(Queue, Child, Options, URL, Application) :-
 1036    pengine_child_id(Child),
 1037    catch(create0(Queue, Child, Options, URL, Application),
 1038          Error,
 1039          create_error(Queue, Child, Error)).
 1040
 1041pengine_child_id(Child) :-
 1042    (   nonvar(Child)
 1043    ->  true
 1044    ;   pengine_uuid(Child)
 1045    ).
 1046
 1047create_error(Queue, Child, Error) :-
 1048    pengine_reply(Queue, error(Child, Error)).
 1049
 1050create0(Queue, Child, Options, URL, Application) :-
 1051    (  current_application(Application)
 1052    -> true
 1053    ;  existence_error(pengine_application, Application)
 1054    ),
 1055    (   URL \== http                    % pengine is _not_ a child of the
 1056                                        % HTTP server thread
 1057    ->  aggregate_all(count, child(_,_), Count),
 1058        setting(Application:slave_limit, Max),
 1059        (   Count >= Max
 1060        ->  throw(error(resource_error(max_pengines), _))
 1061        ;   true
 1062        )
 1063    ;   true
 1064    ),
 1065    partition(pengine_create_option, Options, PengineOptions, RestOptions),
 1066    thread_create_in_pool(
 1067        Application,
 1068        pengine_main(Queue, PengineOptions, Application), ChildThread,
 1069        [ at_exit(pengine_done)
 1070        | RestOptions
 1071        ]),
 1072    option(destroy(Destroy), PengineOptions, true),
 1073    pengine_register_local(Child, ChildThread, Queue, URL, Application, Destroy),
 1074    thread_send_message(ChildThread, pengine_registered(Child)),
 1075    (   option(id(Id), Options)
 1076    ->  Id = Child
 1077    ;   true
 1078    ).
 1079
 1080pengine_create_option(src_text(_)).
 1081pengine_create_option(src_url(_)).
 1082pengine_create_option(application(_)).
 1083pengine_create_option(destroy(_)).
 1084pengine_create_option(ask(_)).
 1085pengine_create_option(template(_)).
 1086pengine_create_option(bindings(_)).
 1087pengine_create_option(chunk(_)).
 1088pengine_create_option(alias(_)).
 1089pengine_create_option(user(_)).
 pengine_done is det
Called from the pengine thread at_exit option. Destroys child pengines using pengine_destroy/1. Cleaning up the Pengine is synchronised by the pengine_done mutex. See read_event/6.
 1098:- public
 1099    pengine_done/0. 1100
 1101pengine_done :-
 1102    thread_self(Me),
 1103    (   thread_property(Me, status(exception(Ex))),
 1104        abort_exception(Ex),
 1105        thread_detach(Me),
 1106        pengine_self(Pengine)
 1107    ->  catch(pengine_reply(destroy(Pengine, abort(Pengine))),
 1108              error(_,_), true)
 1109    ;   true
 1110    ),
 1111    forall(child(_Name, Child),
 1112           pengine_destroy(Child)),
 1113    pengine_self(Id),
 1114    protect_pengine(Id, pengine_unregister(Id)).
 1115
 1116abort_exception('$aborted').
 1117abort_exception(unwind(abort)).
 pengine_main(+Parent, +Options, +Application)
Run a pengine main loop. First acknowledges its creation and run pengine_main_loop/1.
 1124:- thread_local wrap_first_answer_in_create_event/2. 1125
 1126:- meta_predicate
 1127    pengine_prepare_source(:, +). 1128
 1129pengine_main(Parent, Options, Application) :-
 1130    fix_streams,
 1131    thread_get_message(pengine_registered(Self)),
 1132    nb_setval(pengine_parent, Parent),
 1133    pengine_register_user(Options),
 1134    set_prolog_flag(mitigate_spectre, true),
 1135    catch(in_temporary_module(
 1136              Self,
 1137              pengine_prepare_source(Application, Options),
 1138              pengine_create_and_loop(Self, Application, Options)),
 1139          prepare_source_failed,
 1140          pengine_terminate(Self)).
 1141
 1142pengine_create_and_loop(Self, Application, Options) :-
 1143    setting(Application:slave_limit, SlaveLimit),
 1144    CreateEvent = create(Self, [slave_limit(SlaveLimit)|Extra]),
 1145    (   option(ask(Query0), Options)
 1146    ->  asserta(wrap_first_answer_in_create_event(CreateEvent, Extra)),
 1147        (   string(Query0)                      % string is not callable
 1148        ->  (   option(template(TemplateS), Options)
 1149            ->  Ask2 = Query0-TemplateS
 1150            ;   Ask2 = Query0
 1151            ),
 1152            catch(ask_to_term(Ask2, Self, Query, Template, Bindings),
 1153                  Error, true),
 1154            (   var(Error)
 1155            ->  true
 1156            ;   send_error(Error),
 1157                throw(prepare_source_failed)
 1158            )
 1159        ;   Query = Query0,
 1160            option(template(Template), Options, Query),
 1161            option(bindings(Bindings), Options, [])
 1162        ),
 1163        option(chunk(Chunk), Options, 1),
 1164        pengine_ask(Self, Query,
 1165                    [ template(Template),
 1166                      chunk(Chunk),
 1167                      bindings(Bindings)
 1168                    ])
 1169    ;   Extra = [],
 1170        pengine_reply(CreateEvent)
 1171    ),
 1172    pengine_main_loop(Self).
 ask_to_term(+AskSpec, +Module, -Options, OptionsTail) is det
Translate the AskSpec into a query, template and bindings. The trick is that we must parse using the operator declarations of the source and we must make sure variable sharing between query and answer template are known.
 1182ask_to_term(Ask-Template, Module, Ask1, Template1, Bindings) :-
 1183    !,
 1184    format(string(AskTemplate), 't((~s),(~s))', [Template, Ask]),
 1185    term_string(t(Template1,Ask1), AskTemplate,
 1186                [ variable_names(Bindings0),
 1187                  module(Module)
 1188                ]),
 1189    phrase(template_bindings(Template1, Bindings0), Bindings).
 1190ask_to_term(Ask, Module, Ask1, Template, Bindings1) :-
 1191    term_string(Ask1, Ask,
 1192                [ variable_names(Bindings),
 1193                  module(Module)
 1194                ]),
 1195    exclude(anon, Bindings, Bindings1),
 1196    dict_create(Template, swish_default_template, Bindings1).
 1197
 1198template_bindings(Var, Bindings) -->
 1199    { var(Var) }, !,
 1200    (   { var_binding(Bindings, Var, Binding)
 1201        }
 1202    ->  [Binding]
 1203    ;   []
 1204    ).
 1205template_bindings([H|T], Bindings) -->
 1206    !,
 1207    template_bindings(H, Bindings),
 1208    template_bindings(T, Bindings).
 1209template_bindings(Compoound, Bindings) -->
 1210    { compound(Compoound), !,
 1211      compound_name_arguments(Compoound, _, Args)
 1212    },
 1213    template_bindings(Args, Bindings).
 1214template_bindings(_, _) --> [].
 1215
 1216var_binding(Bindings, Var, Binding) :-
 1217    member(Binding, Bindings),
 1218    arg(2, Binding, V),
 1219    V == Var, !.
 fix_streams is det
If we are a pengine that is created from a web server thread, the current output points to a CGI stream.
 1226fix_streams :-
 1227    fix_stream(current_output).
 1228
 1229fix_stream(Name) :-
 1230    is_cgi_stream(Name),
 1231    !,
 1232    debug(pengine(stream), '~w is a CGI stream!', [Name]),
 1233    set_stream(user_output, alias(Name)).
 1234fix_stream(_).
 pengine_prepare_source(:Application, +Options) is det
Load the source into the pengine's module.
throws
- prepare_source_failed if it failed to prepare the sources.
 1243pengine_prepare_source(Module:Application, Options) :-
 1244    setting(Application:program_space, SpaceLimit),
 1245    set_module(Module:program_space(SpaceLimit)),
 1246    delete_import_module(Module, user),
 1247    add_import_module(Module, Application, start),
 1248    catch(prep_module(Module, Application, Options), Error, true),
 1249    (   var(Error)
 1250    ->  true
 1251    ;   send_error(Error),
 1252        throw(prepare_source_failed)
 1253    ).
 1254
 1255prep_module(Module, Application, Options) :-
 1256    maplist(copy_flag(Module, Application), [var_prefix]),
 1257    forall(prepare_module(Module, Application, Options), true),
 1258    setup_call_cleanup(
 1259        '$set_source_module'(OldModule, Module),
 1260        maplist(process_create_option(Module), Options),
 1261        '$set_source_module'(OldModule)).
 1262
 1263copy_flag(Module, Application, Flag) :-
 1264    current_prolog_flag(Application:Flag, Value),
 1265    !,
 1266    set_prolog_flag(Module:Flag, Value).
 1267copy_flag(_, _, _).
 1268
 1269process_create_option(Application, src_text(Text)) :-
 1270    !,
 1271    pengine_src_text(Text, Application).
 1272process_create_option(Application, src_url(URL)) :-
 1273    !,
 1274    pengine_src_url(URL, Application).
 1275process_create_option(_, _).
 prepare_module(+Module, +Application, +Options) is semidet
Hook, called to initialize the temporary private module that provides the working context of a pengine. This hook is executed by the pengine's thread. Preparing the source consists of three steps:
  1. Add Application as (first) default import module for Module
  2. Call this hook
  3. Compile the source provided by the the src_text and src_url options
Arguments:
Module- is a new temporary module (see in_temporary_module/3) that may be (further) prepared by this hook.
Application- (also a module) associated to the pengine.
Options- is passed from the environment and should (currently) be ignored.
 1298pengine_main_loop(ID) :-
 1299    catch(guarded_main_loop(ID), abort_query, pengine_aborted(ID)).
 1300
 1301pengine_aborted(ID) :-
 1302    thread_self(Self),
 1303    debug(pengine(abort), 'Aborting ~p (thread ~p)', [ID, Self]),
 1304    empty_queue,
 1305    destroy_or_continue(abort(ID)).
 guarded_main_loop(+Pengine) is det
Executes state `2' of the pengine, where it waits for two events:
destroy
Terminate the pengine
ask(:Goal, +Options)
Solve Goal.
 1318guarded_main_loop(ID) :-
 1319    pengine_request(Request),
 1320    (   Request = destroy
 1321    ->  debug(pengine(transition), '~q: 2 = ~q => 1', [ID, destroy]),
 1322        pengine_terminate(ID)
 1323    ;   Request = ask(Goal, Options)
 1324    ->  debug(pengine(transition), '~q: 2 = ~q => 3', [ID, ask(Goal)]),
 1325        ask(ID, Goal, Options)
 1326    ;   debug(pengine(transition), '~q: 2 = ~q => 2', [ID, protocol_error]),
 1327        pengine_reply(error(ID, error(protocol_error, _))),
 1328        guarded_main_loop(ID)
 1329    ).
 1330
 1331
 1332pengine_terminate(ID) :-
 1333    pengine_reply(destroy(ID)),
 1334    thread_self(Me),            % Make the thread silently disappear
 1335    thread_detach(Me).
 solve(+Chunk, +Template, :Goal, +ID) is det
Solve Goal. Note that because we can ask for a new goal in state `6', we must provide for an ancesteral cut (prolog_cut_to/1). We need to be sure to have a choice point before we can call prolog_current_choice/1. This is the reason why this predicate has two clauses.
 1346solve(Chunk, Template, Goal, ID) :-
 1347    prolog_current_choice(Choice),
 1348    (   integer(Chunk)
 1349    ->  State = count(Chunk)
 1350    ;   Chunk == false
 1351    ->  State = no_chunk
 1352    ;   domain_error(chunk, Chunk)
 1353    ),
 1354    statistics(cputime, Epoch),
 1355    Time = time(Epoch),
 1356    nb_current('$variable_names', Bindings),
 1357    filter_template(Template, Bindings, Template2),
 1358    '$current_typein_module'(CurrTypeIn),
 1359    (   '$set_typein_module'(ID),
 1360        call_cleanup(catch(findnsols_no_empty(State, Template2,
 1361                                              set_projection(Goal, Bindings),
 1362                                              Result),
 1363                           Error, true),
 1364                     query_done(Det, CurrTypeIn)),
 1365        arg(1, Time, T0),
 1366        statistics(cputime, T1),
 1367        CPUTime is T1-T0,
 1368        forall(pengine_flush_output_hook, true),
 1369        (   var(Error)
 1370        ->  projection(Projection),
 1371            (   var(Det)
 1372            ->  pengine_reply(success(ID, Result, Projection,
 1373                                      CPUTime, true)),
 1374                more_solutions(ID, Choice, State, Time)
 1375            ;   !,                      % commit
 1376                destroy_or_continue(success(ID, Result, Projection,
 1377                                            CPUTime, false))
 1378            )
 1379        ;   !,                          % commit
 1380            (   Error == abort_query
 1381            ->  throw(Error)
 1382            ;   destroy_or_continue(error(ID, Error))
 1383            )
 1384        )
 1385    ;   !,                              % commit
 1386        arg(1, Time, T0),
 1387        statistics(cputime, T1),
 1388        CPUTime is T1-T0,
 1389        destroy_or_continue(failure(ID, CPUTime))
 1390    ).
 1391solve(_, _, _, _).                      % leave a choice point
 1392
 1393query_done(true, CurrTypeIn) :-
 1394    '$set_typein_module'(CurrTypeIn).
 set_projection(:Goal, +Bindings)
findnsols_no_empty/4 copies its goal and template to avoid instantiation thereof when it stops after finding N solutions. Using this helper we can a renamed version of Bindings that we can set.
 1403set_projection(Goal, Bindings) :-
 1404    b_setval('$variable_names', Bindings),
 1405    call(Goal).
 1406
 1407projection(Projection) :-
 1408    nb_current('$variable_names', Bindings),
 1409    !,
 1410    maplist(var_name, Bindings, Projection).
 1411projection([]).
 filter_template(+Template0, +Bindings, -Template) is det
Establish the final template. This is there because hooks such as goal_expansion/2 and the SWISH query hooks can modify the set of bindings.
bug
- Projection and template handling is pretty messy.
 1421filter_template(Template0, Bindings, Template) :-
 1422    is_dict(Template0, swish_default_template),
 1423    !,
 1424    dict_create(Template, swish_default_template, Bindings).
 1425filter_template(Template, _Bindings, Template).
 1426
 1427findnsols_no_empty(no_chunk, Template, Goal, List) =>
 1428    List = [Template],
 1429    call(Goal).
 1430findnsols_no_empty(State, Template, Goal, List) =>
 1431    findnsols(State, Template, Goal, List),
 1432    List \== [].
 1433
 1434destroy_or_continue(Event) :-
 1435    arg(1, Event, ID),
 1436    (   pengine_property(ID, destroy(true))
 1437    ->  thread_self(Me),
 1438        thread_detach(Me),
 1439        pengine_reply(destroy(ID, Event))
 1440    ;   pengine_reply(Event),
 1441        guarded_main_loop(ID)
 1442    ).
 more_solutions(+Pengine, +Choice, +State, +Time)
Called after a solution was found while there can be more. This is state `6' of the state machine. It processes these events:
stop
Go back via state `7' to state `2' (guarded_main_loop/1)
next
Fail. This causes solve/3 to backtrack on the goal asked, providing at most the current chunk solutions.
next(Count)
As next, but sets the new chunk-size to Count.
ask(Goal, Options)
Ask another goal. Note that we must commit the choice point of the previous goal asked for.
 1460more_solutions(ID, Choice, State, Time) :-
 1461    pengine_request(Event),
 1462    more_solutions(Event, ID, Choice, State, Time).
 1463
 1464more_solutions(stop, ID, _Choice, _State, _Time) :-
 1465    !,
 1466    debug(pengine(transition), '~q: 6 = ~q => 7', [ID, stop]),
 1467    destroy_or_continue(stop(ID)).
 1468more_solutions(next, ID, _Choice, _State, Time) :-
 1469    !,
 1470    debug(pengine(transition), '~q: 6 = ~q => 3', [ID, next]),
 1471    statistics(cputime, T0),
 1472    nb_setarg(1, Time, T0),
 1473    fail.
 1474more_solutions(next(Count), ID, _Choice, State, Time) :-
 1475    Count > 0,
 1476    State = count(_),                   % else fallthrough to protocol error
 1477    !,
 1478    debug(pengine(transition), '~q: 6 = ~q => 3', [ID, next(Count)]),
 1479    nb_setarg(1, State, Count),
 1480    statistics(cputime, T0),
 1481    nb_setarg(1, Time, T0),
 1482    fail.
 1483more_solutions(ask(Goal, Options), ID, Choice, _State, _Time) :-
 1484    !,
 1485    debug(pengine(transition), '~q: 6 = ~q => 3', [ID, ask(Goal)]),
 1486    prolog_cut_to(Choice),
 1487    ask(ID, Goal, Options).
 1488more_solutions(destroy, ID, _Choice, _State, _Time) :-
 1489    !,
 1490    debug(pengine(transition), '~q: 6 = ~q => 1', [ID, destroy]),
 1491    pengine_terminate(ID).
 1492more_solutions(Event, ID, Choice, State, Time) :-
 1493    debug(pengine(transition), '~q: 6 = ~q => 6', [ID, protocol_error(Event)]),
 1494    pengine_reply(error(ID, error(protocol_error, _))),
 1495    more_solutions(ID, Choice, State, Time).
 ask(+Pengine, :Goal, +Options)
Migrate from state `2' to `3'. This predicate validates that it is safe to call Goal using safe_goal/1 and then calls solve/3 to prove the goal. It takes care of the chunk(N) option.
 1503ask(ID, Goal, Options) :-
 1504    catch(prepare_goal(ID, Goal, Goal1, Options), Error, true),
 1505    !,
 1506    (   var(Error)
 1507    ->  option(template(Template), Options, Goal),
 1508        option(chunk(N), Options, 1),
 1509        solve(N, Template, Goal1, ID)
 1510    ;   pengine_reply(error(ID, Error)),
 1511        guarded_main_loop(ID)
 1512    ).
 prepare_goal(+Pengine, +GoalIn, -GoalOut, +Options) is det
Prepare GoalIn for execution in Pengine. This implies we must perform goal expansion and, if the system is sandboxed, check the sandbox.

Note that expand_goal(Module:GoalIn, GoalOut) is what we'd like to write, but this does not work correctly if the user wishes to expand X:Y while interpreting X not as the module in which to run Y. This happens in the CQL package. Possibly we should disallow this reinterpretation?

 1526prepare_goal(ID, Goal0, Module:Goal, Options) :-
 1527    option(bindings(Bindings), Options, []),
 1528    b_setval('$variable_names', Bindings),
 1529    (   prepare_goal(Goal0, Goal1, Options)
 1530    ->  true
 1531    ;   Goal1 = Goal0
 1532    ),
 1533    get_pengine_module(ID, Module),
 1534    setup_call_cleanup(
 1535        '$set_source_module'(Old, Module),
 1536        expand_goal(Goal1, Goal),
 1537        '$set_source_module'(_, Old)),
 1538    (   pengine_not_sandboxed(ID)
 1539    ->  true
 1540    ;   get_pengine_application(ID, App),
 1541        setting(App:safe_goal_limit, Limit),
 1542        catch(call_with_time_limit(
 1543                  Limit,
 1544                  safe_goal(Module:Goal)), E, true)
 1545    ->  (   var(E)
 1546        ->  true
 1547        ;   E = time_limit_exceeded
 1548        ->  throw(error(sandbox(time_limit_exceeded, Limit),_))
 1549        ;   throw(E)
 1550        )
 1551    ).
 prepare_goal(+Goal0, -Goal1, +Options) is semidet
Pre-preparation hook for running Goal0. The hook runs in the context of the pengine. Goal is the raw goal given to ask. The returned Goal1 is subject to goal expansion (expand_goal/2) and sandbox validation (safe_goal/1) prior to execution. If this goal fails, Goal0 is used for further processing.
Arguments:
Options- provides the options as given to ask
 pengine_not_sandboxed(+Pengine) is semidet
True when pengine does not operate in sandboxed mode. This implies a user must be registered by authentication_hook/3 and the hook pengines:not_sandboxed(User, Application) must succeed.
 1571pengine_not_sandboxed(ID) :-
 1572    pengine_user(ID, User),
 1573    pengine_property(ID, application(App)),
 1574    not_sandboxed(User, App),
 1575    !.
 not_sandboxed(+User, +Application) is semidet
This hook is called to see whether the Pengine must be executed in a protected environment. It is only called after authentication_hook/3 has confirmed the authentity of the current user. If this hook succeeds, both loading the code and executing the query is executed without enforcing sandbox security. Typically, one should:
  1. Provide a safe user authentication hook.
  2. Enable HTTPS in the server or put it behind an HTTPS proxy and ensure that the network between the proxy and the pengine server can be trusted.
 pengine_pull_response(+Pengine, +Options) is det
Pulls a response (an event term) from the slave Pengine if Pengine is a remote process, else does nothing at all. */
 1597pengine_pull_response(Pengine, Options) :-
 1598    pengine_remote(Pengine, Server),
 1599    !,
 1600    remote_pengine_pull_response(Server, Pengine, Options).
 1601pengine_pull_response(_ID, _Options).
 pengine_input(+Prompt, -Term) is det
Sends Prompt to the master (parent) pengine and waits for input. Note that Prompt may be any term, compound as well as atomic. */
 1610pengine_input(Prompt, Term) :-
 1611    pengine_self(Self),
 1612    pengine_parent(Parent),
 1613    pengine_reply(Parent, prompt(Self, Prompt)),
 1614    pengine_request(Request),
 1615    (   Request = input(Input)
 1616    ->  Term = Input
 1617    ;   Request == destroy
 1618    ->  abort
 1619    ;   throw(error(protocol_error,_))
 1620    ).
 pengine_respond(+Pengine, +Input, +Options) is det
Sends a response in the form of the term Input to a slave (child) pengine that has prompted its master (parent) for input.

Defined in terms of pengine_send/3, as follows:

pengine_respond(Pengine, Input, Options) :-
    pengine_send(Pengine, input(Input), Options).

*/

 1637pengine_respond(Pengine, Input, Options) :-
 1638    pengine_send(Pengine, input(Input), Options).
 send_error(+Error) is det
Send an error to my parent. Remove non-readable blobs from the error term first using replace_blobs/2. If the error contains a stack-trace, this is resolved to a string before sending.
 1647send_error(error(Formal, context(prolog_stack(Frames), Message))) :-
 1648    is_list(Frames),
 1649    !,
 1650    with_output_to(string(Stack),
 1651                   print_prolog_backtrace(current_output, Frames)),
 1652    pengine_self(Self),
 1653    replace_blobs(Formal, Formal1),
 1654    replace_blobs(Message, Message1),
 1655    pengine_reply(error(Self, error(Formal1,
 1656                                    context(prolog_stack(Stack), Message1)))).
 1657send_error(Error) :-
 1658    pengine_self(Self),
 1659    replace_blobs(Error, Error1),
 1660    pengine_reply(error(Self, Error1)).
 replace_blobs(Term0, Term) is det
Copy Term0 to Term, replacing non-text blobs. This is required for error messages that may hold streams and other handles to non-readable objects.
 1668replace_blobs(Blob, Atom) :-
 1669    blob(Blob, Type), Type \== text,
 1670    !,
 1671    format(atom(Atom), '~p', [Blob]).
 1672replace_blobs(Term0, Term) :-
 1673    compound(Term0),
 1674    !,
 1675    compound_name_arguments(Term0, Name, Args0),
 1676    maplist(replace_blobs, Args0, Args),
 1677    compound_name_arguments(Term, Name, Args).
 1678replace_blobs(Term, Term).
 1679
 1680
 1681/*================= Remote pengines =======================
 1682*/
 1683
 1684
 1685remote_pengine_create(BaseURL, Options) :-
 1686    partition(pengine_create_option, Options, PengineOptions0, RestOptions),
 1687        (       option(ask(Query), PengineOptions0),
 1688                \+ option(template(_Template), PengineOptions0)
 1689        ->      PengineOptions = [template(Query)|PengineOptions0]
 1690        ;       PengineOptions = PengineOptions0
 1691        ),
 1692    options_to_dict(PengineOptions, PostData),
 1693    remote_post_rec(BaseURL, create, PostData, Reply, RestOptions),
 1694    arg(1, Reply, ID),
 1695    (   option(id(ID2), Options)
 1696    ->  ID = ID2
 1697    ;   true
 1698    ),
 1699    option(alias(Name), Options, ID),
 1700    assert(child(Name, ID)),
 1701    (   (   functor(Reply, create, _)   % actually created
 1702        ;   functor(Reply, output, _)   % compiler messages
 1703        )
 1704    ->  option(application(Application), PengineOptions, pengine_sandbox),
 1705        option(destroy(Destroy), PengineOptions, true),
 1706        pengine_register_remote(ID, BaseURL, Application, Destroy)
 1707    ;   true
 1708    ),
 1709    thread_self(Queue),
 1710    pengine_reply(Queue, Reply).
 1711
 1712options_to_dict(Options, Dict) :-
 1713    select_option(ask(Ask), Options, Options1),
 1714    select_option(template(Template), Options1, Options2),
 1715    !,
 1716    no_numbered_var_in(Ask+Template),
 1717    findall(AskString-TemplateString,
 1718            ask_template_to_strings(Ask, Template, AskString, TemplateString),
 1719            [ AskString-TemplateString ]),
 1720    options_to_dict(Options2, Dict0),
 1721    Dict = Dict0.put(_{ask:AskString,template:TemplateString}).
 1722options_to_dict(Options, Dict) :-
 1723    maplist(prolog_option, Options, Options1),
 1724    dict_create(Dict, _, Options1).
 1725
 1726no_numbered_var_in(Term) :-
 1727    sub_term(Sub, Term),
 1728    subsumes_term('$VAR'(_), Sub),
 1729    !,
 1730    domain_error(numbered_vars_free_term, Term).
 1731no_numbered_var_in(_).
 1732
 1733ask_template_to_strings(Ask, Template, AskString, TemplateString) :-
 1734    numbervars(Ask+Template, 0, _),
 1735    WOpts = [ numbervars(true), ignore_ops(true), quoted(true) ],
 1736    format(string(AskTemplate), '~W\n~W', [ Ask, WOpts,
 1737                                            Template, WOpts
 1738                                          ]),
 1739    split_string(AskTemplate, "\n", "", [AskString, TemplateString]).
 1740
 1741prolog_option(Option0, Option) :-
 1742    create_option_type(Option0, term),
 1743    !,
 1744    Option0 =.. [Name,Value],
 1745    format(string(String), '~k', [Value]),
 1746    Option =.. [Name,String].
 1747prolog_option(Option, Option).
 1748
 1749create_option_type(ask(_),         term).
 1750create_option_type(template(_),    term).
 1751create_option_type(application(_), atom).
 1752
 1753remote_pengine_send(BaseURL, ID, Event, Options) :-
 1754    remote_send_rec(BaseURL, send, ID, [event=Event], Reply, Options),
 1755    thread_self(Queue),
 1756    pengine_reply(Queue, Reply).
 1757
 1758remote_pengine_pull_response(BaseURL, ID, Options) :-
 1759    remote_send_rec(BaseURL, pull_response, ID, [], Reply, Options),
 1760    thread_self(Queue),
 1761    pengine_reply(Queue, Reply).
 1762
 1763remote_pengine_abort(BaseURL, ID, Options) :-
 1764    remote_send_rec(BaseURL, abort, ID, [], Reply, Options),
 1765    thread_self(Queue),
 1766    pengine_reply(Queue, Reply).
 remote_send_rec(+Server, +Action, +ID, +Params, -Reply, +Options)
Issue a GET request on Server and unify Reply with the replied term.
 1773remote_send_rec(Server, Action, ID, [event=Event], Reply, Options) :-
 1774    !,
 1775    server_url(Server, Action, [id=ID], URL),
 1776    http_open(URL, Stream,              % putting this in setup_call_cleanup/3
 1777              [ post(prolog(Event))     % makes it impossible to interrupt.
 1778              | Options
 1779              ]),
 1780    call_cleanup(
 1781        read_prolog_reply(Stream, Reply),
 1782        close(Stream)).
 1783remote_send_rec(Server, Action, ID, Params, Reply, Options) :-
 1784    server_url(Server, Action, [id=ID|Params], URL),
 1785    http_open(URL, Stream, Options),
 1786    call_cleanup(
 1787        read_prolog_reply(Stream, Reply),
 1788        close(Stream)).
 1789
 1790remote_post_rec(Server, Action, Data, Reply, Options) :-
 1791    server_url(Server, Action, [], URL),
 1792    probe(Action, URL, Options),
 1793    http_open(URL, Stream,
 1794              [ post(json(Data))
 1795              | Options
 1796              ]),
 1797    call_cleanup(
 1798        read_prolog_reply(Stream, Reply),
 1799        close(Stream)).
 probe(+Action, +URL) is det
Probe the target. This is a good idea before posting a large document and be faced with an authentication challenge. Possibly we should make this an option for simpler scenarios.
 1807probe(create, URL, Options) :-
 1808    !,
 1809    http_open(URL, Stream, [method(options)|Options]),
 1810    close(Stream).
 1811probe(_, _, _).
 1812
 1813read_prolog_reply(In, Reply) :-
 1814    set_stream(In, encoding(utf8)),
 1815    read(In, Reply0),
 1816    rebind_cycles(Reply0, Reply).
 1817
 1818rebind_cycles(@(Reply, Bindings), Reply) :-
 1819    is_list(Bindings),
 1820    !,
 1821    maplist(bind, Bindings).
 1822rebind_cycles(Reply, Reply).
 1823
 1824bind(Var = Value) :-
 1825    Var = Value.
 1826
 1827server_url(Server, Action, Params, URL) :-
 1828    atom_concat('pengine/', Action, PAction),
 1829    uri_edit([ path(PAction),
 1830               search(Params)
 1831             ], Server, URL).
 pengine_event(?EventTerm) is det
 pengine_event(?EventTerm, +Options) is det
Examines the pengine's event queue and if necessary blocks execution until a term that unifies to Term arrives in the queue. After a term from the queue has been unified to Term, the term is deleted from the queue.

Valid options are:

timeout(+Time)
Time is a float or integer and specifies the maximum time to wait in seconds. If no event has arrived before the time is up EventTerm is bound to the atom timeout.
listen(+Id)
Only listen to events from the pengine identified by Id. */
 1852pengine_event(Event) :-
 1853    pengine_event(Event, []).
 1854
 1855pengine_event(Event, Options) :-
 1856    thread_self(Self),
 1857    option(listen(Id), Options, _),
 1858    (   thread_get_message(Self, pengine_event(Id, Event), Options)
 1859    ->  true
 1860    ;   Event = timeout
 1861    ),
 1862    update_remote_destroy(Event).
 1863
 1864update_remote_destroy(Event) :-
 1865    destroy_event(Event),
 1866    arg(1, Event, Id),
 1867    pengine_remote(Id, _Server),
 1868    !,
 1869    pengine_unregister_remote(Id).
 1870update_remote_destroy(_).
 1871
 1872destroy_event(destroy(_)).
 1873destroy_event(destroy(_,_)).
 1874destroy_event(create(_,Features)) :-
 1875    memberchk(answer(Answer), Features),
 1876    !,
 1877    nonvar(Answer),
 1878    destroy_event(Answer).
 pengine_event_loop(:Closure, +Options) is det
Starts an event loop accepting event terms sent to the current pengine or thread. For each such event E, calls ignore(call(Closure, E)). A closure thus acts as a handler for the event. Some events are also treated specially:
create(ID, Term)
The ID is placed in a list of active pengines.
destroy(ID)
The ID is removed from the list of active pengines. When the last pengine ID is removed, the loop terminates.
output(ID, Term)
The predicate pengine_pull_response/2 is called.

Valid options are:

autoforward(+To)
Forwards received event terms to slaves. To is either all, all_but_sender or a Prolog list of NameOrIDs. [not yet implemented]

*/

 1907pengine_event_loop(Closure, Options) :-
 1908    child(_,_),
 1909    !,
 1910    pengine_event(Event),
 1911    (   option(autoforward(all), Options) % TODO: Implement all_but_sender and list of IDs
 1912    ->  forall(child(_,ID), pengine_send(ID, Event))
 1913    ;   true
 1914    ),
 1915    pengine_event_loop(Event, Closure, Options).
 1916pengine_event_loop(_, _).
 1917
 1918:- meta_predicate
 1919    pengine_process_event(+, 1, -, +). 1920
 1921pengine_event_loop(Event, Closure, Options) :-
 1922    pengine_process_event(Event, Closure, Continue, Options),
 1923    (   Continue == true
 1924    ->  pengine_event_loop(Closure, Options)
 1925    ;   true
 1926    ).
 1927
 1928pengine_process_event(create(ID, T), Closure, Continue, Options) :-
 1929    debug(pengine(transition), '~q: 1 = /~q => 2', [ID, create(T)]),
 1930    (   select(answer(First), T, T1)
 1931    ->  ignore(call(Closure, create(ID, T1))),
 1932        pengine_process_event(First, Closure, Continue, Options)
 1933    ;   ignore(call(Closure, create(ID, T))),
 1934        Continue = true
 1935    ).
 1936pengine_process_event(output(ID, Msg), Closure, true, _Options) :-
 1937    debug(pengine(transition), '~q: 3 = /~q => 4', [ID, output(Msg)]),
 1938    ignore(call(Closure, output(ID, Msg))),
 1939    pengine_pull_response(ID, []).
 1940pengine_process_event(debug(ID, Msg), Closure, true, _Options) :-
 1941    debug(pengine(transition), '~q: 3 = /~q => 4', [ID, debug(Msg)]),
 1942    ignore(call(Closure, debug(ID, Msg))),
 1943    pengine_pull_response(ID, []).
 1944pengine_process_event(prompt(ID, Term), Closure, true, _Options) :-
 1945    debug(pengine(transition), '~q: 3 = /~q => 5', [ID, prompt(Term)]),
 1946    ignore(call(Closure, prompt(ID, Term))).
 1947pengine_process_event(success(ID, Sol, _Proj, _Time, More), Closure, true, _) :-
 1948    debug(pengine(transition), '~q: 3 = /~q => 6/2', [ID, success(Sol, More)]),
 1949    ignore(call(Closure, success(ID, Sol, More))).
 1950pengine_process_event(failure(ID, _Time), Closure, true, _Options) :-
 1951    debug(pengine(transition), '~q: 3 = /~q => 2', [ID, failure]),
 1952    ignore(call(Closure, failure(ID))).
 1953pengine_process_event(error(ID, Error), Closure, Continue, _Options) :-
 1954    debug(pengine(transition), '~q: 3 = /~q => 2', [ID, error(Error)]),
 1955    (   call(Closure, error(ID, Error))
 1956    ->  Continue = true
 1957    ;   forall(child(_,Child), pengine_destroy(Child)),
 1958        throw(Error)
 1959    ).
 1960pengine_process_event(stop(ID), Closure, true, _Options) :-
 1961    debug(pengine(transition), '~q: 7 = /~q => 2', [ID, stop]),
 1962    ignore(call(Closure, stop(ID))).
 1963pengine_process_event(destroy(ID, Event), Closure, Continue, Options) :-
 1964    pengine_process_event(Event, Closure, _, Options),
 1965    pengine_process_event(destroy(ID), Closure, Continue, Options).
 1966pengine_process_event(destroy(ID), Closure, true, _Options) :-
 1967    retractall(child(_,ID)),
 1968    debug(pengine(transition), '~q: 1 = /~q => 0', [ID, destroy]),
 1969    ignore(call(Closure, destroy(ID))).
 pengine_rpc(+URL, +Query) is nondet
 pengine_rpc(+URL, +Query, +Options) is nondet
Semantically equivalent to the sequence below, except that the query is executed in (and in the Prolog context of) the pengine server referred to by URL, rather than locally.
  copy_term_nat(Query, Copy),  % attributes are not copied to the server
  call(Copy),			 % executed on server at URL
  Query = Copy.

Valid options are:

chunk(+IntegerOrFalse)
Can be used to reduce the number of network roundtrips being made. See pengine_ask/3.
timeout(+Time)
Wait at most Time seconds for the next event from the server. The default is defined by the setting pengines:time_limit.

Remaining options (except the server option) are passed to pengine_create/1. */

 1998pengine_rpc(URL, Query) :-
 1999    pengine_rpc(URL, Query, []).
 2000
 2001pengine_rpc(URL, Query, M:Options0) :-
 2002    translate_local_sources(Options0, Options1, M),
 2003    (  option(timeout(_), Options1)
 2004    -> Options = Options1
 2005    ;  setting(time_limit, Limit),
 2006       Options = [timeout(Limit)|Options1]
 2007    ),
 2008    term_variables(Query, Vars),
 2009    Template =.. [v|Vars],
 2010    State = destroy(true),              % modified by process_event/4
 2011    setup_call_catcher_cleanup(
 2012        pengine_create([ ask(Query),
 2013                         template(Template),
 2014                         server(URL),
 2015                         id(Id)
 2016                       | Options
 2017                       ]),
 2018        wait_event(Template, State, [listen(Id)|Options]),
 2019        Why,
 2020        pengine_destroy_and_wait(State, Id, Why, Options)).
 2021
 2022pengine_destroy_and_wait(destroy(true), Id, Why, Options) :-
 2023    !,
 2024    debug(pengine(rpc), 'Destroying RPC because of ~p', [Why]),
 2025    pengine_destroy(Id, Options),
 2026    wait_destroy(Id, 10).
 2027pengine_destroy_and_wait(_, _, Why, _) :-
 2028    debug(pengine(rpc), 'Not destroying RPC (~p)', [Why]).
 2029
 2030wait_destroy(Id, _) :-
 2031    \+ child(_, Id),
 2032    !.
 2033wait_destroy(Id, N) :-
 2034    pengine_event(Event, [listen(Id),timeout(10)]),
 2035    !,
 2036    (   destroy_event(Event)
 2037    ->  retractall(child(_,Id))
 2038    ;   succ(N1, N)
 2039    ->  wait_destroy(Id, N1)
 2040    ;   debug(pengine(rpc), 'RPC did not answer to destroy ~p', [Id]),
 2041        pengine_unregister_remote(Id),
 2042        retractall(child(_,Id))
 2043    ).
 2044
 2045wait_event(Template, State, Options) :-
 2046    pengine_event(Event, Options),
 2047    debug(pengine(event), 'Received ~p', [Event]),
 2048    process_event(Event, Template, State, Options).
 2049
 2050process_event(create(_ID, Features), Template, State, Options) :-
 2051    memberchk(answer(First), Features),
 2052    process_event(First, Template, State, Options).
 2053process_event(error(_ID, Error), _Template, _, _Options) :-
 2054    throw(Error).
 2055process_event(failure(_ID, _Time), _Template, _, _Options) :-
 2056    fail.
 2057process_event(prompt(ID, Prompt), Template, State, Options) :-
 2058    pengine_rpc_prompt(ID, Prompt, Reply),
 2059    pengine_send(ID, input(Reply)),
 2060    wait_event(Template, State, Options).
 2061process_event(output(ID, Term), Template, State, Options) :-
 2062    pengine_rpc_output(ID, Term),
 2063    pengine_pull_response(ID, Options),
 2064    wait_event(Template, State, Options).
 2065process_event(debug(ID, Message), Template, State, Options) :-
 2066    debug(pengine(debug), '~w', [Message]),
 2067    pengine_pull_response(ID, Options),
 2068    wait_event(Template, State, Options).
 2069process_event(success(_ID, Solutions, _Proj, _Time, false),
 2070              Template, _, _Options) :-
 2071    !,
 2072    member(Template, Solutions).
 2073process_event(success(ID, Solutions, _Proj, _Time, true),
 2074              Template, State, Options) :-
 2075    (   member(Template, Solutions)
 2076    ;   pengine_next(ID, Options),
 2077        wait_event(Template, State, Options)
 2078    ).
 2079process_event(destroy(ID, Event), Template, State, Options) :-
 2080    !,
 2081    retractall(child(_,ID)),
 2082    nb_setarg(1, State, false),
 2083    debug(pengine(destroy), 'State: ~p~n', [State]),
 2084    process_event(Event, Template, State, Options).
 2085% compatibility with older versions of the protocol.
 2086process_event(success(ID, Solutions, Time, More),
 2087              Template, State, Options) :-
 2088    process_event(success(ID, Solutions, _Proj, Time, More),
 2089                  Template, State, Options).
 2090
 2091
 2092pengine_rpc_prompt(ID, Prompt, Term) :-
 2093    prompt(ID, Prompt, Term0),
 2094    !,
 2095    Term = Term0.
 2096pengine_rpc_prompt(_ID, Prompt, Term) :-
 2097    setup_call_cleanup(
 2098        prompt(Old, Prompt),
 2099        read(Term),
 2100        prompt(_, Old)).
 2101
 2102pengine_rpc_output(ID, Term) :-
 2103    output(ID, Term),
 2104    !.
 2105pengine_rpc_output(_ID, Term) :-
 2106    print(Term).
 prompt(+ID, +Prompt, -Term) is semidet
Hook to handle pengine_input/2 from the remote pengine. If the hooks fails, pengine_rpc/3 calls read/1 using the current prompt.
 2113:- multifile prompt/3.
 output(+ID, +Term) is semidet
Hook to handle pengine_output/1 from the remote pengine. If the hook fails, it calls print/1 on Term.
 2120:- multifile output/2. 2121
 2122
 2123/*================= HTTP handlers =======================
 2124*/
 2125
 2126%   Declare  HTTP  locations  we  serve  and   how.  Note  that  we  use
 2127%   time_limit(inifinite) because pengines have their  own timeout. Also
 2128%   note that we use spawn. This  is   needed  because we can easily get
 2129%   many clients waiting for  some  action   on  a  pengine to complete.
 2130%   Without spawning, we would quickly exhaust   the  worker pool of the
 2131%   HTTP server.
 2132%
 2133%   FIXME: probably we should wait for a   short time for the pengine on
 2134%   the default worker thread. Only if  that   time  has expired, we can
 2135%   call http_spawn/2 to continue waiting on   a  new thread. That would
 2136%   improve the performance and reduce the usage of threads.
 2137
 2138:- http_handler(root(pengine),               http_404([]),
 2139                [ id(pengines) ]). 2140:- http_handler(root(pengine/create),        http_pengine_create,
 2141                [ time_limit(infinite), spawn([]) ]). 2142:- http_handler(root(pengine/send),          http_pengine_send,
 2143                [ time_limit(infinite), spawn([]) ]). 2144:- http_handler(root(pengine/pull_response), http_pengine_pull_response,
 2145                [ time_limit(infinite), spawn([]) ]). 2146:- http_handler(root(pengine/abort),         http_pengine_abort,         []). 2147:- http_handler(root(pengine/detach),        http_pengine_detach,        []). 2148:- http_handler(root(pengine/list),          http_pengine_list,          []). 2149:- http_handler(root(pengine/ping),          http_pengine_ping,          []). 2150:- http_handler(root(pengine/destroy_all),   http_pengine_destroy_all,   []). 2151
 2152:- http_handler(root(pengine/'pengines.js'),
 2153                http_reply_file(library('http/web/js/pengines.js'), []), []). 2154:- http_handler(root(pengine/'plterm.css'),
 2155                http_reply_file(library('http/web/css/plterm.css'), []), []).
 http_pengine_create(+Request)
HTTP POST handler for =/pengine/create=. This API accepts the pengine creation parameters both as application/json and as www-form-encoded. Accepted parameters:
ParameterDefaultComment
formatprologOutput format
applicationpengine_sandboxPengine application
chunk1Chunk-size for results
collate0 (off)Join output events
solutionschunkedIf all, emit all results
ask-The query
template-Output template
src_text""Program
src_url-Program to download
disposition-Download location

Note that solutions=all internally uses chunking to obtain the results from the pengine, but the results are combined in a single HTTP reply. This is currently only implemented by the CSV backend that is part of SWISH for downloading unbounded result sets with limited memory resources.

Using chunk=false simulates the recursive toplevel. See pengine_ask/3.

 2186http_pengine_create(Request) :-
 2187    reply_options(Request, [post]),
 2188    !.
 2189http_pengine_create(Request) :-
 2190    memberchk(content_type(CT), Request),
 2191    sub_atom(CT, 0, _, _, 'application/json'),
 2192    !,
 2193    http_read_json_dict(Request, Dict),
 2194    dict_atom_option(format, Dict, Format, prolog),
 2195    dict_atom_option(application, Dict, Application, pengine_sandbox),
 2196    http_pengine_create(Request, Application, Format, Dict).
 2197http_pengine_create(Request) :-
 2198    Optional = [optional(true)],
 2199    OptString = [string|Optional],
 2200    Form = [ format(Format, [default(prolog)]),
 2201             application(Application, [default(pengine_sandbox)]),
 2202             chunk(_, [nonneg;oneof([false]), default(1)]),
 2203             collate(_, [number, default(0)]),
 2204             solutions(_, [oneof([all,chunked]), default(chunked)]),
 2205             ask(_, OptString),
 2206             template(_, OptString),
 2207             src_text(_, OptString),
 2208             disposition(_, OptString),
 2209             src_url(_, Optional)
 2210           ],
 2211    http_parameters(Request, Form),
 2212    form_dict(Form, Dict),
 2213    http_pengine_create(Request, Application, Format, Dict).
 2214
 2215dict_atom_option(Key, Dict, Atom, Default) :-
 2216    (   get_dict(Key, Dict, String)
 2217    ->  atom_string(Atom, String)
 2218    ;   Atom = Default
 2219    ).
 2220
 2221form_dict(Form, Dict) :-
 2222    form_values(Form, Pairs),
 2223    dict_pairs(Dict, _, Pairs).
 2224
 2225form_values([], []).
 2226form_values([H|T], Pairs) :-
 2227    arg(1, H, Value),
 2228    nonvar(Value),
 2229    !,
 2230    functor(H, Name, _),
 2231    Pairs = [Name-Value|PairsT],
 2232    form_values(T, PairsT).
 2233form_values([_|T], Pairs) :-
 2234    form_values(T, Pairs).
 http_pengine_create(+Request, +Application, +Format, +OptionsDict)
 2239http_pengine_create(Request, Application, Format, Dict) :-
 2240    current_application(Application),
 2241    !,
 2242    allowed(Request, Application),
 2243    authenticate(Request, Application, UserOptions),
 2244    dict_to_options(Dict, Application, CreateOptions0),
 2245    append(UserOptions, CreateOptions0, CreateOptions),
 2246    pengine_uuid(Pengine),
 2247    message_queue_create(Queue, [max_size(25)]),
 2248    setting(Application:time_limit, TimeLimit),
 2249    get_time(Now),
 2250    asserta(pengine_queue(Pengine, Queue, TimeLimit, Now)),
 2251    broadcast(pengine(create(Pengine, Application, CreateOptions))),
 2252    create(Queue, Pengine, CreateOptions, http, Application),
 2253    create_wait_and_output_result(Pengine, Queue, Format,
 2254                                  TimeLimit, Dict),
 2255    gc_abandoned_queues.
 2256http_pengine_create(_Request, Application, Format, _Dict) :-
 2257    Error = existence_error(pengine_application, Application),
 2258    pengine_uuid(ID),
 2259    output_result(ID, Format, error(ID, error(Error, _))).
 2260
 2261
 2262dict_to_options(Dict, Application, CreateOptions) :-
 2263    dict_pairs(Dict, _, Pairs),
 2264    pairs_create_options(Pairs, Application, CreateOptions).
 2265
 2266pairs_create_options([], _, []) :- !.
 2267pairs_create_options([N-V0|T0], App, [Opt|T]) :-
 2268    Opt =.. [N,V],
 2269    pengine_create_option(Opt), N \== user,
 2270    !,
 2271    (   create_option_type(Opt, atom)
 2272    ->  atom_string(V, V0)               % term creation must be done if
 2273    ;   V = V0                           % we created the source and know
 2274    ),                                   % the operators.
 2275    pairs_create_options(T0, App, T).
 2276pairs_create_options([_|T0], App, T) :-
 2277    pairs_create_options(T0, App, T).
 wait_and_output_result(+Pengine, +Queue, +Format, +TimeLimit, +Collate) is det
Wait for the Pengine's Queue and if there is a message, send it to the requester using output_result/1. If Pengine does not answer within the time specified by the setting time_limit, Pengine is aborted and the result is error(time_limit_exceeded, _).
 2288wait_and_output_result(Pengine, Queue, Format, TimeLimit, Collate0) :-
 2289    Collate is min(Collate0, TimeLimit/10),
 2290    get_time(Epoch),
 2291    (   catch(thread_get_message(Queue, pengine_event(_, Event),
 2292                                 [ timeout(TimeLimit)
 2293                                 ]),
 2294              Error, true)
 2295    ->  (   var(Error)
 2296        ->  debug(pengine(wait), 'Got ~q from ~q', [Event, Queue]),
 2297            (   collating_event(Collate, Event)
 2298            ->  Deadline is Epoch+TimeLimit,
 2299                collect_events(Pengine, Collate, Queue, Deadline, 100, More),
 2300                Events = [Event|More],
 2301                ignore(destroy_queue_from_http(Pengine, Events, Queue)),
 2302                protect_pengine(Pengine, output_result(Pengine, Format, Events))
 2303            ;   ignore(destroy_queue_from_http(Pengine, Event, Queue)),
 2304                protect_pengine(Pengine, output_result(Pengine, Format, Event))
 2305            )
 2306        ;   output_result(Pengine, Format, died(Pengine))
 2307        )
 2308    ;   time_limit_exceeded(Pengine, Format)
 2309    ).
 collect_events(+Pengine, +CollateTime, +Queue, +Deadline, +Max, -Events)
Collect more events as long as they are not separated by more than CollateTime seconds and collect at most Max.
 2316collect_events(_Pengine, _Collate, _Queue, _Deadline, 0, []) :-
 2317    !.
 2318collect_events(Pengine, Collate, Queue, Deadline, Max, Events) :-
 2319    debug(pengine(wait), 'Waiting to collate events', []),
 2320    (   catch(thread_get_message(Queue, pengine_event(_, Event),
 2321                                 [ timeout(Collate)
 2322                                 ]),
 2323              Error, true)
 2324    ->  (   var(Error)
 2325        ->  debug(pengine(wait), 'Got ~q from ~q', [Event, Queue]),
 2326            Events = [Event|More],
 2327            (   collating_event(Collate, Event)
 2328            ->  Max2 is Max - 1,
 2329                collect_events(Pengine, Collate, Queue, Deadline, Max2, More)
 2330            ;   More = []
 2331            )
 2332        ;   Events = [died(Pengine)]
 2333        )
 2334    ;   get_time(Now),
 2335        Now > Deadline
 2336    ->  time_limit_event(Pengine, TimeLimitEvent),
 2337        Events = [TimeLimitEvent]
 2338    ;   Events = []
 2339    ).
 2340
 2341collating_event(0, _) :-
 2342    !,
 2343    fail.
 2344collating_event(_, output(_,_)).
 create_wait_and_output_result(+Pengine, +Queue, +Format, +TimeLimit, +Dict) is det
Intercepts the `solutions=all' case used for downloading results. Dict may contain a disposition key to denote the download location.
 2353create_wait_and_output_result(Pengine, Queue, Format, TimeLimit, Dict) :-
 2354    get_dict(solutions, Dict, all),
 2355    !,
 2356    between(1, infinite, Page),
 2357    (   catch(thread_get_message(Queue, pengine_event(_, Event),
 2358                                 [ timeout(TimeLimit)
 2359                                 ]),
 2360              Error, true)
 2361    ->  (   var(Error)
 2362        ->  debug(pengine(wait), 'Page ~D: got ~q from ~q', [Page, Event, Queue]),
 2363            (   destroy_queue_from_http(Pengine, Event, Queue)
 2364            ->  !,
 2365                protect_pengine(Pengine,
 2366                                output_result_2(Format, page(Page, Event), Dict))
 2367            ;   is_more_event(Event)
 2368            ->  pengine_thread(Pengine, Thread),
 2369                thread_send_message(Thread, pengine_request(next)),
 2370                protect_pengine(Pengine,
 2371                                output_result_2(Format, page(Page, Event), Dict)),
 2372                fail
 2373            ;   !,
 2374                protect_pengine(Pengine,
 2375                                output_result_2(Format, page(Page, Event), Dict))
 2376            )
 2377        ;   !, output_result(Pengine, Format, died(Pengine))
 2378        )
 2379    ;   !, time_limit_exceeded(Pengine, Format)
 2380    ),
 2381    !.
 2382create_wait_and_output_result(Pengine, Queue, Format, TimeLimit, Dict) :-
 2383    wait_and_output_result(Pengine, Queue, Format, TimeLimit, Dict.get(collate,0)).
 2384
 2385is_more_event(success(_Id, _Answers, _Projection, _Time, true)).
 2386is_more_event(create(_, Options)) :-
 2387    memberchk(answer(Event), Options),
 2388    is_more_event(Event).
 time_limit_exceeded(+Pengine, +Format)
The Pengine did not reply within its time limit. Send a reply to the client in the requested format and interrupt the Pengine.
bug
- Ideally, if the Pengine has destroy set to false, we should get the Pengine back to its main loop. Unfortunately we only have normal exceptions that may be caught by the Pengine and abort which cannot be caught and thus destroys the Pengine.
 2402time_limit_exceeded(Pengine, Format) :-
 2403    time_limit_event(Pengine, Event),
 2404    call_cleanup(
 2405        pengine_destroy(Pengine, [force(true)]),
 2406        output_result(Pengine, Format, Event)).
 2407
 2408time_limit_event(Pengine,
 2409                 destroy(Pengine, error(Pengine, time_limit_exceeded))).
 2410
 2411destroy_pengine_after_output(Pengine, Events) :-
 2412    is_list(Events),
 2413    last(Events, Last),
 2414    time_limit_event(Pengine,  Last),
 2415    !,
 2416    catch(ignore(pengine_destroy(Pengine, [force(true)])), error(_,_), true).
 2417destroy_pengine_after_output(_, _).
 destroy_queue_from_http(+Pengine, +Event, +Queue) is semidet
Consider destroying the output queue for Pengine after sending Event back to the HTTP client. We can destroy the queue if
To be done
- If the client did not request all output, the queue will not be destroyed. We need some timeout and GC for that.
 2432destroy_queue_from_http(ID, _, Queue) :-
 2433    output_queue(ID, Queue, _),
 2434    !,
 2435    destroy_queue_if_empty(Queue).
 2436destroy_queue_from_http(ID, Event, Queue) :-
 2437    debug(pengine(destroy), 'DESTROY? ~p', [Event]),
 2438    is_destroy_event(Event),
 2439    !,
 2440    message_queue_property(Queue, size(Waiting)),
 2441    debug(pengine(destroy), 'Destroy ~p (waiting ~D)', [Queue, Waiting]),
 2442    with_mutex(pengine, sync_destroy_queue_from_http(ID, Queue)).
 2443
 2444is_destroy_event(destroy(_)).
 2445is_destroy_event(destroy(_,_)).
 2446is_destroy_event(create(_, Options)) :-
 2447    memberchk(answer(Event), Options),
 2448    is_destroy_event(Event).
 2449
 2450destroy_queue_if_empty(Queue) :-
 2451    thread_peek_message(Queue, _),
 2452    !.
 2453destroy_queue_if_empty(Queue) :-
 2454    retractall(output_queue(_, Queue, _)),
 2455    message_queue_destroy(Queue).
 gc_abandoned_queues
Check whether there are queues that have been abadoned. This happens if the stream contains output events and not all of them are read by the client.
 2463:- dynamic
 2464    last_gc/1. 2465
 2466gc_abandoned_queues :-
 2467    consider_queue_gc,
 2468    !,
 2469    get_time(Now),
 2470    (   output_queue(_, Queue, Time),
 2471        Now-Time > 15*60,
 2472        retract(output_queue(_, Queue, Time)),
 2473        message_queue_destroy(Queue),
 2474        fail
 2475    ;   retractall(last_gc(_)),
 2476        asserta(last_gc(Now))
 2477    ).
 2478gc_abandoned_queues.
 2479
 2480consider_queue_gc :-
 2481    predicate_property(output_queue(_,_,_), number_of_clauses(N)),
 2482    N > 100,
 2483    (   last_gc(Time),
 2484        get_time(Now),
 2485        Now-Time > 5*60
 2486    ->  true
 2487    ;   \+ last_gc(_)
 2488    ).
 sync_destroy_queue_from_http(+Pengine, +Queue) is det
 sync_delay_destroy_queue(+Pengine, +Queue) is det
Handle destruction of the message queue connecting the HTTP side to the pengine. We cannot delete the queue when the pengine dies because the queue may contain output events. Termination of the pengine and finishing the HTTP exchange may happen in both orders. This means we need handle this using synchronization.
sync_destroy_queue_from_pengine(+Pengine, +Queue)
Called (indirectly) from pengine_done/1 if the pengine's thread dies.
sync_destroy_queue_from_http(+Pengine, +Queue)
Called from destroy_queue/3, from wait_and_output_result/5, i.e., from the HTTP side.
 2506:- dynamic output_queue_destroyed/1. 2507
 2508sync_destroy_queue_from_http(ID, Queue) :-
 2509    (   output_queue(ID, Queue, _)
 2510    ->  destroy_queue_if_empty(Queue)
 2511    ;   thread_peek_message(Queue, pengine_event(_, output(_,_)))
 2512    ->  debug(pengine(destroy), 'Delay destruction of ~p because of output',
 2513              [Queue]),
 2514        get_time(Now),
 2515        asserta(output_queue(ID, Queue, Now))
 2516    ;   message_queue_destroy(Queue),
 2517        asserta(output_queue_destroyed(Queue))
 2518    ).
 sync_destroy_queue_from_pengine(+Pengine, +Queue)
Called from pengine_unregister/1 when the pengine thread terminates. It is called while the mutex pengine held.
 2525sync_destroy_queue_from_pengine(ID, Queue) :-
 2526    (   retract(output_queue_destroyed(Queue))
 2527    ->  true
 2528    ;   get_time(Now),
 2529        asserta(output_queue(ID, Queue, Now))
 2530    ),
 2531    retractall(pengine_queue(ID, Queue, _, _)).
 2532
 2533
 2534http_pengine_send(Request) :-
 2535    reply_options(Request, [get,post]),
 2536    !.
 2537http_pengine_send(Request) :-
 2538    http_parameters(Request,
 2539                    [ id(ID, [ type(atom) ]),
 2540                      event(EventString, [optional(true)]),
 2541                      collate(Collate, [number, default(0)]),
 2542                      format(Format, [default(prolog)])
 2543                    ]),
 2544    catch(read_event(ID, Request, Format, EventString, Event),
 2545          Error,
 2546          true),
 2547    (   var(Error)
 2548    ->  debug(pengine(event), 'HTTP send: ~p', [Event]),
 2549        (   pengine_thread(ID, Thread)
 2550        ->  pengine_queue(ID, Queue, TimeLimit, _),
 2551            random_delay,
 2552            broadcast(pengine(send(ID, Event))),
 2553            thread_send_message(Thread, pengine_request(Event)),
 2554            wait_and_output_result(ID, Queue, Format, TimeLimit, Collate)
 2555        ;   atom(ID)
 2556        ->  pengine_died(Format, ID)
 2557        ;   http_404([], Request)
 2558        )
 2559    ;   Error = error(existence_error(pengine, ID), _)
 2560    ->  pengine_died(Format, ID)
 2561    ;   output_result(ID, Format, error(ID, Error))
 2562    ).
 2563
 2564pengine_died(Format, Pengine) :-
 2565    output_result(Pengine, Format,
 2566                  error(Pengine, error(existence_error(pengine, Pengine),_))).
 read_event(+Pengine, +Request, +Format, +EventString, -Event) is det
Read an event on behalve of Pengine. Note that the pengine's module should not be deleted while we are reading using its syntax (module). This is ensured using the pengine_done mutex.
See also
- pengine_done/0.
 2577read_event(Pengine, Request, Format, EventString, Event) :-
 2578    protect_pengine(
 2579        Pengine,
 2580        ( get_pengine_module(Pengine, Module),
 2581          read_event_2(Request, EventString, Module, Event0, Bindings)
 2582        )),
 2583    !,
 2584    fix_bindings(Format, Event0, Bindings, Event).
 2585read_event(Pengine, Request, _Format, _EventString, _Event) :-
 2586    debug(pengine(event), 'Pengine ~q vanished', [Pengine]),
 2587    discard_post_data(Request),
 2588    existence_error(pengine, Pengine).
 read_event_(+Request, +EventString, +Module, -Event, -Bindings)
Read the sent event. The event is a Prolog term that is either in the event parameter or as a posted document.
 2596read_event_2(_Request, EventString, Module, Event, Bindings) :-
 2597    nonvar(EventString),
 2598    !,
 2599    term_string(Event, EventString,
 2600                [ variable_names(Bindings),
 2601                  module(Module)
 2602                ]).
 2603read_event_2(Request, _EventString, Module, Event, Bindings) :-
 2604    option(method(post), Request),
 2605    http_read_data(Request,     Event,
 2606                   [ content_type('application/x-prolog'),
 2607                     module(Module),
 2608                     variable_names(Bindings)
 2609                   ]).
 discard_post_data(+Request) is det
If this is a POST request, discard the posted data.
 2615discard_post_data(Request) :-
 2616    option(method(post), Request),
 2617    !,
 2618    setup_call_cleanup(
 2619        open_null_stream(NULL),
 2620        http_read_data(Request, _, [to(stream(NULL))]),
 2621        close(NULL)).
 2622discard_post_data(_).
 fix_bindings(+Format, +EventIn, +Bindings, -Event) is det
Generate the template for json(-s) Format from the variables in the asked Goal. Variables starting with an underscore, followed by an capital letter are ignored from the template.
 2630fix_bindings(Format,
 2631             ask(Goal, Options0), Bindings,
 2632             ask(Goal, NewOptions)) :-
 2633    json_lang(Format),
 2634    !,
 2635    exclude(anon, Bindings, NamedBindings),
 2636    template(NamedBindings, Template, Options0, Options1),
 2637    select_option(chunk(Paging), Options1, Options2, 1),
 2638    NewOptions = [ template(Template),
 2639                   chunk(Paging),
 2640                   bindings(NamedBindings)
 2641                 | Options2
 2642                 ].
 2643fix_bindings(_, Command, _, Command).
 2644
 2645template(_, Template, Options0, Options) :-
 2646    select_option(template(Template), Options0, Options),
 2647    !.
 2648template(Bindings, Template, Options, Options) :-
 2649    dict_create(Template, swish_default_template, Bindings).
 2650
 2651anon(Name=_) :-
 2652    sub_atom(Name, 0, _, _, '_'),
 2653    sub_atom(Name, 1, 1, _, Next),
 2654    char_type(Next, prolog_var_start).
 2655
 2656var_name(Name=_, Name).
 json_lang(+Format) is semidet
True if Format is a JSON variation.
 2663json_lang(json) :- !.
 2664json_lang(Format) :-
 2665    sub_atom(Format, 0, _, _, 'json-').
 http_pengine_pull_response(+Request)
HTTP handler for /pengine/pull_response. Pulls possible pending messages from the pengine.
 2672http_pengine_pull_response(Request) :-
 2673    reply_options(Request, [get]),
 2674    !.
 2675http_pengine_pull_response(Request) :-
 2676    http_parameters(Request,
 2677            [   id(ID, []),
 2678                format(Format, [default(prolog)]),
 2679                collate(Collate, [number, default(0)])
 2680            ]),
 2681    reattach(ID),
 2682    (   (   pengine_queue(ID, Queue, TimeLimit, _)
 2683        ->  true
 2684        ;   output_queue(ID, Queue, _),
 2685            TimeLimit = 0
 2686        )
 2687    ->  wait_and_output_result(ID, Queue, Format, TimeLimit, Collate)
 2688    ;   http_404([], Request)
 2689    ).
 http_pengine_abort(+Request)
HTTP handler for /pengine/abort. Note that abort may be sent at any time and the reply may be handled by a pull_response. In that case, our pengine has already died before we get to wait_and_output_result/5.
 2698http_pengine_abort(Request) :-
 2699    reply_options(Request, [get,post]),
 2700    !.
 2701http_pengine_abort(Request) :-
 2702    http_parameters(Request,
 2703            [   id(ID, [])
 2704            ]),
 2705    (   pengine_thread(ID, _Thread)
 2706    ->  broadcast(pengine(abort(ID))),
 2707        abort_pending_output(ID),
 2708        pengine_abort(ID),
 2709        reply_json_dict(true)
 2710    ;   http_404([], Request)
 2711    ).
 http_pengine_detach(+Request)
Detach a Pengine while keeping it running. This has the following consequences:
 2723http_pengine_detach(Request) :-
 2724    reply_options(Request, [post]),
 2725    !.
 2726http_pengine_detach(Request) :-
 2727    http_parameters(Request,
 2728                    [ id(ID, [])
 2729                    ]),
 2730    http_read_json_dict(Request, ClientData),
 2731    (   pengine_property(ID, application(Application)),
 2732        allowed(Request, Application),
 2733        authenticate(Request, Application, _UserOptions)
 2734    ->  broadcast(pengine(detach(ID))),
 2735        get_time(Now),
 2736        assertz(pengine_detached(ID, ClientData.put(time, Now))),
 2737        pengine_queue(ID, Queue, _TimeLimit, _Now),
 2738        message_queue_set(Queue, max_size(1000)),
 2739        pengine_reply(Queue, detached(ID)),
 2740        reply_json_dict(true)
 2741    ;   http_404([], Request)
 2742    ).
 2743
 2744reattach(ID) :-
 2745    (   retract(pengine_detached(ID, _Data)),
 2746        pengine_queue(ID, Queue, _TimeLimit, _Now)
 2747    ->  message_queue_set(Queue, max_size(25))
 2748    ;   true
 2749    ).
 http_pengine_destroy_all(+Request)
Destroy a list of pengines. Normally called by pengines.js if the browser window is closed.
 2757http_pengine_destroy_all(Request) :-
 2758    reply_options(Request, [get,post]),
 2759    !.
 2760http_pengine_destroy_all(Request) :-
 2761    http_parameters(Request,
 2762                    [ ids(IDsAtom, [])
 2763                    ]),
 2764    atomic_list_concat(IDs, ',', IDsAtom),
 2765    forall(( member(ID, IDs),
 2766             \+ pengine_detached(ID, _)
 2767           ),
 2768           pengine_destroy(ID, [force(true)])),
 2769    reply_json_dict("ok").
 http_pengine_ping(+Request)
HTTP handler for /pengine/ping. If the requested Pengine is alive and event status(Pengine, Stats) is created, where Stats is the return of thread_statistics/2.
 2777http_pengine_ping(Request) :-
 2778    reply_options(Request, [get]),
 2779    !.
 2780http_pengine_ping(Request) :-
 2781    http_parameters(Request,
 2782                    [ id(Pengine, []),
 2783                      format(Format, [default(prolog)])
 2784                    ]),
 2785    (   pengine_thread(Pengine, Thread),
 2786        Error = error(_,_),
 2787        catch(thread_statistics(Thread, Stats), Error, fail)
 2788    ->  output_result(Pengine, Format, ping(Pengine, Stats))
 2789    ;   output_result(Pengine, Format, died(Pengine))
 2790    ).
 http_pengine_list(+Request)
HTTP handler for `/pengine/list`, providing information about running Pengines.
To be done
- Only list detached Pengines associated to the logged in user.
 2799http_pengine_list(Request) :-
 2800    reply_options(Request, [get]),
 2801    !.
 2802http_pengine_list(Request) :-
 2803    http_parameters(Request,
 2804                    [ status(Status, [default(detached), oneof([detached])]),
 2805                      application(Application, [default(pengine_sandbox)])
 2806                    ]),
 2807    allowed(Request, Application),
 2808    authenticate(Request, Application, _UserOptions),
 2809    findall(Term, listed_pengine(Application, Status, Term), Terms),
 2810    reply_json_dict(json{pengines: Terms}).
 2811
 2812listed_pengine(Application, detached, State) :-
 2813    State = pengine{id:Id,
 2814                    detached:Time,
 2815                    queued:Queued,
 2816                    stats:Stats},
 2817
 2818    pengine_property(Id, application(Application)),
 2819    pengine_property(Id, detached(Time)),
 2820    pengine_queue(Id, Queue, _TimeLimit, _Now),
 2821    message_queue_property(Queue, size(Queued)),
 2822    (   pengine_thread(Id, Thread),
 2823        catch(thread_statistics(Thread, Stats), _, fail)
 2824    ->  true
 2825    ;   Stats = thread{status:died}
 2826    ).
 output_result(+Pengine, +Format, +EventTerm) is det
 output_result(+Pengine, +Format, +EventTerm, +OptionsDict) is det
Formulate an HTTP response from a pengine event term. Format is one of prolog, json or json-s.
Arguments:
EventTerm- is either a single event or a list of events.
 2837:- dynamic
 2838    pengine_replying/2.             % +Pengine, +Thread
 2839
 2840output_result(Pengine, Format, Event) :-
 2841    thread_self(Thread),
 2842    cors_enable,            % contingent on http:cors setting
 2843    disable_client_cache,
 2844    setup_call_cleanup(
 2845        asserta(pengine_replying(Pengine, Thread), Ref),
 2846        catch(output_result_2(Format, Event, _{}),
 2847              pengine_abort_output,
 2848              true),
 2849        erase(Ref)),
 2850    destroy_pengine_after_output(Pengine, Event).
 2851
 2852output_result_2(Lang, Event, Dict) :-
 2853    write_result(Lang, Event, Dict),
 2854    !.
 2855output_result_2(prolog, Event, _) :-
 2856    !,
 2857    format('Content-type: text/x-prolog; charset=UTF-8~n~n'),
 2858    write_term(Event,
 2859               [ quoted(true),
 2860                 ignore_ops(true),
 2861                 fullstop(true),
 2862                 blobs(portray),
 2863                 portray_goal(portray_blob),
 2864                 nl(true)
 2865               ]).
 2866output_result_2(Lang, Event, _) :-
 2867    json_lang(Lang),
 2868    !,
 2869    (   event_term_to_json_data(Event, JSON, Lang)
 2870    ->  reply_json_dict(JSON)
 2871    ;   assertion(event_term_to_json_data(Event, _, Lang))
 2872    ).
 2873output_result_2(Lang, _Event, _) :-    % FIXME: allow for non-JSON format
 2874    domain_error(pengine_format, Lang).
 portray_blob(+Blob, +Options) is det
Portray non-text blobs that may appear in output terms. Not really sure about that. Basically such terms need to be avoided as they are meaningless outside the process. The generated error is hard to debug though, so now we send them as '$BLOB'(Type). Future versions may include more info, depending on Type.
 2884:- public portray_blob/2.               % called from write-term
 2885portray_blob(Blob, _Options) :-
 2886    blob(Blob, Type),
 2887    writeq('$BLOB'(Type)).
 abort_pending_output(+Pengine) is det
If we get an abort, it is possible that output is being produced for the client. This predicate aborts these threads.
 2894abort_pending_output(Pengine) :-
 2895    forall(pengine_replying(Pengine, Thread),
 2896           abort_output_thread(Thread)).
 2897
 2898abort_output_thread(Thread) :-
 2899    catch(thread_signal(Thread, throw(pengine_abort_output)),
 2900          error(existence_error(thread, _), _),
 2901          true).
 write_result(+Lang, +Event, +Dict) is semidet
Hook that allows for different output formats. The core Pengines library supports prolog and various JSON dialects. The hook event_to_json/3 can be used to refine the JSON dialects. This hook must be used if a completely different output format is desired.
 disable_client_cache
Make sure the client will not cache our page.
See also
- http://stackoverflow.com/questions/49547/making-sure-a-web-page-is-not-cached-across-all-browsers
 2917disable_client_cache :-
 2918    format('Cache-Control: no-cache, no-store, must-revalidate\r\n\c
 2919            Pragma: no-cache\r\n\c
 2920            Expires: 0\r\n').
 2921
 2922event_term_to_json_data(Events, JSON, Lang) :-
 2923    is_list(Events),
 2924    !,
 2925    events_to_json_data(Events, JSON, Lang).
 2926event_term_to_json_data(Event, JSON, Lang) :-
 2927    event_to_json(Event, JSON, Lang),
 2928    !.
 2929event_term_to_json_data(success(ID, Bindings0, Projection, Time, More),
 2930                        json{event:success, id:ID, time:Time,
 2931                             data:Bindings, more:More, projection:Projection},
 2932                        json) :-
 2933    !,
 2934    term_to_json(Bindings0, Bindings).
 2935event_term_to_json_data(destroy(ID, Event),
 2936                        json{event:destroy, id:ID, data:JSON},
 2937                        Style) :-
 2938    !,
 2939    event_term_to_json_data(Event, JSON, Style).
 2940event_term_to_json_data(create(ID, Features0), JSON, Style) :-
 2941    !,
 2942    (   select(answer(First0), Features0, Features1)
 2943    ->  event_term_to_json_data(First0, First, Style),
 2944        Features = [answer(First)|Features1]
 2945    ;   Features = Features0
 2946    ),
 2947    dict_create(JSON, json, [event(create), id(ID)|Features]).
 2948event_term_to_json_data(destroy(ID, Event),
 2949                        json{event:destroy, id:ID, data:JSON}, Style) :-
 2950    !,
 2951    event_term_to_json_data(Event, JSON, Style).
 2952event_term_to_json_data(error(ID, ErrorTerm), Error, _Style) :-
 2953    !,
 2954    Error0 = json{event:error, id:ID, data:Message},
 2955    add_error_details(ErrorTerm, Error0, Error),
 2956    message_to_string(ErrorTerm, Message).
 2957event_term_to_json_data(failure(ID, Time),
 2958                        json{event:failure, id:ID, time:Time}, _) :-
 2959    !.
 2960event_term_to_json_data(EventTerm, json{event:F, id:ID}, _) :-
 2961    functor(EventTerm, F, 1),
 2962    !,
 2963    arg(1, EventTerm, ID).
 2964event_term_to_json_data(EventTerm, json{event:F, id:ID, data:JSON}, _) :-
 2965    functor(EventTerm, F, 2),
 2966    arg(1, EventTerm, ID),
 2967    arg(2, EventTerm, Data),
 2968    term_to_json(Data, JSON).
 2969
 2970events_to_json_data([], [], _).
 2971events_to_json_data([E|T0], [J|T], Lang) :-
 2972    event_term_to_json_data(E, J, Lang),
 2973    events_to_json_data(T0, T, Lang).
 2974
 2975:- public add_error_details/3.
 add_error_details(+Error, +JSON0, -JSON)
Add format error code and location information to an error. Also used by pengines_io.pl.
 2982add_error_details(Error, JSON0, JSON) :-
 2983    add_error_code(Error, JSON0, JSON1),
 2984    add_error_location(Error, JSON1, JSON).
 add_error_code(+Error, +JSON0, -JSON) is det
Add a code field to JSON0 of Error is an ISO error term. The error code is the functor name of the formal part of the error, e.g., syntax_error, type_error, etc. Some errors carry more information:
existence_error(Type, Obj)
{arg1:Type, arg2:Obj}, where Obj is stringified of it is not atomic.
 2997add_error_code(error(existence_error(Type, Obj), _), Error0, Error) :-
 2998    atom(Type),
 2999    !,
 3000    to_atomic(Obj, Value),
 3001    Error = Error0.put(_{code:existence_error, arg1:Type, arg2:Value}).
 3002add_error_code(error(Formal, _), Error0, Error) :-
 3003    callable(Formal),
 3004    !,
 3005    functor(Formal, Code, _),
 3006    Error = Error0.put(code, Code).
 3007add_error_code(_, Error, Error).
 3008
 3009% What to do with large integers?
 3010to_atomic(Obj, Atomic) :- atom(Obj),   !, Atomic = Obj.
 3011to_atomic(Obj, Atomic) :- number(Obj), !, Atomic = Obj.
 3012to_atomic(Obj, Atomic) :- string(Obj), !, Atomic = Obj.
 3013to_atomic(Obj, Atomic) :- term_string(Obj, Atomic).
 add_error_location(+Error, +JSON0, -JSON) is det
Add a location property if the error can be associated with a source location. The location is an object with properties file and line and, if available, the character location in the line.
 3022add_error_location(error(_, file(Path, Line, -1, _CharNo)), Term0, Term) :-
 3023    atom(Path), integer(Line),
 3024    !,
 3025    Term = Term0.put(_{location:_{file:Path, line:Line}}).
 3026add_error_location(error(_, file(Path, Line, Ch, _CharNo)), Term0, Term) :-
 3027    atom(Path), integer(Line), integer(Ch),
 3028    !,
 3029    Term = Term0.put(_{location:_{file:Path, line:Line, ch:Ch}}).
 3030add_error_location(_, Term, Term).
 event_to_json(+Event, -JSONTerm, +Lang) is semidet
Hook that translates a Pengine event structure into a term suitable for reply_json_dict/1, according to the language specification Lang. This can be used to massage general Prolog terms, notably associated with success(ID, Bindings, Projection, Time, More) and output(ID, Term) into a format suitable for processing at the client side.
 3041%:- multifile pengines:event_to_json/3.
 3042
 3043
 3044                 /*******************************
 3045                 *        ACCESS CONTROL        *
 3046                 *******************************/
 allowed(+Request, +Application) is det
Check whether the peer is allowed to connect. Returns a forbidden header if contact is not allowed.
 3053allowed(Request, Application) :-
 3054    setting(Application:allow_from, Allow),
 3055    match_peer(Request, Allow),
 3056    setting(Application:deny_from, Deny),
 3057    \+ match_peer(Request, Deny),
 3058    !.
 3059allowed(Request, _Application) :-
 3060    memberchk(request_uri(Here), Request),
 3061    throw(http_reply(forbidden(Here))).
 3062
 3063match_peer(_, Allowed) :-
 3064    memberchk(*, Allowed),
 3065    !.
 3066match_peer(_, []) :- !, fail.
 3067match_peer(Request, Allowed) :-
 3068    http_peer(Request, Peer),
 3069    debug(pengine(allow), 'Peer: ~q, Allow: ~q', [Peer, Allowed]),
 3070    (   memberchk(Peer, Allowed)
 3071    ->  true
 3072    ;   member(Pattern, Allowed),
 3073        match_peer_pattern(Pattern, Peer)
 3074    ).
 3075
 3076match_peer_pattern(Pattern, Peer) :-
 3077    ip_term(Pattern, IP),
 3078    ip_term(Peer, IP),
 3079    !.
 3080
 3081ip_term(Peer, Pattern) :-
 3082    split_string(Peer, ".", "", PartStrings),
 3083    ip_pattern(PartStrings, Pattern).
 3084
 3085ip_pattern([], []).
 3086ip_pattern([*], _) :- !.
 3087ip_pattern([S|T0], [N|T]) :-
 3088    number_string(N, S),
 3089    ip_pattern(T0, T).
 authenticate(+Request, +Application, -UserOptions:list) is det
Call authentication_hook/3, returning either [user(User)], [] or an exception.
 3097authenticate(Request, Application, UserOptions) :-
 3098    authentication_hook(Request, Application, User),
 3099    !,
 3100    must_be(ground, User),
 3101    UserOptions = [user(User)].
 3102authenticate(_, _, []).
 authentication_hook(+Request, +Application, -User) is semidet
This hook is called from the =/pengine/create= HTTP handler to discover whether the server is accessed by an authorized user. It can react in three ways:
See also
- http_authenticate/3 can be used to implement this hook using default HTTP authentication data.
 3124pengine_register_user(Options) :-
 3125    option(user(User), Options),
 3126    !,
 3127    pengine_self(Me),
 3128    asserta(pengine_user(Me, User)).
 3129pengine_register_user(_).
 pengine_user(-User) is semidet
True when the pengine was create by an HTTP request that authorized User.
See also
- authentication_hook/3 can be used to extract authorization from the HTTP header.
 3140pengine_user(User) :-
 3141    pengine_self(Me),
 3142    pengine_user(Me, User).
 reply_options(+Request, +Methods) is semidet
Reply the HTTP OPTIONS request
 3148reply_options(Request, Allowed) :-
 3149    option(method(options), Request),
 3150    !,
 3151    cors_enable(Request,
 3152                [ methods(Allowed)
 3153                ]),
 3154    format('Content-type: text/plain\r\n'),
 3155    format('~n').                   % empty body
 3156
 3157
 3158                 /*******************************
 3159                 *        COMPILE SOURCE        *
 3160                 *******************************/
 pengine_src_text(+SrcText, +Module) is det
Asserts the clauses defined in SrcText in the private database of the current Pengine. This predicate processes the `src_text' option of pengine_create/1. */
 3169pengine_src_text(Src, Module) :-
 3170    pengine_self(Self),
 3171    format(atom(ID), 'pengine://~w/src', [Self]),
 3172    extra_load_options(Self, Options),
 3173    setup_call_cleanup(
 3174        open_chars_stream(Src, Stream),
 3175        load_files(Module:ID,
 3176                   [ stream(Stream),
 3177                     module(Module),
 3178                     silent(true)
 3179                   | Options
 3180                   ]),
 3181        close(Stream)),
 3182    keep_source(Self, ID, Src).
 3183
 3184system:'#file'(File, _Line) :-
 3185    prolog_load_context(stream, Stream),
 3186    set_stream(Stream, file_name(File)),
 3187    set_stream(Stream, record_position(false)),
 3188    set_stream(Stream, record_position(true)).
 pengine_src_url(+URL, +Module) is det
Asserts the clauses defined in URL in the private database of the current Pengine. This predicate processes the `src_url' option of pengine_create/1.
To be done
- : make a sensible guess at the encoding.
 3198pengine_src_url(URL, Module) :-
 3199    pengine_self(Self),
 3200    uri_encoded(path, URL, Path),
 3201    format(atom(ID), 'pengine://~w/url/~w', [Self, Path]),
 3202    extra_load_options(Self, Options),
 3203    (   get_pengine_application(Self, Application),
 3204        setting(Application:debug_info, false)
 3205    ->  setup_call_cleanup(
 3206            http_open(URL, Stream, []),
 3207            ( set_stream(Stream, encoding(utf8)),
 3208              load_files(Module:ID,
 3209                         [ stream(Stream),
 3210                           module(Module)
 3211                         | Options
 3212                         ])
 3213            ),
 3214            close(Stream))
 3215    ;   setup_call_cleanup(
 3216            http_open(URL, TempStream, []),
 3217            ( set_stream(TempStream, encoding(utf8)),
 3218              read_string(TempStream, _, Src)
 3219            ),
 3220            close(TempStream)),
 3221        setup_call_cleanup(
 3222            open_chars_stream(Src, Stream),
 3223            load_files(Module:ID,
 3224                       [ stream(Stream),
 3225                         module(Module)
 3226                       | Options
 3227                       ]),
 3228            close(Stream)),
 3229        keep_source(Self, ID, Src)
 3230    ).
 3231
 3232
 3233extra_load_options(Pengine, Options) :-
 3234    pengine_not_sandboxed(Pengine),
 3235    !,
 3236    Options = [].
 3237extra_load_options(_, [sandboxed(true)]).
 3238
 3239
 3240keep_source(Pengine, ID, SrcText) :-
 3241    get_pengine_application(Pengine, Application),
 3242    setting(Application:debug_info, true),
 3243    !,
 3244    to_string(SrcText, SrcString),
 3245    assertz(pengine_data(Pengine, source(ID, SrcString))).
 3246keep_source(_, _, _).
 3247
 3248to_string(String, String) :-
 3249    string(String),
 3250    !.
 3251to_string(Atom, String) :-
 3252    atom_string(Atom, String),
 3253    !.
 3254
 3255		 /*******************************
 3256		 *            SANDBOX		*
 3257		 *******************************/
 3258
 3259:- multifile
 3260    sandbox:safe_primitive/1. 3261
 3262sandbox:safe_primitive(pengines:pengine_input(_, _)).
 3263sandbox:safe_primitive(pengines:pengine_output(_)).
 3264sandbox:safe_primitive(pengines:pengine_debug(_,_)).
 3265
 3266
 3267                 /*******************************
 3268                 *            MESSAGES          *
 3269                 *******************************/
 3270
 3271prolog:error_message(sandbox(time_limit_exceeded, Limit)) -->
 3272    [ 'Could not prove safety of your goal within ~f seconds.'-[Limit], nl,
 3273      'This is normally caused by an insufficiently instantiated'-[], nl,
 3274      'meta-call (e.g., call(Var)) for which it is too expensive to'-[], nl,
 3275      'find all possible instantations of Var.'-[]
 3276    ]