DynamoDB Streams and Lambda: Building Real-Time Data Pipelines
6 min read
Your application just hit 100,000 users. Congratulations! Now product wants real-time analytics, cross-region replication, and event-driven notifications. Oh, and they need it yesterday.
Welcome to the world of change data capture (CDC). Traditional databases require polling, custom triggers, or expensive third-party tools. DynamoDB Streams? Built-in, serverless, and pairs perfectly with Lambda.
What Are DynamoDB Streams?
DynamoDB Streams capture every modification to your table-inserts, updates, and deletes- in near real-time. Think of it as a time-ordered log of changes, automatically maintained by AWS.
Key features:
Ordered within partition key
Guaranteed delivery (no duplicates for same shard)
24-hour retention (replay capability)
Near real-time (typically <1 second latency)
No impact on table RCU/WCU
Architecture Overview

Real-World Example: User Activity Aggregation
Let's build a system that tracks user activity in real-time:
Step 1: Enable DynamoDB Streams
// lib/user-activity-stack.ts
import * as cdk from 'aws-cdk-lib';
import * as dynamodb from 'aws-cdk-lib/aws-dynamodb';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import * as lambdaEventSources from 'aws-cdk-lib/aws-lambda-event-sources';
export class UserActivityStack extends cdk.Stack {
constructor(scope: cdk.App, id: string) {
super(scope, id);
// Activities table with stream enabled
const activitiesTable = new dynamodb.Table(this, 'UserActivities', {
tableName: 'user-activities',
partitionKey: { name: 'userId', type: dynamodb.AttributeType.STRING },
sortKey: { name: 'timestamp', type: dynamodb.AttributeType.NUMBER },
stream: dynamodb.StreamViewType.NEW_AND_OLD_IMAGES, // Include before/after
billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
pointInTimeRecovery: true,
});
// Aggregation table
const aggregatesTable = new dynamodb.Table(this, 'UserAggregates', {
tableName: 'user-aggregates',
partitionKey: { name: 'userId', type: dynamodb.AttributeType.STRING },
billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
});
// Stream processor Lambda
const streamProcessor = new lambda.Function(this, 'StreamProcessor', {
runtime: lambda.Runtime.PYTHON_3_12,
handler: 'stream_processor.handler',
code: lambda.Code.fromAsset('lambda'),
environment: {
AGGREGATES_TABLE: aggregatesTable.tableName,
},
timeout: cdk.Duration.seconds(30),
});
// Grant permissions
aggregatesTable.grantReadWriteData(streamProcessor);
// Add stream as event source
streamProcessor.addEventSource(
new lambdaEventSources.DynamoEventSource(activitiesTable, {
startingPosition: lambda.StartingPosition.LATEST,
batchSize: 100, // Process up to 100 records at once
maxBatchingWindow: cdk.Duration.seconds(5), // Wait up to 5s to fill batch
bisectBatchOnError: true, // Isolate failing records
retryAttempts: 3,
parallelizationFactor: 10, // Process up to 10 batches per shard concurrently
reportBatchItemFailures: true, // Enable partial batch responses
})
);
}
}
Using AWS SAM:
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Resources:
UserActivitiesTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: user-activities
AttributeDefinitions:
- AttributeName: userId
AttributeType: S
- AttributeName: timestamp
AttributeType: N
KeySchema:
- AttributeName: userId
KeyType: HASH
- AttributeName: timestamp
KeyType: RANGE
BillingMode: PAY_PER_REQUEST
StreamSpecification:
StreamViewType: NEW_AND_OLD_IMAGES
StreamProcessorFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: lambda/
Handler: stream_processor.handler
Runtime: python3.12
Timeout: 30
Environment:
Variables:
AGGREGATES_TABLE: !Ref UserAggregatesTable
Policies:
- DynamoDBCrudPolicy:
TableName: !Ref UserAggregatesTable
Events:
DynamoDBStream:
Type: DynamoDB
Properties:
Stream: !GetAtt UserActivitiesTable.StreamArn
StartingPosition: LATEST
BatchSize: 100
MaximumBatchingWindowInSeconds: 5
BisectBatchOnFunctionError: true
MaximumRetryAttempts: 3
ParallelizationFactor: 10
FunctionResponseTypes:
- ReportBatchItemFailures
Step 2: Process Stream Events
# lambda/stream_processor.py
import json
import boto3
from decimal import Decimal
from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.data_classes import DynamoDBStreamEvent, event_source
from aws_lambda_powertools.utilities.typing import LambdaContext
logger = Logger()
tracer = Tracer()
dynamodb = boto3.resource('dynamodb')
aggregates_table = dynamodb.Table(os.environ['AGGREGATES_TABLE'])
@event_source(data_class=DynamoDBStreamEvent)
@tracer.capture_lambda_handler
def handler(event: DynamoDBStreamEvent, context: LambdaContext):
"""
Process DynamoDB stream events and update aggregates
"""
batch_item_failures = []
for record in event.records:
try:
process_record(record)
except Exception as e:
logger.exception(f"Failed to process record {record.dynamodb.sequence_number}")
# Report failure for retry
batch_item_failures.append({
"itemIdentifier": record.dynamodb.sequence_number
})
# Return failed items for partial batch response
return {"batchItemFailures": batch_item_failures}
def process_record(record):
"""Process individual DynamoDB stream record"""
event_name = record.event_name # INSERT, MODIFY, or REMOVE
logger.info(f"Processing {event_name} event", extra={
"event_id": record.event_id,
"sequence_number": record.dynamodb.sequence_number
})
if event_name == "INSERT":
handle_insert(record.dynamodb.new_image)
elif event_name == "MODIFY":
handle_modify(record.dynamodb.new_image, record.dynamodb.old_image)
elif event_name == "REMOVE":
handle_remove(record.dynamodb.old_image)
def handle_insert(new_image):
"""Handle new activity insertion"""
user_id = new_image['userId']
activity_type = new_image['activityType']
# Update aggregate counters
aggregates_table.update_item(
Key={'userId': user_id},
UpdateExpression="""
SET totalActivities = if_not_exists(totalActivities, :zero) + :one,
lastActivityTime = :timestamp,
activityCounts.#activityType =
if_not_exists(activityCounts.#activityType, :zero) + :one
""",
ExpressionAttributeNames={
'#activityType': activity_type
},
ExpressionAttributeValues={
':zero': 0,
':one': 1,
':timestamp': new_image['timestamp']
}
)
logger.info("Aggregate updated", extra={
"user_id": user_id,
"activity_type": activity_type
})
def handle_modify(new_image, old_image):
"""Handle activity modification"""
# Compare old vs new, update aggregates if needed
if new_image['status'] != old_image['status']:
user_id = new_image['userId']
aggregates_table.update_item(
Key={'userId': user_id},
UpdateExpression="SET lastModified = :timestamp",
ExpressionAttributeValues={
':timestamp': new_image['timestamp']
}
)
def handle_remove(old_image):
"""Handle activity deletion"""
user_id = old_image['userId']
activity_type = old_image['activityType']
# Decrement counters
aggregates_table.update_item(
Key={'userId': user_id},
UpdateExpression="""
SET totalActivities = totalActivities - :one,
activityCounts.#activityType = activityCounts.#activityType - :one
""",
ExpressionAttributeNames={
'#activityType': activity_type
},
ExpressionAttributeValues={
':one': 1
}
)
Advanced Pattern: Cross-Region Replication
Build active-active multi-region architecture:
// Replicate to secondary region
const replicationFunction = new lambda.Function(this, 'ReplicationFunction', {
runtime: lambda.Runtime.PYTHON_3_12,
handler: 'replicator.handler',
code: lambda.Code.fromAsset('lambda'),
environment: {
TARGET_TABLE_NAME: 'user-activities',
TARGET_REGION: 'eu-west-1',
},
timeout: cdk.Duration.seconds(30),
});
replicationFunction.addEventSource(
new lambdaEventSources.DynamoEventSource(activitiesTable, {
startingPosition: lambda.StartingPosition.LATEST,
batchSize: 100,
filters: [
// Only replicate specific activity types
lambda.FilterCriteria.filter({
eventName: lambda.FilterRule.isEqual('INSERT'),
dynamodb: {
NewImage: {
replicationEnabled: {
BOOL: lambda.FilterRule.isEqual(true)
}
}
}
})
]
})
);
Replication handler:
# lambda/replicator.py
import boto3
import os
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.data_classes import DynamoDBStreamEvent, event_source
logger = Logger()
# DynamoDB client for target region
target_region = os.environ['TARGET_REGION']
target_table_name = os.environ['TARGET_TABLE_NAME']
dynamodb = boto3.resource('dynamodb', region_name=target_region)
target_table = dynamodb.Table(target_table_name)
@event_source(data_class=DynamoDBStreamEvent)
def handler(event: DynamoDBStreamEvent, context):
"""
Replicate DynamoDB items to another region
"""
batch_failures = []
with target_table.batch_writer() as batch:
for record in event.records:
try:
if record.event_name in ["INSERT", "MODIFY"]:
item = record.dynamodb.new_image
# Add replication metadata
item['replicatedFrom'] = os.environ['AWS_REGION']
item['replicatedAt'] = int(time.time())
# Write to target table
batch.put_item(Item=item)
logger.info("Item replicated", extra={
"source_region": os.environ['AWS_REGION'],
"target_region": target_region,
"item_id": item.get('userId')
})
except Exception as e:
logger.exception("Replication failed")
batch_failures.append({
"itemIdentifier": record.dynamodb.sequence_number
})
return {"batchItemFailures": batch_failures}
Analytics Pipeline with Kinesis Firehose
Stream changes to S3 for analytics:
import * as kinesis from 'aws-cdk-lib/aws-kinesis';
import * as kinesisfirehose from 'aws-cdk-lib/aws-kinesisfirehose';
import * as s3 from 'aws-cdk-lib/aws-s3';
// S3 bucket for analytics
const analyticsBucket = new s3.Bucket(this, 'AnalyticsBucket', {
bucketName: 'user-activity-analytics',
lifecycleRules: [{
transitions: [{
storageClass: s3.StorageClass.GLACIER,
transitionAfter: cdk.Duration.days(90),
}],
}],
});
// Kinesis stream
const analyticsStream = new kinesis.Stream(this, 'AnalyticsStream', {
streamName: 'user-activity-analytics',
shardCount: 2,
});
// Firehose delivery stream
const deliveryStream = new kinesisfirehose.CfnDeliveryStream(this, 'DeliveryStream', {
deliveryStreamType: 'KinesisStreamAsSource',
kinesisStreamSourceConfiguration: {
kinesisStreamArn: analyticsStream.streamArn,
roleArn: firehoseRole.roleArn,
},
s3DestinationConfiguration: {
bucketArn: analyticsBucket.bucketArn,
prefix: 'activities/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/',
bufferingHints: {
sizeInMBs: 5,
intervalInSeconds: 300,
},
compressionFormat: 'GZIP',
},
});
// Lambda to push stream events to Kinesis
const analyticsForwarder = new lambda.Function(this, 'AnalyticsForwarder', {
runtime: lambda.Runtime.PYTHON_3_12,
handler: 'analytics_forwarder.handler',
code: lambda.Code.fromAsset('lambda'),
environment: {
KINESIS_STREAM_NAME: analyticsStream.streamName,
},
});
analyticsStream.grantWrite(analyticsForwarder);
analyticsForwarder.addEventSource(
new lambdaEventSources.DynamoEventSource(activitiesTable, {
startingPosition: lambda.StartingPosition.LATEST,
batchSize: 500, // Higher batch for analytics
})
);
Handling Idempotency
DynamoDB Streams guarantee no duplicates within a shard, but Lambda retries can cause duplicate processing:
from aws_lambda_powertools.utilities.idempotency import (
DynamoDBPersistenceLayer,
idempotent
)
# Idempotency table
persistence_layer = DynamoDBPersistenceLayer(table_name="idempotency-table")
@idempotent(persistence_store=persistence_layer)
def process_record(record):
"""
This function is idempotent - safe to retry
Same record will only be processed once
"""
# Process record...
update_aggregates(record)
send_notification(record)
Monitoring and Debugging
Key CloudWatch Metrics
from aws_lambda_powertools.metrics import Metrics, MetricUnit
metrics = Metrics()
@metrics.log_metrics
def handler(event, context):
records_processed = len(event['Records'])
metrics.add_metric(
name="StreamRecordsProcessed",
unit=MetricUnit.Count,
value=records_processed
)
# Track processing lag
oldest_record_age = calculate_oldest_record_age(event['Records'])
metrics.add_metric(
name="StreamProcessingLag",
unit=MetricUnit.Seconds,
value=oldest_record_age
)
Debug with Local DynamoDB Streams
# Run DynamoDB Local with streams
docker run -p 8000:8000 amazon/dynamodb-local -jar DynamoDBLocal.jar -inMemory -sharedDb
# Create table with stream
aws dynamodb create-table \
--table-name user-activities \
--attribute-definitions AttributeName=userId,AttributeType=S AttributeName=timestamp,AttributeType=N \
--key-schema AttributeName=userId,KeyType=HASH AttributeName=timestamp,KeyType=RANGE \
--stream-specification StreamEnabled=true,StreamViewType=NEW_AND_OLD_IMAGES \
--endpoint-url http://localhost:8000
Best Practices
Enable partial batch responses - Prevent full batch retry on single failure
Use bisect on error - Isolate problematic records
Implement idempotency - Handle duplicate processing gracefully
Monitor iterator age - Alert on processing lag
Set appropriate batch size - Balance throughput vs latency
Use parallelization factor - Process multiple batches concurrently
Filter events early - Reduce unnecessary Lambda invocations
Handle schema evolution - Support old and new record formats
Conclusion
DynamoDB Streams + Lambda unlocks powerful real-time data pipelines: aggregations, replication, analytics, and event-driven workflows - all without managing infrastructure.
Start with a simple aggregation, add monitoring, then scale to cross-region replication and analytics. Your data is always moving; make it work for you.
Building with DynamoDB Streams? Share your use cases in the comments!