Push Realtime Streaming Data to Clients using Amazon API Gateway’s WebSocket API


Push Realtime Streaming Data to Clients using Amazon API Gateway’s WebSocket API
Posted : January 29th, 2020

Realizing the value of realtime data insights, most businesses today have implemented some kind of streaming data capture & analysis mechanism. One popular way is using Amazon Kinesis, which provides both streaming data ingestion & real-time analytics solution. But in most cases, the raw data, its processed version & the valuable insights derived from it, simply end up in a database waiting to be queried by some system that can take action based on it. Even if the other system is up all the time, it’s probably fetching the insights periodically from wherever they’re written to by streaming analytics.

What if we could take it up a notch. What if the analyzer itself, as soon as it has the results, could push them to whoever needs them, IN REALTIME! What if your mobile & web apps never have to call the server for data. They simply let the server know what kind of data they’re interested in & the server will send it to them as soon as it’s available. Imagine how awesome that would be. No more long polling, no more unnecessary delays, a true realtime system.

This article describes one way of achieving this; using Amazon Kinesis, AWS Lambda & a WebSocket API. Let’s take a specific example. Say you have GPS coordinates from sensors in vehicles around the world, streaming into Kinesis on a realtime basis. Say you run them through Kinesis Analytics, maybe to place them into a geofence of interest to you. Now say a mobile app is tracking this vehicle in realtime & Kinesis Analytics is pushing its output directly to the app. The system would look something like this:

Let’s walk through how to create such a setup. First, create the WebSocket API:

Now, create a Python Lambda with this code:

import os, json, boto3, base64, requests
dynamodb = boto3.client('dynamodb')

def lambda_handler(event, context):
    connectionId = event['requestContext']['connectionId']

    if event['requestContext']['eventType'] == 'CONNECT':
        vehicle = event['queryStringParameters']['VehicleID']
        dynamodb.put_item(
            TableName = os.environ['CONNECTIONS_TABLE_NAME'],
            Item = {
                'ConnectionID': { 'S': str(connectionId) },
                'VehicleID': { 'S': str(vehicle) }
            }
        )
        return { 'statusCode': 200, 'body': 'Connected.' }

    if event['requestContext']['eventType'] == 'DISCONNECT':
        dynamodb.delete_item(
            TableName = os.environ['CONNECTIONS_TABLE_NAME'],
            Key = { 'ConnectionID': { 'S': str(connectionId) } }
        )
        return { 'statusCode': 200, 'body': 'Disconnected.' }

And attach this Lambda to the WebSocket API’s $connect & $disconnect routes:

Now all that’s left is to make the Lambda invoked by Kinesis Analytics, push data to the WebSocket clients:

api = boto3.client('apigatewaymanagementapi',
    endpoint_url = os.environ['WEBSOCKET_API_ENDPOINT'])

api.post_to_connection(
    ConnectionId = str(connectionID),
    Data = # data
)

That’s it! The Analytics Lambda will now start pushing data to whoever is connected to the WebSocket API.

Ready to transform and unlock your full IT potential? Connect with us today to learn more about our comprehensive digital solutions.

 

Schedule a Consultation

schedule-consultation

Resources



02 AUG 2021
AWS Named as a Leader for the 11th Consecutive Year…
Know More
27 JUL 2021
Introducing Amazon Route 53 Application Recovery Controller
Know More
09 JUN 2021
Amazon SageMaker Named as the Outright Leader in Enterprise MLOps…
Know More