Serverless Signaling Server with AWS ApiGateway, DynamoDB and Lambda

Hello,

We’ve started to use Yjs in our application. One of the more interesting challenges that we faced was that our architecture is serverless, and we didn’t really want to start deploying servers and/or containers again after we just happily moved away from there.

So, instead we took the example server component, and made it run through a websocket API with AWS ApiGateway instead. The connections/rooms are stored in a DynamoDB table, and the actual handler is a single Lambda function:

import { ApiGatewayManagementApi, DynamoDB } from 'aws-sdk';

const { AWS_REGION, TOPICS_TABLE } = process.env;

const dynamoDb = new DynamoDB({
	apiVersion: '2012-08-10',
	region: AWS_REGION,
});

export async function* scanItems(
	dynamoDb: DynamoDB,
	tableName: string,
	extraScanInput?: Omit<DynamoDB.ScanInput, 'TableName' | 'ExclusiveStartKey'>,
): AsyncGenerator<DynamoDB.AttributeMap, void, unknown> {
	let startKey: DynamoDB.Key | undefined;
	do {
		const result = await dynamoDb
			.scan({
				TableName: tableName,
				ExclusiveStartKey: startKey,
				...extraScanInput,
			})
			.promise();
		for (const item of result.Items) {
			yield item;
		}
		startKey = result.LastEvaluatedKey;
	} while (startKey);
}

// Message structure and protocol flow taken from y-webrtc/bin/server.js
interface YWebRtcSubscriptionMessage {
	type: 'subscribe' | 'unsubscribe';
	topics?: string[];
}
interface YWebRtcPingMessage {
	type: 'ping';
}
interface YWebRtcPublishMessage {
	type: 'publish';
	topic?: string;
	[k: string]: any;
}

async function subscribe(topic: string, connectionId: string) {
	return dynamoDb
		.updateItem({
			TableName: TOPICS_TABLE,
			Key: { name: { S: topic } },
			UpdateExpression: 'ADD receivers :r',
			ExpressionAttributeValues: {
				':r': { SS: [connectionId] },
			},
		})
		.promise()
		.catch(err => {
			console.log(`Cannot update topic ${topic}: ${err.message}`);
		});
}

async function unsubscribe(topic: string, connectionId: string) {
	return dynamoDb
		.updateItem({
			TableName: TOPICS_TABLE,
			Key: { name: { S: topic } },
			UpdateExpression: 'DELETE receivers :r',
			ExpressionAttributeValues: {
				':r': { SS: [connectionId] },
			},
		})
		.promise()
		.catch(err => {
			console.log(`Cannot update topic ${topic}: ${err.message}`);
		});
}

async function getReceivers(topic: string) {
	try {
		const { Item: item } = await dynamoDb
			.getItem({
				TableName: TOPICS_TABLE,
				Key: { name: { S: topic } },
			})
			.promise();
		return item?.receivers ? item.receivers.SS : [];
	} catch (err) {
		console.log(`Cannot get topic ${topic}: ${err.message}`);
		return [];
	}
}

async function handleYWebRtcMessage(
	connectionId: string,
	message:
		| YWebRtcSubscriptionMessage
		| YWebRtcPublishMessage
		| YWebRtcPingMessage,
	send: (receiver: string, message: any) => Promise<void>,
) {
	const promises = [];

	if (message && message.type) {
		switch (message.type) {
			case 'subscribe':
				(message.topics || []).forEach(topic => {
					promises.push(subscribe(topic, connectionId));
				});
				break;
			case 'unsubscribe':
				(message.topics || []).forEach(topic => {
					promises.push(unsubscribe(topic, connectionId));
				});
				break;
			case 'publish':
				if (message.topic) {
					const receivers = await getReceivers(message.topic);
					receivers.forEach(receiver => {
						promises.push(send(receiver, message));
					});
				}
				break;
			case 'ping':
				promises.push(send(connectionId, { type: 'pong' }));
				break;
		}
	}

	await Promise.all(promises);
}

function handleConnect(connectionId: string) {
	// Nothing to do
	console.log(`Connected: ${connectionId}`);
}

async function handleDisconnect(connectionId: string) {
	console.log(`Disconnected: ${connectionId}`);
	// Remove the connection from all topics
	// This is quite expensive, as we need to go through all topics in the table
	const promises = [];
	for await (const item of scanItems(dynamoDb, TOPICS_TABLE)) {
		const receivers = item.receivers?.SS ?? [];
		if (receivers.includes(connectionId)) {
			promises.push(unsubscribe(item.name.S, connectionId));
		}
	}

	await Promise.all(promises);
}

export async function handler(
	event: HttpV2WebsocketEvent,
): Promise<HttpV2Response> {
	if (!TOPICS_TABLE) {
		return { statusCode: 502, body: 'Not configured' };
	}

	// The AWS "simple chat" example uses event.requestContext.domainName/...stage, but that doesn't work with custom domain
	// names. It also doesn't matter, this is anyways an internal (AWS->AWS) call.
	const apigwManagementApi = new ApiGatewayManagementApi({
		apiVersion: '2018-11-29',
		endpoint: `https://${event.requestContext.apiId}.execute-api.${AWS_REGION}.amazonaws.com/${event.requestContext.stage}`,
	});
	const send = async (connectionId: string, message: any) => {
		try {
			await apigwManagementApi
				.postToConnection({
					ConnectionId: connectionId,
					Data: JSON.stringify(message),
				})
				.promise();
		} catch (err) {
			if (err.statusCode === 410) {
				console.log(`Found stale connection, deleting ${connectionId}`);
				await handleDisconnect(connectionId);
			} else {
				// Log, but otherwise ignore: There's not much we can do, really.
				console.log(`Error when sending to ${connectionId}: ${err.message}`);
			}
		}
	};

	try {
		switch (event.requestContext.routeKey) {
			case '$connect':
				handleConnect(event.requestContext.connectionId);
				break;
			case '$disconnect':
				await handleDisconnect(event.requestContext.connectionId);
				break;
			case '$default':
				await handleYWebRtcMessage(
					event.requestContext.connectionId,
					JSON.parse(event.body),
					send,
				);
				break;
		}

		return { statusCode: 200 };
	} catch (err) {
		console.log(`Error ${event.requestContext.connectionId}`, err);
		return { statusCode: 500, body: err.message };
	}
}

