6.0 KiB
6.0 KiB
Kinesis Video Streams (KVS) Auto-Recovering RTSP → KVS Pipeline
Overview
This script runs a GStreamer pipeline that:
- Pulls an RTSP stream from a camera (e.g., TP-Link Tapo C246),
- Re-encodes it to H.264 baseline,
- Streams it live to AWS Kinesis Video Streams using kvssink,
- Automatically refreshes expiring IAM role credentials (from EC2 IMDSv2),
- Restarts the pipeline seamlessly before credentials expire or on crash,
- Ensures zero downtime — KVS sees a continuous stream.
Ideal for 24/7 security camera streaming on EC2 with IAM roles.
Full Script: run-kvs-pipeline.sh
#!/usr/bin/env bash
set -euo pipefail
# ------------------------------------------------------------------
# CONFIGURATION (edit only these lines)
# ------------------------------------------------------------------
ROLE_NAME="LiveStreamRole" # IAM role attached to EC2
STREAM_NAME="TAPO-C246D-Stream" # KVS stream name
RTSP_URL="rtsp://cctv-dual-tapo:squirrel@192.168.9.241:554/stream2"
AWS_REGION="eu-west-1"
METADATA_URL="http://169.254.169.254/latest"
TOKEN_TTL_SECONDS=21600 # 6 hours (max for IMDSv2)
REFRESH_BEFORE_EXPIRY=300 # Refresh 5 min early
# ------------------------------------------------------------------
# ---------- Helper: get a fresh token ----------
get_token() {
curl -sX PUT "$METADATA_URL/api/token" \
-H "X-aws-ec2-metadata-token-ttl-seconds: $TOKEN_TTL_SECONDS"
}
# ---------- Helper: get credentials for the role ----------
get_credentials() {
local token="$1"
curl -s -H "X-aws-ec2-metadata-token: $token" \
"$METADATA_URL/meta-data/iam/security-credentials/$ROLE_NAME"
}
# ---------- Export fresh env vars ----------
export_credentials() {
local creds_json="$1"
export AWS_ACCESS_KEY_ID=$(echo "$creds_json" | jq -r .AccessKeyId)
export AWS_SECRET_ACCESS_KEY=$(echo "$creds_json" | jq -r .SecretAccessKey)
export AWS_SESSION_TOKEN=$(echo "$creds_json" | jq -r .Token)
export AWS_REGION="$AWS_REGION"
export CRED_EXPIRY_EPOCH=$(date -d "$(echo "$creds_json" | jq -r .Expiration)" +%s)
}
# ---------- Build the GStreamer command ----------
build_gst_cmd() {
cat <<EOF
GST_DEBUG=kvssink:2 \
gst-launch-1.0 -e rtspsrc location="$RTSP_URL" latency=0 name=src \
src. ! application/x-rtp,media=video,encoding-name=H264 ! rtph264depay ! h264parse ! avdec_h264 \
! videoconvert \
! x264enc tune=zerolatency bitrate=2000 speed-preset=superfast key-int-max=30 \
! video/x-h264,profile=baseline \
! kvssink stream-name="$STREAM_NAME" aws-region="$AWS_REGION" storage-size=512
EOF
}
# ---------- Start (or restart) the pipeline ----------
start_pipeline() {
echo "[$(date)] Starting pipeline with fresh credentials (expire $(date -d "@$CRED_EXPIRY_EPOCH"))"
bash -c "$(build_gst_cmd)" &
GST_PID=$!
echo $GST_PID > /tmp/kvs_pipeline.pid
}
# ---------- Main credential-refresh loop ----------
main() {
TOKEN=$(get_token)
CREDS=$(get_credentials "$TOKEN")
export_credentials "$CREDS"
start_pipeline
while true; do
NOW=$(date +%s)
REFRESH_AT=$(( CRED_EXPIRY_EPOCH - REFRESH_BEFORE_EXPIRY ))
SLEEP_SECONDS=$(( REFRESH_AT - NOW ))
if (( SLEEP_SECONDS > 0 )); then
echo "[$(date)] Sleeping $SLEEP_SECONDS s until credential refresh..."
sleep "$SLEEP_SECONDS"
fi
# Refresh credentials
TOKEN=$(get_token)
CREDS=$(get_credentials "$TOKEN")
export_credentials "$CREDS"
# Graceful restart
if [[ -f /tmp/kvs_pipeline.pid ]] && kill -0 $(cat /tmp/kvs_pipeline.pid) 2>/dev/null; then
echo "[$(date)] Sending SIGTERM to old pipeline (PID $(cat /tmp/kvs_pipeline.pid))"
kill $(cat /tmp/kvs_pipeline.pid) || true
wait $(cat /tmp/kvs_pipeline.pid) 2>/dev/null || true
fi
start_pipeline
done
}
trap 'echo "[$(date)] Received signal, cleaning up..."; kill $(cat /tmp/kvs_pipeline.pid) 2>/dev/null || true; exit' SIGINT SIGTERM
main
How It Works
| Component | Function |
|---|---|
| IMDSv2 Token | Fetches a session token valid for 6 hours |
| IAM Role Credentials | Gets temporary AccessKeyId, SecretKey, SessionToken |
| Credential Expiry Tracking | Parses Expiration field → Unix timestamp |
| Auto-Refresh Loop | Wakes up 5 min before expiry |
| Graceful Restart | Sends SIGTERM → kvssink flushes final fragment → new pipeline starts |
| Zero Downtime | KVS sees continuous fragment numbers → no stream interruption |
Prerequisites
| Requirement | How to Verify |
|---|---|
| EC2 instance with IAM role LiveStreamRole | aws sts get-caller-identity |
| gst-launch-1.0, gstreamer1.0-plugins-*, kvssink | gst-launch-1.0 --version |
| jq, curl | jq --version, curl --version |
| RTSP camera reachable | ffplay rtsp://... |
Install missing packages (Ubuntu/Debian):
sudo apt update
sudo apt install -y gstreamer1.0-tools gstreamer1.0-plugins-base \
gstreamer1.0-plugins-good gstreamer1.0-plugins-bad \
gstreamer1.0-plugins-ugly gstreamer1.0-libav jq curl
How to Use It
# 1. Save the script
cat > run-kvs-pipeline.sh << 'EOF'
# [PASTE FULL SCRIPT HERE]
EOF
chmod +x run-kvs-pipeline.sh
# 2. Run it (as the same user that has access to the IMDSv2 endpoint)
nohup ./run-kvs-pipeline.sh > kvs.log 2>&1 & # start in background
tail -f kvs.log # show the log live
One-Liner (Start + Live Logs)
nohup ./run-kvs-pipeline.sh > kvs.log 2>&1 & tail -f kvs.log
Log Interpretation
| Log Type | Meaning |
|---|---|
| Starting pipeline with fresh credentials... | Pipeline (re)started |
| Sleeping 17820 s until credential refresh... | Will refresh in ~5 hours |
| Sending SIGTERM to old pipeline... | Graceful restart in progress |
| postReadCallback(): Wrote X bytes | Normal debug (can be silenced) |
| PERSISTED | Fragment successfully saved in KVS |