Skip to content

Commit

Permalink
Fix job materialization when destination table does not yet exist (#303)
Browse files Browse the repository at this point in the history
  • Loading branch information
ohaibbq authored Jul 4, 2024
1 parent 76384ba commit 9c7bc44
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 0 deletions.
16 changes: 16 additions & 0 deletions server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1421,6 +1421,22 @@ func (h *jobsInsertHandler) Handle(ctx context.Context, r *jobsInsertRequest) (*
if err != nil {
return nil, err
}
destinationDataset := r.project.Dataset(tableRef.DatasetId)
if destinationDataset == nil {
return nil, fmt.Errorf("failed to find destination dataset: %s", tableRef.DatasetId)
}
destinationTable := destinationDataset.Table(tableRef.TableId)
destinationTableExists := destinationTable != nil
if !destinationTableExists {
_, err := createTableMetadata(ctx, tx, r.server, r.project, destinationDataset, tableDef.ToBigqueryV2(r.project.ID, tableRef.DatasetId))
if err != nil {
return nil, fmt.Errorf("failed to create table: %w", err)
}
serverErr := r.server.contentRepo.CreateTable(ctx, tx, tableDef.ToBigqueryV2(r.project.ID, tableRef.DatasetId))
if serverErr != nil {
return nil, fmt.Errorf("failed to create table: %w", serverErr)
}
}
if err := r.server.contentRepo.AddTableData(ctx, tx, tableRef.ProjectId, tableRef.DatasetId, tableDef); err != nil {
return nil, fmt.Errorf("failed to add table data: %w", err)
}
Expand Down
72 changes: 72 additions & 0 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,78 @@ func TestQuery(t *testing.T) {
}
}

func TestQueryWithDestination(t *testing.T) {
ctx := context.Background()

bqServer, err := server.New(server.TempStorage)
if err != nil {
t.Fatal(err)
}
if err := bqServer.Load(
server.YAMLSource(filepath.Join("testdata", "data.yaml")),
); err != nil {
t.Fatal(err)
}
testServer := bqServer.TestServer()
defer func() {
testServer.Close()
bqServer.Stop(ctx)
}()

const (
projectName = "test"
)

client, err := bigquery.NewClient(
ctx,
projectName,
option.WithEndpoint(testServer.URL),
option.WithoutAuthentication(),
)
if err != nil {
t.Fatal(err)
}
defer client.Close()

query := client.Query("SELECT id FROM dataset1.table_a")
query.QueryConfig.Dst = &bigquery.Table{
ProjectID: projectName,
DatasetID: "dataset1",
TableID: "table_a_materialized",
}
it, err := query.Read(ctx)
if err != nil {
t.Fatal(err)
}
for {
var row []bigquery.Value
if err := it.Next(&row); err != nil {
if err == iterator.Done {
break
}
t.Fatal(err)
}
t.Log("row = ", row)
}

query = client.Query("SELECT id FROM dataset1.table_a_materialized")

it, err = query.Read(ctx)
if err != nil {
t.Fatal(err)
}
for {
var row []bigquery.Value
if err := it.Next(&row); err != nil {
if err == iterator.Done {
break
}
t.Fatal(err)
}
t.Log("row = ", row)
}
}

type TableSchema struct {
Int int
Str string
Expand Down

0 comments on commit 9c7bc44

Please sign in to comment.