We use CloudFormation for deployment, and our resources related to the signaling server look roughly like this:

YWebRtcTopicsTable:
  Type: AWS::DynamoDB::Table
  Properties:
    KeySchema:
      - AttributeName: name
        KeyType: HASH
    AttributeDefinitions:
      - AttributeName: name
        AttributeType: S
    ProvisionedThroughput:
      ReadCapacityUnits: 5
      WriteCapacityUnits: 5
    Tags:
      - Key: Name
        Value: y-webrtc-topics-table

YWebRtcApi:
  Type: AWS::ApiGatewayV2::Api
  Properties:
    Name: ywebrtc-api
    ProtocolType: WEBSOCKET
    RouteSelectionExpression: "$request.body.type"

YWebRtcIntegration:
  Type: AWS::ApiGatewayV2::Integration
  Properties:
    ApiId: !Ref YWebRtcApi
    Description: y-webrtc signaling integration
    IntegrationType: AWS_PROXY
    IntegrationUri: !Sub arn:${AWS::Partition}:apigateway:${AWS::Region}:lambda:path/2015-03-31/functions/${YWebRtcSignalingLambdaFunction.Arn}/invocations

YWebRtcConnectRoute:
  Type: AWS::ApiGatewayV2::Route
  Properties:
    ApiId: !Ref YWebRtcApi
    RouteKey: $connect
    AuthorizationType: NONE
    OperationName: ConnectRoute
    Target: !Join
      - '/'
      - - 'integrations'
        - !Ref YWebRtcIntegration
YWebRtcDisconnectRoute:
  Type: AWS::ApiGatewayV2::Route
  Properties:
    ApiId: !Ref YWebRtcApi
    RouteKey: $disconnect
    AuthorizationType: NONE
    OperationName: DisconnectRoute
    Target: !Join
      - '/'
      - - 'integrations'
        - !Ref YWebRtcIntegration
YWebRtcDefaultRoute:
  Type: AWS::ApiGatewayV2::Route
  Properties:
    ApiId: !Ref YWebRtcApi
    RouteKey: $default
    AuthorizationType: NONE
    OperationName: DefaultRoute
    Target: !Join
      - '/'
      - - 'integrations'
        - !Ref YWebRtcIntegration

YWebRtcDeployment:
  Type: AWS::ApiGatewayV2::Deployment
  DependsOn:
    - YWebRtcConnectRoute
    - YWebRtcDisconnectRoute
    - YWebRtcDefaultRoute
  Properties:
    ApiId: !Ref YWebRtcApi

YWebRtcApiStage:
  Type: AWS::ApiGatewayV2::Stage
  Properties:
    StageName: dev
    DeploymentId: !Ref YWebRtcDeployment
    ApiId: !Ref YWebRtcApi

YWebRtcInvokePermission:
  Type: AWS::Lambda::Permission
  Properties:
    Action: lambda:InvokeFunction
    FunctionName: !Ref YWebRtcSignalingLambdaFunction
    Principal: apigateway.amazonaws.com

YWebRtcApiCertificate:
  Type: AWS::CertificateManager::Certificate
  Properties:
    DomainName: !Sub 'webrtc.${Domain}'
    DomainValidationOptions:
      - DomainName: !Sub 'webrtc.${Domain}'
        HostedZoneId: !Ref HostedZoneId
    ValidationMethod: DNS

YWebRtcApiDomainName:
  Type: AWS::ApiGatewayV2::DomainName
  Properties:
    DomainName: !Sub 'webrtc.${Domain}'
    DomainNameConfigurations:
      - CertificateArn: !Ref YWebRtcApiCertificate
        SecurityPolicy: TLS_1_2

YWebRtcApiMapping:
  Type: AWS::ApiGatewayV2::ApiMapping
  Properties:
    ApiId: !Ref YWebRtcApi
    DomainName: !Sub 'webrtc.${Domain}'
    Stage: !Ref YWebRtcApiStage
  DependsOn:
    - YWebRtcApiDomainName

YWebRtcApiRecordSet:
  Type: AWS::Route53::RecordSet
  Properties:
    AliasTarget:
      DNSName: !GetAtt YWebRtcApiDomainName.RegionalDomainName
      HostedZoneId: !GetAtt YWebRtcApiDomainName.RegionalHostedZoneId
    # See https://forums.aws.amazon.com/thread.jspa?threadID=103919: The `HostedZoneName` requires the trailing dot.
    HostedZoneName: !Sub '${Domain}.'
    Name: !Sub 'webrtc.${Domain}'
    Type: A

This seems to work fine for us, and maybe it helps some other member here as well.

Cheers,
Andreas

1 Like