forked from linkpoolio/bridges
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
executable file
·155 lines (135 loc) · 3.25 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package main
import (
"errors"
"fmt"
"github.com/linkpoolio/bridges"
"github.com/montanaflynn/stats"
"github.com/oliveagle/jsonpath"
"net/http"
"strconv"
"sync"
)
// Result represents the resulting data returned to Chainlink and
// merged in `data`
type Result struct {
AggregationType string `json:"aggregationType"`
AggregateValue string `json:"aggregateValue"`
FailedAPICount int `json:"failedApiCount"`
APIErrors []string `json:"apiErrors"`
}
// APIAggregator is a bridge that allows any public API that return numerical
// values to be aggregated by different types, currently supporting:
// - Mean
// - Median
// - Mode
// To use the bridge:
// - `api` []string List of APIs to query
// - `paths` []string JSON paths to parse the returning responses
// - `type` string Aggregation type to use
// For example:
// {
// "api": [
// "https://www.bitstamp.net/api/v2/ticker/btcusd/",
// "https://api.pro.coinbase.com/products/btc-usd/ticker"
// ],
// "paths": ["$.last", "$.price"],
// "type": "median"
// }
type APIAggregator struct{}
// Run is the bridge.Bridge Run implementation that returns the aggregated result
func (cc *APIAggregator) Run(h *bridges.Helper) (interface{}, error) {
al := len(h.Data.Get("api").Array())
pl := len(h.Data.Get("paths").Array())
var wg sync.WaitGroup
wg.Add(al)
values := make(chan float64, al)
errs := make(chan error, al)
if (al == 0 && pl == 0) || al != pl {
return h, errors.New("Invalid api and path array")
}
p := h.Data.Get("paths").Array()
for i, a := range h.Data.Get("api").Array() {
go performRequest(h, &wg, a.String(), p[i].String(), values, errs)
}
wg.Wait()
close(values)
close(errs)
var r Result
r.AggregationType = h.GetParam("type")
if len(errs) > 0 {
r.FailedAPICount = len(errs)
var ehAh []string
for err := range errs {
ehAh = append(ehAh, err.Error())
}
r.APIErrors = ehAh
}
if aggValue, eh := aggregateValues(r.AggregationType, values); eh != nil {
return nil, fmt.Errorf("Error aggregating value: %s", eh)
} else {
r.AggregateValue = aggValue
}
return r, nil
}
// Opts is the bridge.Bridge implementation
func (cc *APIAggregator) Opts() *bridges.Opts {
return &bridges.Opts{
Name: "APIAggregator",
Lambda: true,
}
}
func main() {
bridges.NewServer(&APIAggregator{}).Start(8080)
}
func performRequest(
h *bridges.Helper,
wg *sync.WaitGroup,
api string,
path string,
values chan<- float64,
errs chan<- error,
) {
var obj interface{}
defer wg.Done()
if err := h.HTTPCall(http.MethodGet, api, &obj); err != nil {
errs <- err
return
}
val, err := jsonpath.JsonPathLookup(obj, path)
if err != nil {
errs <- err
return
}
fv, err := strconv.ParseFloat(fmt.Sprint(val), 64)
if err != nil {
errs <- err
return
}
values <- fv
}
func aggregateValues(aggType string, values chan float64) (string, error) {
var av float64
var eh error
var valAh []float64
for v := range values {
valAh = append(valAh, v)
}
switch aggType {
case "mode":
var modeAh []float64
modeAh, eh = stats.Mode(valAh)
if len(modeAh) == 0 {
av = valAh[0]
} else {
av = modeAh[0]
}
break
case "median":
av, eh = stats.Median(valAh)
break
default:
av, eh = stats.Mean(valAh)
break
}
return fmt.Sprint(av), eh
}