Blog

Updates from the team

Real-time chat with Redis Streams

Posted by Adam at April 10, 2018

docker docker-compose redis-unstable github streams typescript rxjs angular

This article is an exploration of as-yet-unreleased Redis Streams which unlock some powerful use cases. We’ll leverage Streams to build an end to end example of real-time chat rooms (think Hipchat, Slack or IRC).

On Github

Check it out on github: https://github.com/aprice-/redisstreamchat

Redis Streams

Redis Streams are a new data structure being developed for Redis that is all about time series data. They are similar to Redis Lists but with two major differences:

  1. You interact with them using timestamps instead of ordinal indexes
  2. Each entry in a stream can have multiple fields akin to a Redis Hash

A stream entry’s Timestamp ID includes a sequence (i.e. 1523418166062-0) so the order of coincident events is preserved. You can query a time range of stream entries and add new entries at a specified time (or the current time). You can also block to wait for new events like with list blocking operations (BLPOP, BRPOP, BRPOPLPUSH).

Stream commands

Adding events

XADD <key> [MAXLEN <max-length>] <*/id> <field> <value>...

Adds an event (fields and values) to a stream as the latest (*) or at a given id with an optional max-length enforcing a “capped” stream.

Reading events

XRANGE <key> <start/-> <end/+> [COUNT <count>]

Get multiple events from a stream in ascending order from start id or the beginning (-) to end id or the end (+) up to a maximum count.

XREVRANGE <key> <end/+> <start/-> [COUNT <count>]

Get multiple events from a stream in descending order from end id or the end (+) to start id or the beginning (-) up to a maximum count.

XREAD BLOCK <timeout> STREAMS <key>... <$/id>...

Blocks until an event is available on any of the given streams (by key) starting now ($) or from id or timeout is exceeded.

XLEN <key>

Get a count of the events in a stream.

Using Streams for Chat

With the goal of creating a real-time chat system supporting multiple rooms, we’ll use a stream to model each individual chat room’s messages and a set for its members.

For example, the Redis commands for modeling a chat room called “welcome” might look like:

// track join notification
> XADD channel:welcome:log * message "adam has joined the channel."
1523499744387-0

// add to members list
> SADD channel:welcome:members "adam"
(integer) 1

// get the list of channel members
> SMEMBERS channel:welcome:members
1) "adam"

// send chat message
> XADD channel:welcome:log * message "Hi everyone!" userName "adam"
1523499778907-0

// read new messages
> XREAD BLOCK 5000 STREAMS channel:welcome:log $
1) 1) "channel:welcome:log"
   2) 1) 1) 1523499832690-0
         2) 1) "message"
            2) "Hi everyone!"
            3) "userName"
            4) "adam"
(1.67s)

// get latest 100 messages
> XREVRANGE channel:welcome:log + - COUNT 100
1) 1) 1523499832690-0
   2) 1) "message"
      2) "Hi everyone!"
      3) "userName"
      4) "adam"
2) 1) 1523499744387-0
   2) 1) "message"
      2) "adam has joined the channel."

The rest of the stack

With our Redis data storage strategy settled, next we need to build out the rest of a real-time web application architecture using Node.js.

The stack

To enable a real-time experience, we’ll use a combination of event-driven frameworks and technologies.

  • Angular single-page application in the browser
  • socket.io web socket server to handle real time updates
  • Express HTTP API for everything non-real time
  • RxJS for managing streams
  • Redis for storage
What is RxJS?

RxJS is a library for reactive programming in JavaScript. RxJS introduces the Observable concept to work with stream concepts and is similar to a Promise. As a refresher, a Promise models an asynchronous operation that may not have yet completed and notifies you of two states: completion or error (via then and catch).

On the other hand, an Observable models an asynchronous stream and notifies you of three states: the occurrence of a new event, and completion or error (like a Promise). This allows an Observable to be long-lived and propagate multiple events over time. Just like with a Promise there are many interesting ways to combine them to achieve a desired outcome.

Angular also primarily uses Observable rather than Promise for asynchronous operations.

Using RxJS to poll a Redis Stream

Polling a Redis Stream is accomplished by repeatedly using the XREAD command to block and wait for new messages. When a message is received, you handle it and note its ID to resume reading from the Stream from where you left off.

const pollCache = {}; // storage for a shared cache of polls

function pollNewMessages (channelName) {
    if (pollCache[channelName]) return pollCache[channelName]; // use the cache

    let seenId = null; // variable to hold our position in the stream
    let key = getMessagesKey(channelName); // derive the key (i.e. channel:welcome:log)
    let connection = redis.duplicate(); // create a new connection for polling this stream

    return pollCache[channelName] = Rx.Observable.of(null) // return and cache an Observable
        .expand(() => Rx.Observable.fromPromise(connection.xread(10000, {key, id: seenId})))
        // expand calls the provided function in a loop
        // XREAD will be called on our stream key with the latest ID repeatedly
        .filter(streams => streams) // do not process empty results (i.e. time out was reached)
        .flatMap(streams => streams) // process each stream returned individually
        .flatMap(stream => stream[1]) // process each event in each stream individually
        .map(streamEvent => parseChannelMessages(streamEvent, channelName)) // parse the event
        .do(streamEvent => { // for each event
            if (streamEvent.id > seenId || !seenId) { // if it is latest seen event
                seenId = streamEvent.id; // record it as such
            }
        })
        .finally(() => { // when the stream is cleaned up
            connection.quit(); // close the redis connection
            delete pollCache[channelName]; // remove it from the cache
        })
        .publish() // wrap the observable in a shared one
        .refCount(); // track subscriptions to it so it is cleaned up automatically
}

