Skip to content

Commit

Permalink
Kendra Index fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
spugachev committed Oct 11, 2023
1 parent 96d8dc3 commit a2be3e6
Show file tree
Hide file tree
Showing 18 changed files with 263 additions and 85 deletions.
2 changes: 2 additions & 0 deletions lib/chatbot-api/functions/api-handler/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from routes.sessions import router as sessions_router
from routes.semantic_search import router as semantic_search_router
from routes.documents import router as documents_router
from routes.kendra import router as kendra_router

tracer = Tracer()
logger = Logger()
Expand All @@ -43,6 +44,7 @@
app.include_router(sessions_router)
app.include_router(semantic_search_router)
app.include_router(documents_router)
app.include_router(kendra_router)


@app.exception_handler(genai_core.types.CommonError)
Expand Down
40 changes: 40 additions & 0 deletions lib/chatbot-api/functions/api-handler/routes/kendra.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import genai_core.parameters
import genai_core.kendra
from pydantic import BaseModel
from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.event_handler.api_gateway import Router

tracer = Tracer()
router = Router()
logger = Logger()


class KendraDataSynchRequest(BaseModel):
workspaceId: str


@router.get("/rag/engines/kendra/indexes")
@tracer.capture_method
def kendra_indexes():
indexes = genai_core.kendra.get_kendra_indexes()

return {"ok": True, "data": indexes}


@router.post("/rag/engines/kendra/data-sync")
@tracer.capture_method
def kendra_data_sync():
data: dict = router.current_event.json_body
request = KendraDataSynchRequest(**data)

genai_core.kendra.start_kendra_data_sync(workspace_id=request.workspaceId)

return {"ok": True, "data": True}


@router.get("/rag/engines/kendra/data-sync/<workspace_id>")
@tracer.capture_method
def kendra_is_syncing(workspace_id: str):
result = genai_core.kendra.kendra_is_syncing(workspace_id=workspace_id)

return {"ok": True, "data": result}
19 changes: 0 additions & 19 deletions lib/chatbot-api/functions/api-handler/routes/rag.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,3 @@ def engines():
]

return {"ok": True, "data": ret_value}


@router.get("/rag/engines/kendra/indexes")
@tracer.capture_method
def kendra_indexes():
indexes = genai_core.kendra.get_kendra_indexes()

return {"ok": True, "data": indexes}


@router.post("/rag/engines/kendra/data-sync")
@tracer.capture_method
def kendra_data_sync():
data: dict = router.current_event.json_body
request = KendraDataSynchRequest(**data)

genai_core.kendra.start_kendra_data_sync(workspace_id=request.workspaceId)

return {"ok": True, "data": True}
10 changes: 8 additions & 2 deletions lib/chatbot-api/functions/api-handler/routes/workspaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class CreateWorkspaceKendraRequest(BaseModel):
kind: str
name: str
kendraIndexId: str
useAllData: bool


