| Did you know ... | Search Documentation: |
| Pack logtalk -- logtalk-3.98.0/docs/handbook/_sources/libraries/stomp.rst.txt |
.. _library_stomp:
stomp
Portable STOMP 1.2 (Simple Text Orientated Messaging Protocol) client
implementation. This library uses the sockets library and supports
all backend Prolog systems supported by that library: ECLiPSe, GNU
Prolog, SICStus Prolog, SWI-Prolog, Trealla Prolog, and XVM.
This library implements the STOMP 1.2 protocol specification:
https://stomp.github.io/stomp-specification-1.2.html
STOMP 1.2 is backwards compatible with STOMP 1.1 with minor differences in frame line endings and message acknowledgment.
auto, client, client-individualBEGIN, COMMIT, ABORTOpen the `../../apis/library_index.html#stomp <../../apis/library_index.html#stomp>`__ link in a web browser.
To load this library, load the loader.lgt file:
::
| ?- logtalk_load(stomp(loader)).
To test this library predicates, load the tester.lgt file:
::
| ?- logtalk_load(stomp(tester)).
Note: Integration tests require a running STOMP server (e.g., RabbitMQ
with the STOMP plugin, ActiveMQ, or Apache Apollo). They are usually run
using RabbitMQ with the STOMP plugin. As RabbitMQ uses / as its
default virtual host, the tests assume this and use the host('/')
option when calling the connect/4 predicate (otherwise the
connection would fail with a
"Virtual host 'localhost' access denied" error message). To enable
the RabbitMQ's STOMP plugin:
::
$ rabbitmq-plugins enable rabbitmq_stomp
RabbitMQ STOMP listens on port 61613 by default.
Connecting to a STOMP server ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Basic connection assuming RabbitMQ, which uses host('/') as shown:
::
?- stomp::connect(localhost, 61613, Connection, [host('/')]).
Connection with authentication:
::
?- stomp::connect(localhost, 61613, Connection, [
login('user'),
passcode('password')
]).
Connection with heartbeat:
::
?- stomp::connect(localhost, 61613, Connection, [
heartbeat(10000, 10000)
]).
Connection with virtual host:
::
?- stomp::connect(localhost, 61613, Connection, [
host('/my-vhost')
]).
Sending messages
Send a simple text message:
::
?- stomp::send(Connection, '/queue/test', 'Hello, World!', []).
Send a message with content type:
::
?- stomp::send(Connection, '/queue/test', '{"key":"value"}', [
content_type('application/json')
]).
Send with custom headers:
::
?- stomp::send(Connection, '/queue/test', 'Message', [
header('priority', '9'),
header('persistent', 'true')
]).
Send within a transaction:
::
?- stomp::begin_transaction(Connection, 'tx-001', []),
stomp::send(Connection, '/queue/test', 'Message 1', [transaction('tx-001')]),
stomp::send(Connection, '/queue/test', 'Message 2', [transaction('tx-001')]),
stomp::commit_transaction(Connection, 'tx-001', []).
Subscribing to destinations
~~~~~~~~~~~~~~~~~~~~~~~~~~~
Subscribe with auto acknowledgment (default):
::
?- stomp::subscribe(Connection, '/queue/test', 'sub-0', []).
Subscribe with client acknowledgment:
::
?- stomp::subscribe(Connection, '/queue/test', 'sub-0', [
ack(client)
]).
Subscribe with client-individual acknowledgment:
::
?- stomp::subscribe(Connection, '/topic/events', 'sub-1', [
ack(client_individual)
]).
Receiving messages
~~~~~~~~~~~~~~~~~~
Poll for a message (non-blocking):
::
?- stomp::receive(Connection, Frame, [timeout(0)]).
Wait for a message with timeout (milliseconds):
::
?- stomp::receive(Connection, Frame, [timeout(5000)]).
Process received MESSAGE frame:
::
?- stomp::receive(Connection, Frame, []),
stomp::frame_command(Frame, 'MESSAGE'),
stomp::frame_header(Frame, 'destination', Destination),
stomp::frame_header(Frame, 'message-id', MessageId),
stomp::frame_body(Frame, Body).
Acknowledging messages
~~~~~~~~~~~~~~~~~~~~~~
For ``client`` or ``client-individual`` acknowledgment modes:
::
?- stomp::receive(Connection, Frame, []),
stomp::frame_header(Frame, 'ack', AckId),
stomp::ack(Connection, AckId, []).
Negative acknowledgment:
::
?- stomp::nack(Connection, AckId, []).
Acknowledgment within a transaction:
::
?- stomp::ack(Connection, AckId, [transaction('tx-001')]).
Unsubscribing
~~~~~~~~~~~~~
::
?- stomp::unsubscribe(Connection, 'sub-0', []).
Transactions
~~~~~~~~~~~~
Begin a transaction:
::
?- stomp::begin_transaction(Connection, 'tx-001', []).
Commit a transaction:
::
?- stomp::commit_transaction(Connection, 'tx-001', []).
Abort a transaction:
::
?- stomp::abort_transaction(Connection, 'tx-001', []).
Disconnecting
~~~~~~~~~~~~~
Graceful disconnect with receipt confirmation:
::
?- stomp::disconnect(Connection, []).
Heartbeat handling
~~~~~~~~~~~~~~~~~~
Send a heartbeat (if needed manually):
::
?- stomp::send_heartbeat(Connection).
Check connection health:
::
?- stomp::connection_alive(Connection).
Working with frames
~~~~~~~~~~~~~~~~~~~
Get frame command:
::
?- stomp::frame_command(Frame, Command).
Get frame header value:
::
?- stomp::frame_header(Frame, HeaderName, Value).
Get frame body:
::
?- stomp::frame_body(Frame, Body).
Get all frame headers:
::
?- stomp::frame_headers(Frame, Headers).
API Summary
-----------
Connection management
~~~~~~~~~~~~~~~~~~~~~
- ``connect(+Host, +Port, -Connection, +Options)`` - Connect to server
- ``disconnect(+Connection, +Options)`` - Disconnect from server
- ``connection_alive(+Connection)`` - Check if connection is alive
Messaging
~~~~~~~~~
- ``send(+Connection, +Destination, +Body, +Options)`` - Send message
- ``subscribe(+Connection, +Destination, +Id, +Options)`` - Subscribe
- ``unsubscribe(+Connection, +Id, +Options)`` - Unsubscribe
- ``receive(+Connection, -Frame, +Options)`` - Receive frame
Acknowledgment
~~~~~~~~~~~~~~
- ``ack(+Connection, +Id, +Options)`` - Acknowledge message
- ``nack(+Connection, +Id, +Options)`` - Negative acknowledge
.. _transactions-1:
Transactions
~~~~~~~~~~~~
- ``begin_transaction(+Connection, +TransactionId, +Options)`` - Begin
- ``commit_transaction(+Connection, +TransactionId, +Options)`` - Commit
- ``abort_transaction(+Connection, +TransactionId, +Options)`` - Abort
Heartbeat
~~~~~~~~~
- ``send_heartbeat(+Connection)`` - Send heartbeat EOL
Frame inspection
frame_command(+Frame, -Command) - Get frame commandframe_header(+Frame, +Name, -Value) - Get header valueframe_headers(+Frame, -Headers) - Get all headersframe_body(+Frame, -Body) - Get frame bodylogin(Login) - Username for authenticationpasscode(Passcode) - Password for authenticationhost(VirtualHost) - Virtual host name (default: from Host
parameter)heartbeat(ClientMs, ServerMs) - Heartbeat timing in millisecondscontent_type(MimeType) - MIME type of the bodycontent_length(Length) - Byte length of body (auto-calculated if
omitted)transaction(TransactionId) - Include in transactionreceipt(ReceiptId) - Request receipt from serverheader(Name, Value) - Add custom headerack(Mode) - Acknowledgment mode: auto, client, or
client_individualtimeout(Milliseconds) - Wait timeout; 0 for non-blocking, -1 for
infinite
Most predicates throw error(stomp_error(Reason), Context) on
failure. Common error reasons:
connection_failed - Could not establish TCP connectionprotocol_error(Message) - Server sent ERROR framenot_connected - Connection was closedinvalid_frame - Malformed frame receivedtimeout - Operation timed out