Push Realtime Streaming Data to Clients using Amazon API Gateway’s WebSocket API

 

Realizing the value of realtime data insights, most businesses today have implemented some kind of streaming data capture & analysis mechanism. One popular way is using Amazon Kinesis, which provides both streaming data ingestion & real-time analytics solution. But in most cases, the raw data, its processed version & the valuable insights derived from it, simply end up in a database waiting to be queried by some system that can take action based on it. Even if the other system is up all the time, it’s probably fetching the insights periodically from wherever they’re written to by streaming analytics.

What if we could take it up a notch. What if the analyzer itself, as soon as it has the results, could push them to whoever needs them, IN REALTIME! What if your mobile & web apps never have to call the server for data. They simply let the server know what kind of data they’re interested in & the server will send it to them as soon as it’s available. Imagine how awesome that would be. No more long polling, no more unnecessary delays, a true realtime system.

This article describes one way of achieving this; using Amazon Kinesis, AWS Lambda & a WebSocket API. Let’s take a specific example. Say you have GPS coordinates from sensors in vehicles around the world, streaming into Kinesis on a realtime basis. Say you run them through Kinesis Analytics, maybe to place them into a geofence of interest to you. Now say a mobile app is tracking this vehicle in realtime & Kinesis Analytics is pushing its output directly to the app. The system would look something like this:

Let’s walk through how to create such a setup. First, create the WebSocket API:

Now, create a Python Lambda with this code:

import os, json, boto3, base64, requests
dynamodb = boto3.client('dynamodb')

def lambda_handler(event, context):
    connectionId = event['requestContext']['connectionId']

    if event['requestContext']['eventType'] == 'CONNECT':
        vehicle = event['queryStringParameters']['VehicleID']
        dynamodb.put_item(
            TableName = os.environ['CONNECTIONS_TABLE_NAME'],
            Item = {
                'ConnectionID': { 'S': str(connectionId) },
                'VehicleID': { 'S': str(vehicle) }
            }
        )
        return { 'statusCode': 200, 'body': 'Connected.' }

    if event['requestContext']['eventType'] == 'DISCONNECT':
        dynamodb.delete_item(
            TableName = os.environ['CONNECTIONS_TABLE_NAME'],
            Key = { 'ConnectionID': { 'S': str(connectionId) } }
        )
        return { 'statusCode': 200, 'body': 'Disconnected.' }

And attach this Lambda to the WebSocket API’s $connect & $disconnect routes:

Now all that’s left is to make the Lambda invoked by Kinesis Analytics, push data to the WebSocket clients:

api = boto3.client('apigatewaymanagementapi',
    endpoint_url = os.environ['WEBSOCKET_API_ENDPOINT'])

api.post_to_connection(
    ConnectionId = str(connectionID),
    Data = # data
)

That’s it! The Analytics Lambda will now start pushing data to whoever is connected to the WebSocket API.

Harish KM is a Cloud Evangelist and a Full Stack Engineer at QloudX. Harish is very passionate about cloud native solutions and using the best tools for projects. This means that he is an expert in a multitude of application languages and is up to date with all the new offerings and services from cloud providers, especially AWS.

Leave a Reply

Your email address will not be published. Required fields are marked *