KVS_Relay_Documentation/Auto-Recovering RTSP.md

6.0 KiB

Kinesis Video Streams (KVS) Auto-Recovering RTSP → KVS Pipeline


Overview

This script runs a GStreamer pipeline that:

  • Pulls an RTSP stream from a camera (e.g., TP-Link Tapo C246),
  • Re-encodes it to H.264 baseline,
  • Streams it live to AWS Kinesis Video Streams using kvssink,
  • Automatically refreshes expiring IAM role credentials (from EC2 IMDSv2),
  • Restarts the pipeline seamlessly before credentials expire or on crash,
  • Ensures zero downtime — KVS sees a continuous stream.

Ideal for 24/7 security camera streaming on EC2 with IAM roles.


Full Script: run-kvs-pipeline.sh

#!/usr/bin/env bash
set -euo pipefail

# ------------------------------------------------------------------
# CONFIGURATION (edit only these lines)
# ------------------------------------------------------------------
ROLE_NAME="LiveStreamRole"                # IAM role attached to EC2
STREAM_NAME="TAPO-C246D-Stream"           # KVS stream name
RTSP_URL="rtsp://cctv-dual-tapo:squirrel@192.168.9.241:554/stream2"
AWS_REGION="eu-west-1"
METADATA_URL="http://169.254.169.254/latest"
TOKEN_TTL_SECONDS=21600                   # 6 hours (max for IMDSv2)
REFRESH_BEFORE_EXPIRY=300                 # Refresh 5 min early
# ------------------------------------------------------------------

# ---------- Helper: get a fresh token ----------
get_token() {
    curl -sX PUT "$METADATA_URL/api/token" \
         -H "X-aws-ec2-metadata-token-ttl-seconds: $TOKEN_TTL_SECONDS"
}

# ---------- Helper: get credentials for the role ----------
get_credentials() {
    local token="$1"
    curl -s -H "X-aws-ec2-metadata-token: $token" \
         "$METADATA_URL/meta-data/iam/security-credentials/$ROLE_NAME"
}

# ---------- Export fresh env vars ----------
export_credentials() {
    local creds_json="$1"
    export AWS_ACCESS_KEY_ID=$(echo "$creds_json" | jq -r .AccessKeyId)
    export AWS_SECRET_ACCESS_KEY=$(echo "$creds_json" | jq -r .SecretAccessKey)
    export AWS_SESSION_TOKEN=$(echo "$creds_json" | jq -r .Token)
    export AWS_REGION="$AWS_REGION"
    export CRED_EXPIRY_EPOCH=$(date -d "$(echo "$creds_json" | jq -r .Expiration)" +%s)
}

# ---------- Build the GStreamer command ----------
build_gst_cmd() {
    cat <<EOF
GST_DEBUG=kvssink:2 \
gst-launch-1.0 -e rtspsrc location="$RTSP_URL" latency=0 name=src \
  src. ! application/x-rtp,media=video,encoding-name=H264 ! rtph264depay ! h264parse ! avdec_h264 \
       ! videoconvert \
       ! x264enc tune=zerolatency bitrate=2000 speed-preset=superfast key-int-max=30 \
                 ! video/x-h264,profile=baseline \
       ! kvssink stream-name="$STREAM_NAME" aws-region="$AWS_REGION" storage-size=512
EOF
}

# ---------- Start (or restart) the pipeline ----------
start_pipeline() {
    echo "[$(date)] Starting pipeline with fresh credentials (expire $(date -d "@$CRED_EXPIRY_EPOCH"))"
    bash -c "$(build_gst_cmd)" &
    GST_PID=$!
    echo $GST_PID > /tmp/kvs_pipeline.pid
}

# ---------- Main credential-refresh loop ----------
main() {
    TOKEN=$(get_token)
    CREDS=$(get_credentials "$TOKEN")
    export_credentials "$CREDS"
    start_pipeline

    while true; do
        NOW=$(date +%s)
        REFRESH_AT=$(( CRED_EXPIRY_EPOCH - REFRESH_BEFORE_EXPIRY ))

        SLEEP_SECONDS=$(( REFRESH_AT - NOW ))
        if (( SLEEP_SECONDS > 0 )); then
            echo "[$(date)] Sleeping $SLEEP_SECONDS s until credential refresh..."
            sleep "$SLEEP_SECONDS"
        fi

        # Refresh credentials
        TOKEN=$(get_token)
        CREDS=$(get_credentials "$TOKEN")
        export_credentials "$CREDS"

        # Graceful restart
        if [[ -f /tmp/kvs_pipeline.pid ]] && kill -0 $(cat /tmp/kvs_pipeline.pid) 2>/dev/null; then
            echo "[$(date)] Sending SIGTERM to old pipeline (PID $(cat /tmp/kvs_pipeline.pid))"
            kill $(cat /tmp/kvs_pipeline.pid) || true
            wait $(cat /tmp/kvs_pipeline.pid) 2>/dev/null || true
        fi

        start_pipeline
    done
}

trap 'echo "[$(date)] Received signal, cleaning up..."; kill $(cat /tmp/kvs_pipeline.pid) 2>/dev/null || true; exit' SIGINT SIGTERM

main

How It Works

Component Function
IMDSv2 Token Fetches a session token valid for 6 hours
IAM Role Credentials Gets temporary AccessKeyId, SecretKey, SessionToken
Credential Expiry Tracking Parses Expiration field → Unix timestamp
Auto-Refresh Loop Wakes up 5 min before expiry
Graceful Restart Sends SIGTERM → kvssink flushes final fragment → new pipeline starts
Zero Downtime KVS sees continuous fragment numbers → no stream interruption

Prerequisites

Requirement How to Verify
EC2 instance with IAM role LiveStreamRole aws sts get-caller-identity
gst-launch-1.0, gstreamer1.0-plugins-*, kvssink gst-launch-1.0 --version
jq, curl jq --version, curl --version
RTSP camera reachable ffplay rtsp://...

Install missing packages (Ubuntu/Debian):

sudo apt update
sudo apt install -y gstreamer1.0-tools gstreamer1.0-plugins-base \
                    gstreamer1.0-plugins-good gstreamer1.0-plugins-bad \
                    gstreamer1.0-plugins-ugly gstreamer1.0-libav jq curl

How to Use It

# 1. Save the script
cat > run-kvs-pipeline.sh << 'EOF'
# [PASTE FULL SCRIPT HERE]
EOF

chmod +x run-kvs-pipeline.sh
# 2. Run it (as the same user that has access to the IMDSv2 endpoint)
nohup ./run-kvs-pipeline.sh > kvs.log 2>&1 &   # start in background
tail -f kvs.log                               # show the log live

One-Liner (Start + Live Logs)

nohup ./run-kvs-pipeline.sh > kvs.log 2>&1 & tail -f kvs.log

Log Interpretation

Log Type Meaning
Starting pipeline with fresh credentials... Pipeline (re)started
Sleeping 17820 s until credential refresh... Will refresh in ~5 hours
Sending SIGTERM to old pipeline... Graceful restart in progress
postReadCallback(): Wrote X bytes Normal debug (can be silenced)
PERSISTED Fragment successfully saved in KVS