From 32ed4f6af5b890cc7aa23b199b871959ca0b8024 Mon Sep 17 00:00:00 2001 From: Stanislav Jakuschevskij Date: Mon, 20 Jan 2025 15:00:05 +0100 Subject: [PATCH] Add first batch of pull request rework - update Application section in README - remove param name in app.go - add error checks in processor/block.go - move vars from model to transact logic - move newAsset to transact - use ID for well-known initialisms - move randomelement, randomnint and differentelement to transact - remove AssertDefined - blockTxIdsJoinedByComma: use standard library to join elements - return nil, instead of []byte{} - remove go routine in listen.go - move cache to parser - inline processor in listen.go Signed-off-by: Stanislav Jakuschevskij --- off_chain_data/README.md | 3 + off_chain_data/application-go/app.go | 2 +- off_chain_data/application-go/connect.go | 2 +- .../application-go/contract/contract.go | 2 +- .../application-go/contract/model.go | 31 -- off_chain_data/application-go/getAllAssets.go | 2 +- off_chain_data/application-go/go.mod | 2 +- off_chain_data/application-go/listen.go | 279 ++++++++++++++++-- off_chain_data/application-go/parser/block.go | 44 +-- .../application-go/parser/block_test.go | 10 +- off_chain_data/application-go/parser/cache.go | 25 ++ .../utils_test.go => parser/cache_test.go} | 13 +- .../parser/endorserTransaction.go | 16 +- .../parser/namespaceReadWriteSet.go | 3 +- .../application-go/parser/payload.go | 10 +- .../application-go/processor/block.go | 147 --------- .../application-go/processor/transaction.go | 104 ------- .../processor/txIdNotFoundError.go | 32 -- .../application-go/store/flatFille.go | 2 +- off_chain_data/application-go/store/model.go | 2 +- off_chain_data/application-go/transact.go | 58 +++- off_chain_data/application-go/utils/utils.go | 61 ---- 22 files changed, 362 insertions(+), 488 deletions(-) create mode 100644 off_chain_data/application-go/parser/cache.go rename off_chain_data/application-go/{utils/utils_test.go => parser/cache_test.go} (87%) delete mode 100644 off_chain_data/application-go/processor/block.go delete mode 100644 off_chain_data/application-go/processor/transaction.go delete mode 100644 off_chain_data/application-go/processor/txIdNotFoundError.go diff --git a/off_chain_data/README.md b/off_chain_data/README.md index 22e1948f80..33bc646851 100644 --- a/off_chain_data/README.md +++ b/off_chain_data/README.md @@ -19,12 +19,15 @@ The client application provides several "commands" that can be invoked using the - **getAllAssets**: Retrieve the current details of all assets recorded on the ledger. See: - TypeScript: [application-typescript/src/getAllAssets.ts](application-typescript/src/getAllAssets.ts) - Java: [application-java/app/src/main/java/GetAllAssets.java](application-java/app/src/main/java/GetAllAssets.java) + - Go: [application-go/getAllAssets.go](application-go/getAllAssets.go) - **listen**: Listen for block events, and use them to replicate ledger updates in an off-chain data store. See: - TypeScript: [application-typescript/src/listen.ts](application-typescript/src/listen.ts) - Java: [application-java/app/src/main/java/Listen.java](application-java/app/src/main/java/Listen.java) + - Go: [application-go/listen.go](application-go/listen.go) - **transact**: Submit a set of transactions to create, modify and delete assets. See: - TypeScript: [application-typescript/src/transact.ts](application-typescript/src/transact.ts) - Java: [application-java/app/src/main/java/Transact.java](application-java/app/src/main/java/Transact.java) + - Go: [application-go/transact.go](application-go/transact.go) To keep the sample code concise, the **listen** command writes ledger updates to an output file named `store.log` in the current working directory (which for the Java sample is the `application-java/app` directory). A real implementation could write ledger updates directly to an off-chain data store of choice. You can inspect the information captured in this file as you run the sample. diff --git a/off_chain_data/application-go/app.go b/off_chain_data/application-go/app.go index 07d46dd8cc..2a8339f1fe 100644 --- a/off_chain_data/application-go/app.go +++ b/off_chain_data/application-go/app.go @@ -15,7 +15,7 @@ import ( "google.golang.org/grpc" ) -var allCommands = map[string]func(clientConnection *grpc.ClientConn){ +var allCommands = map[string]func(*grpc.ClientConn){ "getAllAssets": getAllAssets, "transact": transact, "listen": listen, diff --git a/off_chain_data/application-go/connect.go b/off_chain_data/application-go/connect.go index 728e9888dd..d8a0863cb0 100644 --- a/off_chain_data/application-go/connect.go +++ b/off_chain_data/application-go/connect.go @@ -9,7 +9,7 @@ package main import ( "crypto/x509" "fmt" - "offChainData/utils" + "offchaindata/utils" "os" "path" "time" diff --git a/off_chain_data/application-go/contract/contract.go b/off_chain_data/application-go/contract/contract.go index 59e27135c3..c63ae10dfb 100644 --- a/off_chain_data/application-go/contract/contract.go +++ b/off_chain_data/application-go/contract/contract.go @@ -65,7 +65,7 @@ func (atb *AssetTransferBasic) DeleteAsset(id string) error { func (atb *AssetTransferBasic) GetAllAssets() ([]byte, error) { result, err := atb.contract.Evaluate("GetAllAssets") if err != nil { - return []byte{}, fmt.Errorf("in GetAllAssets: %w", err) + return nil, fmt.Errorf("in GetAllAssets: %w", err) } return result, nil } diff --git a/off_chain_data/application-go/contract/model.go b/off_chain_data/application-go/contract/model.go index 70c4e688d2..e159475d82 100644 --- a/off_chain_data/application-go/contract/model.go +++ b/off_chain_data/application-go/contract/model.go @@ -5,22 +5,6 @@ */ package contract -import ( - "offChainData/utils" - - "github.com/google/uuid" -) - -var ( - colors = []string{"red", "green", "blue"} - Owners = []string{"alice", "bob", "charlie"} -) - -const ( - maxInitialValue = 1000 - maxInitialSize = 10 -) - type Asset struct { ID string `json:"ID"` Color string `json:"Color"` @@ -28,18 +12,3 @@ type Asset struct { Owner string `json:"Owner"` AppraisedValue uint64 `json:"AppraisedValue"` } - -func NewAsset() Asset { - id, err := uuid.NewRandom() - if err != nil { - panic(err) - } - - return Asset{ - ID: id.String(), - Color: utils.RandomElement(colors), - Size: uint64(utils.RandomInt(maxInitialSize) + 1), - Owner: utils.RandomElement(Owners), - AppraisedValue: uint64(utils.RandomInt(maxInitialValue) + 1), - } -} diff --git a/off_chain_data/application-go/getAllAssets.go b/off_chain_data/application-go/getAllAssets.go index c2ccf2c70f..10e2d5455c 100644 --- a/off_chain_data/application-go/getAllAssets.go +++ b/off_chain_data/application-go/getAllAssets.go @@ -10,7 +10,7 @@ import ( "bytes" "encoding/json" "fmt" - atb "offChainData/contract" + atb "offchaindata/contract" "github.com/hyperledger/fabric-gateway/pkg/client" "google.golang.org/grpc" diff --git a/off_chain_data/application-go/go.mod b/off_chain_data/application-go/go.mod index 83adb07dc7..69ed2296bf 100644 --- a/off_chain_data/application-go/go.mod +++ b/off_chain_data/application-go/go.mod @@ -1,4 +1,4 @@ -module offChainData +module offchaindata go 1.22.0 diff --git a/off_chain_data/application-go/listen.go b/off_chain_data/application-go/listen.go index af3116261d..61eda5eb1d 100644 --- a/off_chain_data/application-go/listen.go +++ b/off_chain_data/application-go/listen.go @@ -3,13 +3,13 @@ package main import ( "context" "fmt" - "offChainData/parser" - "offChainData/processor" - "offChainData/store" - "offChainData/utils" + "offchaindata/parser" + "offchaindata/store" + "offchaindata/utils" "os" "os/signal" - "sync" + "slices" + "strings" "syscall" "github.com/hyperledger/fabric-gateway/pkg/client" @@ -62,32 +62,251 @@ func listen(clientConnection *grpc.ClientConn) { panic(err) } - var wg sync.WaitGroup - wg.Add(1) - - go func() { - defer wg.Done() - - for blockProto := range blocks { - select { - case <-ctx.Done(): - return - default: - blockProcessor := processor.NewBlock( - parser.ParseBlock(blockProto), - checkpointer, - store.ApplyWritesToOffChainStore, - channelName, - ) - - if err := blockProcessor.Process(); err != nil { - fmt.Println("\033[31m[ERROR]\033[0m", err) - return - } - } + for blockProto := range blocks { + aBlockProcessor := blockProcessor{ + parser.ParseBlock(blockProto), + checkpointer, + store.ApplyWritesToOffChainStore, + channelName, } - }() - wg.Wait() + if err := aBlockProcessor.process(); err != nil { + fmt.Println("\033[31m[ERROR]\033[0m", err) + return + } + } + fmt.Println("\nShutting down listener gracefully...") } + +type blockProcessor struct { + parsedBlock *parser.Block + checkpointer *client.FileCheckpointer + writeToStore store.Writer + channelName string +} + +func (b *blockProcessor) process() error { + funcName := "Process" + + fmt.Println("\nReceived block", b.parsedBlock.Number()) + + validTransactions, err := b.validTransactions() + if err != nil { + return fmt.Errorf("in %s: %w", funcName, err) + } + + for _, validTransaction := range validTransactions { + aTransaction := transactionProcessor{ + b.parsedBlock.Number(), + validTransaction, + // TODO use pointer to parent and get blockNumber, store and channelName from parent + b.writeToStore, + b.channelName, + } + if err := aTransaction.process(); err != nil { + return fmt.Errorf("in %s: %w", funcName, err) + } + + channelHeader, err := validTransaction.ChannelHeader() + if err != nil { + return fmt.Errorf("in %s: %w", funcName, err) + } + transactionID := channelHeader.GetTxId() + if err := b.checkpointer.CheckpointTransaction(b.parsedBlock.Number(), transactionID); err != nil { + return fmt.Errorf("in %s: %w", funcName, err) + } + } + + if err := b.checkpointer.CheckpointBlock(b.parsedBlock.Number()); err != nil { + return fmt.Errorf("in %s: %w", funcName, err) + } + + return nil +} + +func (b *blockProcessor) validTransactions() ([]*parser.Transaction, error) { + result := []*parser.Transaction{} + newTransactions, err := b.getNewTransactions() + if err != nil { + return nil, fmt.Errorf("in validTransactions: %w", err) + } + + for _, transaction := range newTransactions { + if transaction.IsValid() { + result = append(result, transaction) + } + } + return result, nil +} + +func (b *blockProcessor) getNewTransactions() ([]*parser.Transaction, error) { + funcName := "getNewTransactions" + + transactions, err := b.parsedBlock.Transactions() + if err != nil { + return nil, fmt.Errorf("in %s: %w", funcName, err) + } + + lastTransactionID := b.checkpointer.TransactionID() + if lastTransactionID == "" { + // No previously processed transactions within this block so all are new + return transactions, nil + } + + // Ignore transactions up to the last processed transaction ID + lastProcessedIndex, err := b.findLastProcessedIndex() + if err != nil { + return nil, fmt.Errorf("in %s: %w", funcName, err) + } + return transactions[lastProcessedIndex+1:], nil +} + +func (b *blockProcessor) findLastProcessedIndex() (int, error) { + funcName := "findLastProcessedIndex" + + transactions, err := b.parsedBlock.Transactions() + if err != nil { + return 0, fmt.Errorf("in %s: %w", funcName, err) + } + + blockTransactionIDs := []string{} + for _, transaction := range transactions { + channelHeader, err := transaction.ChannelHeader() + if err != nil { + return 0, fmt.Errorf("in %s: %w", funcName, err) + } + blockTransactionIDs = append(blockTransactionIDs, channelHeader.GetTxId()) + } + + lastTransactionID := b.checkpointer.TransactionID() + lastProcessedIndex := -1 + for index, id := range blockTransactionIDs { + if id == lastTransactionID { + lastProcessedIndex = index + } + } + + if lastProcessedIndex < 0 { + return lastProcessedIndex, newTxIdNotFoundError( + lastTransactionID, + b.parsedBlock.Number(), + blockTransactionIDs, + ) + } + + return lastProcessedIndex, nil +} + +type transactionProcessor struct { + blockNumber uint64 + transaction *parser.Transaction + writeToStore store.Writer + channelName string +} + +func (t *transactionProcessor) process() error { + funcName := "process" + + channelHeader, err := t.transaction.ChannelHeader() + if err != nil { + return fmt.Errorf("in %s: %w", funcName, err) + } + transactionID := channelHeader.GetTxId() + + writes, err := t.writes() + if err != nil { + return fmt.Errorf("in %s: %w", funcName, err) + } + + if len(writes) == 0 { + fmt.Println("Skipping read-only or system transaction", transactionID) + return nil + } + + fmt.Println("Process transaction", transactionID) + + if err := t.writeToStore(store.LedgerUpdate{ + BlockNumber: t.blockNumber, + TransactionID: transactionID, + Writes: writes, + }); err != nil { + return fmt.Errorf("in %s: %w", funcName, err) + } + + return nil +} + +func (t *transactionProcessor) writes() ([]store.Write, error) { + funcName := "writes" + // TODO this entire code should live in the parser and just return the kvWrite which + // we then map to store.Write and return + channelHeader, err := t.transaction.ChannelHeader() + if err != nil { + return nil, fmt.Errorf("in %s: %w", funcName, err) + } + t.channelName = channelHeader.GetChannelId() + + nsReadWriteSets, err := t.transaction.NamespaceReadWriteSets() + if err != nil { + return nil, fmt.Errorf("in %s: %w", funcName, err) + } + + nonSystemCCReadWriteSets := []*parser.NamespaceReadWriteSet{} + for _, nsReadWriteSet := range nsReadWriteSets { + if !t.isSystemChaincode(nsReadWriteSet.Namespace()) { + nonSystemCCReadWriteSets = append(nonSystemCCReadWriteSets, nsReadWriteSet) + } + } + + writes := []store.Write{} + for _, readWriteSet := range nonSystemCCReadWriteSets { + namespace := readWriteSet.Namespace() + + kvReadWriteSet, err := readWriteSet.ReadWriteSet() + if err != nil { + return nil, fmt.Errorf("in %s: %w", funcName, err) + } + + for _, kvWrite := range kvReadWriteSet.GetWrites() { + writes = append(writes, store.Write{ + ChannelName: t.channelName, + Namespace: namespace, + Key: kvWrite.GetKey(), + IsDelete: kvWrite.GetIsDelete(), + Value: string(kvWrite.GetValue()), // Convert bytes to text, purely for readability in output + }) + } + } + + return writes, nil +} + +func (t *transactionProcessor) isSystemChaincode(chaincodeName string) bool { + systemChaincodeNames := []string{ + "_lifecycle", + "cscc", + "escc", + "lscc", + "qscc", + "vscc", + } + return slices.Contains(systemChaincodeNames, chaincodeName) +} + +type txIdNotFoundError struct { + txId string + blockNumber uint64 + blockTxIds []string +} + +func newTxIdNotFoundError(txId string, blockNumber uint64, blockTxIds []string) *txIdNotFoundError { + return &txIdNotFoundError{ + txId, blockNumber, blockTxIds, + } +} + +func (t *txIdNotFoundError) Error() string { + format := "checkpoint transaction ID %s not found in block %d containing transactions: %s" + return fmt.Sprintf(format, t.txId, t.blockNumber, strings.Join(t.blockTxIds, ", ")) +} diff --git a/off_chain_data/application-go/parser/block.go b/off_chain_data/application-go/parser/block.go index ec105eea0e..2105a3fa36 100644 --- a/off_chain_data/application-go/parser/block.go +++ b/off_chain_data/application-go/parser/block.go @@ -2,31 +2,25 @@ package parser import ( "fmt" - "offChainData/utils" "github.com/hyperledger/fabric-protos-go-apiv2/common" "google.golang.org/protobuf/proto" ) type Block struct { - block *common.Block - transactions []*Transaction + block *common.Block } func ParseBlock(block *common.Block) *Block { - return &Block{block, []*Transaction{}} + return &Block{block} } -func (b *Block) Number() (uint64, error) { - header, err := utils.AssertDefined(b.block.GetHeader(), "missing block header") - if err != nil { - return 0, fmt.Errorf("in Number: %w", err) - } - return header.GetNumber(), nil +func (b *Block) Number() uint64 { + return b.block.GetHeader().GetNumber() } func (b *Block) Transactions() ([]*Transaction, error) { - return utils.Cache(func() ([]*Transaction, error) { + return cache(func() ([]*Transaction, error) { funcName := "Transactions" envelopes, err := b.unmarshalEnvelopesFromBlockData() if err != nil { @@ -74,20 +68,11 @@ func (*Block) unmarshalPayloadsFrom(envelopes []*common.Envelope) ([]*common.Pay func (b *Block) parse(commonPayloads []*common.Payload) ([]*payload, error) { funcName := "parse" - validationCodes, err := b.extractTransactionValidationCodes() - if err != nil { - return nil, fmt.Errorf("in %s: %w", funcName, err) - } + validationCodes := b.block.GetMetadata().GetMetadata()[common.BlockMetadataIndex_TRANSACTIONS_FILTER] result := []*payload{} for i, commonPayload := range commonPayloads { - statusCode, err := utils.AssertDefined( - validationCodes[i], - fmt.Sprint("missing validation code index", i), - ) - if err != nil { - return nil, fmt.Errorf("in %s: %w", funcName, err) - } + statusCode := validationCodes[i] payload := parsePayload(commonPayload, int32(statusCode)) is, err := payload.isEndorserTransaction() @@ -102,21 +87,6 @@ func (b *Block) parse(commonPayloads []*common.Payload) ([]*payload, error) { return result, nil } -func (b *Block) extractTransactionValidationCodes() ([]byte, error) { - metadata, err := utils.AssertDefined( - b.block.GetMetadata(), - "missing block metadata", - ) - if err != nil { - return nil, fmt.Errorf("in extractTransactionValidationCodes: %w", err) - } - - return utils.AssertDefined( - metadata.GetMetadata()[common.BlockMetadataIndex_TRANSACTIONS_FILTER], - "missing transaction validation code", - ) -} - func (*Block) createTransactionsFrom(payloads []*payload) []*Transaction { result := []*Transaction{} for _, payload := range payloads { diff --git a/off_chain_data/application-go/parser/block_test.go b/off_chain_data/application-go/parser/block_test.go index b8546adf0a..3581b35ec5 100644 --- a/off_chain_data/application-go/parser/block_test.go +++ b/off_chain_data/application-go/parser/block_test.go @@ -4,7 +4,7 @@ import ( "encoding/json" "testing" - atb "offChainData/contract" + atb "offchaindata/contract" "github.com/hyperledger/fabric-protos-go-apiv2/ledger/rwset" "github.com/hyperledger/fabric-protos-go-apiv2/ledger/rwset/kvrwset" @@ -107,7 +107,13 @@ func Test_NamespaceReadWriteSetParsing(t *testing.T) { func nsReadWriteSetFake() (*rwset.NsReadWriteSet, string, atb.Asset) { expectedNamespace := "basic" - expectedAsset := atb.NewAsset() + expectedAsset := atb.Asset{ + ID: "id-1", + Color: "green", + Size: 8, + Owner: "Alice", + AppraisedValue: 346, + } result := &rwset.NsReadWriteSet{ Namespace: expectedNamespace, diff --git a/off_chain_data/application-go/parser/cache.go b/off_chain_data/application-go/parser/cache.go new file mode 100644 index 0000000000..5612117ee5 --- /dev/null +++ b/off_chain_data/application-go/parser/cache.go @@ -0,0 +1,25 @@ +package parser + +import ( + "fmt" +) + +// Wrap a function call with a cache. On first call the wrapped function is invoked to +// obtain a result. Subsequent calls return the cached result. +func cache[T any](f func() (T, error)) func() (T, error) { + var value T + var err error + var cached bool + + return func() (T, error) { + if !cached { + value, err = f() + if err != nil { + var zeroValue T + return zeroValue, fmt.Errorf("in cache: %w", err) + } + cached = true + } + return value, nil + } +} diff --git a/off_chain_data/application-go/utils/utils_test.go b/off_chain_data/application-go/parser/cache_test.go similarity index 87% rename from off_chain_data/application-go/utils/utils_test.go rename to off_chain_data/application-go/parser/cache_test.go index d806c6e40c..96db5639db 100644 --- a/off_chain_data/application-go/utils/utils_test.go +++ b/off_chain_data/application-go/parser/cache_test.go @@ -1,8 +1,7 @@ -package utils_test +package parser import ( "errors" - "offChainData/utils" "testing" ) @@ -13,7 +12,7 @@ func Test_cachePrimitiveFunctionResult(t *testing.T) { return 5, nil } - cachedFunc := utils.Cache(f) + cachedFunc := cache(f) result1, err := cachedFunc() if err != nil { t.Fatal("unexpected error:", err) @@ -38,13 +37,13 @@ func Test_whenCachedFunctionsErrors_returnError(t *testing.T) { return 0, errors.New(errorMsg) } - cachedFunc := utils.Cache(f) + cachedFunc := cache(f) _, err := cachedFunc() if err == nil { t.Fatal("expected error, but got", err) } - if err.Error() != errorMsg { + if err.Error() != "in cache: "+errorMsg { t.Fatal("expected error message to be 'error', but got", err) } } @@ -53,7 +52,7 @@ func Test_cacheWrappedPrimitiveFunctionResult(t *testing.T) { controlValue := 5 multiplyControlValueBy := func(n int) (int, error) { controlValue *= n; return controlValue, nil } - cachedFunc := utils.Cache(func() (int, error) { return multiplyControlValueBy(5) }) + cachedFunc := cache(func() (int, error) { return multiplyControlValueBy(5) }) result1, err := cachedFunc() if err != nil { t.Fatal("unexpected error:", err) @@ -80,7 +79,7 @@ func Test_cacheWrappedDataStructureResult(t *testing.T) { controlStruct := &GreetMe{helloTo: "Hello "} greet := func(name string) (*GreetMe, error) { controlStruct.helloTo += name; return controlStruct, nil } - cachedFunc := utils.Cache(func() (*GreetMe, error) { return greet("John Doe") }) + cachedFunc := cache(func() (*GreetMe, error) { return greet("John Doe") }) result1, err := cachedFunc() if err != nil { t.Fatal("unexpected error:", err) diff --git a/off_chain_data/application-go/parser/endorserTransaction.go b/off_chain_data/application-go/parser/endorserTransaction.go index 161775d695..138ef7d1ea 100644 --- a/off_chain_data/application-go/parser/endorserTransaction.go +++ b/off_chain_data/application-go/parser/endorserTransaction.go @@ -2,7 +2,6 @@ package parser import ( "fmt" - "offChainData/utils" "github.com/hyperledger/fabric-protos-go-apiv2/ledger/rwset" "github.com/hyperledger/fabric-protos-go-apiv2/peer" @@ -18,7 +17,7 @@ func parseEndorserTransaction(transaction *peer.Transaction) *endorserTransactio } func (p *endorserTransaction) readWriteSets() ([]*readWriteSet, error) { - return utils.Cache(func() ([]*readWriteSet, error) { + return cache(func() ([]*readWriteSet, error) { funcName := "readWriteSets" chaincodeActionPayloads, err := p.unmarshalChaincodeActionPayloads() if err != nil { @@ -65,18 +64,7 @@ func (p *endorserTransaction) unmarshalChaincodeActionPayloads() ([]*peer.Chainc func (*endorserTransaction) extractChaincodeEndorsedActionsFrom(chaincodeActionPayloads []*peer.ChaincodeActionPayload) ([]*peer.ChaincodeEndorsedAction, error) { result := []*peer.ChaincodeEndorsedAction{} for _, payload := range chaincodeActionPayloads { - chaincodeEndorsedAction, err := utils.AssertDefined( - payload.GetAction(), - "missing chaincode endorsed action", - ) - if err != nil { - return nil, fmt.Errorf("in extractChaincodeEndorsedActionsFrom: %w", err) - } - - result = append( - result, - chaincodeEndorsedAction, - ) + result = append(result, payload.GetAction()) } return result, nil } diff --git a/off_chain_data/application-go/parser/namespaceReadWriteSet.go b/off_chain_data/application-go/parser/namespaceReadWriteSet.go index 4fdd8a91a2..521fa061c0 100644 --- a/off_chain_data/application-go/parser/namespaceReadWriteSet.go +++ b/off_chain_data/application-go/parser/namespaceReadWriteSet.go @@ -2,7 +2,6 @@ package parser import ( "fmt" - "offChainData/utils" "github.com/hyperledger/fabric-protos-go-apiv2/ledger/rwset" "github.com/hyperledger/fabric-protos-go-apiv2/ledger/rwset/kvrwset" @@ -22,7 +21,7 @@ func (p *NamespaceReadWriteSet) Namespace() string { } func (p *NamespaceReadWriteSet) ReadWriteSet() (*kvrwset.KVRWSet, error) { - return utils.Cache(func() (*kvrwset.KVRWSet, error) { + return cache(func() (*kvrwset.KVRWSet, error) { result := kvrwset.KVRWSet{} if err := proto.Unmarshal(p.nsReadWriteSet.GetRwset(), &result); err != nil { return nil, fmt.Errorf("in ReadWriteSet: %w", err) diff --git a/off_chain_data/application-go/parser/payload.go b/off_chain_data/application-go/parser/payload.go index f8ba31056f..33619f7fe3 100644 --- a/off_chain_data/application-go/parser/payload.go +++ b/off_chain_data/application-go/parser/payload.go @@ -2,7 +2,6 @@ package parser import ( "fmt" - "offChainData/utils" "github.com/hyperledger/fabric-protos-go-apiv2/common" "github.com/hyperledger/fabric-protos-go-apiv2/peer" @@ -19,16 +18,11 @@ func parsePayload(commonPayload *common.Payload, statusCode int32) *payload { } func (p *payload) channelHeader() (*common.ChannelHeader, error) { - return utils.Cache(func() (*common.ChannelHeader, error) { + return cache(func() (*common.ChannelHeader, error) { funcName := "channelHeader" - header, err := utils.AssertDefined(p.commonPayload.GetHeader(), "missing payload header") - if err != nil { - return nil, fmt.Errorf("in %s: %w", funcName, err) - } - result := &common.ChannelHeader{} - if err := proto.Unmarshal(header.GetChannelHeader(), result); err != nil { + if err := proto.Unmarshal(p.commonPayload.GetHeader().GetChannelHeader(), result); err != nil { return nil, fmt.Errorf("in %s: %w", funcName, err) } diff --git a/off_chain_data/application-go/processor/block.go b/off_chain_data/application-go/processor/block.go deleted file mode 100644 index 530bf23f57..0000000000 --- a/off_chain_data/application-go/processor/block.go +++ /dev/null @@ -1,147 +0,0 @@ -package processor - -import ( - "fmt" - "offChainData/parser" - "offChainData/store" - - "github.com/hyperledger/fabric-gateway/pkg/client" -) - -type block struct { - parsedBlock *parser.Block - checkpointer *client.FileCheckpointer - writeToStore store.Writer - channelName string -} - -func NewBlock( - parsedBlock *parser.Block, - checkpointer *client.FileCheckpointer, - writeToStore store.Writer, - channelName string, -) *block { - return &block{ - parsedBlock, - checkpointer, - writeToStore, - channelName, - } -} - -func (b *block) Process() error { - funcName := "Process" - - blockNumber, err := b.parsedBlock.Number() - if err != nil { - return fmt.Errorf("in %s: %w", funcName, err) - } - - fmt.Println("\nReceived block", blockNumber) - - validTransactions, err := b.validTransactions() - if err != nil { - return fmt.Errorf("in %s: %w", funcName, err) - } - - for _, validTransaction := range validTransactions { - aTransaction := transaction{ - blockNumber, - validTransaction, - // TODO use pointer to parent and get blockNumber, store and channelName from parent - b.writeToStore, - b.channelName, - } - if err := aTransaction.process(); err != nil { - return fmt.Errorf("in %s: %w", funcName, err) - } - - channelHeader, err := validTransaction.ChannelHeader() - if err != nil { - return fmt.Errorf("in %s: %w", funcName, err) - } - transactionId := channelHeader.GetTxId() - b.checkpointer.CheckpointTransaction(blockNumber, transactionId) - } - - b.checkpointer.CheckpointBlock(blockNumber) - - return nil -} - -func (b *block) validTransactions() ([]*parser.Transaction, error) { - result := []*parser.Transaction{} - newTransactions, err := b.getNewTransactions() - if err != nil { - return nil, fmt.Errorf("in validTransactions: %w", err) - } - - for _, transaction := range newTransactions { - if transaction.IsValid() { - result = append(result, transaction) - } - } - return result, nil -} - -func (b *block) getNewTransactions() ([]*parser.Transaction, error) { - funcName := "getNewTransactions" - - transactions, err := b.parsedBlock.Transactions() - if err != nil { - return nil, fmt.Errorf("in %s: %w", funcName, err) - } - - lastTransactionId := b.checkpointer.TransactionID() - if lastTransactionId == "" { - // No previously processed transactions within this block so all are new - return transactions, nil - } - - // Ignore transactions up to the last processed transaction ID - lastProcessedIndex, err := b.findLastProcessedIndex() - if err != nil { - return nil, fmt.Errorf("in %s: %w", funcName, err) - } - return transactions[lastProcessedIndex+1:], nil -} - -func (b *block) findLastProcessedIndex() (int, error) { - funcName := "findLastProcessedIndex" - - transactions, err := b.parsedBlock.Transactions() - if err != nil { - return 0, fmt.Errorf("in %s: %w", funcName, err) - } - - blockTransactionIds := []string{} - for _, transaction := range transactions { - channelHeader, err := transaction.ChannelHeader() - if err != nil { - return 0, fmt.Errorf("in %s: %w", funcName, err) - } - blockTransactionIds = append(blockTransactionIds, channelHeader.GetTxId()) - } - - lastTransactionId := b.checkpointer.TransactionID() - lastProcessedIndex := -1 - for index, id := range blockTransactionIds { - if id == lastTransactionId { - lastProcessedIndex = index - } - } - - if lastProcessedIndex < 0 { - blockNumber, err := b.parsedBlock.Number() - if err != nil { - return 0, fmt.Errorf("in %s: %w", funcName, err) - } - return lastProcessedIndex, newTxIdNotFoundError( - lastTransactionId, - blockNumber, - blockTransactionIds, - ) - } - - return lastProcessedIndex, nil -} diff --git a/off_chain_data/application-go/processor/transaction.go b/off_chain_data/application-go/processor/transaction.go deleted file mode 100644 index ec9c960887..0000000000 --- a/off_chain_data/application-go/processor/transaction.go +++ /dev/null @@ -1,104 +0,0 @@ -package processor - -import ( - "fmt" - "offChainData/parser" - "offChainData/store" - "slices" -) - -type transaction struct { - blockNumber uint64 - transaction *parser.Transaction - writeToStore store.Writer - channelName string -} - -func (t *transaction) process() error { - funcName := "process" - - channelHeader, err := t.transaction.ChannelHeader() - if err != nil { - return fmt.Errorf("in %s: %w", funcName, err) - } - transactionId := channelHeader.GetTxId() - - writes, err := t.writes() - if err != nil { - return fmt.Errorf("in %s: %w", funcName, err) - } - - if len(writes) == 0 { - fmt.Println("Skipping read-only or system transaction", transactionId) - return nil - } - - fmt.Println("Process transaction", transactionId) - - if err := t.writeToStore(store.LedgerUpdate{ - BlockNumber: t.blockNumber, - TransactionId: transactionId, - Writes: writes, - }); err != nil { - return fmt.Errorf("in %s: %w", funcName, err) - } - - return nil -} - -func (t *transaction) writes() ([]store.Write, error) { - funcName := "writes" - // TODO this entire code should live in the parser and just return the kvWrite which - // we then map to store.Write and return - channelHeader, err := t.transaction.ChannelHeader() - if err != nil { - return nil, fmt.Errorf("in %s: %w", funcName, err) - } - t.channelName = channelHeader.GetChannelId() - - nsReadWriteSets, err := t.transaction.NamespaceReadWriteSets() - if err != nil { - return nil, fmt.Errorf("in %s: %w", funcName, err) - } - - nonSystemCCReadWriteSets := []*parser.NamespaceReadWriteSet{} - for _, nsReadWriteSet := range nsReadWriteSets { - if !t.isSystemChaincode(nsReadWriteSet.Namespace()) { - nonSystemCCReadWriteSets = append(nonSystemCCReadWriteSets, nsReadWriteSet) - } - } - - writes := []store.Write{} - for _, readWriteSet := range nonSystemCCReadWriteSets { - namespace := readWriteSet.Namespace() - - kvReadWriteSet, err := readWriteSet.ReadWriteSet() - if err != nil { - return nil, fmt.Errorf("in %s: %w", funcName, err) - } - - for _, kvWrite := range kvReadWriteSet.GetWrites() { - writes = append(writes, store.Write{ - ChannelName: t.channelName, - Namespace: namespace, - Key: kvWrite.GetKey(), - IsDelete: kvWrite.GetIsDelete(), - Value: string(kvWrite.GetValue()), // Convert bytes to text, purely for readability in output - }) - } - } - - return writes, nil -} - -func (t *transaction) isSystemChaincode(chaincodeName string) bool { - systemChaincodeNames := []string{ - "_lifecycle", - "cscc", - "escc", - "lscc", - "qscc", - "vscc", - } - return slices.Contains(systemChaincodeNames, chaincodeName) -} diff --git a/off_chain_data/application-go/processor/txIdNotFoundError.go b/off_chain_data/application-go/processor/txIdNotFoundError.go deleted file mode 100644 index 7a8f230a71..0000000000 --- a/off_chain_data/application-go/processor/txIdNotFoundError.go +++ /dev/null @@ -1,32 +0,0 @@ -package processor - -import "fmt" - -type txIdNotFoundError struct { - txId string - blockNumber uint64 - blockTxIds []string -} - -func newTxIdNotFoundError(txId string, blockNumber uint64, blockTxIds []string) *txIdNotFoundError { - return &txIdNotFoundError{ - txId, blockNumber, blockTxIds, - } -} - -func (t *txIdNotFoundError) Error() string { - format := "checkpoint transaction ID %s not found in block %d containing transactions: %s" - return fmt.Sprintf(format, t.txId, t.blockNumber, t.blockTxIdsJoinedByComma()) -} - -func (t *txIdNotFoundError) blockTxIdsJoinedByComma() string { - result := "" - for index, item := range t.blockTxIds { - if len(t.blockTxIds)-1 == index { - result += item - } else { - result += item + ", " - } - } - return result -} diff --git a/off_chain_data/application-go/store/flatFille.go b/off_chain_data/application-go/store/flatFille.go index 81794b6cce..291111f33d 100644 --- a/off_chain_data/application-go/store/flatFille.go +++ b/off_chain_data/application-go/store/flatFille.go @@ -5,7 +5,7 @@ import ( "errors" "fmt" "math" - "offChainData/utils" + "offchaindata/utils" "os" "strconv" "strings" diff --git a/off_chain_data/application-go/store/model.go b/off_chain_data/application-go/store/model.go index a68efbea8c..8c1b71e837 100644 --- a/off_chain_data/application-go/store/model.go +++ b/off_chain_data/application-go/store/model.go @@ -6,7 +6,7 @@ type Writer = func(data LedgerUpdate) error // Ledger update made by a specific transaction. type LedgerUpdate struct { BlockNumber uint64 - TransactionId string + TransactionID string Writes []Write } diff --git a/off_chain_data/application-go/transact.go b/off_chain_data/application-go/transact.go index 532aa09871..112ed51cbd 100644 --- a/off_chain_data/application-go/transact.go +++ b/off_chain_data/application-go/transact.go @@ -1,16 +1,20 @@ package main import ( + "crypto/rand" "fmt" + "math/big" "sync" - atb "offChainData/contract" - "offChainData/utils" + atb "offchaindata/contract" + "github.com/google/uuid" "github.com/hyperledger/fabric-gateway/pkg/client" "google.golang.org/grpc" ) +var owners = []string{"alice", "bob", "charlie"} + func transact(clientConnection *grpc.ClientConn) { id, options := newConnectOptions(clientConnection) gateway, err := client.Connect(id, options...) @@ -59,7 +63,7 @@ func (t *transactApp) run() { func (t *transactApp) transact() error { funcName := "transact" - anAsset := atb.NewAsset() + anAsset := newAsset() err := t.smartContract.CreateAsset(anAsset) if err != nil { @@ -68,8 +72,8 @@ func (t *transactApp) transact() error { fmt.Println("Created asset", anAsset.ID) // Transfer randomly 1 in 2 assets to a new owner. - if utils.RandomInt(2) == 0 { - newOwner := utils.DifferentElement(atb.Owners, anAsset.Owner) + if randomInt(2) == 0 { + newOwner := differentElement(owners, anAsset.Owner) oldOwner, err := t.smartContract.TransferAsset(anAsset.ID, newOwner) if err != nil { return fmt.Errorf("in %s: %w", funcName, err) @@ -78,7 +82,7 @@ func (t *transactApp) transact() error { } // Delete randomly 1 in 4 created assets. - if utils.RandomInt(4) == 0 { + if randomInt(4) == 0 { err := t.smartContract.DeleteAsset(anAsset.ID) if err != nil { return fmt.Errorf("in %s: %w", funcName, err) @@ -87,3 +91,45 @@ func (t *transactApp) transact() error { } return nil } + +func newAsset() atb.Asset { + id, err := uuid.NewRandom() + if err != nil { + panic(err) + } + + return atb.Asset{ + ID: id.String(), + Color: randomElement([]string{"red", "green", "blue"}), + Size: uint64(randomInt(10) + 1), + Owner: randomElement(owners), + AppraisedValue: uint64(randomInt(1000) + 1), + } +} + +// Pick a random element from an array. +func randomElement(values []string) string { + result := values[randomInt(len(values))] + return result +} + +// Generate a random integer in the range 0 to max - 1. +func randomInt(max int) int { + result, err := rand.Int(rand.Reader, big.NewInt(int64(max))) + if err != nil { + panic(err) + } + + return int(result.Int64()) +} + +// Pick a random element from an array, excluding the current value. +func differentElement(values []string, currentValue string) string { + candidateValues := []string{} + for _, v := range values { + if v != currentValue { + candidateValues = append(candidateValues, v) + } + } + return randomElement(candidateValues) +} diff --git a/off_chain_data/application-go/utils/utils.go b/off_chain_data/application-go/utils/utils.go index c3714931ec..6743e68b95 100644 --- a/off_chain_data/application-go/utils/utils.go +++ b/off_chain_data/application-go/utils/utils.go @@ -1,70 +1,9 @@ package utils import ( - "crypto/rand" - "errors" - "fmt" - "math/big" "os" ) -// Pick a random element from an array. -func RandomElement(values []string) string { - result := values[RandomInt(len(values))] - return result -} - -// Generate a random integer in the range 0 to max - 1. -func RandomInt(max int) int { - result, err := rand.Int(rand.Reader, big.NewInt(int64(max))) - if err != nil { - panic(err) - } - - return int(result.Int64()) -} - -// Pick a random element from an array, excluding the current value. -func DifferentElement(values []string, currentValue string) string { - candidateValues := []string{} - for _, v := range values { - if v != currentValue { - candidateValues = append(candidateValues, v) - } - } - return RandomElement(candidateValues) -} - -// Return the value if it is defined; otherwise panics with an error message. -func AssertDefined[T any](value T, message string) (T, error) { - if any(value) == any(nil) { - var zeroValue T - return zeroValue, errors.New(message) - } - - return value, nil -} - -// Wrap a function call with a cache. On first call the wrapped function is invoked to -// obtain a result. Subsequent calls return the cached result. -func Cache[T any](f func() (T, error)) func() (T, error) { - var value T - var err error - var cached bool - - return func() (T, error) { - if !cached { - value, err = f() - if err != nil { - var zeroValue T - return zeroValue, fmt.Errorf("in Cache: %w", err) - } - cached = true - } - return value, nil - } -} - func EnvOrDefault(key, defaultValue string) string { result := os.Getenv(key) if result == "" {