Posted by Adam at April 10, 2018
docker docker-compose redis-unstable github streams typescript rxjs angularThis article is an exploration of as-yet-unreleased Redis Streams which unlock some powerful use cases. We’ll leverage Streams to build an end to end example of real-time chat rooms (think Hipchat, Slack or IRC).
Check it out on github: https://github.com/aprice-/redisstreamchat
Redis Streams are a new data structure being developed for Redis that is all about time series data. They are similar to Redis Lists but with two major differences:
A stream entry’s Timestamp ID includes a sequence (i.e. 1523418166062-0
) so the order of coincident events is preserved.
You can query a time range of stream entries and add new entries at a specified time (or the current time).
You can also block to wait for new events like with list blocking operations (BLPOP
, BRPOP
, BRPOPLPUSH
).
Adding events
XADD <key> [MAXLEN <max-length>] <*/id> <field> <value>...
Adds an event (field
s and value
s) to a stream as the latest (*) or at a given id
with an optional max-length
enforcing a “capped” stream.
Reading events
XRANGE <key> <start/-> <end/+> [COUNT <count>]
Get multiple events from a stream in ascending order from start
id or the beginning (-) to end
id or the end (+) up to a maximum count
.
XREVRANGE <key> <end/+> <start/-> [COUNT <count>]
Get multiple events from a stream in descending order from end
id or the end (+) to start
id or the beginning (-) up to a maximum count
.
XREAD BLOCK <timeout> STREAMS <key>... <$/id>...
Blocks until an event is available on any of the given streams (by key
) starting now ($) or from id
or timeout
is exceeded.
XLEN <key>
Get a count of the events in a stream.
With the goal of creating a real-time chat system supporting multiple rooms, we’ll use a stream to model each individual chat room’s messages and a set for its members.
For example, the Redis commands for modeling a chat room called “welcome” might look like:
// track join notification
> XADD channel:welcome:log * message "adam has joined the channel."
1523499744387-0
// add to members list
> SADD channel:welcome:members "adam"
(integer) 1
// get the list of channel members
> SMEMBERS channel:welcome:members
1) "adam"
// send chat message
> XADD channel:welcome:log * message "Hi everyone!" userName "adam"
1523499778907-0
// read new messages
> XREAD BLOCK 5000 STREAMS channel:welcome:log $
1) 1) "channel:welcome:log"
2) 1) 1) 1523499832690-0
2) 1) "message"
2) "Hi everyone!"
3) "userName"
4) "adam"
(1.67s)
// get latest 100 messages
> XREVRANGE channel:welcome:log + - COUNT 100
1) 1) 1523499832690-0
2) 1) "message"
2) "Hi everyone!"
3) "userName"
4) "adam"
2) 1) 1523499744387-0
2) 1) "message"
2) "adam has joined the channel."
With our Redis data storage strategy settled, next we need to build out the rest of a real-time web application architecture using Node.js.
To enable a real-time experience, we’ll use a combination of event-driven frameworks and technologies.
RxJS is a library for reactive programming in JavaScript.
RxJS introduces the Observable
concept to work with stream concepts and is similar to a Promise
.
As a refresher, a Promise
models an asynchronous operation that may not have yet completed and notifies you of two states: completion or error (via then
and catch
).
On the other hand, an Observable
models an asynchronous stream and notifies you of three states: the occurrence of a new event, and completion or error (like a Promise
).
This allows an Observable
to be long-lived and propagate multiple events over time.
Just like with a Promise
there are many interesting ways to combine them to achieve a desired outcome.
Angular also primarily uses Observable
rather than Promise
for asynchronous operations.
Polling a Redis Stream is accomplished by repeatedly using the XREAD
command to block and wait for new messages.
When a message is received, you handle it and note its ID to resume reading from the Stream from where you left off.
const pollCache = {}; // storage for a shared cache of polls
function pollNewMessages (channelName) {
if (pollCache[channelName]) return pollCache[channelName]; // use the cache
let seenId = null; // variable to hold our position in the stream
let key = getMessagesKey(channelName); // derive the key (i.e. channel:welcome:log)
let connection = redis.duplicate(); // create a new connection for polling this stream
return pollCache[channelName] = Rx.Observable.of(null) // return and cache an Observable
.expand(() => Rx.Observable.fromPromise(connection.xread(10000, {key, id: seenId})))
// expand calls the provided function in a loop
// XREAD will be called on our stream key with the latest ID repeatedly
.filter(streams => streams) // do not process empty results (i.e. time out was reached)
.flatMap(streams => streams) // process each stream returned individually
.flatMap(stream => stream[1]) // process each event in each stream individually
.map(streamEvent => parseChannelMessages(streamEvent, channelName)) // parse the event
.do(streamEvent => { // for each event
if (streamEvent.id > seenId || !seenId) { // if it is latest seen event
seenId = streamEvent.id; // record it as such
}
})
.finally(() => { // when the stream is cleaned up
connection.quit(); // close the redis connection
delete pollCache[channelName]; // remove it from the cache
})
.publish() // wrap the observable in a shared one
.refCount(); // track subscriptions to it so it is cleaned up automatically
}
The end result is cached, shareable, automatically cleaned up, scrolling read of a redis stream.
RxJS also makes handling the socket.io web sockets server simple.
let sockets = [];
let socketServer = socketio(server, {path: '/api/socket.io'});
Rx.Observable.fromEvent(socketServer, 'connection') // when a new connection occurs
.flatMap(socket => { // for each connection
sockets.push(socket); // keep list of sockets
const part$ = Rx.Observable.fromEvent(socket, 'part'); // when user leaves a channel
const disconnect$ = Rx.Observable.fromEvent(socket, 'disconnect') // user disconnects
.do(() => sockets.splice(sockets.indexOf(socket), 1)); // remove socket
return Rx.Observable.fromEvent(socket, 'join') // when user joins a channel
.flatMap(channel => { // for the joined channel
return data.pollNewMessages(channel) // get a poller
.takeUntil(part$.filter(c => c === channel)) // listen until channel is left
})
.do(message => socket.emit('message', message)) // emit each message on the socket
.takeUntil(disconnect$); // stop listening if user disconnects
})
.finally(() => sockets.forEach((socket) => {socket.destroy()})) // destroy sockets
.takeUntil(Rx.Observable.fromEvent(process, 'SIGINT')) // listen until server is stopped
.subscribe(() => {}); // start processing
Now we have a socket server capable of creating channel pollers on the fly as users join and part with cleanup included when every user has left a channel.
In the single-page application, we use an Angular Service
to manage the web socket connection and track application state.
Each room the user joins is represented by its own Channel
object that the UI can interact with.
socket = io.connect({path: '/api/socket.io'}); // connect the web socket
message = Observable.fromEvent<IMessage>(this.socket, 'message'); // socket received messages
reconnect = Observable.fromEvent(this.socket, 'reconnect'); // socket reconnections
messages: IMessage[] = [];
latestSeenId: string;
this.channelService.message
.filter(message => message.channel === this.name) // filter message stream to this channel
.subscribe(message => {
this.messages.push(message); // add to the end of the array
if (this.channelService.current == this) { // if this channel has focus
this.latestSeenId = message.id; // consider this message seen
}
});
<div class="row" *ngFor="let message of channelService.current?.messages; trackBy: trackByFn">
<div class="col">
<span class="text-info" *ngIf="message.userName"><{{message.userName}}></span>
<span [class.text-success]="message.join" [class.text-warning]="message.part">
{{message.message}}
</span>
</div>
<div class="col-3 col-md-3 col-lg-2 text-right">
<span class="text-secondary mr-3 timestamp">
{{message.id | humanRelativeDate}}
</span>
</div>
</div>
For everything else, like getting historical messages, the members of a channel or joining and parting, the HTTP API is used. The user’s name is included on each request as an HTTP header. Security or authentication is left to the reader to implement.
// join a channel
router.post('/join/:channel', async (req, res) => {
let channel = req.params.channel;
await data.join(channel, req.userName);
res.send({success: true});
});
// leave a channel
router.post('/part/:channel', async (req, res) => {
let channel = req.params.channel;
await data.part(channel, req.userName);
res.send({success: true});
});
// send a message to a channel
router.post('/channel/:channel', async (req, res) => {
let channel = req.params.channel;
let message = req.body.message;
await data.send(channel, req.userName, message);
res.send({success: true});
});
// get messages from a channel
router.get('/channel/:channel', async (req, res) => {
let channel = req.params.channel;
let before = req.query.before;
let messages = await data.getMessages(channel, before);
res.send({success: true, messages});
});
// get members of a channel
router.get('/channel/:channel/members', async (req, res) => {
let channel = req.params.channel;
let members = await data.getMembers(channel);
res.send({success: true, members});
});
// get the user's name from the request
app.use((req, res, next) => {
req.userName = req.get('x-username');
next();
});
Streams haven’t been released officially yet and to use them you have to get Redis from the unstable
branch.
I’ve included a Dockerfile
and docker-compose.yml
set up to build and launch this for you.
It’s based on the official Redis Docker image alpine variant but instead of downloading a tar release, it clones the unstable branch with git.
The easiest way to run the sample and see it for yourself is using docker-compose
:
$ git clone https://github.com/aprice-/redisstreamchat
$ cd redisstreamchat
$ docker-compose up -d
Now, navigate to port to http://localhost:3000/