1/* File: canny/redis_streams.pl 2 Author: Roy Ratcliffe 3 Created: Sep 24 2022 4 Purpose: Canny Redis Streams 5 6Copyright (c) 2022, Roy Ratcliffe, Northumberland, United Kingdom 7 8Permission is hereby granted, free of charge, to any person obtaining a 9copy of this software and associated documentation files (the 10"Software"), to deal in the Software without restriction, including 11without limitation the rights to use, copy, modify, merge, publish, 12distribute, sublicense, and/or sell copies of the Software, and to 13permit persons to whom the Software is furnished to do so, subject to 14the following conditions: 15 16 The above copyright notice and this permission notice shall be 17 included in all copies or substantial portions of the Software. 18 19THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 20OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 21MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. 22IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY 23CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, 24TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE 25SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 26 27*/ 28 29:- module(canny_redis_streams, 30 [ xrange/4, % +Redis,+Key,-Entries,+Options 31 xread/4, % +Redis,+Streams,-Reads,+Options 32 xread_call/5, % +Redis,+Streams,:Goal,-Fields,+Options 33 xread_call/6 % +Redis,+Streams,:Goal,?Tag,-Fields,+Options 34 ]). 35:- autoload(library(option), [option/3, option/2]). 36:- autoload(library(lists), [append/3]). 37:- autoload(library(redis), [redis/3]). 38 39:- use_module(redis). 40 41:- meta_predicate 42 xread_call( , , , , ), 43 xread_call( , , , , , ).
-
and +
respectively or in reverse if rev(true)
included in Options list;
the plus stream identifier stands for the maximum identifier, or
the newest, whereas the minus identifier stands for the oldest.
Option count(Count)
limits the number of entries to read by
Count items.
The following always unifies Entries with []
.
xrange(Server, Key, Entries, [start(+)]). xrange(Server, Key, Entries, [rev(true), start(-)]).
60xrange(Redis, Key, Entries, Options) :- 61 option(rev(Rev), Options, false), 62 rev(Rev, XRange, StartDefault, EndDefault), 63 option(start(Start), Options, StartDefault), 64 option(end(End), Options, EndDefault), 65 ( option(count(Count), Options) 66 -> Arguments = [count, Count] 67 ; Arguments = [] 68 ), 69 Command =.. [XRange, Key, Start, End|Arguments], 70 redis(Redis, Command, Entries). 71 72rev(false, xrange, -, +). 73rev(true, xrevrange, +, -).
block(Milliseconds)
specifies a non-zero blocking delay.
85xread(Redis, Streams, Reads, Options) :-
86 redis_keys_and_stream_ids(Streams, _, Keys, StreamIds),
87 append(Keys, StreamIds, Arguments___),
88 Arguments__ = [streams|Arguments___],
89 ( option(block(Block), Options)
90 -> Arguments_ = [block, Block|Arguments__]
91 ; Arguments_ = Arguments__
92 ),
93 ( option(count(Count), Options)
94 -> Arguments = [count, Count|Arguments_]
95 ; Arguments = Arguments_
96 ),
97 Command =.. [xread|Arguments],
98 redis(Redis, Command, Reads).
110xread_call(Redis, Streams, Goal, Fields, Options) :- 111 xread(Redis, Streams, Reads, Options), 112 redis_last_streams(Reads, _, Streams_), 113 xread_call_(Redis, Streams.put(Streams_), Goal, Reads, Fields, Options). 114 115xread_call_(_Redis, _Streams, Goal, Reads, Fields, _Options) :- 116 redis_stream_read(Reads, Key, StreamId, Fields), 117 call(Goal, Key, StreamId, Fields), 118 !. 119xread_call_(Redis, Streams, Goal, _Reads, Fields, Options) :- 120 ( option(threshold(Threshold), Options) 121 -> dict_pairs(Streams, _, Pairs), 122 maplist(xread_call__, Pairs, RedisTimes), 123 max_list(RedisTimes, RedisTime), 124 RedisTime < Threshold 125 ; true 126 ), 127 xread_call(Redis, Streams, Goal, Fields, Options). 128 129xread_call__(_Key-StreamId, RedisTime) :- 130 redis_stream_id(StreamId, RedisTime, _Seq). 131 132xread_call(Redis, Streams, Goal, Tag, Fields, Options) :- 133 xread(Redis, Streams, Reads, Options), 134 redis_last_streams(Reads, _, Streams_), 135 xread_call_(Redis, Streams.put(Streams_), Goal, Reads, Tag, Fields, Options). 136 137xread_call_(_Redis, _Streams, Goal, Reads, Tag, Fields, _Options) :- 138 redis_stream_read(Reads, Key, StreamId, Tag, Fields), 139 call(Goal, Key, StreamId, Tag, Fields), 140 !. 141xread_call_(Redis, Streams, Goal, _Reads, Tag, Fields, Options) :- 142 ( option(threshold(Threshold), Options) 143 -> dict_pairs(Streams, _, Pairs), 144 maplist(xread_call__, Pairs, RedisTimes), 145 max_list(RedisTimes, RedisTime), 146 RedisTime < Threshold 147 ; true 148 ), 149 xread_call(Redis, Streams, Goal, Tag, Fields, Options)