| Did you know ... | Search Documentation: |
| Pack logtalk -- logtalk-3.98.0/library/amqp/NOTES.md |
This file is part of Logtalk https://logtalk.org/ SPDX-FileCopyrightText: 1998-2026 Paulo Moura <pmoura@logtalk.org> SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
amqp
Portable AMQP 0-9-1 (Advanced Message Queuing Protocol) client
implementation. This library uses the sockets library and supports all
backend Prolog systems supported by that library: ECLiPSe, GNU Prolog,
SICStus Prolog, SWI-Prolog, Trealla Prolog, and XVM.
https://www.rabbitmq.com/amqp-0-9-1-reference.html
AMQP 0-9-1 is a binary wire-level protocol for message-oriented middleware. It is the de facto standard for message broker interoperability and is supported by RabbitMQ, Apache Qpid, Apache ActiveMQ, and other brokers.
Open the [../../apis/library_index.html#amqp](../../apis/library_index.html#amqp) link in a web browser.
To load this library, load the loader.lgt file:
| ?- logtalk_load(amqp(loader)).
To test this library predicates, load the tester.lgt file:
| ?- logtalk_load(amqp(tester)).
Note: Integration tests require a running AMQP 0-9-1 server (e.g., RabbitMQ). RabbitMQ AMQP listens on port 5672 by default with guest/guest credentials.
Basic connection to RabbitMQ with default settings:
?- amqp::connect(localhost, 5672, Connection, []).
Connection with custom credentials and virtual host:
?- amqp::connect(localhost, 5672, Connection, [
username('myuser'),
password('mypassword'),
virtual_host('/myvhost'),
heartbeat(30)
]).
Connection with automatic reconnection enabled:
?- amqp::connect(localhost, 5672, Connection, [
reconnect(true),
reconnect_attempts(5),
reconnect_delay(2)
]).
This will attempt to connect up to 5 times with a 2 second delay between
attempts. If all attempts fail, an amqp_error(reconnect_failed) error
is thrown. The reconnection options are:
reconnect(Boolean) - Enable automatic reconnection (default: false)reconnect_attempts(N) - Maximum number of connection attempts (default: 3)reconnect_delay(Seconds) - Delay between attempts in seconds (default: 1)AMQP operations require a channel. Open one on the connection:
?- amqp::channel_open(Connection, 1, Channel).
You can open multiple channels (with different numbers) on a single connection.
Declare a direct exchange:
?- amqp::exchange_declare(Channel, 'my.exchange', [
type(direct),
durable(true)
]).
Declare a queue:
?- amqp::queue_declare(Channel, 'my.queue', [
durable(true),
auto_delete(false)
]).
Declare a queue with server-generated name:
?- amqp::queue_declare(Channel, Queue, [exclusive(true)]). % Queue will be unified with the generated name
Bind a queue to an exchange:
?- amqp::queue_bind(Channel, 'my.queue', 'my.exchange', [
routing_key('my.routing.key')
]).
Publish a simple message:
?- amqp::basic_publish(Channel, 'my.exchange', 'Hello, World!', [
routing_key('my.routing.key')
]).
Publish with properties:
?- amqp::basic_publish(Channel, 'my.exchange', '{"data": "json"}', [
routing_key('my.routing.key'),
content_type('application/json'),
delivery_mode(2), % persistent
correlation_id('abc123'),
reply_to('reply.queue')
]).
Publish with custom headers:
?- amqp::basic_publish(Channel, 'my.exchange', 'Message', [
routing_key('my.key'),
headers([
'x-custom-header'-longstr('value'),
'x-priority'-int(5)
])
]).
Start a consumer:
?- amqp::basic_consume(Channel, 'my.queue', [
consumer_tag('my-consumer'),
no_ack(false)
]).
Receive messages:
?- amqp::receive(Channel, Message, [timeout(5000)]).
Extract message data:
?- amqp::message_body(Message, Body), amqp::message_delivery_tag(Message, DeliveryTag), amqp::message_property(Message, content_type, ContentType).
Acknowledge a message:
?- amqp::basic_ack(Channel, DeliveryTag, []).
Reject a message:
?- amqp::basic_reject(Channel, DeliveryTag, [requeue(true)]).
Get a single message synchronously:
?- amqp::basic_get(Channel, 'my.queue', [no_ack(false)]).
Set prefetch count to limit unacknowledged messages:
?- amqp::basic_qos(Channel, [prefetch_count(10)]).
Enable transactions on a channel:
?- amqp::tx_select(Channel).
Publish messages within a transaction:
?- amqp::basic_publish(Channel, 'exchange', 'Msg1', [routing_key('key')]),
amqp::basic_publish(Channel, 'exchange', 'Msg2', [routing_key('key')]),
amqp::tx_commit(Channel).
Rollback a transaction:
?- amqp::tx_rollback(Channel).
Enable publisher confirms:
?- amqp::confirm_select(Channel).
Close a channel:
?- amqp::channel_close(Channel).
Close the connection:
?- amqp::close(Connection).
The library provides connection pooling through the amqp_pool category.
To create a connection pool, define an object that imports this category:
:- object(my_pool,
imports(amqp_pool)).
:- end_object.
Initialize the pool with configuration options:
?- my_pool::initialize([
host(localhost),
port(5672),
min_size(2),
max_size(10),
connection_options([
username('guest'),
password('guest'),
virtual_host('/')
])
]).
Pool configuration options:
host(Host) - AMQP server hostname (default: localhost)port(Port) - AMQP server port (default: 5672)min_size(N) - Minimum connections to maintain (default: 1)max_size(N) - Maximum connections allowed (default: 10)connection_options(Options) - Options passed to `amqp::connect/4` (default: [])Manually acquire and release connections:
?- my_pool::acquire(Connection), amqp::channel_open(Connection, 1, Channel), % ... use the channel ... amqp::channel_close(Channel), my_pool::release(Connection).
Use with_connection/1 for automatic connection management:
?- my_pool::with_connection(do_work).
do_work(Connection) :-
amqp::channel_open(Connection, 1, Channel),
amqp::basic_publish(Channel, '', 'Hello!', [routing_key('my.queue')]),
amqp::channel_close(Channel).
The connection is automatically released even if the goal fails or throws an exception.
Get pool statistics:
?- my_pool::stats(stats(Available, InUse, Total, MinSize, MaxSize)).
Resize the pool at runtime:
?- my_pool::resize(5, 20).
Close all connections and clear pool state:
?- my_pool::destroy.
Pools can also be created at runtime using create_object/4:
?- create_object(dynamic_pool, [imports(amqp_pool)], [], []), dynamic_pool::initialize([host(localhost), port(5672)]).
AMQP 0-9-1 is a binary protocol. The library provides low-level encoding and decoding predicates for working with raw frames:
An AMQP frame consists of:
Encode a frame to bytes:
?- amqp::encode_frame(Frame, Bytes).
Decode bytes to a frame:
?- amqp::decode_frame(Bytes, Frame).
The library handles AMQP data types automatically:
bool(true/false) - Booleanbyte(V) - Signed 8-bitshort(V) - Signed 16-bitint(V) - Signed 32-bitlong(V) - Signed 64-bitfloat(V) - 32-bit floatdouble(V) - 64-bit floatlongstr(V) - Long stringtable(Pairs) - Nested tablearray(Values) - Arraytimestamp(V) - Timestampvoid - No valueThe library implements proper IEEE 754 encoding and decoding for floating-point values:
Float (32-bit IEEE 754 single precision):
float(Value) field values
Double (64-bit IEEE 754 double precision):double(Value) field values
Special values representation:
The library represents IEEE 754 special values using Prolog compound terms:
Known limitation:
Most Prolog backends cannot distinguish between -0.0 and 0.0 when comparing
floating-point values. While the library correctly encodes and decodes both
positive and negative zero according to IEEE 754, these values will typically
compare as equal in Prolog arithmetic operations. This limitation is unlikely
to affect typical AMQP messaging applications.
The library throws structured errors:
amqp_error(connection_failed) - TCP connection failedamqp_error(auth_failed) - Authentication failedamqp_error(protocol_error(Msg)) - Protocol violationamqp_error(channel_error(Msg)) - Channel-level erroramqp_error(exchange_error(Msg)) - Exchange operation failedamqp_error(queue_error(Msg)) - Queue operation failedamqp_error(basic_error(Msg)) - Basic operation failedamqp_error(tx_error(Msg)) - Transaction errorAMQP 0-9-1 and STOMP are both messaging protocols, but differ significantly:
| Feature | AMQP 0-9-1 | STOMP |
|---|---|---|
| Protocol type | Binary | Text |
| Complexity | High | Low |
| Exchange types | Multiple | Broker-dependent |
| Routing | Flexible | Simple |
| Transactions | Native | Native |
| QoS | Native prefetch | Limited |
| Performance | Higher | Lower |
Use AMQP when you need:
-0.0 and 0.0 (see the "Float/Double Encoding/Decoding" section above for details)This library implements AMQP 0-9-1 only. Despite the similar name, AMQP 1.0 is a fundamentally different protocol and is not supported by this library. The two versions are not wire-compatible and have different conceptual models:
| Aspect | AMQP 0-9-1 | AMQP 1.0 |
|---|---|---|
| Design | Broker-centric | Peer-to-peer |
| Routing | Exchanges + queues | Link-based addressing |
| Frame structure | Custom binary framing | Layered performatives |
| Data encoding | Custom type system | CBOR-like encoding |
Key differences:
Practical note: RabbitMQ and most message brokers continue to primarily use AMQP 0-9-1, making this library suitable for the vast majority of use cases.