Skip to content
This repository has been archived by the owner on Jul 19, 2023. It is now read-only.

Implement parca's ingestion API #220

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions pkg/debuginfo/fake.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package debuginfo

import (
"context"

"github.com/go-kit/log"
"github.com/go-kit/log/level"

parcadebuginfov1 "github.com/parca-dev/parca/gen/proto/go/parca/debuginfo/v1alpha1"
)

type fakeDebugInfo struct {
parcadebuginfov1.UnimplementedDebugInfoServiceServer

logger log.Logger
}

func New(logger log.Logger) parcadebuginfov1.DebugInfoServiceServer {
return &fakeDebugInfo{
logger: logger,
}
}

// Exists returns true if the given build_id has debug info uploaded for it.
func (f *fakeDebugInfo) Exists(ctx context.Context, req *parcadebuginfov1.ExistsRequest) (*parcadebuginfov1.ExistsResponse, error) {
level.Warn(f.logger).Log("msg", "received exists request", "buildid", req.GetBuildId(), "hash", req.GetHash())

return &parcadebuginfov1.ExistsResponse{
Exists: false,
}, nil
}

// Upload ingests debug info for a given build_id
func (f *fakeDebugInfo) Upload(u parcadebuginfov1.DebugInfoService_UploadServer) error {
req, err := u.Recv()
if err != nil {
return err
}
level.Warn(f.logger).Log("msg", "received upload", "buildid", req.GetInfo().GetBuildId(), "hash", req.GetInfo().GetHash())

return nil
}

// Download returns the debug info for a given build_id.
func (_ *fakeDebugInfo) Download(*parcadebuginfov1.DownloadRequest, parcadebuginfov1.DebugInfoService_DownloadServer) error {
return nil
}
45 changes: 45 additions & 0 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ import (

"github.com/bufbuild/connect-go"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/google/uuid"
"github.com/grafana/dskit/ring"
ring_client "github.com/grafana/dskit/ring/client"
"github.com/grafana/dskit/services"
"github.com/opentracing/opentracing-go"
parcastorev1 "github.com/parca-dev/parca/gen/proto/go/parca/profilestore/v1alpha1"
"github.com/parca-dev/parca/pkg/scrape"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -147,6 +149,8 @@ func (d *Distributor) Push(ctx context.Context, req *connect.Request[pushv1.Push

p.Normalize()

level.Warn(d.logger).Log("msg", "received sample", "labels", firemodel.LabelPairsString(series.Labels), "type", p.StringTable[p.SampleType[0].Type])

// reuse the data buffer if possible
size := p.SizeVT()
if cap(data) < size {
Expand Down Expand Up @@ -309,3 +313,44 @@ func TokenFor(tenantID, labels string) uint32 {
_, _ = h.Write([]byte(labels))
return h.Sum32()
}

func (d *Distributor) ParcaProfileStore() parcastorev1.ProfileStoreServiceServer {
return &ParcaProfileStore{
distributor: d,
}
}

type ParcaProfileStore struct {
parcastorev1.UnimplementedProfileStoreServiceServer
distributor *Distributor
}

func (s *ParcaProfileStore) WriteRaw(ctx context.Context, req *parcastorev1.WriteRawRequest) (*parcastorev1.WriteRawResponse, error) {
nReq := &pushv1.PushRequest{
Series: make([]*pushv1.RawProfileSeries, len(req.Series)),
}
for idxSeries, series := range req.Series {
nReq.Series[idxSeries] = &pushv1.RawProfileSeries{
Samples: make([]*pushv1.RawSample, len(series.Samples)),
Labels: make([]*commonv1.LabelPair, len(series.Labels.Labels)),
}
for idx, l := range series.Labels.Labels {
nReq.Series[idxSeries].Labels[idx] = &commonv1.LabelPair{
Name: l.Name,
Value: l.Value,
}
}
for idx, s := range series.Samples {
nReq.Series[idxSeries].Samples[idx] = &pushv1.RawSample{
RawProfile: s.RawProfile,
}
}
level.Warn(s.distributor.logger).Log("msg", "converted parca sample", "labels", firemodel.LabelPairsString(nReq.Series[idxSeries].Labels))
}

if _, err := s.distributor.Push(ctx, connect.NewRequest(nReq)); err != nil {
return nil, err
}

return &parcastorev1.WriteRawResponse{}, nil
}
10 changes: 9 additions & 1 deletion pkg/fire/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ import (
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
grpcgw "github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
parcadebuginfov1 "github.com/parca-dev/parca/gen/proto/go/parca/debuginfo/v1alpha1"
parcastorev1 "github.com/parca-dev/parca/gen/proto/go/parca/profilestore/v1alpha1"
"github.com/pkg/errors"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/version"
"github.com/thanos-io/thanos/pkg/discovery/dns"
Expand All @@ -24,6 +25,7 @@ import (
"golang.org/x/net/http2/h2c"

"github.com/grafana/fire/pkg/agent"
"github.com/grafana/fire/pkg/debuginfo"
"github.com/grafana/fire/pkg/distributor"
"github.com/grafana/fire/pkg/firedb"
agentv1 "github.com/grafana/fire/pkg/gen/agent/v1"
Expand Down Expand Up @@ -107,7 +109,13 @@ func (f *Fire) initDistributor() (services.Service, error) {
// initialise direct pusher, this overwrites the default HTTP client
f.pusherClient = d

// register pusher
pushv1connect.RegisterPusherServiceHandler(f.Server.HTTP, d)

// register parca compatible profile store
parcastorev1.RegisterProfileStoreServiceServer(f.Server.GRPC, d.ParcaProfileStore())
parcadebuginfov1.RegisterDebugInfoServiceServer(f.Server.GRPC, debuginfo.New(f.logger))

return d, nil
}

Expand Down
1 change: 1 addition & 0 deletions pkg/firedb/head_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ func TestHeadIngestRealProfiles(t *testing.T) {
profilePaths := []string{
"testdata/heap",
"testdata/profile",
"testdata/parca-agent",
}

head, err := NewHead(t.TempDir())
Expand Down
5 changes: 4 additions & 1 deletion pkg/firedb/locations.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ func (*locationsHelper) addToRewriter(r *rewriter, elemRewriter idConversionTabl
}

func (*locationsHelper) rewrite(r *rewriter, l *profilev1.Location) error {
r.mappings.rewriteUint64(&l.MappingId)
// ignore mappingIDs of 0, as they indicate that it has already been symbolized.
if l.MappingId != 0 {
r.mappings.rewriteUint64(&l.MappingId)
}

for pos := range l.Line {
r.functions.rewriteUint64(&l.Line[pos].FunctionId)
Expand Down
1 change: 0 additions & 1 deletion pkg/firedb/profiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,6 @@ func (*profilesHelper) key(s *schemav1.Profile) noKey {
}

func (*profilesHelper) addToRewriter(r *rewriter, elemRewriter idConversionTable) {
r.locations = elemRewriter
}

func (*profilesHelper) rewrite(r *rewriter, s *schemav1.Profile) error {
Expand Down
Binary file added pkg/firedb/testdata/parca-agent
Binary file not shown.