@router.get("/workspaces")
Expand Down Expand Up @@ -271,11 +272,15 @@ def _create_workspace_kendra(request: CreateWorkspaceKendraRequest, config: dict
raise genai_core.types.CommonError("Kendra index not found")

return genai_core.workspaces.create_workspace_kendra(
workspace_name=workspace_name, kendra_index=kendra_index
workspace_name=workspace_name,
kendra_index=kendra_index,
use_all_data=request.useAllData,
)


def _convert_workspace(workspace: dict):
kendra_index_external = workspace.get("kendra_index_external")

return {
"id": workspace["workspace_id"],
"name": workspace["name"],
Expand All @@ -297,7 +302,8 @@ def _convert_workspace(workspace: dict):
"documents": workspace.get("documents"),
"sizeInBytes": workspace.get("size_in_bytes"),
"kendraIndexId": workspace.get("kendra_index_id"),
"kendraIndexExternal": workspace.get("kendra_index_external"),
"kendraIndexExternal": kendra_index_external,
"kendraUseAllData": workspace.get("kendra_use_all_data", kendra_index_external),
"createdAt": workspace.get("created_at"),
"updatedAt": workspace.get("updated_at"),
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ def query_workspace_aurora(
)

items = []
vector_search_records = []
keyword_search_records = []
with AuroraConnection() as cursor:
if metric == "cosine":
cursor.execute(
Expand Down Expand Up @@ -110,7 +112,8 @@ def query_workspace_aurora(
raise Exception("Unknown metric")

vector_search_records = cursor.fetchall()
vector_search_records = _convert_records("vector_search", vector_search_records)
vector_search_records = _convert_records(
"vector_search", vector_search_records)
items.extend(vector_search_records)

if hybrid_search:
Expand Down
39 changes: 37 additions & 2 deletions lib/shared/layers/python-sdk/python/genai_core/kendra/data_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@
import genai_core.workspaces
from .client import get_kendra_client_for_index

DEFAULT_KENDRA_S3_DATA_SOURCE_ID = os.environ.get("DEFAULT_KENDRA_S3_DATA_SOURCE_ID")
DEFAULT_KENDRA_S3_DATA_SOURCE_ID = os.environ.get(
"DEFAULT_KENDRA_S3_DATA_SOURCE_ID")


def start_kendra_data_sync(workspace_id: str):
workspace = genai_core.workspaces.get_workspace(workspace_id=workspace_id)

if not workspace:
raise genai_core.types.CommonError(f"Workspace {workspace_id} not found")
raise genai_core.types.CommonError(
f"Workspace {workspace_id} not found")

if workspace["engine"] != "kendra":
raise genai_core.types.CommonError(
Expand All @@ -30,3 +32,36 @@ def start_kendra_data_sync(workspace_id: str):
)

print(response)


def kendra_is_syncing(workspace_id: str):
workspace = genai_core.workspaces.get_workspace(workspace_id=workspace_id)

if not workspace:
raise genai_core.types.CommonError(
f"Workspace {workspace_id} not found")

if workspace["engine"] != "kendra":
raise genai_core.types.CommonError(
f"Workspace {workspace_id} is not a kendra workspace"
)

if workspace["kendra_index_external"]:
return False

kendra_index_id = workspace["kendra_index_id"]
kendra = get_kendra_client_for_index(kendra_index_id)

response = kendra.list_data_source_sync_jobs(
IndexId=kendra_index_id, Id=DEFAULT_KENDRA_S3_DATA_SOURCE_ID, MaxResults=5
)

ret_value = False
for item in response["History"]:
status = item["Status"]

if status == "SYNCING" or status == "SYNCING_INDEXING":
ret_value = True
break

return ret_value
18 changes: 16 additions & 2 deletions lib/shared/layers/python-sdk/python/genai_core/kendra/query.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
import os
import re
import genai_core.types
from typing import List
from .client import get_kendra_client_for_index

s3_pattern = re.compile(r"(s3-|s3\.)?(.*)\.amazonaws\.com")


def query_workspace_kendra(
workspace_id: str, workspace: dict, query: str, limit: int, full_response: bool
):
kendra_index_id = workspace.get("kendra_index_id")
kendra_index_external = workspace.get("kendra_index_external", True)
kendra_use_all_data = workspace.get("kendra_use_all_data", False)

if not kendra_index_id:
raise genai_core.types.CommonError(
f"Could not find kendra index for workspace {workspace_id}"
Expand All @@ -17,7 +22,7 @@ def query_workspace_kendra(
kendra = get_kendra_client_for_index(kendra_index_id)
limit = max(1, min(100, limit))

if kendra_index_external:
if kendra_index_external or kendra_use_all_data:
result = kendra.retrieve(
IndexId=kendra_index_id, QueryText=query, PageSize=limit, PageNumber=1
)
Expand Down Expand Up @@ -52,7 +57,12 @@ def _convert_records(source: str, workspace_id: str, records: List[dict]):
converted_records = []
for record in records:
document_uri = record["DocumentURI"]
path = os.path.basename(document_uri)
is_s3 = s3_pattern.match(document_uri)
if is_s3:
path = os.path.basename(document_uri)
else:
path = document_uri

title = record.get("DocumentTitle")
content = record.get("Content")

Expand All @@ -61,6 +71,10 @@ def _convert_records(source: str, workspace_id: str, records: List[dict]):
for attribute in document_attributes:
if attribute["Key"] == "document_type":
document_type = attribute["Value"]["StringValue"]
break

if not document_type:
document_type = "file" if is_s3 else "website"

converted = {
"chunk_id": record.get("Id"),
Expand Down
16 changes: 13 additions & 3 deletions lib/shared/layers/python-sdk/python/genai_core/opensearch/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ def query_workspace_open_search(
vector_search_limit = 25
keyword_search_limit = 25

vector_search_records = []
keyword_search_records = []

selected_model = genai_core.embeddings.get_embeddings_model(
embeddings_model_provider, embeddings_model_name
)
Expand All @@ -42,7 +45,8 @@ def query_workspace_open_search(
vector_search_records = vector_query(
client, index_name, query_embeddings, vector_search_limit
)
vector_search_records = _convert_records("vector_search", vector_search_records)
vector_search_records = _convert_records(
"vector_search", vector_search_records)
items.extend(vector_search_records)

if hybrid_search:
Expand Down Expand Up @@ -167,12 +171,18 @@ def vector_query(client, index_name: str, vector: List[float], size: int = 25):

response = client.search(index=index_name, body=query, size=size)

return response["hits"]["hits"]
ret_value = response["hits"]["hits"]
ret_value = ret_value if ret_value is not None else []

return ret_value


def keyword_query(client, index_name: str, text: str, size: int = 25):
query = {"query": {"match": {"content": text}}}

response = client.search(index=index_name, body=query, size=size)

return response["hits"]["hits"]
ret_value = response["hits"]["hits"]
ret_value = ret_value if ret_value is not None else []

return ret_value
6 changes: 5 additions & 1 deletion lib/shared/layers/python-sdk/python/genai_core/workspaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,11 +227,14 @@ def create_workspace_open_search(
}


def create_workspace_kendra(workspace_name: str, kendra_index: dict):
def create_workspace_kendra(
workspace_name: str, kendra_index: dict, use_all_data: bool
):
workspace_id = str(uuid.uuid4())
timestamp = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")
kendra_index_id = kendra_index["id"]
kendra_index_external = kendra_index["external"]
use_all_data = use_all_data if not kendra_index_external else True

item = {
"workspace_id": workspace_id,
Expand All @@ -242,6 +245,7 @@ def create_workspace_kendra(workspace_name: str, kendra_index: dict):
"status": "submitted",
"kendra_index_id": kendra_index_id,
"kendra_index_external": kendra_index_external,
"kendra_use_all_data": use_all_data,
"documents": 0,
"vectors": 0,
"size_in_bytes": 0,
Expand Down
10 changes: 10 additions & 0 deletions lib/user-interface/react-app/src/common/api-client/api-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { WorkspacesClient } from "./workspaces-client";
import { SessionsClient } from "./sessions-client";
import { SemanticSearchClient } from "./semantic-search-client";
import { DocumentsClient } from "./documents-client";
import { KendraClient } from "./kendra-client";

export class ApiClient {
private _healthClient: HealthClient | undefined;
Expand All @@ -19,6 +20,7 @@ export class ApiClient {
private _sessionsClient: SessionsClient | undefined;
private _semanticSearchClient: SemanticSearchClient | undefined;
private _documentsClient: DocumentsClient | undefined;
private _kendraClient: KendraClient | undefined;

public get health() {
if (!this._healthClient) {
Expand Down Expand Up @@ -92,5 +94,13 @@ export class ApiClient {
return this._documentsClient;
}

public get kendra() {
if (!this._kendraClient) {
this._kendraClient = new KendraClient(this._appConfig);
}

return this._kendraClient;
}

constructor(protected _appConfig: AppConfig) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import { ApiResult, KendraIndexItem } from "../types";
import { ApiClientBase } from "./api-client-base";

export class KendraClient extends ApiClientBase {
async getKendraIndexes(): Promise<ApiResult<KendraIndexItem[]>> {
try {
const headers = await this.getHeaders();
const result = await fetch(
this.getApiUrl("/rag/engines/kendra/indexes"),
{
headers,
}
);

return result.json();
} catch (error) {
return this.error(error);
}
}

async startKendraDataSync(workspaceId: string): Promise<ApiResult<void>> {
try {
const headers = await this.getHeaders();
const result = await fetch(
this.getApiUrl("/rag/engines/kendra/data-sync"),
{
headers,
method: "POST",
body: JSON.stringify({ workspaceId }),
}
);

return result.json();
} catch (error) {
return this.error(error);
}
}

async kendraIsSyncing(workspaceId: string): Promise<ApiResult<boolean>> {
try {
const headers = await this.getHeaders();
const result = await fetch(
this.getApiUrl(`/rag/engines/kendra/data-sync/${workspaceId}`),
{
headers,
method: "GET",
}
);

return result.json();
} catch (error) {
return this.error(error);
}
}
}
Loading

0 comments on commit a2be3e6

Please sign in to comment.