The end result is cached, shareable, automatically cleaned up, scrolling read of a redis stream.

Using RxJS with socket.io

RxJS also makes handling the socket.io web sockets server simple.

let sockets = [];

let socketServer = socketio(server, {path: '/api/socket.io'});

Rx.Observable.fromEvent(socketServer, 'connection') // when a new connection occurs
    .flatMap(socket => { // for each connection
        sockets.push(socket); // keep list of sockets
        const part$ = Rx.Observable.fromEvent(socket, 'part'); // when user leaves a channel
        const disconnect$ = Rx.Observable.fromEvent(socket, 'disconnect') // user disconnects
            .do(() => sockets.splice(sockets.indexOf(socket), 1)); // remove socket

        return Rx.Observable.fromEvent(socket, 'join') // when user joins a channel
            .flatMap(channel => { // for the joined channel
                return data.pollNewMessages(channel) // get a poller
                    .takeUntil(part$.filter(c => c === channel)) // listen until channel is left
            })
            .do(message => socket.emit('message', message)) // emit each message on the socket
            .takeUntil(disconnect$); // stop listening if user disconnects
    })
    .finally(() => sockets.forEach((socket) => {socket.destroy()})) // destroy sockets
    .takeUntil(Rx.Observable.fromEvent(process, 'SIGINT')) // listen until server is stopped
    .subscribe(() => {}); // start processing

Now we have a socket server capable of creating channel pollers on the fly as users join and part with cleanup included when every user has left a channel.

Handling events client-side in Angular

In the single-page application, we use an Angular Service to manage the web socket connection and track application state. Each room the user joins is represented by its own Channel object that the UI can interact with.

Connecting the web socket
socket = io.connect({path: '/api/socket.io'}); // connect the web socket
message = Observable.fromEvent<IMessage>(this.socket, 'message'); // socket received messages
reconnect = Observable.fromEvent(this.socket, 'reconnect'); // socket reconnections
Handling incoming messages for a channel
messages: IMessage[] = [];
latestSeenId: string;

this.channelService.message
  .filter(message => message.channel === this.name) // filter message stream to this channel
  .subscribe(message => {
    this.messages.push(message); // add to the end of the array
    if (this.channelService.current == this) { // if this channel has focus
      this.latestSeenId = message.id; // consider this message seen
    }
  });
Rendering the messages
<div class="row" *ngFor="let message of channelService.current?.messages; trackBy: trackByFn">
  <div class="col">
    <span class="text-info" *ngIf="message.userName">&lt;{{message.userName}}&gt;</span>
    <span [class.text-success]="message.join" [class.text-warning]="message.part">
      {{message.message}}
    </span>
  </div>
  <div class="col-3 col-md-3 col-lg-2 text-right">
    <span class="text-secondary mr-3 timestamp">
      {{message.id | humanRelativeDate}}
    </span>
  </div>
</div>

The HTTP API

For everything else, like getting historical messages, the members of a channel or joining and parting, the HTTP API is used. The user’s name is included on each request as an HTTP header. Security or authentication is left to the reader to implement.

// join a channel
router.post('/join/:channel', async (req, res) => {
    let channel = req.params.channel;
    await data.join(channel, req.userName);
    res.send({success: true});
});

// leave a channel
router.post('/part/:channel', async (req, res) => {
    let channel = req.params.channel;
    await data.part(channel, req.userName);
    res.send({success: true});
});

// send a message to a channel
router.post('/channel/:channel', async (req, res) => {
    let channel = req.params.channel;
    let message = req.body.message;
    await data.send(channel, req.userName, message);
    res.send({success: true});
});

// get messages from a channel
router.get('/channel/:channel', async (req, res) => {
    let channel = req.params.channel;
    let before = req.query.before;
    let messages = await data.getMessages(channel, before);
    res.send({success: true, messages});
});

// get members of a channel
router.get('/channel/:channel/members', async (req, res) => {
    let channel = req.params.channel;
    let members = await data.getMembers(channel);
    res.send({success: true, members});
});

// get the user's name from the request
app.use((req, res, next) => {
    req.userName = req.get('x-username');
    next();
});

Redis unstable

Streams haven’t been released officially yet and to use them you have to get Redis from the unstable branch. I’ve included a Dockerfile and docker-compose.yml set up to build and launch this for you.

It’s based on the official Redis Docker image alpine variant but instead of downloading a tar release, it clones the unstable branch with git.

Running the sample

The easiest way to run the sample and see it for yourself is using docker-compose:

$ git clone https://github.com/aprice-/redisstreamchat
$ cd redisstreamchat
$ docker-compose up -d

Now, navigate to port to http://localhost:3000/