Skip to content

Commit

Permalink
Add first batch of pull request rework
Browse files Browse the repository at this point in the history
- update Application section in README
- remove param name in app.go
- add error checks in processor/block.go
- move vars from model to transact logic
- move newAsset to transact
- use ID for well-known initialisms
- move randomelement, randomnint and differentelement to transact
- remove AssertDefined
- blockTxIdsJoinedByComma: use standard library to join elements
- return nil, instead of []byte{}
- remove go routine in listen.go
- move cache to parser
- inline processor in listen.go

Signed-off-by: Stanislav Jakuschevskij <[email protected]>
  • Loading branch information
twoGiants committed Jan 20, 2025
1 parent 69c521f commit 32ed4f6
Show file tree
Hide file tree
Showing 22 changed files with 362 additions and 488 deletions.
3 changes: 3 additions & 0 deletions off_chain_data/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@ The client application provides several "commands" that can be invoked using the
- **getAllAssets**: Retrieve the current details of all assets recorded on the ledger. See:
- TypeScript: [application-typescript/src/getAllAssets.ts](application-typescript/src/getAllAssets.ts)
- Java: [application-java/app/src/main/java/GetAllAssets.java](application-java/app/src/main/java/GetAllAssets.java)
- Go: [application-go/getAllAssets.go](application-go/getAllAssets.go)
- **listen**: Listen for block events, and use them to replicate ledger updates in an off-chain data store. See:
- TypeScript: [application-typescript/src/listen.ts](application-typescript/src/listen.ts)
- Java: [application-java/app/src/main/java/Listen.java](application-java/app/src/main/java/Listen.java)
- Go: [application-go/listen.go](application-go/listen.go)
- **transact**: Submit a set of transactions to create, modify and delete assets. See:
- TypeScript: [application-typescript/src/transact.ts](application-typescript/src/transact.ts)
- Java: [application-java/app/src/main/java/Transact.java](application-java/app/src/main/java/Transact.java)
- Go: [application-go/transact.go](application-go/transact.go)

To keep the sample code concise, the **listen** command writes ledger updates to an output file named `store.log` in the current working directory (which for the Java sample is the `application-java/app` directory). A real implementation could write ledger updates directly to an off-chain data store of choice. You can inspect the information captured in this file as you run the sample.

Expand Down
2 changes: 1 addition & 1 deletion off_chain_data/application-go/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"google.golang.org/grpc"
)

