Did you know ... Search Documentation:
Pack logtalk -- logtalk-3.98.0/library/stomp/NOTES.md

This file is part of Logtalk https://logtalk.org/ SPDX-FileCopyrightText: 1998-2026 Paulo Moura <pmoura@logtalk.org> SPDX-License-Identifier: Apache-2.0

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

stomp

Portable STOMP 1.2 (Simple Text Orientated Messaging Protocol) client implementation. This library uses the sockets library and supports all backend Prolog systems supported by that library: ECLiPSe, GNU Prolog, SICStus Prolog, SWI-Prolog, Trealla Prolog, and XVM.

Protocol Version

This library implements the STOMP 1.2 protocol specification:

https://stomp.github.io/stomp-specification-1.2.html

STOMP 1.2 is backwards compatible with STOMP 1.1 with minor differences in frame line endings and message acknowledgment.

Features

  • Full STOMP 1.2 protocol support
  • Heartbeat negotiation and automatic heartbeat handling
  • Multiple concurrent subscriptions with unique IDs
  • All acknowledgment modes: auto, client, client-individual
  • Transaction support: BEGIN, COMMIT, ABORT
  • Graceful disconnect with receipt confirmation
  • Proper frame encoding/decoding with header escaping
  • User-defined headers for messages

API documentation

