# **Kinesis Video Streams (KVS) Auto-Recovering RTSP → KVS Pipeline** --- ## **Overview** This script runs a GStreamer pipeline that: * Pulls an RTSP stream from a camera (e.g., TP-Link Tapo C246), * Re-encodes it to H.264 baseline, * Streams it live to AWS Kinesis Video Streams using kvssink, * Automatically refreshes expiring IAM role credentials (from EC2 IMDSv2), * Restarts the pipeline seamlessly before credentials expire or on crash, * Ensures zero downtime — KVS sees a continuous stream. Ideal for 24/7 security camera streaming on EC2 with IAM roles. --- ## Full Script: run-kvs-pipeline.sh ``` #!/usr/bin/env bash set -euo pipefail # ------------------------------------------------------------------ # CONFIGURATION (edit only these lines) # ------------------------------------------------------------------ ROLE_NAME="LiveStreamRole" # IAM role attached to EC2 STREAM_NAME="TAPO-C246D-Stream" # KVS stream name RTSP_URL="rtsp://cctv-dual-tapo:squirrel@192.168.9.241:554/stream2" AWS_REGION="eu-west-1" METADATA_URL="http://169.254.169.254/latest" TOKEN_TTL_SECONDS=21600 # 6 hours (max for IMDSv2) REFRESH_BEFORE_EXPIRY=300 # Refresh 5 min early # ------------------------------------------------------------------ # ---------- Helper: get a fresh token ---------- get_token() { curl -sX PUT "$METADATA_URL/api/token" \ -H "X-aws-ec2-metadata-token-ttl-seconds: $TOKEN_TTL_SECONDS" } # ---------- Helper: get credentials for the role ---------- get_credentials() { local token="$1" curl -s -H "X-aws-ec2-metadata-token: $token" \ "$METADATA_URL/meta-data/iam/security-credentials/$ROLE_NAME" } # ---------- Export fresh env vars ---------- export_credentials() { local creds_json="$1" export AWS_ACCESS_KEY_ID=$(echo "$creds_json" | jq -r .AccessKeyId) export AWS_SECRET_ACCESS_KEY=$(echo "$creds_json" | jq -r .SecretAccessKey) export AWS_SESSION_TOKEN=$(echo "$creds_json" | jq -r .Token) export AWS_REGION="$AWS_REGION" export CRED_EXPIRY_EPOCH=$(date -d "$(echo "$creds_json" | jq -r .Expiration)" +%s) } # ---------- Build the GStreamer command ---------- build_gst_cmd() { cat < /tmp/kvs_pipeline.pid } # ---------- Main credential-refresh loop ---------- main() { TOKEN=$(get_token) CREDS=$(get_credentials "$TOKEN") export_credentials "$CREDS" start_pipeline while true; do NOW=$(date +%s) REFRESH_AT=$(( CRED_EXPIRY_EPOCH - REFRESH_BEFORE_EXPIRY )) SLEEP_SECONDS=$(( REFRESH_AT - NOW )) if (( SLEEP_SECONDS > 0 )); then echo "[$(date)] Sleeping $SLEEP_SECONDS s until credential refresh..." sleep "$SLEEP_SECONDS" fi # Refresh credentials TOKEN=$(get_token) CREDS=$(get_credentials "$TOKEN") export_credentials "$CREDS" # Graceful restart if [[ -f /tmp/kvs_pipeline.pid ]] && kill -0 $(cat /tmp/kvs_pipeline.pid) 2>/dev/null; then echo "[$(date)] Sending SIGTERM to old pipeline (PID $(cat /tmp/kvs_pipeline.pid))" kill $(cat /tmp/kvs_pipeline.pid) || true wait $(cat /tmp/kvs_pipeline.pid) 2>/dev/null || true fi start_pipeline done } trap 'echo "[$(date)] Received signal, cleaning up..."; kill $(cat /tmp/kvs_pipeline.pid) 2>/dev/null || true; exit' SIGINT SIGTERM main ``` --- ## **How It Works** | Component | Function | | :---- | :---- | | IMDSv2 Token | Fetches a session token valid for 6 hours | | IAM Role Credentials | Gets temporary AccessKeyId, SecretKey, SessionToken | | Credential Expiry Tracking | Parses Expiration field → Unix timestamp | | Auto-Refresh Loop | Wakes up 5 min before expiry | | Graceful Restart | Sends SIGTERM → kvssink flushes final fragment → new pipeline starts | | Zero Downtime | KVS sees continuous fragment numbers → no stream interruption | --- ## Prerequisites | Requirement | How to Verify | | :---- | :---- | | EC2 instance with IAM role LiveStreamRole | aws sts get-caller-identity | | gst-launch-1.0, gstreamer1.0-plugins-\*, kvssink | gst-launch-1.0 \--version | | jq, curl | jq \--version, curl \--version | | RTSP camera reachable | ffplay rtsp://... | **Install missing packages (Ubuntu/Debian):** ``` sudo apt update sudo apt install -y gstreamer1.0-tools gstreamer1.0-plugins-base \ gstreamer1.0-plugins-good gstreamer1.0-plugins-bad \ gstreamer1.0-plugins-ugly gstreamer1.0-libav jq curl ``` --- ## **How to Use It** ``` # 1. Save the script cat > run-kvs-pipeline.sh << 'EOF' # [PASTE FULL SCRIPT HERE] EOF chmod +x run-kvs-pipeline.sh ``` ``` # 2. Run it (as the same user that has access to the IMDSv2 endpoint) nohup ./run-kvs-pipeline.sh > kvs.log 2>&1 & # start in background tail -f kvs.log # show the log live ``` --- ### **One-Liner (Start \+ Live Logs)** ``` nohup ./run-kvs-pipeline.sh > kvs.log 2>&1 & tail -f kvs.log ``` --- ## **Log Interpretation** | Log Type | Meaning | | :---- | :---- | | Starting pipeline with fresh credentials... | Pipeline (re)started | | Sleeping 17820 s until credential refresh... | Will refresh in \~5 hours | | Sending SIGTERM to old pipeline... | Graceful restart in progress | | postReadCallback(): Wrote X bytes | Normal debug (can be silenced) | | PERSISTED | Fragment successfully saved in KVS |