Skip to content
This repository has been archived by the owner on Jul 19, 2023. It is now read-only.

Commit

Permalink
Add support for parca profilestore API
Browse files Browse the repository at this point in the history
  • Loading branch information
simonswine committed Sep 13, 2022
1 parent b1583e4 commit 05b4713
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 3 deletions.
45 changes: 45 additions & 0 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ import (

"github.com/bufbuild/connect-go"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/google/uuid"
"github.com/grafana/dskit/ring"
ring_client "github.com/grafana/dskit/ring/client"
"github.com/grafana/dskit/services"
"github.com/opentracing/opentracing-go"
parcastorev1 "github.com/parca-dev/parca/gen/proto/go/parca/profilestore/v1alpha1"
"github.com/parca-dev/parca/pkg/scrape"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -147,6 +149,8 @@ func (d *Distributor) Push(ctx context.Context, req *connect.Request[pushv1.Push

p.Normalize()

level.Warn(d.logger).Log("msg", "received sample", "labels", firemodel.LabelPairsString(series.Labels), "type", p.StringTable[p.SampleType[0].Type])

// reuse the data buffer if possible
size := p.SizeVT()
if cap(data) < size {
Expand Down Expand Up @@ -309,3 +313,44 @@ func TokenFor(tenantID, labels string) uint32 {
_, _ = h.Write([]byte(labels))
return h.Sum32()
}

func (d *Distributor) ParcaProfileStore() parcastorev1.ProfileStoreServiceServer {
return &ParcaProfileStore{
distributor: d,
}
}

type ParcaProfileStore struct {
parcastorev1.UnimplementedProfileStoreServiceServer
distributor *Distributor
}

func (s *ParcaProfileStore) WriteRaw(ctx context.Context, req *parcastorev1.WriteRawRequest) (*parcastorev1.WriteRawResponse, error) {
nReq := &pushv1.PushRequest{
Series: make([]*pushv1.RawProfileSeries, len(req.Series)),
}
for idxSeries, series := range req.Series {
nReq.Series[idxSeries] = &pushv1.RawProfileSeries{
Samples: make([]*pushv1.RawSample, len(series.Samples)),
Labels: make([]*commonv1.LabelPair, len(series.Labels.Labels)),
}
for idx, l := range series.Labels.Labels {
nReq.Series[idxSeries].Labels[idx] = &commonv1.LabelPair{
Name: l.Name,
Value: l.Value,
}
}
for idx, s := range series.Samples {
nReq.Series[idxSeries].Samples[idx] = &pushv1.RawSample{
RawProfile: s.RawProfile,
}
}
level.Warn(s.distributor.logger).Log("msg", "converted parca sample", "labels", firemodel.LabelPairsString(nReq.Series[idxSeries].Labels))
}

if _, err := s.distributor.Push(ctx, connect.NewRequest(nReq)); err != nil {
return nil, err
}

return &parcastorev1.WriteRawResponse{}, nil
}
7 changes: 6 additions & 1 deletion pkg/fire/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
grpcgw "github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
parcastorev1 "github.com/parca-dev/parca/gen/proto/go/parca/profilestore/v1alpha1"
"github.com/pkg/errors"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/version"
"github.com/thanos-io/thanos/pkg/discovery/dns"
Expand Down Expand Up @@ -107,7 +107,12 @@ func (f *Fire) initDistributor() (services.Service, error) {
// initialise direct pusher, this overwrites the default HTTP client
f.pusherClient = d

// register pusher
pushv1connect.RegisterPusherServiceHandler(f.Server.HTTP, d)

// register parca compatible profile store
parcastorev1.RegisterProfileStoreServiceServer(f.Server.GRPC, d.ParcaProfileStore())

return d, nil
}

Expand Down
1 change: 1 addition & 0 deletions pkg/firedb/head_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ func TestHeadIngestRealProfiles(t *testing.T) {
profilePaths := []string{
"testdata/heap",
"testdata/profile",
"testdata/parca-agent",
}

head, err := NewHead(t.TempDir())
Expand Down
5 changes: 4 additions & 1 deletion pkg/firedb/locations.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ func (*locationsHelper) addToRewriter(r *rewriter, elemRewriter idConversionTabl
}

func (*locationsHelper) rewrite(r *rewriter, l *profilev1.Location) error {
r.mappings.rewriteUint64(&l.MappingId)
// ignore mappingIDs of 0, as they indicate that it has already been symbolized.
if l.MappingId != 0 {
r.mappings.rewriteUint64(&l.MappingId)
}

for pos := range l.Line {
r.functions.rewriteUint64(&l.Line[pos].FunctionId)
Expand Down
1 change: 0 additions & 1 deletion pkg/firedb/profiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,6 @@ func (*profilesHelper) key(s *schemav1.Profile) noKey {
}

func (*profilesHelper) addToRewriter(r *rewriter, elemRewriter idConversionTable) {
r.locations = elemRewriter
}

func (*profilesHelper) rewrite(r *rewriter, s *schemav1.Profile) error {
Expand Down
Binary file added pkg/firedb/testdata/parca-agent
Binary file not shown.

0 comments on commit 05b4713

Please sign in to comment.