forked from microsoft/AIforEarth-API-Development
-
Notifications
You must be signed in to change notification settings - Fork 0
/
api_example.R
115 lines (96 loc) · 3.54 KB
/
api_example.R
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
library(future)
plan(multiprocess)
library(reticulate)
library(jsonlite)
use_virtualenv("ai4e_py_api")
source_python("/ai4e_api_tools/sas_blob.py")
source("/ai4e_api_tools/task_management/api_task.R")
source("/ai4e_api_tools/ai4e_app_insights.R")
write.table(paste0(FALSE), file = "running.txt")
# Helper function to write dataframes to csv
WriteBlob <- function(dataframe_to_write, container_uri, blob_name, include_row_names) {
sas_blob_helper = SasBlob()
tmp <- file(tempfile())
open(tmp, "w+")
write.csv(dataframe_to_write, file=tmp, row.names = include_row_names, append=FALSE, fileEncoding="UTF-8")
seek(tmp, where=0)
data_to_save <- paste(readLines(tmp, n=-1), collapse="\n")
sas_blob_helper$write_blob_from_text(container_uri, blob_name, data_to_save)
close(tmp)
}
GetBlobFromContainer<-function(container_uri, blob_name){
sas_blob_helper = SasBlob()
input_data <- sas_blob_helper$get_blob_sas_uri(container_uri, blob_name)
return(input_data)
}
# Primary working function
ProcessData<-function(taskId, config){
tryCatch({
# Update task status at any time
UpdateTaskStatus(taskId, 'running')
#INSERT_YOUR_MODEL_CALL_HERE
container_uri <- config$container_uri
run_id <- config$run_id
observations_csv <- GetBlobFromContainer(container_uri, paste(run_id, "Observation.csv", sep= "/"))
observations <- read.csv(observations_csv)
dir = WriteBlob(observations, container_uri, paste(run_id, "output_dir/output_name.csv", sep= "/"), include_row_names=FALSE)
write.table(paste0(FALSE), file = "running.txt")
UpdateTaskStatus(taskId, 'completed')
}, error = function(err) {
print(paste0(err))
write.table(paste0(FALSE), file = "running.txt")
log_exception(paste0(err), taskId)
UpdateTaskStatus(taskId, paste("failed - ", err))
})
}
#* Test process
#* @post /test
function(req){
print("running")
task <- AddTask(req)
taskId <- task$uuid
sas_blob_helper = SasBlob()
is_processing <- read.table("running.txt")
# R is single-threaded, so we only process one response at a time.
# Parallel requests are handled by AKS auto-scaling.
if (is_processing == "TRUE")
{
log_warn("Too many requests are being processed.", taskId)
res$status <- 429 # Too manay requests
res$body <- "Too many requests are being processed. Retry with a backoff."
return(res)
}
write.table(paste0(TRUE), file = "running.txt")
tryCatch({
body <- req$postBody
input_data <- fromJSON(body, simplifyDataFrame=TRUE)
promise <- future(ProcessData(taskId, input_data))
#ProcessData(taskId, input_data)
message <- paste0("Starting task: ", taskId, " Output files will be placed in ", input_data$run_id, " directory.")
directory <- input_data$run_id
}, error = function(err) {
print(paste0(err))
write.table(paste0(FALSE), file = "running.txt")
log_exception(paste0(err), taskId)
UpdateTaskStatus(taskId, paste("failed - ", err))
res$status <- 400
res$body <- "Bad request. Please ensure JSON request body is properly formatted."
return(res)
})
data.frame(message, taskId, directory)
}
#* Get status of task by id
#* @param taskId The id of the task
#* @get /task/<taskId>
GetProcessDataTaskStatus<-function(taskId){
status <- GetTaskStatus(taskId)
return(status)
}
#* Provide healthcheck endpoint
#* @get /
GetProcessDataTaskStatus<-function(taskId){
return("OK")
}
# Please have an empty last line in the end; otherwise, you will see an error when starting a webserver