Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SendFil Logs and API #724

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func NewServer(conf Config) (*Server, error) {
if err != nil {
return nil, fmt.Errorf("creating deal module: %s", err)
}
wm, err := walletModule.New(clientBuilder, masterAddr, conf.WalletInitialFunds, conf.AutocreateMasterAddr, networkName)
wm, err := walletModule.New(clientBuilder, masterAddr, conf.WalletInitialFunds, conf.AutocreateMasterAddr, networkName, txndstr.Wrap(ds, "wallet"))
if err != nil {
return nil, fmt.Errorf("creating wallet module: %s", err)
}
Expand Down
2 changes: 1 addition & 1 deletion ffs/integrationtest/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func NewCustomFFSManager(t require.TestingT, ds datastore.TxnDatastore, cb lotus
sched, err := scheduler.New(txndstr.Wrap(ds, "ffs/scheduler"), l, hl, cl, 10, time.Minute*10, nil, scheduler.GCConfig{AutoGCInterval: 0})
require.NoError(t, err)

wm, err := walletModule.New(cb, masterAddr, *big.NewInt(iWalletBal), false, "")
wm, err := walletModule.New(cb, masterAddr, *big.NewInt(iWalletBal), false, "", txndstr.Wrap(ds, "wallet"))
require.NoError(t, err)

manager, err := manager.New(ds, wm, dm, sched, false, true)
Expand Down
2 changes: 1 addition & 1 deletion ffs/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func TestDefaultStorageConfig(t *testing.T) {
}

func newManager(clientBuilder lotus.ClientBuilder, ds datastore.TxnDatastore, masterAddr address.Address, ffsUseMasterAddr bool) (*Manager, func() error, error) {
wm, err := walletModule.New(clientBuilder, masterAddr, *big.NewInt(4000000000), false, "")
wm, err := walletModule.New(clientBuilder, masterAddr, *big.NewInt(4000000000), false, "", txndstr.Wrap(ds, "wallet"))
if err != nil {
return nil, func() error { return nil }, err
}
Expand Down
19 changes: 16 additions & 3 deletions wallet/module/wallet.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ import (
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/crypto"
"github.com/filecoin-project/lotus/chain/types"
"github.com/ipfs/go-datastore"
logger "github.com/ipfs/go-log/v2"
"github.com/textileio/powergate/lotus"
txndstr "github.com/textileio/powergate/txndstransform"
"github.com/textileio/powergate/wallet/sendstore"
)

const (
Expand All @@ -31,15 +34,17 @@ type Module struct {
iAmount *big.Int
masterAddr address.Address
networkName string
ss *sendstore.SendStore
}

// New creates a new wallet module.
func New(clientBuilder lotus.ClientBuilder, maddr address.Address, iam big.Int, autocreate bool, networkName string) (*Module, error) {
func New(clientBuilder lotus.ClientBuilder, maddr address.Address, iam big.Int, autocreate bool, networkName string, ds datastore.TxnDatastore) (*Module, error) {
m := &Module{
clientBuilder: clientBuilder,
iAmount: &iam,
masterAddr: maddr,
networkName: networkName,
ss: sendstore.New(txndstr.Wrap(ds, "sendstore")),
}
if maddr == address.Undef && autocreate {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
Expand Down Expand Up @@ -223,8 +228,16 @@ func (m *Module) SendFil(ctx context.Context, from string, to string, amount *bi
}
defer cls()

_, err = client.MpoolPushMessage(ctx, msg, nil)
return err
sm, err := client.MpoolPushMessage(ctx, msg, nil)
if err != nil {
return fmt.Errorf("pushing mpool message: %s", err)
}

if _, err := m.ss.Put(sm.Cid(), f, t, amount); err != nil {
return fmt.Errorf("saving transaction: %s", err)
}

return nil
}

// FundFromFaucet make a faucet call to fund the provided wallet address.
Expand Down
257 changes: 257 additions & 0 deletions wallet/sendstore/sendstore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
package sendstore

import (
"encoding/json"
"errors"
"fmt"
"math/big"
"strconv"
"time"

"github.com/filecoin-project/go-address"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
logging "github.com/ipfs/go-log/v2"
"github.com/textileio/powergate/wallet"
)

var (
log = logging.Logger("wallet-sendstore")

// ErrNotFound indicates the instance doesn't exist.
ErrNotFound = errors.New("not found")

dsBaseEvent = datastore.NewKey("event").String()
dsBaseIndexFrom = datastore.NewKey("index/from").String()
dsBaseIndexTo = datastore.NewKey("index/to").String()
)

// SendStore stores information about SendFil transactions.
type SendStore struct {
ds datastore.TxnDatastore
}

// New creates a new SendStore.
func New(ds datastore.TxnDatastore) *SendStore {
return &SendStore{
ds: ds,
}
}

// Put saves a transaction.
func (s *SendStore) Put(cid cid.Cid, from, to address.Address, amount *big.Int) (*wallet.SendFilEvent, error) {
rec := &wallet.SendFilEvent{
Cid: cid,
From: from,
To: to,
Amount: amount,
Time: time.Now(),
}
bytes, err := json.Marshal(rec)
if err != nil {
return nil, fmt.Errorf("marshaling json: %v", err)
}

tx, err := s.ds.NewTransaction(false)
defer tx.Discard()
if err != nil {
return nil, fmt.Errorf("creating transaction: %v", err)
}

dataKey := eventKey(cid)

err = tx.Put(dataKey, bytes)
if err != nil {
return nil, fmt.Errorf("putting rec: %v", err)
}

err = tx.Put(indexFromKey(cid, from, to, rec.Time, rec.Amount), dataKey.Bytes())
if err != nil {
return nil, fmt.Errorf("putting from index: %v", err)
}

err = tx.Put(indexToKey(cid, to, from, rec.Time, rec.Amount), dataKey.Bytes())
if err != nil {
return nil, fmt.Errorf("putting to index: %v", err)
}

err = tx.Commit()
if err != nil {
return nil, fmt.Errorf("committing transaction: %v", err)
}
return rec, nil
}

// Get retrieves a send fil txn by cid.
func (s *SendStore) Get(cid cid.Cid) (*wallet.SendFilEvent, error) {
return s.get(eventKey(cid))
}

// All returns all SendFilEvents.
func (s *SendStore) All() ([]*wallet.SendFilEvent, error) {
return s.withIndexPrefix(dsBaseIndexFrom)
}

// From returns all SendFilEvents sent from the specified address.
func (s *SendStore) From(address address.Address) ([]*wallet.SendFilEvent, error) {
return s.withIndexPrefix(indexFromPrefix(address))
}

// To returns all SendFilEvents sent to the specified address.
func (s *SendStore) To(address address.Address) ([]*wallet.SendFilEvent, error) {
return s.withIndexPrefix(indexToPrefix(address))
}

// FromTo returns all SendFilEvents sent from the specified address to the specified address.
func (s *SendStore) FromTo(from, to address.Address) ([]*wallet.SendFilEvent, error) {
return s.withIndexPrefix(indexFromToPrefix(from, to))
}

// Between returns all SendFilEvents between the specified addresses.
func (s *SendStore) Between(addr1, addr2 address.Address) ([]*wallet.SendFilEvent, error) {
res1, err := s.withIndexPrefix(indexFromToPrefix(addr1, addr2))
if err != nil {
return nil, fmt.Errorf("getting events from addr1 to addr 2: %v", err)
}
res2, err := s.withIndexPrefix(indexFromToPrefix(addr2, addr1))
if err != nil {
return nil, fmt.Errorf("getting events from addr1 to addr 2: %v", err)
}
return append(res1, res2...), nil
}

func sortByTime(a, b query.Entry) int {
aTime, err := extractTime(a.Key)
if err != nil {
log.Errorf("extracting time from key a: %v", err)
return 0
}
bTime, err := extractTime(b.Key)
if err != nil {
log.Errorf("extracting time from key b: %v", err)
return 0
}

if aTime > bTime {
return 1
} else if bTime > aTime {
return -1
} else {
return 0
}
}

func extractTime(key string) (int64, error) {
k := datastore.NewKey(key)
for _, namespace := range k.Namespaces() {
t := datastore.NamespaceType(namespace)
v := datastore.NamespaceValue(namespace)
if t == "time" {
return strconv.ParseInt(v, 10, 64)
}
}
return 0, fmt.Errorf("no time namespace type found")
}

func (s *SendStore) withIndexPrefix(prefix string) ([]*wallet.SendFilEvent, error) {
q := query.Query{
Prefix: prefix,
Orders: []query.Order{query.OrderByFunction(sortByTime)},
}
res, err := s.ds.Query(q)
if err != nil {
return nil, fmt.Errorf("querying datastore: %s", err)
}
defer func() {
if err := res.Close(); err != nil {
log.Errorf("closing allWithIndexPrefix index query result: %s", err)
}
}()
var events []*wallet.SendFilEvent
for r := range res.Next() {
if r.Error != nil {
return nil, fmt.Errorf("iter next: %s", r.Error)
}
eventKey := datastore.NewKey(string(r.Value))
event, err := s.get(eventKey)
if err != nil {
return nil, fmt.Errorf("getting event: %v", err)
}
events = append(events, event)
}
return events, nil
}

func (s *SendStore) get(key datastore.Key) (*wallet.SendFilEvent, error) {
bytes, err := s.ds.Get(key)
if err != nil {
return nil, fmt.Errorf("getting event bytes from ds: %v", err)
}
event := &wallet.SendFilEvent{}
err = json.Unmarshal(bytes, event)
if err != nil {
return nil, fmt.Errorf("unmarshaling bytes into event: %v", err)
}
return event, nil
}

func eventKey(cid cid.Cid) datastore.Key {
return datastore.KeyWithNamespaces([]string{
dsBaseEvent,
cid.String(),
})
}

func indexFromPrefix(from address.Address) string {
return datastore.KeyWithNamespaces([]string{
dsBaseIndexFrom,
kvStr("from", from.String()),
}).String()
}

func indexToPrefix(to address.Address) string {
return datastore.KeyWithNamespaces([]string{
dsBaseIndexTo,
kvStr("to", to.String()),
}).String()
}

func indexFromToPrefix(from, to address.Address) string {
return datastore.KeyWithNamespaces([]string{
indexFromPrefix(from),
kvStr("to", to.String()),
}).String()
}

func indexToFromPrefix(to, from address.Address) string {
return datastore.KeyWithNamespaces([]string{
indexToPrefix(to),
kvStr("from", from.String()),
}).String()
}

func indexFromKey(cid cid.Cid, from, to address.Address, time time.Time, amt *big.Int) datastore.Key {
return datastore.KeyWithNamespaces([]string{
indexFromToPrefix(from, to),
kvStr("time", fmt.Sprintf("%d", time.UnixNano())),
kvStr("amt", amt.String()),
cid.String(),
})
}

func indexToKey(cid cid.Cid, to, from address.Address, time time.Time, amt *big.Int) datastore.Key {
return datastore.KeyWithNamespaces([]string{
indexToFromPrefix(to, from),
kvStr("time", fmt.Sprintf("%d", time.UnixNano())),
kvStr("amt", amt.String()),
cid.String(),
})
}

func kvStr(t, v string) string {
if t == "" {
return v
}
return fmt.Sprintf("%s:%s", t, v)
}
Loading