Open the [../../apis/library_index.html#stomp](../../apis/library_index.html#stomp) link in a web browser.

Loading

To load this library, load the loader.lgt file:

| ?- logtalk_load(stomp(loader)).

Testing

To test this library predicates, load the tester.lgt file:

| ?- logtalk_load(stomp(tester)).

Note: Integration tests require a running STOMP server (e.g., RabbitMQ with the STOMP plugin, ActiveMQ, or Apache Apollo). They are usually run using RabbitMQ with the STOMP plugin. As RabbitMQ uses / as its default virtual host, the tests assume this and use the host('/') option when calling the connect/4 predicate (otherwise the connection would fail with a "Virtual host 'localhost' access denied" error message). To enable the RabbitMQ's STOMP plugin:

$ rabbitmq-plugins enable rabbitmq_stomp

RabbitMQ STOMP listens on port 61613 by default.

Usage

Connecting to a STOMP server

Basic connection assuming RabbitMQ, which uses host('/') as shown:

?- stomp::connect(localhost, 61613, Connection, [host('/')]).

Connection with authentication:

?- stomp::connect(localhost, 61613, Connection, [
       login('user'),
       passcode('password')
   ]).

Connection with heartbeat:

?- stomp::connect(localhost, 61613, Connection, [
       heartbeat(10000, 10000)
   ]).

Connection with virtual host:

?- stomp::connect(localhost, 61613, Connection, [
       host('/my-vhost')
   ]).

Sending messages

Send a simple text message:

?- stomp::send(Connection, '/queue/test', 'Hello, World!', []).

Send a message with content type:

?- stomp::send(Connection, '/queue/test', '{"key":"value"}', [
       content_type('application/json')
   ]).

Send with custom headers:

?- stomp::send(Connection, '/queue/test', 'Message', [
       header('priority', '9'),
       header('persistent', 'true')
   ]).

Send within a transaction:

?- stomp::begin_transaction(Connection, 'tx-001', []),
   stomp::send(Connection, '/queue/test', 'Message 1', [transaction('tx-001')]),
   stomp::send(Connection, '/queue/test', 'Message 2', [transaction('tx-001')]),
   stomp::commit_transaction(Connection, 'tx-001', []).

Subscribing to destinations

Subscribe with auto acknowledgment (default):

?- stomp::subscribe(Connection, '/queue/test', 'sub-0', []).

Subscribe with client acknowledgment:

?- stomp::subscribe(Connection, '/queue/test', 'sub-0', [
       ack(client)
   ]).

Subscribe with client-individual acknowledgment:

?- stomp::subscribe(Connection, '/topic/events', 'sub-1', [
       ack(client_individual)
   ]).

Receiving messages

Poll for a message (non-blocking):

?- stomp::receive(Connection, Frame, [timeout(0)]).

Wait for a message with timeout (milliseconds):

?- stomp::receive(Connection, Frame, [timeout(5000)]).

Process received MESSAGE frame:

?- stomp::receive(Connection, Frame, []),
   stomp::frame_command(Frame, 'MESSAGE'),
   stomp::frame_header(Frame, 'destination', Destination),
   stomp::frame_header(Frame, 'message-id', MessageId),
   stomp::frame_body(Frame, Body).

Acknowledging messages

For client or client-individual acknowledgment modes:

?- stomp::receive(Connection, Frame, []),
   stomp::frame_header(Frame, 'ack', AckId),
   stomp::ack(Connection, AckId, []).

Negative acknowledgment:

?- stomp::nack(Connection, AckId, []).

Acknowledgment within a transaction:

?- stomp::ack(Connection, AckId, [transaction('tx-001')]).

Unsubscribing

?- stomp::unsubscribe(Connection, 'sub-0', []).

Transactions

Begin a transaction:

?- stomp::begin_transaction(Connection, 'tx-001', []).

Commit a transaction:

?- stomp::commit_transaction(Connection, 'tx-001', []).

Abort a transaction:

?- stomp::abort_transaction(Connection, 'tx-001', []).

Disconnecting

Graceful disconnect with receipt confirmation:

?- stomp::disconnect(Connection, []).

Heartbeat handling

Send a heartbeat (if needed manually):

?- stomp::send_heartbeat(Connection).

Check connection health:

?- stomp::connection_alive(Connection).

Working with frames

Get frame command:

?- stomp::frame_command(Frame, Command).

Get frame header value:

?- stomp::frame_header(Frame, HeaderName, Value).

Get frame body:

?- stomp::frame_body(Frame, Body).

Get all frame headers:

?- stomp::frame_headers(Frame, Headers).

API Summary

Connection management

  • connect(+Host, +Port, -Connection, +Options) - Connect to server
  • disconnect(+Connection, +Options) - Disconnect from server
  • connection_alive(+Connection) - Check if connection is alive

Messaging

  • send(+Connection, +Destination, +Body, +Options) - Send message
  • subscribe(+Connection, +Destination, +Id, +Options) - Subscribe
  • unsubscribe(+Connection, +Id, +Options) - Unsubscribe
  • receive(+Connection, -Frame, +Options) - Receive frame

Acknowledgment

  • ack(+Connection, +Id, +Options) - Acknowledge message
  • nack(+Connection, +Id, +Options) - Negative acknowledge

Transactions

  • begin_transaction(+Connection, +TransactionId, +Options) - Begin
  • commit_transaction(+Connection, +TransactionId, +Options) - Commit
  • abort_transaction(+Connection, +TransactionId, +Options) - Abort

Heartbeat

  • send_heartbeat(+Connection) - Send heartbeat EOL

Frame inspection

  • frame_command(+Frame, -Command) - Get frame command
  • frame_header(+Frame, +Name, -Value) - Get header value
  • frame_headers(+Frame, -Headers) - Get all headers
  • frame_body(+Frame, -Body) - Get frame body

Connection Options

  • login(Login) - Username for authentication
  • passcode(Passcode) - Password for authentication
  • host(VirtualHost) - Virtual host name (default: from Host parameter)
  • heartbeat(ClientMs, ServerMs) - Heartbeat timing in milliseconds

Send Options

  • content_type(MimeType) - MIME type of the body
  • content_length(Length) - Byte length of body (auto-calculated if omitted)
  • transaction(TransactionId) - Include in transaction
  • receipt(ReceiptId) - Request receipt from server
  • header(Name, Value) - Add custom header

Subscribe Options

  • ack(Mode) - Acknowledgment mode: auto, client, or client_individual

Receive Options

  • timeout(Milliseconds) - Wait timeout; 0 for non-blocking, -1 for infinite

Error Handling

Most predicates throw error(stomp_error(Reason), Context) on failure. Common error reasons:

  • connection_failed - Could not establish TCP connection
  • protocol_error(Message) - Server sent ERROR frame
  • not_connected - Connection was closed
  • invalid_frame - Malformed frame received
  • timeout - Operation timed out