update to aws location services

This commit is contained in:
King Matthew Ochoa 2025-07-05 23:06:26 +08:00
parent ab068cac8b
commit 387b71e4e8

View File

@ -2,54 +2,90 @@ import logging
import json
from typing import Dict, Any
from http import HTTPStatus
import urllib.request
import boto3
from botocore.exceptions import ClientError
import os
REGION = os.environ['AWS_REGION']
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Replace with the name of your AWS Location Place Index
PLACE_INDEX_NAME = 'helpngoindex'
def ensure_place_index_exists(index_name: str):
client = boto3.client('location', region_name=REGION)
# Check if index exists
try:
response = client.list_place_indexes()
if any(entry['IndexName'] == index_name for entry in response['Entries']):
print(f"✅ Place index '{index_name}' already exists.")
return
except ClientError as e:
print(f"❌ Error listing indexes: {e}")
return
# Create the index
try:
print(f"🆕 Creating place index '{index_name}' in region '{region}'...")
client.create_place_index(
IndexName=index_name,
DataSource='Here',
Description='Auto-created place index for geocoding',
PricingPlan='RequestBasedUsage'
)
except ClientError as e:
print(f"❌ Error creating index: {e}")
return
# Wait until ACTIVE
print("⏳ Waiting for index to become ACTIVE...")
while True:
try:
status = client.describe_place_index(IndexName=index_name)['Status']
print(f"Status: {status}")
if status == 'ACTIVE':
print(f"✅ Place index '{index_name}' is now ACTIVE.")
break
time.sleep(2)
except ClientError as e:
print(f"❌ Error checking status: {e}")
break
# Initialize the AWS Location client
location_client = boto3.client('location', region_name=REGION)
def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
"""
AWS Lambda handler for processing Bedrock agent requests and geocoding location names using Nominatim.
Args:
event (Dict[str, Any]): The Lambda event containing action details
context (Any): The Lambda context object
Returns:
Dict[str, Any]: Response formatted for Bedrock Agents
AWS Lambda handler for processing Bedrock agent requests and geocoding location names using AWS Location Service.
"""
try:
action_group = event['actionGroup']
function = event['function']
message_version = event.get('messageVersion', 1)
parameters = event.get('parameters', [])
print(parameters)
parameters_dict ={parameter["name"]:parameter["value"] for parameter in parameters}
parameters_dict = {parameter["name"]: parameter["value"] for parameter in parameters}
location_name = parameters_dict.get('location_name')
if not location_name:
raise KeyError("Missing required parameter: 'location_name'")
# Call Nominatim API to geocode the location
query = urllib.parse.urlencode({
"q": location_name,
"format": "json",
"limit": 1
})
headers = {
"User-Agent": "aws-lambda-geocoder/1.0"
}
url = f"https://nominatim.openstreetmap.org/search?{query}"
# Use AWS Location Service to geocode the location
ensure_place_index_exists(PLACE_INDEX_NAME)
response = location_client.search_place_index_for_text(
IndexName=PLACE_INDEX_NAME,
Text=location_name,
MaxResults=1
)
req = urllib.request.Request(url, headers=headers)
with urllib.request.urlopen(req) as response:
response_data = response.read()
print(response_data)
results = json.loads(response_data)
results = response.get("Results", [])
if not results:
raise ValueError(f"No coordinates found for '{location_name}'")
lat = float(results[0]["lat"])
lon = float(results[0]["lon"])
# Extract coordinates [Longitude, Latitude]
lon, lat = results[0]['Place']['Geometry']['Point']
# Prepare Bedrock-compatible response
response_body = {
@ -80,6 +116,14 @@ def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
'statusCode': HTTPStatus.BAD_REQUEST,
'body': f'Error: {str(e)}'
}
except ClientError as e:
logger.error('AWS Location error: %s', str(e))
return {
'statusCode': HTTPStatus.INTERNAL_SERVER_ERROR,
'body': f'AWS Location error: {str(e)}'
}
except Exception as e:
logger.error('Unexpected error: %s', str(e))
return {