Skip to content

Commit

Permalink
integrated opentelemetry into application_insights code (#191)
Browse files Browse the repository at this point in the history
Co-authored-by: Josh Bradley <[email protected]>
  • Loading branch information
KennyZhang1 and jgbradley1 authored Oct 26, 2024
1 parent fe3c103 commit 587cb92
Show file tree
Hide file tree
Showing 10 changed files with 2,775 additions and 1,158 deletions.
2,401 changes: 1,281 additions & 1,120 deletions backend/poetry.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions backend/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ authors = [
"Douglas Orbaker <[email protected]>",
"Chris Sanchez <[email protected]>",
"Shane Solomon <[email protected]>",
"Kenny Zhang <[email protected]>"
]
readme = "README.md"
license = "MIT"
Expand Down Expand Up @@ -51,9 +52,8 @@ httpx = ">=0.25.2"
kubernetes = ">=29.0.0"
networkx = ">=3.2.1"
nltk = "*"
opencensus = ">=0.11.4"
opencensus-context = ">=0.1.3"
opencensus-ext-azure = ">=1.1.13"
azure-monitor-opentelemetry-exporter = "*"
opentelemetry-sdk = ">=1.27.0"
pandas = ">=2.2.1"
pyaml-env = ">=1.2.1"
pyarrow = ">=15.0.0"
Expand Down
4 changes: 2 additions & 2 deletions backend/src/api/query_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ async def global_search_streaming(request: GraphRequest):
else:
# Current investigations show that community level 1 is the most useful for global search. Set this as the default value
COMMUNITY_LEVEL = 1

for index_name in sanitized_index_names:
validate_index_file_exist(index_name, COMMUNITY_REPORT_TABLE)
validate_index_file_exist(index_name, ENTITIES_TABLE)
Expand Down Expand Up @@ -249,7 +249,7 @@ async def local_search_streaming(request: GraphRequest):
NODES_TABLE = "output/create_final_nodes.parquet"
RELATIONSHIPS_TABLE = "output/create_final_relationships.parquet"
TEXT_UNITS_TABLE = "output/create_final_text_units.parquet"

if isinstance(request.community_level, int):
COMMUNITY_LEVEL = request.community_level
else:
Expand Down
2 changes: 1 addition & 1 deletion backend/src/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class GraphRequest(BaseModel):
class GraphResponse(BaseModel):
result: Any
context_data: Any


class GraphDataResponse(BaseModel):
nodes: int
Expand Down
30 changes: 23 additions & 7 deletions backend/src/reporting/application_insights_workflow_callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,17 @@
Optional,
)

from azure.monitor.opentelemetry.exporter import AzureMonitorLogExporter
from datashaper.workflow.workflow_callbacks import NoopWorkflowCallbacks
from opencensus.ext.azure.log_exporter import AzureLogHandler
from opentelemetry._logs import (
get_logger_provider,
set_logger_provider,
)
from opentelemetry.sdk._logs import (
LoggerProvider,
LoggingHandler,
)
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor


class ApplicationInsightsWorkflowCallbacks(NoopWorkflowCallbacks):
Expand Down Expand Up @@ -74,15 +83,23 @@ def __init_logger(self, connection_string, max_logger_init_retries: int = 10):
unique_hash = hashlib.sha256(current_time.encode()).hexdigest()
self._logger_name = f"{self.__class__.__name__}-{unique_hash}"
if self._logger_name not in logging.Logger.manager.loggerDict:
# attach azure monitor log exporter to logger provider
logger_provider = LoggerProvider()
set_logger_provider(logger_provider)
exporter = AzureMonitorLogExporter(connection_string=connection_string)
get_logger_provider().add_log_record_processor(
BatchLogRecordProcessor(
exporter=exporter,
schedule_delay_millis=60000,
)
)
# instantiate new logger
self._logger = logging.getLogger(self._logger_name)
self._logger.propagate = False
# remove any existing handlers
self._logger.handlers.clear()
# set up Azure Monitor
self._logger.addHandler(
AzureLogHandler(connection_string=connection_string)
)
# fetch handler from logger provider and attach to class
self._logger.addHandler(LoggingHandler())
# set logging level
self._logger.setLevel(logging.DEBUG)

Expand All @@ -91,8 +108,7 @@ def __init_logger(self, connection_string, max_logger_init_retries: int = 10):

