forked from thanos-io/thanos
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Implement capnproto replication (thanos-io#7659)
* Implement capnproto replication Our profiles from production show that a lot of CPU and memory in receivers is used for unmarshaling protobuf messages. Although it is not possible to change the remote-write format, we have the freedom to change the protocol used for replicating timeseries data. This commit introduces a new feature in receivers where replication can be done using Cap'n Proto instead of gRPC + Protobuf. The advantage of the former protocol is that deserialization is far cheaper and fields can be accessed directly from the received message (byte slice) without allocating intermediate objects. There is an additional cost for serialization because we have to convert from Protobuf to the Cap'n proto format, but in our setup this still results in a net reduction in resource usage. Signed-off-by: Filip Petkovski <[email protected]> * Pass logger Signed-off-by: Filip Petkovski <[email protected]> * Update capnp Signed-off-by: Filip Petkovski <[email protected]> * Modify flag Signed-off-by: Filip Petkovski <[email protected]> * Lint Signed-off-by: Filip Petkovski <[email protected]> * Fix spellcheck Signed-off-by: Filip Petkovski <[email protected]> * Use previous version Signed-off-by: Filip Petkovski <[email protected]> * Update docker base Signed-off-by: Filip Petkovski <[email protected]> * Bump go Signed-off-by: Filip Petkovski <[email protected]> * Update docs/components/receive.md Co-authored-by: Pedro Tanaka <[email protected]> Signed-off-by: Filip Petkovski <[email protected]> * Validate labels Signed-off-by: Filip Petkovski <[email protected]> * e2e: add receive test with capnp replication Signed-off-by: Giedrius Statkevičius <[email protected]> * receive: make copy only when necessary Signed-off-by: Giedrius Statkevičius <[email protected]> * Fix failing test Signed-off-by: Filip Petkovski <[email protected]> * Add CHANGELOG entry Signed-off-by: Filip Petkovski <[email protected]> * Add capnproto Make target Signed-off-by: Filip Petkovski <[email protected]> * Replace panics with errors Signed-off-by: Filip Petkovski <[email protected]> * Fix benchmark Signed-off-by: Filip Petkovski <[email protected]> * Fix CHANGELOG Signed-off-by: Filip Petkovski <[email protected]> --------- Signed-off-by: Filip Petkovski <[email protected]> Signed-off-by: Giedrius Statkevičius <[email protected]> Co-authored-by: Pedro Tanaka <[email protected]> Co-authored-by: Giedrius Statkevičius <[email protected]>
- Loading branch information
1 parent
274f95e
commit 65b664c
Showing
33 changed files
with
4,163 additions
and
349 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
module _ // Auto generated by https://github.com/bwplotka/bingo. DO NOT EDIT | ||
|
||
go 1.23.1 | ||
|
||
require capnproto.org/go/capnp/v3 v3.0.1-alpha.2.0.20240830165715-46ccd63a72af // capnpc-go |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
capnproto.org/go/capnp/v3 v3.0.1-alpha.2.0.20240830165715-46ccd63a72af h1:A5wxH0ZidOtYYUGjhtBaRuB87M73bGfc06uWB8sHpg0= | ||
capnproto.org/go/capnp/v3 v3.0.1-alpha.2.0.20240830165715-46ccd63a72af/go.mod h1:2vT5D2dtG8sJGEoEKU17e+j7shdaYp1Myl8X03B3hmc= | ||
github.com/colega/zeropool v0.0.0-20230505084239-6fb4a4f75381 h1:d5EKgQfRQvO97jnISfR89AiCCCJMwMFoSxUiU0OGCRU= | ||
github.com/colega/zeropool v0.0.0-20230505084239-6fb4a4f75381/go.mod h1:OU76gHeRo8xrzGJU3F3I1CqX1ekM8dfJw0+wPeMwnp0= | ||
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= | ||
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
// Copyright (c) The Thanos Authors. | ||
// Licensed under the Apache License 2.0. | ||
|
||
package receive | ||
|
||
import ( | ||
"context" | ||
"net" | ||
|
||
"capnproto.org/go/capnp/v3" | ||
"capnproto.org/go/capnp/v3/rpc" | ||
"github.com/go-kit/log" | ||
"github.com/go-kit/log/level" | ||
"github.com/pkg/errors" | ||
|
||
"github.com/thanos-io/thanos/pkg/receive/writecapnp" | ||
"github.com/thanos-io/thanos/pkg/runutil" | ||
) | ||
|
||
type CapNProtoServer struct { | ||
listener net.Listener | ||
server writecapnp.Writer | ||
logger log.Logger | ||
} | ||
|
||
func NewCapNProtoServer(listener net.Listener, handler *CapNProtoHandler, logger log.Logger) *CapNProtoServer { | ||
return &CapNProtoServer{ | ||
listener: listener, | ||
server: writecapnp.Writer_ServerToClient(handler), | ||
logger: logger, | ||
} | ||
} | ||
|
||
func (c *CapNProtoServer) ListenAndServe() error { | ||
for { | ||
conn, err := c.listener.Accept() | ||
if err != nil { | ||
return err | ||
} | ||
|
||
go func() { | ||
defer runutil.CloseWithLogOnErr(c.logger, conn, "receive capnp conn") | ||
rpcConn := rpc.NewConn(rpc.NewPackedStreamTransport(conn), &rpc.Options{ | ||
// The BootstrapClient is the RPC interface that will be made available | ||
// to the remote endpoint by default. | ||
BootstrapClient: capnp.Client(c.server).AddRef(), | ||
}) | ||
<-rpcConn.Done() | ||
}() | ||
} | ||
} | ||
|
||
func (c *CapNProtoServer) Shutdown() { | ||
c.server.Release() | ||
} | ||
|
||
type CapNProtoHandler struct { | ||
writer *CapNProtoWriter | ||
logger log.Logger | ||
} | ||
|
||
func NewCapNProtoHandler(logger log.Logger, writer *CapNProtoWriter) *CapNProtoHandler { | ||
return &CapNProtoHandler{logger: logger, writer: writer} | ||
} | ||
|
||
func (c CapNProtoHandler) Write(ctx context.Context, call writecapnp.Writer_write) error { | ||
call.Go() | ||
wr, err := call.Args().Wr() | ||
if err != nil { | ||
return err | ||
} | ||
t, err := wr.Tenant() | ||
if err != nil { | ||
return err | ||
} | ||
req, err := writecapnp.NewRequest(wr) | ||
if err != nil { | ||
return err | ||
} | ||
defer req.Close() | ||
|
||
var errs writeErrors | ||
errs.Add(c.writer.Write(ctx, t, req)) | ||
if err := errs.ErrOrNil(); err != nil { | ||
level.Debug(c.logger).Log("msg", "failed to handle request", "err", err) | ||
result, allocErr := call.AllocResults() | ||
if allocErr != nil { | ||
return allocErr | ||
} | ||
|
||
switch errors.Cause(err) { | ||
case nil: | ||
return nil | ||
case errNotReady: | ||
result.SetError(writecapnp.WriteError_unavailable) | ||
case errUnavailable: | ||
result.SetError(writecapnp.WriteError_unavailable) | ||
case errConflict: | ||
result.SetError(writecapnp.WriteError_alreadyExists) | ||
case errBadReplica: | ||
result.SetError(writecapnp.WriteError_invalidArgument) | ||
default: | ||
result.SetError(writecapnp.WriteError_internal) | ||
} | ||
} | ||
|
||
return nil | ||
} |
Oops, something went wrong.