Skip to content

Commit 6cb46d8

Browse files
committed
webrtc: support receiving 256kB messages
In experiments with js we've found that increasing the message size increases throughput. See: libp2p/specs#628 (comment) for details. This changes the protobuf reader for the stream to read 256kB messages. This also forces a change to the connection SCTP read buffer to be increased to about 2.5 MB, to support 1 message being buffered for 10 streams. This isn't enough to support larger messages. We most likely need to change the inferred SDP of the server to use 256kB maxMessageSize, and need some backwards compatible mechanism in the handshake to opt in to large messages. See: libp2p/specs#628 for details
1 parent 61f03f4 commit 6cb46d8

11 files changed

+131
-112
lines changed

go.mod

+5-5
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,11 @@ require (
4747
github.com/multiformats/go-varint v0.0.7
4848
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58
4949
github.com/pion/datachannel v1.5.10
50-
github.com/pion/ice/v4 v4.0.6
50+
github.com/pion/ice/v4 v4.0.8
5151
github.com/pion/logging v0.2.3
52-
github.com/pion/sctp v1.8.36
52+
github.com/pion/sctp v1.8.37
5353
github.com/pion/stun v0.6.1
54-
github.com/pion/webrtc/v4 v4.0.10
54+
github.com/pion/webrtc/v4 v4.0.14
5555
github.com/prometheus/client_golang v1.21.0
5656
github.com/prometheus/client_model v0.6.1
5757
github.com/quic-go/quic-go v0.50.0
@@ -93,8 +93,8 @@ require (
9393
github.com/pion/mdns/v2 v2.0.7 // indirect
9494
github.com/pion/randutil v0.1.0 // indirect
9595
github.com/pion/rtcp v1.2.15 // indirect
96-
github.com/pion/rtp v1.8.11 // indirect
97-
github.com/pion/sdp/v3 v3.0.10 // indirect
96+
github.com/pion/rtp v1.8.13 // indirect
97+
github.com/pion/sdp/v3 v3.0.11 // indirect
9898
github.com/pion/srtp/v3 v3.0.4 // indirect
9999
github.com/pion/stun/v3 v3.0.0 // indirect
100100
github.com/pion/transport/v2 v2.2.10 // indirect

go.sum

+10-10
Original file line numberDiff line numberDiff line change
@@ -216,8 +216,8 @@ github.com/pion/dtls/v2 v2.2.12 h1:KP7H5/c1EiVAAKUmXyCzPiQe5+bCJrpOeKg/L05dunk=
216216
github.com/pion/dtls/v2 v2.2.12/go.mod h1:d9SYc9fch0CqK90mRk1dC7AkzzpwJj6u2GU3u+9pqFE=
217217
github.com/pion/dtls/v3 v3.0.4 h1:44CZekewMzfrn9pmGrj5BNnTMDCFwr+6sLH+cCuLM7U=
218218
github.com/pion/dtls/v3 v3.0.4/go.mod h1:R373CsjxWqNPf6MEkfdy3aSe9niZvL/JaKlGeFphtMg=
219-
github.com/pion/ice/v4 v4.0.6 h1:jmM9HwI9lfetQV/39uD0nY4y++XZNPhvzIPCb8EwxUM=
220-
github.com/pion/ice/v4 v4.0.6/go.mod h1:y3M18aPhIxLlcO/4dn9X8LzLLSma84cx6emMSu14FGw=
219+
github.com/pion/ice/v4 v4.0.8 h1:ajNx0idNG+S+v9Phu4LSn2cs8JEfTsA1/tEjkkAVpFY=
220+
github.com/pion/ice/v4 v4.0.8/go.mod h1:y3M18aPhIxLlcO/4dn9X8LzLLSma84cx6emMSu14FGw=
221221
github.com/pion/interceptor v0.1.37 h1:aRA8Zpab/wE7/c0O3fh1PqY0AJI3fCSEM5lRWJVorwI=
222222
github.com/pion/interceptor v0.1.37/go.mod h1:JzxbJ4umVTlZAf+/utHzNesY8tmRkM2lVmkS82TTj8Y=
223223
github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms=
@@ -229,12 +229,12 @@ github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA=
229229
github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8=
230230
github.com/pion/rtcp v1.2.15 h1:LZQi2JbdipLOj4eBjK4wlVoQWfrZbh3Q6eHtWtJBZBo=
231231
github.com/pion/rtcp v1.2.15/go.mod h1:jlGuAjHMEXwMUHK78RgX0UmEJFV4zUKOFHR7OP+D3D0=
232-
github.com/pion/rtp v1.8.11 h1:17xjnY5WO5hgO6SD3/NTIUPvSFw/PbLsIJyz1r1yNIk=
233-
github.com/pion/rtp v1.8.11/go.mod h1:8uMBJj32Pa1wwx8Fuv/AsFhn8jsgw+3rUC2PfoBZ8p4=
234-
github.com/pion/sctp v1.8.36 h1:owNudmnz1xmhfYje5L/FCav3V9wpPRePHle3Zi+P+M0=
235-
github.com/pion/sctp v1.8.36/go.mod h1:cNiLdchXra8fHQwmIoqw0MbLLMs+f7uQ+dGMG2gWebE=
236-
github.com/pion/sdp/v3 v3.0.10 h1:6MChLE/1xYB+CjumMw+gZ9ufp2DPApuVSnDT8t5MIgA=
237-
github.com/pion/sdp/v3 v3.0.10/go.mod h1:88GMahN5xnScv1hIMTqLdu/cOcUkj6a9ytbncwMCq2E=
232+
github.com/pion/rtp v1.8.13 h1:8uSUPpjSL4OlwZI8Ygqu7+h2p9NPFB+yAZ461Xn5sNg=
233+
github.com/pion/rtp v1.8.13/go.mod h1:8uMBJj32Pa1wwx8Fuv/AsFhn8jsgw+3rUC2PfoBZ8p4=
234+
github.com/pion/sctp v1.8.37 h1:ZDmGPtRPX9mKCiVXtMbTWybFw3z/hVKAZgU81wcOrqs=
235+
github.com/pion/sctp v1.8.37/go.mod h1:cNiLdchXra8fHQwmIoqw0MbLLMs+f7uQ+dGMG2gWebE=
236+
github.com/pion/sdp/v3 v3.0.11 h1:VhgVSopdsBKwhCFoyyPmT1fKMeV9nLMrEKxNOdy3IVI=
237+
github.com/pion/sdp/v3 v3.0.11/go.mod h1:88GMahN5xnScv1hIMTqLdu/cOcUkj6a9ytbncwMCq2E=
238238
github.com/pion/srtp/v3 v3.0.4 h1:2Z6vDVxzrX3UHEgrUyIGM4rRouoC7v+NiF1IHtp9B5M=
239239
github.com/pion/srtp/v3 v3.0.4/go.mod h1:1Jx3FwDoxpRaTh1oRV8A/6G1BnFL+QI82eK4ms8EEJQ=
240240
github.com/pion/stun v0.6.1 h1:8lp6YejULeHBF8NmV8e2787BogQhduZugh5PdhDyyN4=
@@ -249,8 +249,8 @@ github.com/pion/transport/v3 v3.0.7 h1:iRbMH05BzSNwhILHoBoAPxoB9xQgOaJk+591KC9P1
249249
github.com/pion/transport/v3 v3.0.7/go.mod h1:YleKiTZ4vqNxVwh77Z0zytYi7rXHl7j6uPLGhhz9rwo=
250250
github.com/pion/turn/v4 v4.0.0 h1:qxplo3Rxa9Yg1xXDxxH8xaqcyGUtbHYw4QSCvmFWvhM=
251251
github.com/pion/turn/v4 v4.0.0/go.mod h1:MuPDkm15nYSklKpN8vWJ9W2M0PlyQZqYt1McGuxG7mA=
252-
github.com/pion/webrtc/v4 v4.0.10 h1:Hq/JLjhqLxi+NmCtE8lnRPDr8H4LcNvwg8OxVcdv56Q=
253-
github.com/pion/webrtc/v4 v4.0.10/go.mod h1:ViHLVaNpiuvaH8pdiuQxuA9awuE6KVzAXx3vVWilOck=
252+
github.com/pion/webrtc/v4 v4.0.14 h1:nyds/sFRR+HvmWoBa6wrL46sSfpArE0qR883MBW96lg=
253+
github.com/pion/webrtc/v4 v4.0.14/go.mod h1:R3+qTnQTS03UzwDarYecgioNf7DYgTsldxnCXB821Kk=
254254
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
255255
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
256256
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=

p2p/transport/webrtc/connection.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ func (c *connection) OpenStream(ctx context.Context) (network.MuxedStream, error
188188
dc.Close()
189189
return nil, fmt.Errorf("detach channel failed for stream(%d): %w", streamID, err)
190190
}
191-
str := newStream(dc, rwc, func() { c.removeStream(streamID) })
191+
str := newStream(dc, rwc, maxSendMessageSize, func() { c.removeStream(streamID) })
192192
if err := c.addStream(str); err != nil {
193193
str.Reset()
194194
return nil, fmt.Errorf("failed to add stream(%d) to connection: %w", streamID, err)
@@ -201,7 +201,7 @@ func (c *connection) AcceptStream() (network.MuxedStream, error) {
201201
case <-c.ctx.Done():
202202
return nil, c.closeErr
203203
case dc := <-c.acceptQueue:
204-
str := newStream(dc.channel, dc.stream, func() { c.removeStream(*dc.channel.ID()) })
204+
str := newStream(dc.channel, dc.stream, maxSendMessageSize, func() { c.removeStream(*dc.channel.ID()) })
205205
if err := c.addStream(str); err != nil {
206206
str.Reset()
207207
return nil, err

p2p/transport/webrtc/listener.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ func (l *listener) setupConnection(
253253
if err != nil {
254254
return nil, err
255255
}
256-
handshakeChannel := newStream(w.HandshakeDataChannel, rwc, func() {})
256+
handshakeChannel := newStream(w.HandshakeDataChannel, rwc, maxSendMessageSize, nil)
257257
// we do not yet know A's peer ID so accept any inbound
258258
remotePubKey, err := l.transport.noiseHandshake(ctx, w.PeerConnection, handshakeChannel, "", crypto.SHA256, true)
259259
if err != nil {

p2p/transport/webrtc/stream.go

+29-29
Original file line numberDiff line numberDiff line change
@@ -15,22 +15,9 @@ import (
1515
)
1616

1717
const (
18-
// maxMessageSize is the maximum message size of the Protobuf message we send / receive.
19-
maxMessageSize = 16384
20-
// maxSendBuffer is the maximum data we enqueue on the underlying data channel for writes.
21-
// The underlying SCTP layer has an unbounded buffer for writes. We limit the amount enqueued
22-
// per stream is limited to avoid a single stream monopolizing the entire connection.
23-
maxSendBuffer = 2 * maxMessageSize
24-
// sendBufferLowThreshold is the threshold below which we write more data on the underlying
25-
// data channel. We want a notification as soon as we can write 1 full sized message.
26-
sendBufferLowThreshold = maxSendBuffer - maxMessageSize
27-
// maxTotalControlMessagesSize is the maximum total size of all control messages we will
28-
// write on this stream.
29-
// 4 control messages of size 10 bytes + 10 bytes buffer. This number doesn't need to be
30-
// exact. In the worst case, we enqueue these many bytes more in the webrtc peer connection
31-
// send queue.
32-
maxTotalControlMessagesSize = 50
33-
18+
// maxSendMessageSize is the maximum message size of the Protobuf message we send / receive.
19+
// NOTE: Change `varintOverhead` if you change this.
20+
maxSendMessageSize = 16384
3421
// Proto overhead assumption is 5 bytes
3522
protoOverhead = 5
3623
// Varint overhead is assumed to be 2 bytes. This is safe since
@@ -40,9 +27,20 @@ const (
4027
// is less than or equal to 2 ^ 14, the varint will not be more than
4128
// 2 bytes in length.
4229
varintOverhead = 2
30+
31+
// maxTotalControlMessagesSize is the maximum total size of all control messages we will
32+
// write on this stream.
33+
// 4 control messages of size 10 bytes + 10 bytes buffer. This number doesn't need to be
34+
// exact. In the worst case, we enqueue these many bytes more in the webrtc peer connection
35+
// send queue.
36+
maxTotalControlMessagesSize = 50
37+
4338
// maxFINACKWait is the maximum amount of time a stream will wait to read
4439
// FIN_ACK before closing the data channel
4540
maxFINACKWait = 10 * time.Second
41+
42+
// maxReceiveMessageSize is the maximum message size of the Protobuf message we receive.
43+
maxReceiveMessageSize = 256<<10 + 1<<10 // 1kB buffer
4644
)
4745

4846
type receiveState uint8
@@ -79,11 +77,12 @@ type stream struct {
7977
nextMessage *pb.Message
8078
receiveState receiveState
8179

82-
writer pbio.Writer // concurrent writes prevented by mx
83-
writeStateChanged chan struct{}
84-
sendState sendState
85-
writeDeadline time.Time
86-
writeError error
80+
writer pbio.Writer // concurrent writes prevented by mx
81+
writeStateChanged chan struct{}
82+
sendState sendState
83+
writeDeadline time.Time
84+
writeError error
85+
maxSendMessageSize int
8786

8887
controlMessageReaderOnce sync.Once
8988
// controlMessageReaderEndTime is the end time for reading FIN_ACK from the control
@@ -105,20 +104,21 @@ var _ network.MuxedStream = &stream{}
105104
func newStream(
106105
channel *webrtc.DataChannel,
107106
rwc datachannel.ReadWriteCloser,
107+
maxSendMessageSize int,
108108
onDone func(),
109109
) *stream {
110110
s := &stream{
111-
reader: pbio.NewDelimitedReader(rwc, maxMessageSize),
112-
writer: pbio.NewDelimitedWriter(rwc),
113-
writeStateChanged: make(chan struct{}, 1),
114-
id: *channel.ID(),
115-
dataChannel: rwc.(*datachannel.DataChannel),
116-
onDone: onDone,
111+
reader: pbio.NewDelimitedReader(rwc, maxReceiveMessageSize),
112+
writer: pbio.NewDelimitedWriter(rwc),
113+
writeStateChanged: make(chan struct{}, 1),
114+
id: *channel.ID(),
115+
dataChannel: rwc.(*datachannel.DataChannel),
116+
onDone: onDone,
117+
maxSendMessageSize: maxSendMessageSize,
117118
}
118-
s.dataChannel.SetBufferedAmountLowThreshold(sendBufferLowThreshold)
119+
s.dataChannel.SetBufferedAmountLowThreshold(uint64(s.sendBufferLowThreshold()))
119120
s.dataChannel.OnBufferedAmountLow(func() {
120121
s.notifyWriteStateChanged()
121-
122122
})
123123
return s
124124
}

p2p/transport/webrtc/stream_test.go

+55-44
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package libp2pwebrtc
33
import (
44
"crypto/rand"
55
"errors"
6+
"fmt"
67
"io"
78
"os"
89
"sync/atomic"
@@ -148,8 +149,8 @@ func TestStreamSimpleReadWriteClose(t *testing.T) {
148149
client, server := getDetachedDataChannels(t)
149150

150151
var clientDone, serverDone atomic.Bool
151-
clientStr := newStream(client.dc, client.rwc, func() { clientDone.Store(true) })
152-
serverStr := newStream(server.dc, server.rwc, func() { serverDone.Store(true) })
152+
clientStr := newStream(client.dc, client.rwc, maxSendMessageSize, func() { clientDone.Store(true) })
153+
serverStr := newStream(server.dc, server.rwc, maxSendMessageSize, func() { serverDone.Store(true) })
153154

154155
// send a foobar from the client
155156
n, err := clientStr.Write([]byte("foobar"))
@@ -194,8 +195,8 @@ func TestStreamSimpleReadWriteClose(t *testing.T) {
194195
func TestStreamPartialReads(t *testing.T) {
195196
client, server := getDetachedDataChannels(t)
196197

197-
clientStr := newStream(client.dc, client.rwc, func() {})
198-
serverStr := newStream(server.dc, server.rwc, func() {})
198+
clientStr := newStream(client.dc, client.rwc, maxSendMessageSize, func() {})
199+
serverStr := newStream(server.dc, server.rwc, maxSendMessageSize, func() {})
199200

200201
_, err := serverStr.Write([]byte("foobar"))
201202
require.NoError(t, err)
@@ -217,8 +218,8 @@ func TestStreamPartialReads(t *testing.T) {
217218
func TestStreamSkipEmptyFrames(t *testing.T) {
218219
client, server := getDetachedDataChannels(t)
219220

220-
clientStr := newStream(client.dc, client.rwc, func() {})
221-
serverStr := newStream(server.dc, server.rwc, func() {})
221+
clientStr := newStream(client.dc, client.rwc, maxSendMessageSize, func() {})
222+
serverStr := newStream(server.dc, server.rwc, maxSendMessageSize, func() {})
222223

223224
for i := 0; i < 10; i++ {
224225
require.NoError(t, serverStr.writer.WriteMsg(&pb.Message{}))
@@ -252,7 +253,7 @@ func TestStreamSkipEmptyFrames(t *testing.T) {
252253
func TestStreamReadReturnsOnClose(t *testing.T) {
253254
client, _ := getDetachedDataChannels(t)
254255

255-
clientStr := newStream(client.dc, client.rwc, func() {})
256+
clientStr := newStream(client.dc, client.rwc, maxSendMessageSize, func() {})
256257
errChan := make(chan error, 1)
257258
go func() {
258259
_, err := clientStr.Read([]byte{0})
@@ -275,8 +276,8 @@ func TestStreamResets(t *testing.T) {
275276
client, server := getDetachedDataChannels(t)
276277

277278
var clientDone, serverDone atomic.Bool
278-
clientStr := newStream(client.dc, client.rwc, func() { clientDone.Store(true) })
279-
serverStr := newStream(server.dc, server.rwc, func() { serverDone.Store(true) })
279+
clientStr := newStream(client.dc, client.rwc, maxSendMessageSize, func() { clientDone.Store(true) })
280+
serverStr := newStream(server.dc, server.rwc, maxSendMessageSize, func() { serverDone.Store(true) })
280281

281282
// send a foobar from the client
282283
_, err := clientStr.Write([]byte("foobar"))
@@ -311,8 +312,8 @@ func TestStreamResets(t *testing.T) {
311312
func TestStreamReadDeadlineAsync(t *testing.T) {
312313
client, server := getDetachedDataChannels(t)
313314

314-
clientStr := newStream(client.dc, client.rwc, func() {})
315-
serverStr := newStream(server.dc, server.rwc, func() {})
315+
clientStr := newStream(client.dc, client.rwc, maxSendMessageSize, func() {})
316+
serverStr := newStream(server.dc, server.rwc, maxSendMessageSize, func() {})
316317

317318
timeout := 100 * time.Millisecond
318319
if os.Getenv("CI") != "" {
@@ -342,8 +343,8 @@ func TestStreamReadDeadlineAsync(t *testing.T) {
342343
func TestStreamWriteDeadlineAsync(t *testing.T) {
343344
client, server := getDetachedDataChannels(t)
344345

345-
clientStr := newStream(client.dc, client.rwc, func() {})
346-
serverStr := newStream(server.dc, server.rwc, func() {})
346+
clientStr := newStream(client.dc, client.rwc, maxSendMessageSize, func() {})
347+
serverStr := newStream(server.dc, server.rwc, maxSendMessageSize, func() {})
347348
_ = serverStr
348349

349350
b := make([]byte, 1024)
@@ -372,8 +373,8 @@ func TestStreamWriteDeadlineAsync(t *testing.T) {
372373
func TestStreamReadAfterClose(t *testing.T) {
373374
client, server := getDetachedDataChannels(t)
374375

375-
clientStr := newStream(client.dc, client.rwc, func() {})
376-
serverStr := newStream(server.dc, server.rwc, func() {})
376+
clientStr := newStream(client.dc, client.rwc, maxSendMessageSize, func() {})
377+
serverStr := newStream(server.dc, server.rwc, maxSendMessageSize, func() {})
377378

378379
serverStr.Close()
379380
b := make([]byte, 1)
@@ -384,8 +385,8 @@ func TestStreamReadAfterClose(t *testing.T) {
384385

385386
client, server = getDetachedDataChannels(t)
386387

387-
clientStr = newStream(client.dc, client.rwc, func() {})
388-
serverStr = newStream(server.dc, server.rwc, func() {})
388+
clientStr = newStream(client.dc, client.rwc, maxSendMessageSize, func() {})
389+
serverStr = newStream(server.dc, server.rwc, maxSendMessageSize, func() {})
389390

390391
serverStr.Reset()
391392
b = make([]byte, 1)
@@ -399,8 +400,8 @@ func TestStreamCloseAfterFINACK(t *testing.T) {
399400
client, server := getDetachedDataChannels(t)
400401

401402
done := make(chan bool, 1)
402-
clientStr := newStream(client.dc, client.rwc, func() { done <- true })
403-
serverStr := newStream(server.dc, server.rwc, func() {})
403+
clientStr := newStream(client.dc, client.rwc, maxSendMessageSize, func() { done <- true })
404+
serverStr := newStream(server.dc, server.rwc, maxSendMessageSize, func() {})
404405

405406
go func() {
406407
err := clientStr.Close()
@@ -427,8 +428,8 @@ func TestStreamFinAckAfterStopSending(t *testing.T) {
427428
client, server := getDetachedDataChannels(t)
428429

429430
done := make(chan bool, 1)
430-
clientStr := newStream(client.dc, client.rwc, func() { done <- true })
431-
serverStr := newStream(server.dc, server.rwc, func() {})
431+
clientStr := newStream(client.dc, client.rwc, maxSendMessageSize, func() { done <- true })
432+
serverStr := newStream(server.dc, server.rwc, maxSendMessageSize, func() {})
432433

433434
go func() {
434435
clientStr.CloseRead()
@@ -460,8 +461,8 @@ func TestStreamConcurrentClose(t *testing.T) {
460461

461462
start := make(chan bool, 2)
462463
done := make(chan bool, 2)
463-
clientStr := newStream(client.dc, client.rwc, func() { done <- true })
464-
serverStr := newStream(server.dc, server.rwc, func() { done <- true })
464+
clientStr := newStream(client.dc, client.rwc, maxSendMessageSize, func() { done <- true })
465+
serverStr := newStream(server.dc, server.rwc, maxSendMessageSize, func() { done <- true })
465466

466467
go func() {
467468
start <- true
@@ -495,7 +496,7 @@ func TestStreamResetAfterClose(t *testing.T) {
495496
client, server := getDetachedDataChannels(t)
496497

497498
done := make(chan bool, 2)
498-
clientStr := newStream(client.dc, client.rwc, func() { done <- true })
499+
clientStr := newStream(client.dc, client.rwc, maxSendMessageSize, func() { done <- true })
499500
clientStr.Close()
500501

501502
select {
@@ -520,7 +521,7 @@ func TestStreamDataChannelCloseOnFINACK(t *testing.T) {
520521
client, server := getDetachedDataChannels(t)
521522

522523
done := make(chan bool, 1)
523-
clientStr := newStream(client.dc, client.rwc, func() { done <- true })
524+
clientStr := newStream(client.dc, client.rwc, maxSendMessageSize, func() { done <- true })
524525

525526
clientStr.Close()
526527

@@ -540,24 +541,34 @@ func TestStreamDataChannelCloseOnFINACK(t *testing.T) {
540541
}
541542

542543
func TestStreamChunking(t *testing.T) {
543-
client, server := getDetachedDataChannels(t)
544-
545-
clientStr := newStream(client.dc, client.rwc, func() {})
546-
serverStr := newStream(server.dc, server.rwc, func() {})
547-
548-
const N = (16 << 10) + 1000
549-
go func() {
550-
data := make([]byte, N)
551-
_, err := clientStr.Write(data)
552-
require.NoError(t, err)
553-
}()
554-
555-
data := make([]byte, N)
556-
n, err := serverStr.Read(data)
557-
require.NoError(t, err)
558-
require.LessOrEqual(t, n, 16<<10)
544+
for _, msgSize := range []int{16 << 10, 32 << 10, 64 << 10, 128 << 10, 256 << 10} {
545+
t.Run(fmt.Sprintf("msgSize=%d", msgSize), func(t *testing.T) {
546+
client, server := getDetachedDataChannels(t)
547+
defer client.dc.Close()
548+
defer server.dc.Close()
549+
550+
clientStr := newStream(client.dc, client.rwc, msgSize, func() {})
551+
// server should read large messages even if it can only send 16 kB messages.
552+
serverStr := newStream(server.dc, server.rwc, 16<<10, func() {})
553+
554+
N := msgSize + 1000
555+
input := make([]byte, N)
556+
_, err := rand.Read(input)
557+
require.NoError(t, err)
558+
go func() {
559+
_, err = clientStr.Write(input)
560+
require.NoError(t, err)
561+
}()
559562

560-
nn, err := serverStr.Read(data)
561-
require.NoError(t, err)
562-
require.Equal(t, nn+n, N)
563+
data := make([]byte, N)
564+
n, err := serverStr.Read(data)
565+
require.NoError(t, err)
566+
require.LessOrEqual(t, n, msgSize)
567+
// shouldn't be much less than msgSize
568+
require.GreaterOrEqual(t, n, msgSize-100)
569+
_, err = serverStr.Read(data[n:])
570+
require.NoError(t, err)
571+
require.Equal(t, input, data)
572+
})
573+
}
563574
}

0 commit comments

Comments
 (0)