1/* Part of SWI-Prolog 2 3 Author: Jan Wielemaker 4 E-mail: J.Wielemaker@vu.nl 5 WWW: http://www.swi-prolog.org 6 Copyright (c) 2002-2022, University of Amsterdam 7 VU University Amsterdam 8 CWI, Amsterdam 9 All rights reserved. 10 11 Redistribution and use in source and binary forms, with or without 12 modification, are permitted provided that the following conditions 13 are met: 14 15 1. Redistributions of source code must retain the above copyright 16 notice, this list of conditions and the following disclaimer. 17 18 2. Redistributions in binary form must reproduce the above copyright 19 notice, this list of conditions and the following disclaimer in 20 the documentation and/or other materials provided with the 21 distribution. 22 23 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 24 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 25 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 26 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 27 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 28 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 29 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 30 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 31 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 32 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 33 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 34 POSSIBILITY OF SUCH DAMAGE. 35*/ 36 37:- module(thread_httpd, 38 [ http_current_server/2, % ?:Goal, ?Port 39 http_server_property/2, % ?Port, ?Property 40 http_server/2, % :Goal, +Options 41 http_workers/2, % +Port, ?WorkerCount 42 http_add_worker/2, % +Port, +Options 43 http_current_worker/2, % ?Port, ?ThreadID 44 http_stop_server/2, % +Port, +Options 45 http_spawn/2, % :Goal, +Options 46 47 http_requeue/1, % +Request 48 http_close_connection/1, % +Request 49 http_enough_workers/3 % +Queue, +Why, +Peer 50 ]). 51:- use_module(library(debug)). 52:- use_module(library(error)). 53:- use_module(library(option)). 54:- use_module(library(socket)). 55:- use_module(library(thread_pool)). 56:- use_module(library(gensym)). 57:- use_module(http_wrapper). 58:- use_module(http_path). 59 60:- autoload(library(uri), [uri_resolve/3]). 61 62:- predicate_options(http_server/2, 2, 63 [ port(any), 64 unix_socket(atom), 65 entry_page(atom), 66 tcp_socket(any), 67 workers(positive_integer), 68 timeout(number), 69 keep_alive_timeout(number), 70 silent(boolean), 71 ssl(list(any)), % if http/http_ssl_plugin is loaded 72 pass_to(system:thread_create/3, 3) 73 ]). 74:- predicate_options(http_spawn/2, 2, 75 [ pool(atom), 76 pass_to(system:thread_create/3, 3), 77 pass_to(thread_pool:thread_create_in_pool/4, 4) 78 ]). 79:- predicate_options(http_add_worker/2, 2, 80 [ timeout(number), 81 keep_alive_timeout(number), 82 max_idle_time(number), 83 pass_to(system:thread_create/3, 3) 84 ]).
112:- meta_predicate 113 http_server( , ), 114 http_current_server( , ), 115 http_spawn( , ). 116 117:- dynamic 118 current_server/6, % Port, Goal, Thread, Queue, Scheme, StartTime 119 queue_worker/2, % Queue, ThreadID 120 queue_options/2. % Queue, Options 121 122:- multifile 123 make_socket_hook/3, 124 accept_hook/2, 125 close_hook/1, 126 open_client_hook/6, 127 discard_client_hook/1, 128 http:create_pool/1, 129 http:schedule_workers/1. 130 131:- meta_predicate 132 thread_repeat_wait( ).
main
thread.
If you need to control resource usage you may consider the
spawn
option of http_handler/3 and library(thread_pool).true
(default false
), do not print an informational
message that the server was started.A typical initialization for an HTTP server that uses http_dispatch/1 to relay requests to predicates is:
:- use_module(library(http/thread_httpd)). :- use_module(library(http/http_dispatch)). start_server(Port) :- http_server(http_dispatch, [port(Port)]).
Note that multiple servers can coexist in the same Prolog process. A notable application of this is to have both an HTTP and HTTPS server, where the HTTP server redirects to the HTTPS server for handling sensitive requests.
197http_server(Goal, M:Options0) :- 198 server_address(Address, Options0), 199 !, 200 make_socket(Address, M:Options0, Options), 201 create_workers(Options), 202 create_server(Goal, Address, Options), 203 ( option(silent(true), Options0) 204 -> true 205 ; print_message(informational, 206 httpd_started_server(Address, Options0)) 207 ). 208http_server(_Goal, _:Options0) :- 209 existence_error(server_address, Options0). 210 211server_address(Address, Options) :- 212 ( option(port(Port), Options) 213 -> Address = Port 214 ; option(unix_socket(Path), Options) 215 -> Address = unix_socket(Path) 216 ). 217 218address_port(_IFace:Port, Port) :- !. 219address_port(unix_socket(Path), Path) :- !. 220address_port(Address, Address) :- !. 221 222tcp_address(Port) :- 223 var(Port), 224 !. 225tcp_address(Port) :- 226 integer(Port), 227 !. 228tcp_address(_Iface:_Port).
queue(QueueId)
.
238make_socket(Address, M:Options0, Options) :- 239 tcp_address(Address), 240 make_socket_hook(Address, M:Options0, Options), 241 !. 242make_socket(Address, _:Options0, Options) :- 243 option(tcp_socket(_), Options0), 244 !, 245 make_addr_atom('httpd', Address, Queue), 246 Options = [ queue(Queue) 247 | Options0 248 ]. 249make_socket(Address, _:Options0, Options) :- 250 tcp_address(Address), 251 !, 252 tcp_socket(Socket), 253 tcp_setopt(Socket, reuseaddr), 254 tcp_bind(Socket, Address), 255 tcp_listen(Socket, 64), 256 make_addr_atom('httpd', Address, Queue), 257 Options = [ queue(Queue), 258 tcp_socket(Socket) 259 | Options0 260 ]. 261:- if(current_predicate(unix_domain_socket/1)). 262make_socket(Address, _:Options0, Options) :- 263 Address = unix_socket(Path), 264 !, 265 unix_domain_socket(Socket), 266 tcp_bind(Socket, Path), 267 tcp_listen(Socket, 64), 268 make_addr_atom('httpd', Address, Queue), 269 Options = [ queue(Queue), 270 tcp_socket(Socket) 271 | Options0 272 ]. 273:- endif.
280make_addr_atom(Scheme, Address, Atom) :- 281 phrase(address_parts(Address), Parts), 282 atomic_list_concat([Scheme,@|Parts], Atom). 283 284address_parts(Var) --> 285 { var(Var), 286 !, 287 instantiation_error(Var) 288 }. 289address_parts(Atomic) --> 290 { atomic(Atomic) }, 291 !, 292 [Atomic]. 293address_parts(Host:Port) --> 294 !, 295 address_parts(Host), [:], address_parts(Port). 296address_parts(ip(A,B,C,D)) --> 297 !, 298 [ A, '.', B, '.', C, '.', D ]. 299address_parts(unix_socket(Path)) --> 300 [Path]. 301address_parts(Address) --> 302 { domain_error(http_server_address, Address) }.
310create_server(Goal, Address, Options) :- 311 get_time(StartTime), 312 memberchk(queue(Queue), Options), 313 scheme(Scheme, Options), 314 autoload_https(Scheme), 315 address_port(Address, Port), 316 make_addr_atom(Scheme, Port, Alias), 317 thread_self(Initiator), 318 thread_create(accept_server(Goal, Initiator, Options), _, 319 [ alias(Alias) 320 ]), 321 thread_get_message(server_started), 322 assert(current_server(Port, Goal, Alias, Queue, Scheme, StartTime)). 323 324scheme(Scheme, Options) :- 325 option(scheme(Scheme), Options), 326 !. 327scheme(Scheme, Options) :- 328 ( option(ssl(_), Options) 329 ; option(ssl_instance(_), Options) 330 ), 331 !, 332 Scheme = https. 333scheme(http, _). 334 335autoload_https(https) :- 336 \+ clause(accept_hook(_Goal, _Options), _), 337 exists_source(library(http/http_ssl_plugin)), 338 !, 339 use_module(library(http/http_ssl_plugin)). 340autoload_https(_).
348http_current_server(Goal, Port) :-
349 current_server(Port, Goal, _, _, _, _).
http
or https
.365http_server_property(_:Port, Property) :- 366 integer(Port), 367 !, 368 server_property(Property, Port). 369http_server_property(Port, Property) :- 370 server_property(Property, Port). 371 372server_property(goal(Goal), Port) :- 373 current_server(Port, Goal, _, _, _, _). 374server_property(scheme(Scheme), Port) :- 375 current_server(Port, _, _, _, Scheme, _). 376server_property(start_time(Time), Port) :- 377 current_server(Port, _, _, _, _, Time).
387http_workers(Port, Workers) :- 388 must_be(ground, Port), 389 current_server(Port, _, _, Queue, _, _), 390 !, 391 ( integer(Workers) 392 -> resize_pool(Queue, Workers) 393 ; findall(W, queue_worker(Queue, W), WorkerIDs), 394 length(WorkerIDs, Workers) 395 ). 396http_workers(Port, _) :- 397 existence_error(http_server, Port).
410http_add_worker(Port, Options) :- 411 must_be(ground, Port), 412 current_server(Port, _, _, Queue, _, _), 413 !, 414 queue_options(Queue, QueueOptions), 415 merge_options(Options, QueueOptions, WorkerOptions), 416 atom_concat(Queue, '_', AliasBase), 417 create_workers(1, 1, Queue, AliasBase, WorkerOptions). 418http_add_worker(Port, _) :- 419 existence_error(http_server, Port).
429http_current_worker(Port, ThreadID) :-
430 current_server(Port, _, _, Queue, _, _),
431 queue_worker(Queue, ThreadID).
439accept_server(Goal, Initiator, Options) :- 440 catch(accept_server2(Goal, Initiator, Options), http_stop, true), 441 thread_self(Thread), 442 debug(http(stop), '[~p]: accept server received http_stop', [Thread]), 443 retract(current_server(_Port, _, Thread, Queue, _Scheme, _StartTime)), 444 close_pending_accepts(Queue), 445 close_server_socket(Options). 446 447accept_server2(Goal, Initiator, Options) :- 448 thread_send_message(Initiator, server_started), 449 repeat, 450 ( catch(accept_server3(Goal, Options), E, true) 451 -> ( var(E) 452 -> fail 453 ; accept_rethrow_error(E) 454 -> throw(E) 455 ; print_message(error, E), 456 fail 457 ) 458 ; print_message(error, % internal error 459 goal_failed(accept_server3(Goal, Options))), 460 fail 461 ). 462 463accept_server3(Goal, Options) :- 464 accept_hook(Goal, Options), 465 !. 466accept_server3(Goal, Options) :- 467 memberchk(tcp_socket(Socket), Options), 468 memberchk(queue(Queue), Options), 469 debug(http(connection), 'Waiting for connection', []), 470 tcp_accept(Socket, Client, Peer), 471 sig_atomic(send_to_worker(Queue, Client, Goal, Peer)), 472 http_enough_workers(Queue, accept, Peer). 473 474send_to_worker(Queue, Client, Goal, Peer) :- 475 debug(http(connection), 'New HTTP connection from ~p', [Peer]), 476 thread_send_message(Queue, tcp_client(Client, Goal, Peer)). 477 478accept_rethrow_error(http_stop). 479accept_rethrow_error('$aborted').
486close_server_socket(Options) :- 487 close_hook(Options), 488 !. 489close_server_socket(Options) :- 490 memberchk(tcp_socket(Socket), Options), 491 !, 492 tcp_close_socket(Socket).
496close_pending_accepts(Queue) :- 497 ( thread_get_message(Queue, Msg, [timeout(0)]) 498 -> close_client(Msg), 499 close_pending_accepts(Queue) 500 ; true 501 ). 502 503close_client(tcp_client(Client, _Goal, _0Peer)) => 504 debug(http(stop), 'Closing connection from ~p during shut-down', [_0Peer]), 505 tcp_close_socket(Client). 506close_client(Msg) => 507 ( discard_client_hook(Msg) 508 -> true 509 ; print_message(warning, http_close_client(Msg)) 510 ).
520http_stop_server(Host:Port, Options) :- % e.g., localhost:4000 521 ground(Host), 522 !, 523 http_stop_server(Port, Options). 524http_stop_server(Port, _Options) :- 525 http_workers(Port, 0), % checks Port is ground 526 current_server(Port, _, Thread, Queue, _Scheme, _Start), 527 retractall(queue_options(Queue, _)), 528 debug(http(stop), 'Signalling HTTP server thread ~p to stop', [Thread]), 529 thread_signal(Thread, throw(http_stop)), 530 catch(connect(localhost:Port), _, true), 531 thread_join(Thread, _0Status), 532 debug(http(stop), 'Joined HTTP server thread ~p (~p)', [Thread, _0Status]), 533 message_queue_destroy(Queue). 534 535connect(Address) :- 536 setup_call_cleanup( 537 tcp_socket(Socket), 538 tcp_connect(Socket, Address), 539 tcp_close_socket(Socket)).
547http_enough_workers(Queue, _Why, _Peer) :- 548 message_queue_property(Queue, waiting(_0)), 549 !, 550 debug(http(scheduler), '~D waiting for work; ok', [_0]). 551http_enough_workers(Queue, Why, Peer) :- 552 message_queue_property(Queue, size(Size)), 553 ( enough(Size, Why) 554 -> debug(http(scheduler), '~D in queue; ok', [Size]) 555 ; current_server(Port, _, _, Queue, _, _), 556 Data = _{ port:Port, 557 reason:Why, 558 peer:Peer, 559 waiting:Size, 560 queue:Queue 561 }, 562 debug(http(scheduler), 'Asking to reschedule: ~p', [Data]), 563 catch(http:schedule_workers(Data), 564 Error, 565 print_message(error, Error)) 566 -> true 567 ; true 568 ). 569 570enough(0, _). 571enough(1, keep_alive). % I will be ready myself
accept
for a new connection or keep_alive
if a
worker tries to reschedule itself.
Note that, when called with reason:accept
, we are called in
the time critical main accept loop. An implementation of this
hook shall typically send the event to thread dedicated to
dynamic worker-pool management.
601 /******************************* 602 * WORKER QUEUE OPERATIONS * 603 *******************************/
610create_workers(Options) :- 611 option(workers(N), Options, 5), 612 option(queue(Queue), Options), 613 catch(message_queue_create(Queue), _, true), 614 atom_concat(Queue, '_', AliasBase), 615 create_workers(1, N, Queue, AliasBase, Options), 616 assert(queue_options(Queue, Options)). 617 618create_workers(I, N, _, _, _) :- 619 I > N, 620 !. 621create_workers(I, N, Queue, AliasBase, Options) :- 622 gensym(AliasBase, Alias), 623 thread_create(http_worker(Options), Id, 624 [ alias(Alias) 625 | Options 626 ]), 627 assertz(queue_worker(Queue, Id)), 628 I2 is I + 1, 629 create_workers(I2, N, Queue, AliasBase, Options).
637resize_pool(Queue, Size) :-
638 findall(W, queue_worker(Queue, W), Workers),
639 length(Workers, Now),
640 ( Now < Size
641 -> queue_options(Queue, Options),
642 atom_concat(Queue, '_', AliasBase),
643 I0 is Now+1,
644 create_workers(I0, Size, Queue, AliasBase, Options)
645 ; Now == Size
646 -> true
647 ; Now > Size
648 -> Excess is Now - Size,
649 thread_self(Me),
650 forall(between(1, Excess, _), thread_send_message(Queue, quit(Me))),
651 forall(between(1, Excess, _), thread_get_message(quitted(_)))
652 ).
If the message quit(Sender)
is read from the queue, the worker
stops.
663http_worker(Options) :- 664 debug(http(scheduler), 'New worker', []), 665 prolog_listen(this_thread_exit, done_worker), 666 option(queue(Queue), Options), 667 option(max_idle_time(MaxIdle), Options, infinite), 668 thread_repeat_wait(get_work(Queue, Message, MaxIdle)), 669 debug(http(worker), 'Waiting for a job ...', []), 670 debug(http(worker), 'Got job ~p', [Message]), 671 ( Message = quit(Sender) 672 -> !, 673 thread_self(Self), 674 thread_detach(Self), 675 ( Sender == idle 676 -> true 677 ; retract(queue_worker(Queue, Self)), 678 thread_send_message(Sender, quitted(Self)) 679 ) 680 ; open_client(Message, Queue, Goal, In, Out, 681 Options, ClientOptions), 682 ( catch(http_process(Goal, In, Out, ClientOptions), 683 Error, true) 684 -> true 685 ; Error = goal_failed(http_process/4) 686 ), 687 ( var(Error) 688 -> fail 689 ; current_message_level(Error, Level), 690 print_message(Level, Error), 691 memberchk(peer(Peer), ClientOptions), 692 close_connection(Peer, In, Out), 693 fail 694 ) 695 ). 696 697get_work(Queue, Message, infinite) :- 698 !, 699 thread_get_message(Queue, Message). 700get_work(Queue, Message, MaxIdle) :- 701 ( thread_get_message(Queue, Message, [timeout(MaxIdle)]) 702 -> true 703 ; Message = quit(idle) 704 ).
713open_client(requeue(In, Out, Goal, ClOpts), 714 _, Goal, In, Out, Opts, ClOpts) :- 715 !, 716 memberchk(peer(Peer), ClOpts), 717 option(keep_alive_timeout(KeepAliveTMO), Opts, 2), 718 check_keep_alive_connection(In, KeepAliveTMO, Peer, In, Out). 719open_client(Message, Queue, Goal, In, Out, Opts, 720 [ pool(client(Queue, Goal, In, Out)), 721 timeout(Timeout) 722 | Options 723 ]) :- 724 catch(open_client(Message, Goal, In, Out, Options, Opts), 725 E, report_error(E)), 726 option(timeout(Timeout), Opts, 60), 727 ( debugging(http(connection)) 728 -> memberchk(peer(Peer), Options), 729 debug(http(connection), 'Opened connection from ~p', [Peer]) 730 ; true 731 ).
737open_client(Message, Goal, In, Out, ClientOptions, Options) :- 738 open_client_hook(Message, Goal, In, Out, ClientOptions, Options), 739 !. 740open_client(tcp_client(Socket, Goal, Peer), Goal, In, Out, 741 [ peer(Peer), 742 protocol(http) 743 ], _) :- 744 tcp_open_socket(Socket, In, Out). 745 746report_error(E) :- 747 print_message(error, E), 748 fail.
757check_keep_alive_connection(In, TMO, Peer, In, Out) :-
758 stream_property(In, timeout(Old)),
759 set_stream(In, timeout(TMO)),
760 debug(http(keep_alive), 'Waiting for keep-alive ...', []),
761 catch(peek_code(In, Code), E, true),
762 ( var(E), % no exception
763 Code \== -1 % no end-of-file
764 -> set_stream(In, timeout(Old)),
765 debug(http(keep_alive), '\tre-using keep-alive connection', [])
766 ; ( Code == -1
767 -> debug(http(keep_alive), '\tRemote closed keep-alive connection', [])
768 ; debug(http(keep_alive), '\tTimeout on keep-alive connection', [])
769 ),
770 close_connection(Peer, In, Out),
771 fail
772 ).
781done_worker :- 782 thread_self(Self), 783 thread_detach(Self), 784 retract(queue_worker(Queue, Self)), 785 thread_property(Self, status(Status)), 786 !, 787 ( catch(recreate_worker(Status, Queue), _, fail) 788 -> print_message(informational, 789 httpd_restarted_worker(Self)) 790 ; done_status_message_level(Status, Level), 791 print_message(Level, 792 httpd_stopped_worker(Self, Status)) 793 ). 794done_worker :- % received quit(Sender) 795 thread_self(Self), 796 thread_property(Self, status(Status)), 797 done_status_message_level(Status, Level), 798 print_message(Level, 799 httpd_stopped_worker(Self, Status)). 800 801done_status_message_level(true, silent) :- !. 802done_status_message_level(exception('$aborted'), silent) :- !. 803done_status_message_level(_, informational).
The first clause deals with the possibility that we cannot write to
user_error
. This is possible when Prolog is started as a service
using some service managers. Would be nice if we could write an
error, but where?
818recreate_worker(exception(error(io_error(write,user_error),_)), _Queue) :- 819 halt(2). 820recreate_worker(exception(Error), Queue) :- 821 recreate_on_error(Error), 822 queue_options(Queue, Options), 823 atom_concat(Queue, '_', AliasBase), 824 create_workers(1, 1, Queue, AliasBase, Options). 825 826recreate_on_error('$aborted'). 827recreate_on_error(time_limit_exceeded).
836:- multifile 837 message_level/2. 838 839message_level(error(io_error(read, _), _), silent). 840message_level(error(socket_error(epipe,_), _), silent). 841message_level(error(http_write_short(_Obj,_Written), _), silent). 842message_level(error(timeout_error(read, _), _), informational). 843message_level(keep_alive_timeout, silent). 844 845current_message_level(Term, Level) :- 846 ( message_level(Term, Level) 847 -> true 848 ; Level = error 849 ).
857http_requeue(Header) :- 858 requeue_header(Header, ClientOptions), 859 memberchk(pool(client(Queue, Goal, In, Out)), ClientOptions), 860 memberchk(peer(Peer), ClientOptions), 861 http_enough_workers(Queue, keep_alive, Peer), 862 thread_send_message(Queue, requeue(In, Out, Goal, ClientOptions)), 863 !. 864http_requeue(Header) :- 865 debug(http(error), 'Re-queue failed: ~p', [Header]), 866 fail. 867 868requeue_header([], []). 869requeue_header([H|T0], [H|T]) :- 870 requeue_keep(H), 871 !, 872 requeue_header(T0, T). 873requeue_header([_|T0], T) :- 874 requeue_header(T0, T). 875 876requeue_keep(pool(_)). 877requeue_keep(peer(_)). 878requeue_keep(protocol(_)).
885http_process(Goal, In, Out, Options) :- 886 debug(http(server), 'Running server goal ~p on ~p -> ~p', 887 [Goal, In, Out]), 888 option(timeout(TMO), Options, 60), 889 set_stream(In, timeout(TMO)), 890 set_stream(Out, timeout(TMO)), 891 http_wrapper(Goal, In, Out, Connection, 892 [ request(Request) 893 | Options 894 ]), 895 next(Connection, Request). 896 897next(Connection, Request) :- 898 next_(Connection, Request), !. 899next(Connection, Request) :- 900 print_message(warning, goal_failed(next(Connection,Request))). 901 902next_(switch_protocol(SwitchGoal, _SwitchOptions), Request) :- 903 !, 904 memberchk(pool(client(_Queue, _Goal, In, Out)), Request), 905 ( catch(call(SwitchGoal, In, Out), E, 906 ( print_message(error, E), 907 fail)) 908 -> true 909 ; http_close_connection(Request) 910 ). 911next_(spawned(ThreadId), _) :- 912 !, 913 debug(http(spawn), 'Handler spawned to thread ~w', [ThreadId]). 914next_(Connection, Request) :- 915 downcase_atom(Connection, 'keep-alive'), 916 http_requeue(Request), 917 !. 918next_(_, Request) :- 919 http_close_connection(Request).
926http_close_connection(Request) :-
927 memberchk(pool(client(_Queue, _Goal, In, Out)), Request),
928 memberchk(peer(Peer), Request),
929 close_connection(Peer, In, Out).
936close_connection(Peer, In, Out) :-
937 debug(http(connection), 'Closing connection from ~p', [Peer]),
938 catch(close(In, [force(true)]), _, true),
939 catch(close(Out, [force(true)]), _, true).
If a pool does not exist, this predicate calls the multifile hook create_pool/1 to create it. If this predicate succeeds the operation is retried.
957http_spawn(Goal, Options) :- 958 select_option(pool(Pool), Options, ThreadOptions), 959 !, 960 current_output(CGI), 961 catch(thread_create_in_pool(Pool, 962 wrap_spawned(CGI, Goal), Id, 963 [ detached(true) 964 | ThreadOptions 965 ]), 966 Error, 967 true), 968 ( var(Error) 969 -> http_spawned(Id) 970 ; Error = error(resource_error(threads_in_pool(_)), _) 971 -> throw(http_reply(busy)) 972 ; Error = error(existence_error(thread_pool, Pool), _), 973 create_pool(Pool) 974 -> http_spawn(Goal, Options) 975 ; throw(Error) 976 ). 977http_spawn(Goal, Options) :- 978 current_output(CGI), 979 thread_create(wrap_spawned(CGI, Goal), Id, 980 [ detached(true) 981 | Options 982 ]), 983 http_spawned(Id). 984 985wrap_spawned(CGI, Goal) :- 986 set_output(CGI), 987 http_wrap_spawned(Goal, Request, Connection), 988 next(Connection, Request).
998create_pool(Pool) :- 999 E = error(permission_error(create, thread_pool, Pool), _), 1000 catch(http:create_pool(Pool), E, true). 1001create_pool(Pool) :- 1002 print_message(informational, httpd(created_pool(Pool))), 1003 thread_pool_create(Pool, 10, []). 1004 1005 1006 /******************************* 1007 * WAIT POLICIES * 1008 *******************************/ 1009 1010:- meta_predicate 1011 thread_repeat_wait( ).
repeat, thread_idle(Goal)
, choosing whether to use a
long
or short
idle time based on the average firing rate.1018thread_repeat_wait(Goal) :- 1019 new_rate_mma(5, 1000, State), 1020 repeat, 1021 update_rate_mma(State, MMA), 1022 long(MMA, IsLong), 1023 ( IsLong == brief 1024 -> call(Goal) 1025 ; thread_idle(Goal, IsLong) 1026 ). 1027 1028long(MMA, brief) :- 1029 MMA < 0.05, 1030 !. 1031long(MMA, short) :- 1032 MMA < 1, 1033 !. 1034long(_, long).
1048new_rate_mma(N, Resolution, rstate(Base, 0, MaxI, Resolution, N, 0)) :- 1049 current_prolog_flag(max_tagged_integer, MaxI), 1050 get_time(Base). 1051 1052update_rate_mma(State, MMAr) :- 1053 State = rstate(Base, Last, MaxI, Resolution, N, MMA0), 1054 get_time(Now), 1055 Stamp is round((Now-Base)*Resolution), 1056 ( Stamp > MaxI 1057 -> nb_setarg(1, State, Now), 1058 nb_setarg(2, State, 0) 1059 ; true 1060 ), 1061 Diff is Stamp-Last, 1062 nb_setarg(2, State, Stamp), 1063 MMA is round(((N-1)*MMA0+Diff)/N), 1064 nb_setarg(6, State, MMA), 1065 MMAr is MMA/float(Resolution). 1066 1067 1068 /******************************* 1069 * MESSAGES * 1070 *******************************/ 1071 1072:- multifile 1073 prolog:message/3. 1074 1075prologmessage(httpd_started_server(Port, Options)) --> 1076 [ 'Started server at '-[] ], 1077 http_root(Port, Options). 1078prologmessage(httpd_stopped_worker(Self, Status)) --> 1079 [ 'Stopped worker ~p: ~p'-[Self, Status] ]. 1080prologmessage(httpd_restarted_worker(Self)) --> 1081 [ 'Replaced aborted worker ~p'-[Self] ]. 1082prologmessage(httpd(created_pool(Pool))) --> 1083 [ 'Created thread-pool ~p of size 10'-[Pool], nl, 1084 'Create this pool at startup-time or define the hook ', nl, 1085 'http:create_pool/1 to avoid this message and create a ', nl, 1086 'pool that fits the usage-profile.' 1087 ]. 1088 1089http_root(Address, Options) --> 1090 { landing_page(Address, URI, Options) }, 1091 [ '~w'-[URI] ]. 1092 1093landing_page(Host:Port, URI, Options) :- 1094 !, 1095 must_be(atom, Host), 1096 must_be(integer, Port), 1097 http_server_property(Port, scheme(Scheme)), 1098 ( default_port(Scheme, Port) 1099 -> format(atom(Base), '~w://~w', [Scheme, Host]) 1100 ; format(atom(Base), '~w://~w:~w', [Scheme, Host, Port]) 1101 ), 1102 entry_page(Base, URI, Options). 1103landing_page(unix_socket(Path), URI, _Options) :- 1104 !, 1105 format(string(URI), 'Unix domain socket "~w"', [Path]). 1106landing_page(Port, URI, Options) :- 1107 landing_page(localhost:Port, URI, Options). 1108 1109default_port(http, 80). 1110default_port(https, 443). 1111 1112entry_page(Base, URI, Options) :- 1113 option(entry_page(Entry), Options), 1114 !, 1115 uri_resolve(Entry, Base, URI). 1116entry_page(Base, URI, _) :- 1117 http_absolute_location(root(.), Entry, []), 1118 uri_resolve(Entry, Base, URI)
Threaded HTTP server
Most code doesn't need to use this directly; instead use library(http/http_server), which combines this library with the typical HTTP libraries that most servers need.
This library defines the HTTP server frontend of choice for SWI-Prolog. It is based on the multi-threading capabilities of SWI-Prolog and thus exploits multiple cores to serve requests concurrently. The server scales well and can cooperate with library(thread_pool) to control the number of concurrent requests of a given type. For example, it can be configured to handle 200 file download requests concurrently, 2 requests that potentially uses a lot of memory and 8 requests that use a lot of CPU resources.
On Unix systems, this library can be combined with library(http/http_unix_daemon) to realise a proper Unix service process that creates a web server at port 80, runs under a specific account, optionally detaches from the controlling terminal, etc.
Combined with library(http/http_ssl_plugin) from the SSL package, this library can be used to create an HTTPS server. See <plbase>/doc/packages/examples/ssl/https for an example server using a self-signed SSL certificate. */