Skip to content

Commit

Permalink
fix: bump pubsub package with fix for deadlock
Browse files Browse the repository at this point in the history
refs troian/pubsub#3

Signed-off-by: Artur Troian <[email protected]>
  • Loading branch information
troian committed Jan 29, 2025
1 parent 8b0f59b commit 33af360
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 19 deletions.
36 changes: 21 additions & 15 deletions cluster/inventory.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,12 @@ import (
inventoryV1 "github.com/akash-network/akash-api/go/inventory/v1"
provider "github.com/akash-network/akash-api/go/provider/v1"
"github.com/boz/go-lifecycle"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/desertbit/timer"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
tpubsub "github.com/troian/pubsub"

sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/tendermint/tendermint/libs/log"
tpubsub "github.com/troian/pubsub"

dtypes "github.com/akash-network/akash-api/go/node/deployment/v1beta3"
mtypes "github.com/akash-network/akash-api/go/node/market/v1beta4"
Expand Down Expand Up @@ -486,23 +485,23 @@ func (is *inventoryService) run(ctx context.Context, reservationsArg []*reservat
invupch := make(chan ctypes.Inventory, 1)

invch := is.clients.inventory.ResultChan()
var reserveChLocal <-chan inventoryRequest
var reservech <-chan inventoryRequest

resumeProcessingReservations := func() {
reserveChLocal = is.reservech
reservech = is.reservech
}

t := timer.NewStoppedTimer()

updateIPs := func() {
if is.clients.ip != nil {
reserveChLocal = nil
reservech = nil
if runch == nil {
t.Stop()
runch = is.runCheck(rctx, state)
}
} else if reserveChLocal == nil && state.inventory != nil {
reserveChLocal = is.reservech
} else if reservech == nil && state.inventory != nil {
reservech = is.reservech
}
}

Expand All @@ -516,7 +515,6 @@ func (is *inventoryService) run(ctx context.Context, reservationsArg []*reservat
default:
}
}

loop:
for {
select {
Expand Down Expand Up @@ -566,7 +564,7 @@ loop:
}
case <-t.C:
updateIPs()
case req := <-reserveChLocal:
case req := <-reservech:
is.handleRequest(req, state)
case req := <-is.lookupch:
// lookup registration
Expand Down Expand Up @@ -612,14 +610,21 @@ loop:
inventoryRequestsCounter.WithLabelValues("unreserve", "not-found").Inc()
req.ch <- inventoryResponse{err: errReservationNotFound}
case responseCh := <-is.statusch:
responseCh <- is.getStatus(state)
select {
case responseCh <- is.getStatus(state):
default:
}
inventoryRequestsCounter.WithLabelValues("status", "success").Inc()
case responseCh := <-is.statusV1ch:
resp, err := is.getStatusV1(state)
responseCh <- invSnapshotResp{
select {
case responseCh <- invSnapshotResp{
res: resp,
err: err,
}:
default:
}

if err == nil {
inventoryRequestsCounter.WithLabelValues("status", "success").Inc()
} else {
Expand All @@ -631,11 +636,11 @@ loop:
}

select {
case invupch <- inv:
case <-invupch:
default:
<-invupch
invupch <- inv
}

invupch <- inv
case inv := <-invupch:
currinv = inv.Dup()
state.inventory = inv
Expand Down Expand Up @@ -692,6 +697,7 @@ loop:
if err != nil {
continue
}

bus.Pub(inv, []string{ptypes.PubSubTopicInventoryStatus}, tpubsub.WithRetain())
}

Expand Down
7 changes: 5 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ require (
github.com/spf13/viper v1.18.2
github.com/stretchr/testify v1.9.0
github.com/tendermint/tendermint v0.34.27
github.com/troian/pubsub v0.1.1
github.com/troian/pubsub v0.1.2
github.com/vektra/mockery/v2 v2.40.2
go.uber.org/zap v1.27.0
golang.org/x/net v0.30.0
Expand All @@ -51,7 +51,10 @@ require (
sigs.k8s.io/structured-merge-diff/v4 v4.4.2
)

retract v0.6.0
retract (
v0.6.0
v0.6.5
)

replace (
// use cosmos fork of keyring
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1495,8 +1495,8 @@ github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/troian/hid v0.13.2 h1:O7PWZQm5YGyg0nVvknFVLVrNTPillz4ZXvxJOtoyteE=
github.com/troian/hid v0.13.2/go.mod h1:n6adloQ1876oEXZr6fFsthy4FDHxwJhh7QYQspm30Ds=
github.com/troian/pubsub v0.1.1 h1:huc5qneo0rtSKKsrkroyyMu+b8bw0talql2tt7GXl98=
github.com/troian/pubsub v0.1.1/go.mod h1:fOUAEWXes/SkyWPTdBpW3L/ovyg74N+eBxRpWKik+2Q=
github.com/troian/pubsub v0.1.2 h1:XPS8Y5nawdNRyPyhfFw/SFhPKO1SadY83ZlOVlaxadE=
github.com/troian/pubsub v0.1.2/go.mod h1:fOUAEWXes/SkyWPTdBpW3L/ovyg74N+eBxRpWKik+2Q=
github.com/ttacon/chalk v0.0.0-20160626202418-22c06c80ed31/go.mod h1:onvgF043R+lC5RZ8IT9rBXDaEDnpnw/Cl+HFiw+v/7Q=
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
github.com/tyler-smith/go-bip39 v1.0.1-0.20181017060643-dbb3b84ba2ef/go.mod h1:sJ5fKU0s6JVwZjjcUEX2zFOnvq0ASQ2K9Zr6cf67kNs=
Expand Down

0 comments on commit 33af360

Please sign in to comment.