Skip to content

Commit

Permalink
Enabled setting for use JSON API in GCS emulator, allow import even n…
Browse files Browse the repository at this point in the history
…o publicHost is set to GCS emulator
  • Loading branch information
totem3 committed Apr 7, 2024
1 parent e74d4e4 commit 8bf5a73
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 0 deletions.
1 change: 1 addition & 0 deletions server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1062,6 +1062,7 @@ func (h *jobsInsertHandler) importFromGCS(ctx context.Context, r *jobsInsertRequ
opts = append(
opts,
option.WithEndpoint(fmt.Sprintf("%s/storage/v1/", host)),
storage.WithJSONReads(),
option.WithoutAuthentication(),
)
}
Expand Down
135 changes: 135 additions & 0 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1708,6 +1708,141 @@ func TestImportFromGCS(t *testing.T) {
}
}

func TestImportFromGCSEmulatorWithoutPublicHost(t *testing.T) {
const (
projectID = "test"
datasetID = "dataset1"
tableID = "table_a"
host = "127.0.0.1"
bucketName = "test-bucket"
sourceName = "path/to/data.json"
)

ctx := context.Background()
bqServer, err := server.New(server.TempStorage)
if err != nil {
t.Fatal(err)
}
project := types.NewProject(
projectID,
types.NewDataset(
datasetID,
types.NewTable(
tableID,
[]*types.Column{
types.NewColumn("id", types.INT64),
types.NewColumn("value", types.INT64),
},
nil,
),
),
)
if err := bqServer.Load(server.StructSource(project)); err != nil {
t.Fatal(err)
}

testServer := bqServer.TestServer()
var buf bytes.Buffer
enc := json.NewEncoder(&buf)
for i := 0; i < 3; i++ {
if err := enc.Encode(map[string]interface{}{
"id": i + 1,
"value": i + 10,
}); err != nil {
t.Fatal(err)
}
}
storageServer, err := fakestorage.NewServerWithOptions(fakestorage.Options{
InitialObjects: []fakestorage.Object{
{
ObjectAttrs: fakestorage.ObjectAttrs{
BucketName: bucketName,
Name: sourceName,
Size: int64(len(buf.Bytes())),
},
Content: buf.Bytes(),
},
},
Host: host,
Scheme: "http",
})
if err != nil {
t.Fatal(err)
}

storageServerURL := storageServer.URL()
u, err := url.Parse(storageServerURL)
if err != nil {
t.Fatal(err)
}
storageEmulatorHost := fmt.Sprintf("http://%s:%s", host, u.Port())
t.Setenv("STORAGE_EMULATOR_HOST", storageEmulatorHost)

defer func() {
testServer.Close()
bqServer.Stop(ctx)
storageServer.Stop()
}()

client, err := bigquery.NewClient(
ctx,
projectID,
option.WithEndpoint(testServer.URL),
option.WithoutAuthentication(),
)
if err != nil {
t.Fatal(err)
}
defer client.Close()

gcsSourceURL := fmt.Sprintf("gs://%s/%s", bucketName, sourceName)
gcsRef := bigquery.NewGCSReference(gcsSourceURL)
gcsRef.SourceFormat = bigquery.JSON
gcsRef.AutoDetect = true
loader := client.Dataset(datasetID).Table(tableID).LoaderFrom(gcsRef)
loader.WriteDisposition = bigquery.WriteTruncate
job, err := loader.Run(ctx)
if err != nil {
t.Fatal(err)
}
status, err := job.Wait(ctx)
if err != nil {
t.Fatal(err)
}
if status.Err() != nil {
t.Fatal(status.Err())
}

query := client.Query(fmt.Sprintf("SELECT * FROM %s.%s", datasetID, tableID))
it, err := query.Read(ctx)
if err != nil {
t.Fatal(err)
}

type row struct {
ID int64
Value int64
}
var rows []*row
for {
var r row
if err := it.Next(&r); err != nil {
if err == iterator.Done {
break
}
t.Fatal(err)
}
rows = append(rows, &r)
}
if diff := cmp.Diff([]*row{
{ID: 1, Value: 10},
{ID: 2, Value: 11},
{ID: 3, Value: 12},
}, rows); diff != "" {
t.Errorf("(-want +got):\n%s", diff)
}
}

func TestImportWithWildcardFromGCS(t *testing.T) {
const (
projectID = "test"
Expand Down

0 comments on commit 8bf5a73

Please sign in to comment.