-
Notifications
You must be signed in to change notification settings - Fork 493
/
subscriptions.go
100 lines (88 loc) · 3.21 KB
/
subscriptions.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package graphql
import (
"context"
"errors"
qerrors "github.com/graph-gophers/graphql-go/errors"
"github.com/graph-gophers/graphql-go/internal/common"
"github.com/graph-gophers/graphql-go/internal/exec"
"github.com/graph-gophers/graphql-go/internal/exec/resolvable"
"github.com/graph-gophers/graphql-go/internal/exec/selected"
"github.com/graph-gophers/graphql-go/internal/query"
"github.com/graph-gophers/graphql-go/internal/validation"
"github.com/graph-gophers/graphql-go/introspection"
)
// Subscribe returns a response channel for the given subscription with the schema's
// resolver. It returns an error if the schema was created without a resolver.
// If the context gets cancelled, the response channel will be closed and no
// further resolvers will be called. The context error will be returned as soon
// as possible (not immediately).
func (s *Schema) Subscribe(ctx context.Context, queryString string, operationName string, variables map[string]interface{}) (<-chan interface{}, error) {
if !s.res.SubscriptionResolver.IsValid() {
return nil, errors.New("schema created without resolver, can not subscribe")
}
if _, ok := s.schema.RootOperationTypes["subscription"]; !ok {
return nil, errors.New("no subscriptions are offered by the schema")
}
return s.subscribe(ctx, queryString, operationName, variables, s.res), nil
}
func (s *Schema) subscribe(ctx context.Context, queryString string, operationName string, variables map[string]interface{}, res *resolvable.Schema) <-chan interface{} {
doc, qErr := query.Parse(queryString)
if qErr != nil {
return sendAndReturnClosed(&Response{Errors: []*qerrors.QueryError{qErr}})
}
validationFinish := s.validationTracer.TraceValidation(ctx)
errs := validation.Validate(s.schema, doc, variables, s.maxDepth)
validationFinish(errs)
if len(errs) != 0 {
return sendAndReturnClosed(&Response{Errors: errs})
}
op, err := getOperation(doc, operationName)
if err != nil {
return sendAndReturnClosed(&Response{Errors: []*qerrors.QueryError{qerrors.Errorf("%s", err)}})
}
r := &exec.Request{
Request: selected.Request{
Doc: doc,
Vars: variables,
Schema: s.schema,
},
Limiter: make(chan struct{}, s.maxParallelism),
Tracer: s.tracer,
Logger: s.logger,
PanicHandler: s.panicHandler,
SubscribeResolverTimeout: s.subscribeResolverTimeout,
}
varTypes := make(map[string]*introspection.Type)
for _, v := range op.Vars {
t, err := common.ResolveType(v.Type, s.schema.Resolve)
if err != nil {
return sendAndReturnClosed(&Response{Errors: []*qerrors.QueryError{err}})
}
varTypes[v.Name.Name] = introspection.WrapType(t)
}
if op.Type == query.Query || op.Type == query.Mutation {
data, errs := r.Execute(ctx, res, op)
return sendAndReturnClosed(&Response{Data: data, Errors: errs})
}
responses := r.Subscribe(ctx, res, op)
c := make(chan interface{})
go func() {
Loop:
for resp := range responses {
select {
case c <- &Response{Data: resp.Data, Errors: resp.Errors}:
continue
case <-ctx.Done():
break Loop
}
}
close(c)
}()
return c
}
func sendAndReturnClosed(resp *Response) chan interface{} {
c := make(chan interface{}, 1)
c <- resp
close(c)
return c
}