Skip to content

Commit

Permalink
feat: google cloud storage (#4)
Browse files Browse the repository at this point in the history
  • Loading branch information
hgiasac authored Jan 28, 2025
1 parent c141415 commit adb4d67
Show file tree
Hide file tree
Showing 172 changed files with 12,217 additions and 7,565 deletions.
7 changes: 6 additions & 1 deletion .env
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,9 @@ APP_STORAGE_WRITE_URL="http://local.hasura.dev:8080"
AZURE_STORAGE_ENDPOINT=http://local.hasura.dev:10000
AZURE_STORAGE_DEFAULT_BUCKET=azure-test
AZURE_STORAGE_ACCOUNT_NAME=local
AZURE_STORAGE_ACCOUNT_KEY=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==
AZURE_STORAGE_ACCOUNT_KEY=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==

GOOGLE_STORAGE_DEFAULT_BUCKET=gcp-bucket
GOOGLE_STORAGE_ENDPOINT=http://gcp-storage-emulator:4443/storage/v1/
GOOGLE_STORAGE_PUBLIC_HOST=http://localhost:10010
GOOGLE_PROJECT_ID=test-local-project
21 changes: 21 additions & 0 deletions compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ services:
- 8080:8080
volumes:
- ./tests/configuration:/etc/connector:ro
- ./tests/certs/service_account.json:/service_account.json
extra_hosts:
- local.hasura.dev=host-gateway
environment:
Expand All @@ -26,6 +27,11 @@ services:
AZURE_STORAGE_DEFAULT_BUCKET: $AZURE_STORAGE_DEFAULT_BUCKET
AZURE_STORAGE_ACCOUNT_NAME: $AZURE_STORAGE_ACCOUNT_NAME
AZURE_STORAGE_ACCOUNT_KEY: $AZURE_STORAGE_ACCOUNT_KEY
GOOGLE_STORAGE_DEFAULT_BUCKET: $GOOGLE_STORAGE_DEFAULT_BUCKET
GOOGLE_STORAGE_ENDPOINT: $GOOGLE_STORAGE_ENDPOINT
GOOGLE_STORAGE_PUBLIC_HOST: $GOOGLE_STORAGE_PUBLIC_HOST
GOOGLE_PROJECT_ID: $GOOGLE_PROJECT_ID
GOOGLE_STORAGE_CREDENTIALS_FILE: /service_account.json
HASURA_LOG_LEVEL: debug
OTEL_EXPORTER_OTLP_ENDPOINT: http://otel-collector:4317
OTEL_METRICS_EXPORTER: prometheus
Expand Down Expand Up @@ -68,6 +74,21 @@ services:
environment:
AZURITE_ACCOUNTS: "${AZURE_STORAGE_ACCOUNT_NAME}:${AZURE_STORAGE_ACCOUNT_KEY}"

# https://github.com/fsouza/fake-gcs-server
gcp-storage-emulator:
image: fsouza/fake-gcs-server:1.52.1
command:
[
"-scheme",
"http",
"-public-host",
"http://localhost:10010",
"-log-level",
"debug",
]
ports:
- "10010:4443"

volumes:
minio_data:
s3_data:
3 changes: 2 additions & 1 deletion connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/hasura/ndc-sdk-go/connector"
"github.com/hasura/ndc-sdk-go/schema"
"github.com/hasura/ndc-sdk-go/utils"
"github.com/hasura/ndc-storage/configuration/version"
"github.com/hasura/ndc-storage/connector/functions"
"github.com/hasura/ndc-storage/connector/storage"
"github.com/hasura/ndc-storage/connector/types"
Expand Down Expand Up @@ -72,7 +73,7 @@ func (c *Connector) ParseConfiguration(ctx context.Context, configurationDir str
func (c *Connector) TryInitState(ctx context.Context, configuration *types.Configuration, metrics *connector.TelemetryState) (*types.State, error) {
logger := connector.GetLogger(ctx)

manager, err := storage.NewManager(ctx, configuration.Clients, logger)
manager, err := storage.NewManager(ctx, configuration.Clients, logger, version.BuildVersion)
if err != nil {
return nil, err
}
Expand Down
5 changes: 4 additions & 1 deletion connector/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
)

func TestConnector(t *testing.T) {

azureBlobEndpoint := "http://local.hasura.dev:10000"
azureAccountName := "local"
azureAccountKey := "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
Expand All @@ -27,6 +26,10 @@ func TestConnector(t *testing.T) {
t.Setenv("AZURE_STORAGE_ACCOUNT_NAME", azureAccountName)
t.Setenv("AZURE_STORAGE_ACCOUNT_KEY", azureAccountKey)
t.Setenv("AZURE_STORAGE_CONNECTION_STRING", fmt.Sprintf("DefaultEndpointsProtocol=http;AccountName=%s;AccountKey=%s;BlobEndpoint=%s", azureAccountName, azureAccountKey, azureBlobEndpoint))
t.Setenv("GOOGLE_STORAGE_DEFAULT_BUCKET", "gcp-bucket")
t.Setenv("GOOGLE_PROJECT_ID", "test-local-project")
t.Setenv("GOOGLE_STORAGE_ENDPOINT", "http://localhost:10010/storage/v1/")
t.Setenv("GOOGLE_STORAGE_CREDENTIALS_FILE", "../tests/certs/service_account.json")

for _, dir := range []string{"01-setup", "02-get", "03-cleanup"} {
ndctest.TestConnector(t, &Connector{}, ndctest.TestConnectorOptions{
Expand Down
168 changes: 55 additions & 113 deletions connector/functions/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package functions
import (
"context"

"github.com/hasura/ndc-sdk-go/schema"
"github.com/hasura/ndc-sdk-go/utils"
"github.com/hasura/ndc-storage/connector/functions/internal"
"github.com/hasura/ndc-storage/connector/storage/common"
Expand All @@ -19,147 +20,88 @@ func ProcedureCreateStorageBucket(ctx context.Context, state *types.State, args
}

// FunctionStorageBuckets list all buckets.
func FunctionStorageBuckets(ctx context.Context, state *types.State, args *common.ListStorageBucketArguments) ([]common.StorageBucketInfo, error) {
request := internal.ObjectPredicate{}

if err := request.EvalSelection(utils.CommandSelectionFieldFromContext(ctx)); err != nil {
return nil, err
}

return state.Storage.ListBuckets(ctx, args.ClientID, common.BucketOptions{
IncludeTags: request.Include.Tags,
NumThreads: state.Concurrency.Query,
})
}

// FunctionStorageBucketExists checks if a bucket exists.
func FunctionStorageBucketExists(ctx context.Context, state *types.State, args *common.StorageBucketArguments) (bool, error) {
return state.Storage.BucketExists(ctx, args)
}

// ProcedureRemoveStorageBucket removes a bucket, bucket should be empty to be successfully removed.
func ProcedureRemoveStorageBucket(ctx context.Context, state *types.State, args *common.StorageBucketArguments) (bool, error) {
if err := state.Storage.RemoveBucket(ctx, args); err != nil {
return false, err
func FunctionStorageBuckets(ctx context.Context, state *types.State, args *common.ListStorageBucketArguments) (common.StorageBucketListResults, error) {
if args.MaxResults <= 0 {
return common.StorageBucketListResults{}, schema.UnprocessableContentError("maxResults must be larger than 0", nil)
}

return true, nil
}

// ProcedureSetStorageBucketTags sets tags to a bucket.
func ProcedureSetStorageBucketTags(ctx context.Context, state *types.State, args *common.SetStorageBucketTaggingArguments) (bool, error) {
if err := state.Storage.SetBucketTagging(ctx, args); err != nil {
return false, err
request, err := internal.EvalObjectPredicate(common.StorageBucketArguments{}, "", args.Where, types.QueryVariablesFromContext(ctx))
if err != nil {
return common.StorageBucketListResults{}, err
}

return true, nil
}

// FunctionStorageBucketPolicy gets access permissions on a bucket or a prefix.
func FunctionStorageBucketPolicy(ctx context.Context, state *types.State, args *common.StorageBucketArguments) (string, error) {
return state.Storage.GetBucketPolicy(ctx, args)
}

// FunctionStorageBucketNotification gets notification configuration on a bucket.
func FunctionStorageBucketNotification(ctx context.Context, state *types.State, args *common.StorageBucketArguments) (*common.NotificationConfig, error) {
return state.Storage.GetBucketNotification(ctx, args)
}

// ProcedureSetStorageBucketNotification sets a new notification configuration on a bucket.
func ProcedureSetStorageBucketNotification(ctx context.Context, state *types.State, args *common.SetBucketNotificationArguments) (bool, error) {
if err := state.Storage.SetBucketNotification(ctx, args); err != nil {
return false, err
if !request.IsValid {
return common.StorageBucketListResults{
Buckets: []common.StorageBucket{},
}, nil
}

return true, nil
}

// ProcedureSetStorageBucketLifecycle sets lifecycle on bucket or an object prefix.
func ProcedureSetStorageBucketLifecycle(ctx context.Context, state *types.State, args *common.SetStorageBucketLifecycleArguments) (bool, error) {
err := state.Storage.SetBucketLifecycle(ctx, args)
if err != nil {
return false, err
if err := request.EvalSelection(utils.CommandSelectionFieldFromContext(ctx)); err != nil {
return common.StorageBucketListResults{}, err
}

return true, nil
}

// FunctionStorageBucketLifecycle gets lifecycle on a bucket or a prefix.
func FunctionStorageBucketLifecycle(ctx context.Context, state *types.State, args *common.StorageBucketArguments) (*common.BucketLifecycleConfiguration, error) {
return state.Storage.GetBucketLifecycle(ctx, args)
}

// ProcedureSetStorageBucketEncryption sets default encryption configuration on a bucket.
func ProcedureSetStorageBucketEncryption(ctx context.Context, state *types.State, args *common.SetStorageBucketEncryptionArguments) (bool, error) {
err := state.Storage.SetBucketEncryption(ctx, args)
if err != nil {
return false, err
predicate := request.BucketPredicate.CheckPostPredicate
if !request.BucketPredicate.HasPostPredicate() {
predicate = nil
}

return true, nil
}

// FunctionStorageBucketEncryption gets default encryption configuration set on a bucket.
func FunctionStorageBucketEncryption(ctx context.Context, state *types.State, args *common.StorageBucketArguments) (*common.ServerSideEncryptionConfiguration, error) {
return state.Storage.GetBucketEncryption(ctx, args)
}

// ProcedureSetObjectLockConfig sets object lock configuration in given bucket. mode, validity and unit are either all set or all nil.
func ProcedureSetStorageObjectLockConfig(ctx context.Context, state *types.State, args *common.SetStorageObjectLockArguments) (bool, error) {
err := state.Storage.SetObjectLockConfig(ctx, args)
result, err := state.Storage.ListBuckets(ctx, request.ClientID, &common.ListStorageBucketsOptions{
Prefix: request.BucketPredicate.GetPrefix(),
MaxResults: args.MaxResults,
StartAfter: args.StartAfter,
Include: common.BucketIncludeOptions{
Tags: request.Include.Tags,
Versioning: request.Include.Versions,
Lifecycle: request.Include.Lifecycle,
Encryption: request.Include.Encryption,
ObjectLock: request.Include.ObjectLock,
},
NumThreads: state.Concurrency.Query,
}, predicate)
if err != nil {
return false, err
return common.StorageBucketListResults{}, err
}

return true, nil
return *result, nil
}

// FunctionStorageObjectLockConfig gets object lock configuration of given bucket.
func FunctionStorageObjectLockConfig(ctx context.Context, state *types.State, args *common.StorageBucketArguments) (*common.StorageObjectLockConfig, error) {
return state.Storage.GetObjectLockConfig(ctx, args)
}
// FunctionStorageBucket gets a bucket by name.
func FunctionStorageBucket(ctx context.Context, state *types.State, args *common.StorageBucketArguments) (*common.StorageBucket, error) {
request := internal.PredicateEvaluator{}

// ProcedureEnableStorageBucketVersioning enables bucket versioning support.
func ProcedureEnableStorageBucketVersioning(ctx context.Context, state *types.State, args *common.StorageBucketArguments) (bool, error) {
if err := state.Storage.EnableVersioning(ctx, args); err != nil {
return false, err
}

return true, nil
}

// ProcedureSuspendStorageBucketVersioning disables bucket versioning support.
func ProcedureSuspendStorageBucketVersioning(ctx context.Context, state *types.State, args *common.StorageBucketArguments) (bool, error) {
if err := state.Storage.SuspendVersioning(ctx, args); err != nil {
return false, err
if err := request.EvalSelection(utils.CommandSelectionFieldFromContext(ctx)); err != nil {
return nil, err
}

return true, nil
return state.Storage.GetBucket(ctx, args, common.BucketOptions{
Include: common.BucketIncludeOptions{
Tags: request.Include.Tags,
Versioning: request.Include.Versions,
Lifecycle: request.Include.Lifecycle,
Encryption: request.Include.Encryption,
ObjectLock: request.Include.ObjectLock,
},
NumThreads: state.Concurrency.Query,
})
}

// FunctionStorageBucketVersioning gets versioning configuration set on a bucket.
func FunctionStorageBucketVersioning(ctx context.Context, state *types.State, args *common.StorageBucketArguments) (*common.StorageBucketVersioningConfiguration, error) {
return state.Storage.GetBucketVersioning(ctx, args)
// FunctionStorageBucketExists checks if a bucket exists.
func FunctionStorageBucketExists(ctx context.Context, state *types.State, args *common.StorageBucketArguments) (bool, error) {
return state.Storage.BucketExists(ctx, args)
}

// ProcedureSetStorageBucketReplication sets replication configuration on a bucket. Role can be obtained by first defining the replication target on MinIO
// to associate the source and destination buckets for replication with the replication endpoint.
func ProcedureSetStorageBucketReplication(ctx context.Context, state *types.State, args *common.SetStorageBucketReplicationArguments) (bool, error) {
if err := state.Storage.SetBucketReplication(ctx, args); err != nil {
// ProcedureRemoveStorageBucket removes a bucket, bucket should be empty to be successfully removed.
func ProcedureRemoveStorageBucket(ctx context.Context, state *types.State, args *common.StorageBucketArguments) (bool, error) {
if err := state.Storage.RemoveBucket(ctx, args); err != nil {
return false, err
}

return true, nil
}

// FunctionGetBucketReplication gets current replication config on a bucket.
func FunctionStorageBucketReplication(ctx context.Context, state *types.State, args *common.StorageBucketArguments) (*common.StorageReplicationConfig, error) {
return state.Storage.GetBucketReplication(ctx, args)
}

// RemoveBucketReplication removes replication configuration on a bucket.
func ProcedureRemoveStorageBucketReplication(ctx context.Context, state *types.State, args *common.StorageBucketArguments) (bool, error) {
if err := state.Storage.RemoveBucketReplication(ctx, args); err != nil {
// ProcedureUpdateStorageBucket updates the bucket's configuration.
func ProcedureUpdateStorageBucket(ctx context.Context, state *types.State, args *common.UpdateBucketArguments) (bool, error) {
if err := state.Storage.UpdateBucket(ctx, args); err != nil {
return false, err
}

Expand Down
Loading

0 comments on commit adb4d67

Please sign in to comment.