-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwrite.go
79 lines (74 loc) · 1.95 KB
/
write.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package automergendjsonsync
import (
"context"
"encoding/json"
"fmt"
"io"
"log/slog"
"net/http"
"slices"
"github.com/automerge/automerge-go"
)
// SubscribeToReceivedChanges allows the caller to subscribe to changes received by the doc. Call the finish function
// to clean up.
func (b *SharedDoc) SubscribeToReceivedChanges() (chan bool, func()) {
b.mutex.Lock()
defer b.mutex.Unlock()
if b.channels == nil {
b.channels = make([]chan bool, 0, 1)
}
nc := make(chan bool, 1)
b.channels = append(b.channels, nc)
return nc, func() {
b.mutex.Lock()
defer b.mutex.Unlock()
if i := slices.Index(b.channels, nc); i >= 0 {
c := b.channels[i]
b.channels = slices.Delete(b.channels, i, i+1)
close(c)
}
}
}
func generateMessagesToWriter(ctx context.Context, state *automerge.SyncState, hintChannel <-chan bool, writer io.Writer, immediate bool) error {
log := Logger(ctx)
sent, sentBytes, sentChanges := 0, 0, 0
defer func() {
log.InfoContext(ctx, "finished writing sync messages", slog.Int("sent-messages", sent), slog.Int("sent-changes", sentChanges), slog.Int("sent-bytes", sentBytes))
}()
for {
for {
if m, ok := state.GenerateMessage(); !ok {
break
} else {
r, _ := json.Marshal(&NdJson{Event: EventSync, Data: m.Bytes()})
r = append(r, '\n')
if n, err := writer.Write(r); err != nil {
return fmt.Errorf("failed to marshal: %w", err)
} else {
sent += 1
sentBytes += n
sentChanges += len(m.Changes())
log.DebugContext(ctx, "wrote message", slog.Int("changes", len(m.Changes())), slog.Int("bytes", n), slog.Any("heads", LoggableChangeHashes(m.Heads())))
}
if f, ok := writer.(http.Flusher); ok {
f.Flush()
}
}
}
if immediate {
break
}
select {
case <-hintChannel:
continue
case <-ctx.Done():
return ctx.Err()
}
}
if v, ok := writer.(io.Closer); ok {
if err := v.Close(); err != nil {
return fmt.Errorf("failed to close writer: %w", err)
}
}
return nil
}