diff --git a/api/server/server.go b/api/server/server.go index 35c44150b..d66103eba 100644 --- a/api/server/server.go +++ b/api/server/server.go @@ -229,7 +229,7 @@ func NewServer(conf Config) (*Server, error) { if err != nil { return nil, fmt.Errorf("creating deal module: %s", err) } - wm, err := walletModule.New(clientBuilder, masterAddr, conf.WalletInitialFunds, conf.AutocreateMasterAddr, networkName) + wm, err := walletModule.New(clientBuilder, masterAddr, conf.WalletInitialFunds, conf.AutocreateMasterAddr, networkName, txndstr.Wrap(ds, "wallet")) if err != nil { return nil, fmt.Errorf("creating wallet module: %s", err) } diff --git a/ffs/integrationtest/manager/manager.go b/ffs/integrationtest/manager/manager.go index 3d91bc0b0..26175599b 100644 --- a/ffs/integrationtest/manager/manager.go +++ b/ffs/integrationtest/manager/manager.go @@ -92,7 +92,7 @@ func NewCustomFFSManager(t require.TestingT, ds datastore.TxnDatastore, cb lotus sched, err := scheduler.New(txndstr.Wrap(ds, "ffs/scheduler"), l, hl, cl, 10, time.Minute*10, nil, scheduler.GCConfig{AutoGCInterval: 0}) require.NoError(t, err) - wm, err := walletModule.New(cb, masterAddr, *big.NewInt(iWalletBal), false, "") + wm, err := walletModule.New(cb, masterAddr, *big.NewInt(iWalletBal), false, "", txndstr.Wrap(ds, "wallet")) require.NoError(t, err) manager, err := manager.New(ds, wm, dm, sched, false, true) diff --git a/ffs/manager/manager_test.go b/ffs/manager/manager_test.go index 1c425add0..069e2c603 100644 --- a/ffs/manager/manager_test.go +++ b/ffs/manager/manager_test.go @@ -194,7 +194,7 @@ func TestDefaultStorageConfig(t *testing.T) { } func newManager(clientBuilder lotus.ClientBuilder, ds datastore.TxnDatastore, masterAddr address.Address, ffsUseMasterAddr bool) (*Manager, func() error, error) { - wm, err := walletModule.New(clientBuilder, masterAddr, *big.NewInt(4000000000), false, "") + wm, err := walletModule.New(clientBuilder, masterAddr, *big.NewInt(4000000000), false, "", txndstr.Wrap(ds, "wallet")) if err != nil { return nil, func() error { return nil }, err } diff --git a/wallet/module/wallet.go b/wallet/module/wallet.go index d3b3be62d..f9d75b2fd 100644 --- a/wallet/module/wallet.go +++ b/wallet/module/wallet.go @@ -12,8 +12,11 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/crypto" "github.com/filecoin-project/lotus/chain/types" + "github.com/ipfs/go-datastore" logger "github.com/ipfs/go-log/v2" "github.com/textileio/powergate/lotus" + txndstr "github.com/textileio/powergate/txndstransform" + "github.com/textileio/powergate/wallet/sendstore" ) const ( @@ -31,15 +34,17 @@ type Module struct { iAmount *big.Int masterAddr address.Address networkName string + ss *sendstore.SendStore } // New creates a new wallet module. -func New(clientBuilder lotus.ClientBuilder, maddr address.Address, iam big.Int, autocreate bool, networkName string) (*Module, error) { +func New(clientBuilder lotus.ClientBuilder, maddr address.Address, iam big.Int, autocreate bool, networkName string, ds datastore.TxnDatastore) (*Module, error) { m := &Module{ clientBuilder: clientBuilder, iAmount: &iam, masterAddr: maddr, networkName: networkName, + ss: sendstore.New(txndstr.Wrap(ds, "sendstore")), } if maddr == address.Undef && autocreate { ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) @@ -223,8 +228,16 @@ func (m *Module) SendFil(ctx context.Context, from string, to string, amount *bi } defer cls() - _, err = client.MpoolPushMessage(ctx, msg, nil) - return err + sm, err := client.MpoolPushMessage(ctx, msg, nil) + if err != nil { + return fmt.Errorf("pushing mpool message: %s", err) + } + + if _, err := m.ss.Put(sm.Cid(), f, t, amount); err != nil { + return fmt.Errorf("saving transaction: %s", err) + } + + return nil } // FundFromFaucet make a faucet call to fund the provided wallet address. diff --git a/wallet/sendstore/sendstore.go b/wallet/sendstore/sendstore.go new file mode 100644 index 000000000..02c009685 --- /dev/null +++ b/wallet/sendstore/sendstore.go @@ -0,0 +1,257 @@ +package sendstore + +import ( + "encoding/json" + "errors" + "fmt" + "math/big" + "strconv" + "time" + + "github.com/filecoin-project/go-address" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/query" + logging "github.com/ipfs/go-log/v2" + "github.com/textileio/powergate/wallet" +) + +var ( + log = logging.Logger("wallet-sendstore") + + // ErrNotFound indicates the instance doesn't exist. + ErrNotFound = errors.New("not found") + + dsBaseEvent = datastore.NewKey("event").String() + dsBaseIndexFrom = datastore.NewKey("index/from").String() + dsBaseIndexTo = datastore.NewKey("index/to").String() +) + +// SendStore stores information about SendFil transactions. +type SendStore struct { + ds datastore.TxnDatastore +} + +// New creates a new SendStore. +func New(ds datastore.TxnDatastore) *SendStore { + return &SendStore{ + ds: ds, + } +} + +// Put saves a transaction. +func (s *SendStore) Put(cid cid.Cid, from, to address.Address, amount *big.Int) (*wallet.SendFilEvent, error) { + rec := &wallet.SendFilEvent{ + Cid: cid, + From: from, + To: to, + Amount: amount, + Time: time.Now(), + } + bytes, err := json.Marshal(rec) + if err != nil { + return nil, fmt.Errorf("marshaling json: %v", err) + } + + tx, err := s.ds.NewTransaction(false) + defer tx.Discard() + if err != nil { + return nil, fmt.Errorf("creating transaction: %v", err) + } + + dataKey := eventKey(cid) + + err = tx.Put(dataKey, bytes) + if err != nil { + return nil, fmt.Errorf("putting rec: %v", err) + } + + err = tx.Put(indexFromKey(cid, from, to, rec.Time, rec.Amount), dataKey.Bytes()) + if err != nil { + return nil, fmt.Errorf("putting from index: %v", err) + } + + err = tx.Put(indexToKey(cid, to, from, rec.Time, rec.Amount), dataKey.Bytes()) + if err != nil { + return nil, fmt.Errorf("putting to index: %v", err) + } + + err = tx.Commit() + if err != nil { + return nil, fmt.Errorf("committing transaction: %v", err) + } + return rec, nil +} + +// Get retrieves a send fil txn by cid. +func (s *SendStore) Get(cid cid.Cid) (*wallet.SendFilEvent, error) { + return s.get(eventKey(cid)) +} + +// All returns all SendFilEvents. +func (s *SendStore) All() ([]*wallet.SendFilEvent, error) { + return s.withIndexPrefix(dsBaseIndexFrom) +} + +// From returns all SendFilEvents sent from the specified address. +func (s *SendStore) From(address address.Address) ([]*wallet.SendFilEvent, error) { + return s.withIndexPrefix(indexFromPrefix(address)) +} + +// To returns all SendFilEvents sent to the specified address. +func (s *SendStore) To(address address.Address) ([]*wallet.SendFilEvent, error) { + return s.withIndexPrefix(indexToPrefix(address)) +} + +// FromTo returns all SendFilEvents sent from the specified address to the specified address. +func (s *SendStore) FromTo(from, to address.Address) ([]*wallet.SendFilEvent, error) { + return s.withIndexPrefix(indexFromToPrefix(from, to)) +} + +// Between returns all SendFilEvents between the specified addresses. +func (s *SendStore) Between(addr1, addr2 address.Address) ([]*wallet.SendFilEvent, error) { + res1, err := s.withIndexPrefix(indexFromToPrefix(addr1, addr2)) + if err != nil { + return nil, fmt.Errorf("getting events from addr1 to addr 2: %v", err) + } + res2, err := s.withIndexPrefix(indexFromToPrefix(addr2, addr1)) + if err != nil { + return nil, fmt.Errorf("getting events from addr1 to addr 2: %v", err) + } + return append(res1, res2...), nil +} + +func sortByTime(a, b query.Entry) int { + aTime, err := extractTime(a.Key) + if err != nil { + log.Errorf("extracting time from key a: %v", err) + return 0 + } + bTime, err := extractTime(b.Key) + if err != nil { + log.Errorf("extracting time from key b: %v", err) + return 0 + } + + if aTime > bTime { + return 1 + } else if bTime > aTime { + return -1 + } else { + return 0 + } +} + +func extractTime(key string) (int64, error) { + k := datastore.NewKey(key) + for _, namespace := range k.Namespaces() { + t := datastore.NamespaceType(namespace) + v := datastore.NamespaceValue(namespace) + if t == "time" { + return strconv.ParseInt(v, 10, 64) + } + } + return 0, fmt.Errorf("no time namespace type found") +} + +func (s *SendStore) withIndexPrefix(prefix string) ([]*wallet.SendFilEvent, error) { + q := query.Query{ + Prefix: prefix, + Orders: []query.Order{query.OrderByFunction(sortByTime)}, + } + res, err := s.ds.Query(q) + if err != nil { + return nil, fmt.Errorf("querying datastore: %s", err) + } + defer func() { + if err := res.Close(); err != nil { + log.Errorf("closing allWithIndexPrefix index query result: %s", err) + } + }() + var events []*wallet.SendFilEvent + for r := range res.Next() { + if r.Error != nil { + return nil, fmt.Errorf("iter next: %s", r.Error) + } + eventKey := datastore.NewKey(string(r.Value)) + event, err := s.get(eventKey) + if err != nil { + return nil, fmt.Errorf("getting event: %v", err) + } + events = append(events, event) + } + return events, nil +} + +func (s *SendStore) get(key datastore.Key) (*wallet.SendFilEvent, error) { + bytes, err := s.ds.Get(key) + if err != nil { + return nil, fmt.Errorf("getting event bytes from ds: %v", err) + } + event := &wallet.SendFilEvent{} + err = json.Unmarshal(bytes, event) + if err != nil { + return nil, fmt.Errorf("unmarshaling bytes into event: %v", err) + } + return event, nil +} + +func eventKey(cid cid.Cid) datastore.Key { + return datastore.KeyWithNamespaces([]string{ + dsBaseEvent, + cid.String(), + }) +} + +func indexFromPrefix(from address.Address) string { + return datastore.KeyWithNamespaces([]string{ + dsBaseIndexFrom, + kvStr("from", from.String()), + }).String() +} + +func indexToPrefix(to address.Address) string { + return datastore.KeyWithNamespaces([]string{ + dsBaseIndexTo, + kvStr("to", to.String()), + }).String() +} + +func indexFromToPrefix(from, to address.Address) string { + return datastore.KeyWithNamespaces([]string{ + indexFromPrefix(from), + kvStr("to", to.String()), + }).String() +} + +func indexToFromPrefix(to, from address.Address) string { + return datastore.KeyWithNamespaces([]string{ + indexToPrefix(to), + kvStr("from", from.String()), + }).String() +} + +func indexFromKey(cid cid.Cid, from, to address.Address, time time.Time, amt *big.Int) datastore.Key { + return datastore.KeyWithNamespaces([]string{ + indexFromToPrefix(from, to), + kvStr("time", fmt.Sprintf("%d", time.UnixNano())), + kvStr("amt", amt.String()), + cid.String(), + }) +} + +func indexToKey(cid cid.Cid, to, from address.Address, time time.Time, amt *big.Int) datastore.Key { + return datastore.KeyWithNamespaces([]string{ + indexToFromPrefix(to, from), + kvStr("time", fmt.Sprintf("%d", time.UnixNano())), + kvStr("amt", amt.String()), + cid.String(), + }) +} + +func kvStr(t, v string) string { + if t == "" { + return v + } + return fmt.Sprintf("%s:%s", t, v) +} diff --git a/wallet/sendstore/sendstore_test.go b/wallet/sendstore/sendstore_test.go new file mode 100644 index 000000000..87960cdf9 --- /dev/null +++ b/wallet/sendstore/sendstore_test.go @@ -0,0 +1,160 @@ +package sendstore + +import ( + "math/big" + "testing" + "time" + + "github.com/filecoin-project/go-address" + "github.com/ipfs/go-cid" + "github.com/stretchr/testify/require" + "github.com/textileio/powergate/tests" + "github.com/textileio/powergate/util" + "github.com/textileio/powergate/wallet" +) + +var createAddress = address.NewForTestGetter() + +func TestPut(t *testing.T) { + t.Parallel() + s := create(t) + requirePut(t, s, "QmWATWQ7fVPP2EFGu71UkfnqhYXDYH566qy47CnJDgvs8u", address.TestAddress, address.TestAddress2, "100") +} + +func TestGet(t *testing.T) { + t.Parallel() + s := create(t) + txn := requirePut(t, s, "QmWATWQ7fVPP2EFGu71UkfnqhYXDYH566qy47CnJDgvs8u", address.TestAddress, address.TestAddress2, "100") + res, err := s.Get(txn.Cid) + require.NoError(t, err) + require.Equal(t, txn.Cid, res.Cid) + require.Equal(t, txn.Amount, res.Amount) + require.Equal(t, txn.From, res.From) + require.Equal(t, txn.To, res.To) + require.True(t, res.Time.Equal(txn.Time)) +} + +func TestAll(t *testing.T) { + t.Parallel() + s := create(t) + addr1 := createAddress() + addr2 := createAddress() + requirePut(t, s, "QmWATWQ7fVPP2EFGu71UkfnqhYXDYH566qy47CnJDgvs8u", addr1, addr2, "100") + requirePut(t, s, "QmWATWQ7fVPP2EFGu71UkfnqhYXDYH566qy47CnJDgvs8v", addr1, addr2, "200") + requirePut(t, s, "QmWATWQ7fVPP2EFGu71UkfnqhYXDYH566qy47CnJDgvs8w", addr1, addr2, "300") + requirePut(t, s, "QmWATWQ7fVPP2EFGu71UkfnqhYXDYH566qy47CnJDgvs8x", addr2, addr1, "400") + res, err := s.All() + require.NoError(t, err) + require.Len(t, res, 4) + requireSorted(t, res) +} + +func TestFrom(t *testing.T) { + t.Parallel() + s := create(t) + addr1 := createAddress() + addr2 := createAddress() + requirePut(t, s, "QmWATWQ7fVPP2EFGu71UkfnqhYXDYH566qy47CnJDgvs8u", addr1, addr2, "100") + requirePut(t, s, "QmWATWQ7fVPP2EFGu71UkfnqhYXDYH566qy47CnJDgvs8v", addr1, addr2, "200") + requirePut(t, s, "QmWATWQ7fVPP2EFGu71UkfnqhYXDYH566qy47CnJDgvs8w", addr1, addr2, "300") + requirePut(t, s, "QmWATWQ7fVPP2EFGu71UkfnqhYXDYH566qy47CnJDgvs8x", addr2, addr1, "400") + res, err := s.From(addr1) + require.NoError(t, err) + require.Len(t, res, 3) + requireSorted(t, res) +} + +func TestTo(t *testing.T) { + t.Parallel() + s := create(t) + addr1 := createAddress() + addr2 := createAddress() + requirePut(t, s, "QmWATWQ7fVPP2EFGu71UkfnqhYXDYH566qy47CnJDgvs8u", addr1, addr2, "100") + requirePut(t, s, "QmWATWQ7fVPP2EFGu71UkfnqhYXDYH566qy47CnJDgvs8v", addr1, addr2, "200") + requirePut(t, s, "QmWATWQ7fVPP2EFGu71UkfnqhYXDYH566qy47CnJDgvs8w", addr1, addr2, "300") + requirePut(t, s, "QmWATWQ7fVPP2EFGu71UkfnqhYXDYH566qy47CnJDgvs8x", addr2, addr1, "400") + res, err := s.To(addr2) + require.NoError(t, err) + require.Len(t, res, 3) + requireSorted(t, res) +} + +func TestFromTo(t *testing.T) { + t.Parallel() + s := create(t) + addr1 := createAddress() + addr2 := createAddress() + addr3 := createAddress() + requirePut(t, s, "QmWATWQ7fVPP2EFGu71UkfnqhYXDYH566qy47CnJDgvs8u", addr1, addr2, "100") + requirePut(t, s, "QmWATWQ7fVPP2EFGu71UkfnqhYXDYH566qy47CnJDgvs8v", addr1, addr2, "200") + requirePut(t, s, "QmWATWQ7fVPP2EFGu71UkfnqhYXDYH566qy47CnJDgvs8w", addr1, addr2, "300") + requirePut(t, s, "QmWATWQ7fVPP2EFGu71UkfnqhYXDYH566qy47CnJDgvs8x", addr2, addr1, "400") + requirePut(t, s, "QmWATWQ7fVPP2EFGu71UkfnqhYXDYH566qy47CnJDgvs8y", addr3, addr1, "500") + res, err := s.FromTo(addr1, addr2) + require.NoError(t, err) + require.Len(t, res, 3) + requireSorted(t, res) + res, err = s.FromTo(addr2, addr1) + require.NoError(t, err) + require.Len(t, res, 1) + requireSorted(t, res) +} + +func TestBetween(t *testing.T) { + t.Parallel() + s := create(t) + addr1 := createAddress() + addr2 := createAddress() + addr3 := createAddress() + requirePut(t, s, "QmWATWQ7fVPP2EFGu71UkfnqhYXDYH566qy47CnJDgvs8u", addr1, addr2, "100") + requirePut(t, s, "QmWATWQ7fVPP2EFGu71UkfnqhYXDYH566qy47CnJDgvs8v", addr1, addr2, "200") + requirePut(t, s, "QmWATWQ7fVPP2EFGu71UkfnqhYXDYH566qy47CnJDgvs8w", addr1, addr2, "300") + requirePut(t, s, "QmWATWQ7fVPP2EFGu71UkfnqhYXDYH566qy47CnJDgvs8x", addr2, addr1, "400") + requirePut(t, s, "QmWATWQ7fVPP2EFGu71UkfnqhYXDYH566qy47CnJDgvs8y", addr3, addr1, "500") + res, err := s.Between(addr1, addr2) + require.NoError(t, err) + require.Len(t, res, 4) + requireSorted(t, res) +} + +func requireCid(t *testing.T, cid string) cid.Cid { + c, err := util.CidFromString(cid) + require.NoError(t, err) + return c +} + +func requireBigInt(t *testing.T, s string) *big.Int { + res, ok := new(big.Int).SetString(s, 10) + require.True(t, ok) + return res +} + +func requirePut(t *testing.T, s *SendStore, cid string, from, to address.Address, amt string) *wallet.SendFilEvent { + c := requireCid(t, cid) + a := requireBigInt(t, amt) + txn, err := s.Put(c, from, to, a) + require.NoError(t, err) + require.Equal(t, c, txn.Cid) + require.Equal(t, a, txn.Amount) + require.Equal(t, from, txn.From) + require.Equal(t, to, txn.To) + require.True(t, txn.Time.Before(time.Now())) + return txn +} + +func requireSorted(t *testing.T, events []*wallet.SendFilEvent) { + var last *wallet.SendFilEvent + for _, event := range events { + if last != nil { + require.True(t, last.Time.Before(event.Time), "%v is not before %v", last.Time, event.Time) + } + last = event + } +} + +func create(t *testing.T) *SendStore { + ds := tests.NewTxMapDatastore() + store := New(ds) + require.NotNil(t, store) + return store +} diff --git a/wallet/type.go b/wallet/type.go index 2c96fba62..99bdcc1d0 100644 --- a/wallet/type.go +++ b/wallet/type.go @@ -3,6 +3,10 @@ package wallet import ( "context" "math/big" + "time" + + "github.com/filecoin-project/go-address" + "github.com/ipfs/go-cid" ) // Module provides wallet management access to a Filecoin client. @@ -13,3 +17,12 @@ type Module interface { SendFil(ctx context.Context, from string, to string, amount *big.Int) error FundFromFaucet(ctx context.Context, addr string) error } + +// SendFilEvent is a record created when calling SendFil. +type SendFilEvent struct { + Cid cid.Cid + From address.Address + To address.Address + Amount *big.Int + Time time.Time +}