amqpï
Portable AMQP 0-9-1 (Advanced Message Queuing Protocol) client
implementation. This library uses the sockets library and supports
all backend Prolog systems supported by that library: ECLiPSe, GNU
Prolog, SICStus Prolog, SWI-Prolog, Trealla Prolog, and XVM.
https://www.rabbitmq.com/amqp-0-9-1-reference.html
AMQP 0-9-1 is a binary wire-level protocol for message-oriented middleware. It is the de facto standard for message broker interoperability and is supported by RabbitMQ, Apache Qpid, Apache ActiveMQ, and other brokers.
API documentationï
Open the ../../apis/library_index.html#amqp link in a web browser.
Loadingï
To load this library, load the loader.lgt file:
| ?- logtalk_load(amqp(loader)).
Testingï
To test this library predicates, load the tester.lgt file:
| ?- logtalk_load(amqp(tester)).
Note: Integration tests require a running AMQP 0-9-1 server (e.g., RabbitMQ). RabbitMQ AMQP listens on port 5672 by default with guest/guest credentials.
Featuresï
Full AMQP 0-9-1 protocol support with binary frame encoding/decoding
Connection management with heartbeat negotiation
Automatic reconnection with configurable retry attempts and delays
Connection pooling with automatic connection management
Multiple concurrent channels over a single connection
Exchange operations: declare, delete, bind, unbind
Queue operations: declare, delete, bind, unbind, purge
Basic messaging: publish, consume, get, ack, nack, reject
Quality of service (QoS) with prefetch settings
Transaction support: tx.select, tx.commit, tx.rollback
Publisher confirms (RabbitMQ extension)
SASL PLAIN authentication
Custom message properties and headers
Content properties: content-type, delivery-mode, priority, etc.
Usageï
Connecting to an AMQP serverï
Basic connection to RabbitMQ with default settings:
?- amqp::connect(localhost, 5672, Connection, []).
Connection with custom credentials and virtual host:
?- amqp::connect(localhost, 5672, Connection, [
username('myuser'),
password('mypassword'),
virtual_host('/myvhost'),
heartbeat(30)
]).
Connection with automatic reconnection enabled:
?- amqp::connect(localhost, 5672, Connection, [
reconnect(true),
reconnect_attempts(5),
reconnect_delay(2)
]).
This will attempt to connect up to 5 times with a 2 second delay between
attempts. If all attempts fail, an amqp_error(reconnect_failed)
error is thrown. The reconnection options are:
reconnect(Boolean)- Enable automatic reconnection (default:false)reconnect_attempts(N)- Maximum number of connection attempts (default:3)reconnect_delay(Seconds)- Delay between attempts in seconds (default:1)
Opening a channelï
AMQP operations require a channel. Open one on the connection:
?- amqp::channel_open(Connection, 1, Channel).
You can open multiple channels (with different numbers) on a single connection.
Declaring exchanges and queuesï
Declare a direct exchange:
?- amqp::exchange_declare(Channel, 'my.exchange', [
type(direct),
durable(true)
]).
Declare a queue:
?- amqp::queue_declare(Channel, 'my.queue', [
durable(true),
auto_delete(false)
]).
Declare a queue with server-generated name:
?- amqp::queue_declare(Channel, Queue, [exclusive(true)]).
% Queue will be unified with the generated name
Bind a queue to an exchange:
?- amqp::queue_bind(Channel, 'my.queue', 'my.exchange', [
routing_key('my.routing.key')
]).
Publishing messagesï
Publish a simple message:
?- amqp::basic_publish(Channel, 'my.exchange', 'Hello, World!', [
routing_key('my.routing.key')
]).
Publish with properties:
?- amqp::basic_publish(Channel, 'my.exchange', '{"data": "json"}', [
routing_key('my.routing.key'),
content_type('application/json'),
delivery_mode(2), % persistent
correlation_id('abc123'),
reply_to('reply.queue')
]).
Publish with custom headers:
?- amqp::basic_publish(Channel, 'my.exchange', 'Message', [
routing_key('my.key'),
headers([
'x-custom-header'-longstr('value'),
'x-priority'-int(5)
])
]).
Consuming messagesï
Start a consumer:
?- amqp::basic_consume(Channel, 'my.queue', [
consumer_tag('my-consumer'),
no_ack(false)
]).
Receive messages:
?- amqp::receive(Channel, Message, [timeout(5000)]).
Extract message data:
?- amqp::message_body(Message, Body),
amqp::message_delivery_tag(Message, DeliveryTag),
amqp::message_property(Message, content_type, ContentType).
Acknowledge a message:
?- amqp::basic_ack(Channel, DeliveryTag, []).
Reject a message:
?- amqp::basic_reject(Channel, DeliveryTag, [requeue(true)]).
Synchronous getï
Get a single message synchronously:
?- amqp::basic_get(Channel, 'my.queue', [no_ack(false)]).
Quality of Serviceï
Set prefetch count to limit unacknowledged messages:
?- amqp::basic_qos(Channel, [prefetch_count(10)]).
Transactionsï
Enable transactions on a channel:
?- amqp::tx_select(Channel).
Publish messages within a transaction:
?- amqp::basic_publish(Channel, 'exchange', 'Msg1', [routing_key('key')]),
amqp::basic_publish(Channel, 'exchange', 'Msg2', [routing_key('key')]),
amqp::tx_commit(Channel).
Rollback a transaction:
?- amqp::tx_rollback(Channel).
Publisher Confirms (RabbitMQ extension)ï
Enable publisher confirms:
?- amqp::confirm_select(Channel).
Closing connectionsï
Close a channel:
?- amqp::channel_close(Channel).
Close the connection:
?- amqp::close(Connection).
Connection Poolingï
The library provides connection pooling through the amqp_pool
category. To create a connection pool, define an object that imports
this category:
Defining a poolï
:- object(my_pool,
imports(amqp_pool)).
:- end_object.
Initializing the poolï
Initialize the pool with configuration options:
?- my_pool::initialize([
host(localhost),
port(5672),
min_size(2),
max_size(10),
connection_options([
username('guest'),
password('guest'),
virtual_host('/')
])
]).
Pool configuration options:
host(Host)- AMQP server hostname (default:localhost)port(Port)- AMQP server port (default:5672)min_size(N)- Minimum connections to maintain (default:1)max_size(N)- Maximum connections allowed (default:10)connection_options(Options)- Options passed toamqp::connect/4(default:[])
Acquiring and releasing connectionsï
Manually acquire and release connections:
?- my_pool::acquire(Connection),
amqp::channel_open(Connection, 1, Channel),
% ... use the channel ...
amqp::channel_close(Channel),
my_pool::release(Connection).
Using with_connection/1ï
Use with_connection/1 for automatic connection management:
?- my_pool::with_connection(do_work).
do_work(Connection) :-
amqp::channel_open(Connection, 1, Channel),
amqp::basic_publish(Channel, '', 'Hello!', [routing_key('my.queue')]),
amqp::channel_close(Channel).
The connection is automatically released even if the goal fails or throws an exception.
Pool statisticsï
Get pool statistics:
?- my_pool::stats(stats(Available, InUse, Total, MinSize, MaxSize)).
Resizing the poolï
Resize the pool at runtime:
?- my_pool::resize(5, 20).
Destroying the poolï
Close all connections and clear pool state:
?- my_pool::destroy.
Creating pools dynamicallyï
Pools can also be created at runtime using create_object/4:
?- create_object(dynamic_pool, [imports(amqp_pool)], [], []),
dynamic_pool::initialize([host(localhost), port(5672)]).
Binary Frame Encodingï
AMQP 0-9-1 is a binary protocol. The library provides low-level encoding and decoding predicates for working with raw frames:
Frame structureï
An AMQP frame consists of:
Type (1 byte): 1=method, 2=header, 3=body, 8=heartbeat
Channel (2 bytes): Channel number (0 for connection frames)
Size (4 bytes): Payload size
Payload (Size bytes): Frame-specific data
Frame end (1 byte): 0xCE marker
Encoding/decoding framesï
Encode a frame to bytes:
?- amqp::encode_frame(Frame, Bytes).
Decode bytes to a frame:
?- amqp::decode_frame(Bytes, Frame).
Data Typesï
The library handles AMQP data types automatically:
octet: 8-bit unsigned integer
short: 16-bit unsigned integer (big-endian)
long: 32-bit unsigned integer (big-endian)
longlong: 64-bit unsigned integer (big-endian)
shortstr: Short string (length <= 255)
longstr: Long string
table: Field table (key-value pairs)
array: Field array
Field values in tables use type tags:
bool(true/false)- Booleanbyte(V)- Signed 8-bitshort(V)- Signed 16-bitint(V)- Signed 32-bitlong(V)- Signed 64-bitfloat(V)- 32-bit floatdouble(V)- 64-bit floatlongstr(V)- Long stringtable(Pairs)- Nested tablearray(Values)- Arraytimestamp(V)- Timestampvoid- No value
Float/Double Encoding/Decodingï
The library implements proper IEEE 754 encoding and decoding for floating-point values:
Float (32-bit IEEE 754 single precision):
Format: 1 sign bit + 8 exponent bits (bias 127) + 23 mantissa bits
Encoded as 4 bytes in big-endian order
Used for
float(Value)field values
Double (64-bit IEEE 754 double precision):
Format: 1 sign bit + 11 exponent bits (bias 1023) + 52 mantissa bits
Encoded as 8 bytes in big-endian order
Used for
double(Value)field values
Special values representation:
The library represents IEEE 754 special values using Prolog compound terms:
@infinity- Positive infinity@negative_infinity- Negative infinity@not_a_number- NaN (Not a Number)
These special values are automatically encoded to and decoded from their standard IEEE 754 binary representations.
Known limitation:
Most Prolog backends cannot distinguish between -0.0 and 0.0
when comparing floating-point values. While the library correctly
encodes and decodes both positive and negative zero according to IEEE
754, these values will typically compare as equal in Prolog arithmetic
operations. This limitation is unlikely to affect typical AMQP messaging
applications.
Error Handlingï
The library throws structured errors:
amqp_error(connection_failed)- TCP connection failedamqp_error(auth_failed)- Authentication failedamqp_error(protocol_error(Msg))- Protocol violationamqp_error(channel_error(Msg))- Channel-level erroramqp_error(exchange_error(Msg))- Exchange operation failedamqp_error(queue_error(Msg))- Queue operation failedamqp_error(basic_error(Msg))- Basic operation failedamqp_error(tx_error(Msg))- Transaction error
Comparison with STOMPï
AMQP 0-9-1 and STOMP are both messaging protocols, but differ significantly:
Feature |
AMQP 0-9-1 |
STOMP |
|---|---|---|
Protocol type |
Binary |
Text |
Complexity |
High |
Low |
Exchange types |
Multiple |
Broker-dependent |
Routing |
Flexible |
Simple |
Transactions |
Native |
Native |
QoS |
Native prefetch |
Limited |
Performance |
Higher |
Lower |
Use AMQP when you need:
Fine-grained routing control
High performance
Advanced message patterns
Exchange-based routing
Use STOMP when you need:
Simple text-based protocol
Easy debugging
Protocol simplicity
Future Workï
Logtalk protocols for messaging patterns (request/reply, pub/sub, etc.)
Categories for common message transformations
Async message handlers
Known Limitations and Issuesï
SSL/TLS connections not yet supported (use stunnel or similar)
Heartbeat sending must be done manually via
send_heartbeat/1Most Prolog backends cannot distinguish between
-0.0and0.0(see the âFloat/Double Encoding/Decodingâ section above for details)GNU Prolog support is partial as it doesnât currently support null characters in atoms
AMQP 1.0 vs AMQP 0-9-1ï
This library implements AMQP 0-9-1 only. Despite the similar name, AMQP 1.0 is a fundamentally different protocol and is not supported by this library. The two versions are not wire-compatible and have different conceptual models:
Aspect |
AMQP 0-9-1 |
AMQP 1.0 |
|---|---|---|
Design |
Broker-centric |
Peer-to-peer |
Routing |
Exchanges + queues |
Link-based addressing |
Frame structure |
Custom binary framing |
Layered performatives |
Data encoding |
Custom type system |
CBOR-like encoding |
Key differences:
AMQP 0-9-1 uses exchanges, bindings, and queues as core protocol concepts
AMQP 1.0 abstracts these as broker-specific ânodesâ and uses links
Frame encoding, handshake, and message format are completely different
Minimal code could be shared between the two implementations
AMQP 1.0 support would require a separate library with its own distinct API reflecting the link-based model, rather than the exchange/queue model of AMQP 0-9-1.
Practical note: RabbitMQ and most message brokers continue to primarily use AMQP 0-9-1, making this library suitable for the vast majority of use cases.