160 lines
5.5 KiB
Python
160 lines
5.5 KiB
Python
import json
|
|
import os
|
|
import boto3
|
|
|
|
from datetime import datetime, timezone
|
|
from decimal import Decimal
|
|
|
|
now = datetime.utcnow()
|
|
dynamodb = boto3.resource("dynamodb")
|
|
table = dynamodb.Table("cctv-people-analytics-table")
|
|
region = "eu-west-1"
|
|
|
|
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth
|
|
from requests_aws4auth import AWS4Auth
|
|
|
|
service = 'aoss'
|
|
credentials = boto3.Session().get_credentials().get_frozen_credentials()
|
|
print(credentials.access_key, credentials.secret_key, credentials.token)
|
|
|
|
auth = AWSV4SignerAuth(credentials, region, service)
|
|
|
|
auth = AWS4Auth(
|
|
credentials.access_key,
|
|
credentials.secret_key,
|
|
region,
|
|
service,
|
|
session_token=credentials.token # ✅ include session token!
|
|
)
|
|
|
|
host = "5xmv01a4621tqq8agz99.eu-west-1.aoss.amazonaws.com"
|
|
index = now.strftime("cctv-people-analytics-%Y-%m-%d-%H")
|
|
url = f"{host}/{index}/_doc"
|
|
|
|
client = OpenSearch(
|
|
hosts=[{'host': host, 'port': 443}],
|
|
http_auth=auth,
|
|
use_ssl=True,
|
|
verify_certs=True,
|
|
connection_class=RequestsHttpConnection,
|
|
pool_maxsize=20,
|
|
)
|
|
|
|
if not client.indices.exists(index=index):
|
|
client.indices.create(index=index)
|
|
|
|
def normalize_timestamp(ts: str) -> str:
|
|
"""
|
|
Normalize timestamp into ISO 8601 with UTC (Z).
|
|
Handles:
|
|
- %Y-%m-%dT%H-%M-%S (bad format with dashes)
|
|
- %Y-%m-%dT%H:%M:%S (already correct)
|
|
- Epoch numbers (int/float)
|
|
"""
|
|
if isinstance(ts, (int, float)): # epoch
|
|
return datetime.fromtimestamp(ts/1000, tz=timezone.utc).isoformat().replace("+00:00", "Z")
|
|
|
|
if isinstance(ts, str):
|
|
# Already ISO with colons
|
|
try:
|
|
dt = datetime.strptime(ts, "%Y-%m-%dT%H:%M:%S")
|
|
return dt.replace(tzinfo=timezone.utc).isoformat().replace("+00:00", "Z")
|
|
except ValueError:
|
|
pass
|
|
|
|
# Bad format with dashes
|
|
try:
|
|
dt = datetime.strptime(ts, "%Y-%m-%dT%H-%M-%S")
|
|
return dt.replace(tzinfo=timezone.utc).isoformat().replace("+00:00", "Z")
|
|
except ValueError:
|
|
pass
|
|
|
|
# Fallback: return as-is
|
|
return str(ts)
|
|
|
|
def epoch_converter(ts):
|
|
"""
|
|
Normalize timestamp into epoch milliseconds (integer).
|
|
"""
|
|
if isinstance(ts, (int, float)):
|
|
# already epoch
|
|
return int(ts)
|
|
if isinstance(ts, str):
|
|
try:
|
|
# Try ISO format
|
|
dt = datetime.strptime(ts, "%Y-%m-%dT%H:%M:%S")
|
|
except ValueError:
|
|
# Bad format with dashes
|
|
dt = datetime.strptime(ts, "%Y-%m-%dT%H-%M-%S")
|
|
# Convert to UTC epoch milliseconds
|
|
dt = dt.replace(tzinfo=timezone.utc)
|
|
return int(dt.timestamp() * 1000)
|
|
return int(ts) # fallback
|
|
|
|
def lambda_handler(event, context):
|
|
print(event)
|
|
dict_stream_camid = {
|
|
'CCTV-Stream': 'iMbYu7fVoHRP',
|
|
'TAPO-C246D-Stream': 'piYD7HWDKAub',
|
|
'TAPO-C500-Stream': 'EZ4DnbJjqFvk',
|
|
'HIKVision-2MP-Dome-1-Stream': 'x4j8eVyrF6mO',
|
|
'HIKVision-2MP-Dome-2-Stream': 'NnIXfhJeqX6i',
|
|
'HIKVision-2MP-Bullet-1-Stream': 'vP2wVuCEN5vi',
|
|
'HIKVision-2MP-Bullet-2-Stream': 'SnPNqUVLHinu',
|
|
'Dahua-2MP-Dome-1-Stream': 'YMpY5kku8IXg',
|
|
'Dahua-2MP-Dome-2-Stream': 'yyzi5SqLTftn',
|
|
'Dahua-2MP-Bullet-1-Stream': 'YTOYlpGjviLB',
|
|
'Dahua-2MP-Bullet-2-Stream': 'cLVBa3vAV4yf'
|
|
}
|
|
|
|
notification = json.loads(event['Records'][0]['Sns']['Message'])
|
|
if notification['eventNamespace']['type'] == 'LABEL_DETECTED':
|
|
stream_arn = notification['inputInformation']['kinesisVideo']['streamArn']
|
|
camera_id = dict_stream_camid[stream_arn.split('/')[1]]
|
|
|
|
for label in notification['labels']:
|
|
label_id = label['id']
|
|
detect_timestamp = datetime.now().strftime('%Y-%m-%dT%H:%M:%S')
|
|
frame_timestamp = label['videoMapping']['kinesisVideoMapping']['serverTimestamp']
|
|
frame_url = str.replace(label['frameImageUri'], 's3://cctv-stream-results/', 'https://cctv-stream-results.s3.eu-west-1.amazonaws.com/')
|
|
label_name = label['name']
|
|
confidence = label['confidence']
|
|
|
|
to_upload_dynamodb = {
|
|
'camera_id': camera_id,
|
|
'label_id': label_id,
|
|
'detect_timestamp': normalize_timestamp(detect_timestamp),
|
|
'frame_timestamp': normalize_timestamp(frame_timestamp),
|
|
'frame_url': frame_url,
|
|
'label_name': label_name,
|
|
'confidence': Decimal(str(confidence)),
|
|
'rekognition_response': json.dumps(label)
|
|
}
|
|
print()
|
|
response= table.put_item(Item=to_upload_dynamodb)
|
|
if response['ResponseMetadata']['HTTPStatusCode'] == 200:
|
|
print("Item successfully written!")
|
|
|
|
# headers = { "Content-Type": "application/json" }
|
|
# response = requests.post(url, auth=awsauth, json=to_upload, headers=headers)
|
|
# print(response.text, response.status_code)
|
|
to_upload_es = {
|
|
'label_id': label_id,
|
|
'@timestamp': normalize_timestamp(detect_timestamp),
|
|
'frame_timestamp': normalize_timestamp(frame_timestamp),
|
|
'frame_url': frame_url,
|
|
'label_name': label_name,
|
|
'confidence': Decimal(str(confidence))
|
|
}
|
|
|
|
####
|
|
response = client.index(
|
|
index = index,
|
|
body = to_upload_es
|
|
)
|
|
print(response)
|
|
|
|
return {
|
|
'statusCode': 200,
|
|
'body': json.dumps('Processing cctv results success'),
|
|
} |