var allCommands = map[string]func(clientConnection *grpc.ClientConn){
var allCommands = map[string]func(*grpc.ClientConn){
"getAllAssets": getAllAssets,
"transact": transact,
"listen": listen,
Expand Down
2 changes: 1 addition & 1 deletion off_chain_data/application-go/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ package main
import (
"crypto/x509"
"fmt"
"offChainData/utils"
"offchaindata/utils"
"os"
"path"
"time"
Expand Down
2 changes: 1 addition & 1 deletion off_chain_data/application-go/contract/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (atb *AssetTransferBasic) DeleteAsset(id string) error {
func (atb *AssetTransferBasic) GetAllAssets() ([]byte, error) {
result, err := atb.contract.Evaluate("GetAllAssets")
if err != nil {
return []byte{}, fmt.Errorf("in GetAllAssets: %w", err)
return nil, fmt.Errorf("in GetAllAssets: %w", err)
}
return result, nil
}
31 changes: 0 additions & 31 deletions off_chain_data/application-go/contract/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,41 +5,10 @@
*/
package contract

import (
"offChainData/utils"

"github.com/google/uuid"
)

var (
colors = []string{"red", "green", "blue"}
Owners = []string{"alice", "bob", "charlie"}
)

const (
maxInitialValue = 1000
maxInitialSize = 10
)

type Asset struct {
ID string `json:"ID"`
Color string `json:"Color"`
Size uint64 `json:"Size"`
Owner string `json:"Owner"`
AppraisedValue uint64 `json:"AppraisedValue"`
}

func NewAsset() Asset {
id, err := uuid.NewRandom()
if err != nil {
panic(err)
}

return Asset{
ID: id.String(),
Color: utils.RandomElement(colors),
Size: uint64(utils.RandomInt(maxInitialSize) + 1),
Owner: utils.RandomElement(Owners),
AppraisedValue: uint64(utils.RandomInt(maxInitialValue) + 1),
}
}
2 changes: 1 addition & 1 deletion off_chain_data/application-go/getAllAssets.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"bytes"
"encoding/json"
"fmt"
atb "offChainData/contract"
atb "offchaindata/contract"

"github.com/hyperledger/fabric-gateway/pkg/client"
"google.golang.org/grpc"
Expand Down
2 changes: 1 addition & 1 deletion off_chain_data/application-go/go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module offChainData
module offchaindata

go 1.22.0

Expand Down
279 changes: 249 additions & 30 deletions off_chain_data/application-go/listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ package main
import (
"context"
"fmt"
"offChainData/parser"
"offChainData/processor"
"offChainData/store"
"offChainData/utils"
"offchaindata/parser"
"offchaindata/store"
"offchaindata/utils"
"os"
"os/signal"
"sync"
"slices"
"strings"
"syscall"

"github.com/hyperledger/fabric-gateway/pkg/client"
Expand Down Expand Up @@ -62,32 +62,251 @@ func listen(clientConnection *grpc.ClientConn) {
panic(err)
}

var wg sync.WaitGroup
wg.Add(1)

go func() {
defer wg.Done()

for blockProto := range blocks {
select {
case <-ctx.Done():
return
default:
blockProcessor := processor.NewBlock(
parser.ParseBlock(blockProto),
checkpointer,
store.ApplyWritesToOffChainStore,
channelName,
)

if err := blockProcessor.Process(); err != nil {
fmt.Println("\033[31m[ERROR]\033[0m", err)
return
}
}
for blockProto := range blocks {
aBlockProcessor := blockProcessor{
parser.ParseBlock(blockProto),
checkpointer,
store.ApplyWritesToOffChainStore,
channelName,
}
}()

wg.Wait()
if err := aBlockProcessor.process(); err != nil {
fmt.Println("\033[31m[ERROR]\033[0m", err)
return
}
}

fmt.Println("\nShutting down listener gracefully...")
}

type blockProcessor struct {
parsedBlock *parser.Block
checkpointer *client.FileCheckpointer
writeToStore store.Writer
channelName string
}

func (b *blockProcessor) process() error {
funcName := "Process"

fmt.Println("\nReceived block", b.parsedBlock.Number())

validTransactions, err := b.validTransactions()
if err != nil {
return fmt.Errorf("in %s: %w", funcName, err)
}

for _, validTransaction := range validTransactions {
aTransaction := transactionProcessor{
b.parsedBlock.Number(),
validTransaction,
// TODO use pointer to parent and get blockNumber, store and channelName from parent
b.writeToStore,
b.channelName,
}
if err := aTransaction.process(); err != nil {
return fmt.Errorf("in %s: %w", funcName, err)
}

channelHeader, err := validTransaction.ChannelHeader()
if err != nil {
return fmt.Errorf("in %s: %w", funcName, err)
}
transactionID := channelHeader.GetTxId()
if err := b.checkpointer.CheckpointTransaction(b.parsedBlock.Number(), transactionID); err != nil {
return fmt.Errorf("in %s: %w", funcName, err)
}
}

if err := b.checkpointer.CheckpointBlock(b.parsedBlock.Number()); err != nil {
return fmt.Errorf("in %s: %w", funcName, err)
}

return nil
}

func (b *blockProcessor) validTransactions() ([]*parser.Transaction, error) {
result := []*parser.Transaction{}
newTransactions, err := b.getNewTransactions()
if err != nil {
return nil, fmt.Errorf("in validTransactions: %w", err)
}

for _, transaction := range newTransactions {
if transaction.IsValid() {
result = append(result, transaction)
}
}
return result, nil
}

func (b *blockProcessor) getNewTransactions() ([]*parser.Transaction, error) {
funcName := "getNewTransactions"

transactions, err := b.parsedBlock.Transactions()
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}

lastTransactionID := b.checkpointer.TransactionID()
if lastTransactionID == "" {
// No previously processed transactions within this block so all are new
return transactions, nil
}

// Ignore transactions up to the last processed transaction ID
lastProcessedIndex, err := b.findLastProcessedIndex()
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
return transactions[lastProcessedIndex+1:], nil
}

func (b *blockProcessor) findLastProcessedIndex() (int, error) {
funcName := "findLastProcessedIndex"

transactions, err := b.parsedBlock.Transactions()
if err != nil {
return 0, fmt.Errorf("in %s: %w", funcName, err)
}

blockTransactionIDs := []string{}
for _, transaction := range transactions {
channelHeader, err := transaction.ChannelHeader()
if err != nil {
return 0, fmt.Errorf("in %s: %w", funcName, err)
}
blockTransactionIDs = append(blockTransactionIDs, channelHeader.GetTxId())
}

lastTransactionID := b.checkpointer.TransactionID()
lastProcessedIndex := -1
for index, id := range blockTransactionIDs {
if id == lastTransactionID {
lastProcessedIndex = index
}
}

if lastProcessedIndex < 0 {
return lastProcessedIndex, newTxIdNotFoundError(
lastTransactionID,
b.parsedBlock.Number(),
blockTransactionIDs,
)
}

return lastProcessedIndex, nil
}

type transactionProcessor struct {
blockNumber uint64
transaction *parser.Transaction
writeToStore store.Writer
channelName string
}

func (t *transactionProcessor) process() error {
funcName := "process"

channelHeader, err := t.transaction.ChannelHeader()
if err != nil {
return fmt.Errorf("in %s: %w", funcName, err)
}
transactionID := channelHeader.GetTxId()

writes, err := t.writes()
if err != nil {
return fmt.Errorf("in %s: %w", funcName, err)
}

if len(writes) == 0 {
fmt.Println("Skipping read-only or system transaction", transactionID)
return nil
}

fmt.Println("Process transaction", transactionID)

if err := t.writeToStore(store.LedgerUpdate{
BlockNumber: t.blockNumber,
TransactionID: transactionID,
Writes: writes,
}); err != nil {
return fmt.Errorf("in %s: %w", funcName, err)
}

return nil
}

func (t *transactionProcessor) writes() ([]store.Write, error) {
funcName := "writes"
// TODO this entire code should live in the parser and just return the kvWrite which
// we then map to store.Write and return
channelHeader, err := t.transaction.ChannelHeader()
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
t.channelName = channelHeader.GetChannelId()

nsReadWriteSets, err := t.transaction.NamespaceReadWriteSets()
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}

nonSystemCCReadWriteSets := []*parser.NamespaceReadWriteSet{}
for _, nsReadWriteSet := range nsReadWriteSets {
if !t.isSystemChaincode(nsReadWriteSet.Namespace()) {
nonSystemCCReadWriteSets = append(nonSystemCCReadWriteSets, nsReadWriteSet)
}
}

writes := []store.Write{}
for _, readWriteSet := range nonSystemCCReadWriteSets {
namespace := readWriteSet.Namespace()

kvReadWriteSet, err := readWriteSet.ReadWriteSet()
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}

for _, kvWrite := range kvReadWriteSet.GetWrites() {
writes = append(writes, store.Write{
ChannelName: t.channelName,
Namespace: namespace,
Key: kvWrite.GetKey(),
IsDelete: kvWrite.GetIsDelete(),
Value: string(kvWrite.GetValue()), // Convert bytes to text, purely for readability in output
})
}
}

return writes, nil
}

func (t *transactionProcessor) isSystemChaincode(chaincodeName string) bool {
systemChaincodeNames := []string{
"_lifecycle",
"cscc",
"escc",
"lscc",
"qscc",
"vscc",
}
return slices.Contains(systemChaincodeNames, chaincodeName)
}

type txIdNotFoundError struct {
txId string
blockNumber uint64
blockTxIds []string
}

func newTxIdNotFoundError(txId string, blockNumber uint64, blockTxIds []string) *txIdNotFoundError {
return &txIdNotFoundError{
txId, blockNumber, blockTxIds,
}
}

func (t *txIdNotFoundError) Error() string {
format := "checkpoint transaction ID %s not found in block %d containing transactions: %s"
return fmt.Sprintf(format, t.txId, t.blockNumber, strings.Join(t.blockTxIds, ", "))
}
Loading

0 comments on commit 32ed4f6

Please sign in to comment.