def _format_details(self, details: Dict[str, Any] | None = None) -> Dict[str, Any]:
"""
Format the details dictionary to comply with the Application Insights structured.
Format the details dictionary to comply with the Application Insights structured
logging Property column standard.
Args:
Expand Down
19 changes: 13 additions & 6 deletions infra/deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -343,23 +343,28 @@ deployAzureResources () {
--parameters "enablePrivateEndpoints=$ENABLE_PRIVATE_ENDPOINTS" \
--parameters "acrName=$CONTAINER_REGISTRY_NAME" \
--output json)
# errors in deployment may not be caught by exitIfCommandFailed function so we also check the output for errors
exitIfCommandFailed $? "Error deploying Azure resources..."
exitIfValueEmpty "$AZURE_DEPLOY_RESULTS" "Error deploying Azure resources..."
AZURE_OUTPUTS=$(jq -r .properties.outputs <<< $AZURE_DEPLOY_RESULTS)
exitIfCommandFailed $? "Error parsing outputs from Azure resource deployment..."
exitIfCommandFailed $? "Error parsing outputs from Azure deployment..."
exitIfValueEmpty "$AZURE_OUTPUTS" "Error parsing outputs from Azure deployment..."
assignAOAIRoleToManagedIdentity
}

validateSKUs() {
# Run SKU validation functions unless skip flag is set
if [ $2 = true ]; then
checkSKUAvailability $1
checkSKUQuotas $1
local location=$1
local validate_skus=$2
if [ $validate_skus = true ]; then
checkSKUAvailability $location
checkSKUQuotas $location
fi
}

checkSKUAvailability() {
# Function to validate that the required SKUs are not restricted for the given region
printf "Checking Location for SKU Availability... "
printf "Checking cloud region for VM sku availability... "
local location=$1
local sku_checklist=("standard_d4s_v5" "standard_d8s_v5" "standard_e8s_v5")
for sku in ${sku_checklist[@]}; do
Expand Down Expand Up @@ -682,6 +687,9 @@ startBanner
checkRequiredTools
populateParams $PARAMS_FILE

# Check SKU availability and quotas
validateSKUs $LOCATION $VALIDATE_SKUS_FLAG

# Create resource group
createResourceGroupIfNotExists $LOCATION $RESOURCE_GROUP

Expand All @@ -690,7 +698,6 @@ createSshkeyIfNotExists $RESOURCE_GROUP

# Deploy Azure resources
checkForApimSoftDelete
validateSKUs $LOCATION $VALIDATE_SKUS_FLAG
deployAzureResources

# Deploy the graphrag backend docker image to ACR
Expand Down
4 changes: 3 additions & 1 deletion infra/helm/graphrag/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ ingress:
service.beta.kubernetes.io/azure-load-balancer-internal: "true"

graphragConfig:
AI_SEARCH_AUDIENCE: ""
AI_SEARCH_URL: ""
APP_INSIGHTS_CONNECTION_STRING: ""
COSMOS_URI_ENDPOINT: ""
GRAPHRAG_API_BASE: ""
GRAPHRAG_API_VERSION: ""
Expand All @@ -40,7 +43,6 @@ graphragConfig:
GRAPHRAG_EMBEDDING_DEPLOYMENT_NAME: ""
REPORTERS: "blob,console,app_insights"
STORAGE_ACCOUNT_BLOB_URL: ""
AI_SEARCH_URL: ""

master:
name: "master"
Expand Down
32 changes: 24 additions & 8 deletions notebooks/1-Quickstart.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -327,20 +327,28 @@
"%%time\n",
"\n",
"\n",
"def global_search(index_name: str | list[str], query: str, community_level: int) -> requests.Response:\n",
"def global_search(\n",
" index_name: str | list[str], query: str, community_level: int\n",
") -> requests.Response:\n",
" \"\"\"Run a global query over the knowledge graph(s) associated with one or more indexes\"\"\"\n",
" url = endpoint + \"/query/global\"\n",
" # optional parameter: community level to query the graph at (default for global query = 1)\n",
" request = {\"index_name\": index_name, \"query\": query, \"community_level\": community_level}\n",
" request = {\n",
" \"index_name\": index_name,\n",
" \"query\": query,\n",
" \"community_level\": community_level,\n",
" }\n",
" return requests.post(url, json=request, headers=headers)\n",
"\n",
"\n",
"# perform a global query\n",
"global_response = global_search(\n",
" index_name=index_name, query=\"Summarize the main topics found in this data\", community_level=1\n",
" index_name=index_name,\n",
" query=\"Summarize the main topics found in this data\",\n",
" community_level=1,\n",
")\n",
"global_response_data = parse_query_response(global_response, return_context_data=True)\n",
"global_response_data\n"
"global_response_data"
]
},
{
Expand All @@ -361,17 +369,25 @@
"%%time\n",
"\n",
"\n",
"def local_search(index_name: str | list[str], query: str, community_level: int) -> requests.Response:\n",
"def local_search(\n",
" index_name: str | list[str], query: str, community_level: int\n",
") -> requests.Response:\n",
" \"\"\"Run a local query over the knowledge graph(s) associated with one or more indexes\"\"\"\n",
" url = endpoint + \"/query/local\"\n",
" # optional parameter: community level to query the graph at (default for local query = 2)\n",
" request = {\"index_name\": index_name, \"query\": query, \"community_level\": community_level}\n",
" request = {\n",
" \"index_name\": index_name,\n",
" \"query\": query,\n",
" \"community_level\": community_level,\n",
" }\n",
" return requests.post(url, json=request, headers=headers)\n",
"\n",
"\n",
"# perform a local query\n",
"local_response = local_search(\n",
" index_name=index_name, query=\"Summarize the main topics found in this data\", community_level=2\n",
" index_name=index_name,\n",
" query=\"Summarize the main topics found in this data\",\n",
" community_level=2,\n",
")\n",
"local_response_data = parse_query_response(local_response, return_context_data=True)\n",
"local_response_data"
Expand All @@ -380,7 +396,7 @@
],
"metadata": {
"kernelspec": {
"display_name": ".venv",
"display_name": "graphrag-venv",
"language": "python",
"name": "python3"
},
Expand Down
44 changes: 34 additions & 10 deletions notebooks/2-Advanced_Getting_Started.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -324,11 +324,17 @@
" return requests.get(url, headers=headers)\n",
"\n",
"\n",
"def global_search(index_name: str | list[str], query: str, community_level: int) -> requests.Response:\n",
"def global_search(\n",
" index_name: str | list[str], query: str, community_level: int\n",
") -> requests.Response:\n",
" \"\"\"Run a global query over the knowledge graph(s) associated with one or more indexes\"\"\"\n",
" url = endpoint + \"/query/global\"\n",
" # optional parameter: community level to query the graph at (default for global query = 1)\n",
" request = {\"index_name\": index_name, \"query\": query, \"community_level\": community_level}\n",
" request = {\n",
" \"index_name\": index_name,\n",
" \"query\": query,\n",
" \"community_level\": community_level,\n",
" }\n",
" return requests.post(url, json=request, headers=headers)\n",
"\n",
"\n",
Expand All @@ -338,7 +344,11 @@
" \"\"\"Run a global query across one or more indexes and stream back the response\"\"\"\n",
" url = endpoint + \"/query/streaming/global\"\n",
" # optional parameter: community level to query the graph at (default for global query = 1)\n",
" request = {\"index_name\": index_name, \"query\": query, \"community_level\": community_level}\n",
" request = {\n",
" \"index_name\": index_name,\n",
" \"query\": query,\n",
" \"community_level\": community_level,\n",
" }\n",
" context_list = []\n",
" with requests.post(url, json=request, headers=headers, stream=True) as r:\n",
" r.raise_for_status()\n",
Expand All @@ -358,11 +368,17 @@
" display(pd.DataFrame.from_dict(context_list[0][\"reports\"]).head(10))\n",
"\n",
"\n",
"def local_search(index_name: str | list[str], query: str, community_level: int) -> requests.Response:\n",
"def local_search(\n",
" index_name: str | list[str], query: str, community_level: int\n",
") -> requests.Response:\n",
" \"\"\"Run a local query over the knowledge graph(s) associated with one or more indexes\"\"\"\n",
" url = endpoint + \"/query/local\"\n",
" # optional parameter: community level to query the graph at (default for local query = 2)\n",
" request = {\"index_name\": index_name, \"query\": query, \"community_level\": community_level}\n",
" request = {\n",
" \"index_name\": index_name,\n",
" \"query\": query,\n",
" \"community_level\": community_level,\n",
" }\n",
" return requests.post(url, json=request, headers=headers)\n",
"\n",
"\n",
Expand All @@ -372,7 +388,11 @@
" \"\"\"Run a global query across one or more indexes and stream back the response\"\"\"\n",
" url = endpoint + \"/query/streaming/local\"\n",
" # optional parameter: community level to query the graph at (default for local query = 2)\n",
" request = {\"index_name\": index_name, \"query\": query, \"community_level\": community_level}\n",
" request = {\n",
" \"index_name\": index_name,\n",
" \"query\": query,\n",
" \"community_level\": community_level,\n",
" }\n",
" context_list = []\n",
" with requests.post(url, json=request, headers=headers, stream=True) as r:\n",
" r.raise_for_status()\n",
Expand Down Expand Up @@ -746,7 +766,9 @@
"%%time\n",
"# pass in a single index name as a string or to query across multiple indexes, set index_name=[myindex1, myindex2]\n",
"global_response = global_search(\n",
" index_name=index_name, query=\"Summarize the main topics found in this data\", community_level=1\n",
" index_name=index_name,\n",
" query=\"Summarize the main topics found in this data\",\n",
" community_level=1,\n",
")\n",
"# print the result and save context data in a variable\n",
"global_response_data = parse_query_response(global_response, return_context_data=True)\n",
Expand All @@ -769,7 +791,9 @@
"outputs": [],
"source": [
"global_search_streaming(\n",
" index_name=index_name, query=\"Summarize the main topics found in this data\", community_level=1\n",
" index_name=index_name,\n",
" query=\"Summarize the main topics found in this data\",\n",
" community_level=1,\n",
")"
]
},
Expand Down Expand Up @@ -797,7 +821,7 @@
"local_response = local_search(\n",
" index_name=index_name,\n",
" query=\"Who are the primary actors in these communities?\",\n",
" community_level=2\n",
" community_level=2,\n",
")\n",
"# print the result and save context data in a variable\n",
"local_response_data = parse_query_response(local_response, return_context_data=True)\n",
Expand All @@ -822,7 +846,7 @@
"local_search_streaming(\n",
" index_name=index_name,\n",
" query=\"Who are the primary actors in these communities?\",\n",
" community_level=2\n",
" community_level=2,\n",
")"
]
},
Expand Down
Loading

0 comments on commit 587cb92

Please sign in to comment.