Skip to content

Commit

Permalink
Merge pull request #2977 from rockwotj/schema
Browse files Browse the repository at this point in the history
snowflake: support init_statements
  • Loading branch information
rockwotj authored Nov 3, 2024
2 parents 754fd8a + 8ccf219 commit 7ea878e
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 16 deletions.
37 changes: 31 additions & 6 deletions docs/modules/components/pages/outputs/snowflake_streaming.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ output:
private_key_file: "" # No default (optional)
private_key_pass: "" # No default (optional)
mapping: "" # No default (optional)
init_statement: | # No default (optional)
CREATE TABLE IF NOT EXISTS mytable (amount NUMBER);
batching:
count: 0
byte_size: 0
Expand Down Expand Up @@ -77,6 +79,8 @@ output:
private_key_file: "" # No default (optional)
private_key_pass: "" # No default (optional)
mapping: "" # No default (optional)
init_statement: | # No default (optional)
CREATE TABLE IF NOT EXISTS mytable (amount NUMBER);
batching:
count: 0
byte_size: 0
Expand Down Expand Up @@ -153,18 +157,19 @@ pipeline:
password: "${TODO}"
output:
snowflake_streaming:
# By default there is only a single channel per output table allowed
# if we want to have multiple Redpanda Connect streams writing data
# then we need a unique channel prefix per stream. We'll use the host
# name to get unique prefixes in this example.
channel_prefix: "snowflake-channel-for-${HOST}"
account: "MYSNOW-ACCOUNT"
user: MYUSER
role: ACCOUNTADMIN
database: "MYDATABASE"
schema: "PUBLIC"
table: "MYTABLE"
private_key_file: "my/private/key.p8"
# By default there is only a single channel per output table allowed
# if we want to have multiple Redpanda Connect streams writing data
# then we need a unique channel prefix per stream. We'll use the host
# name to get unique prefixes in this example.
channel_prefix: "snowflake-channel-for-${HOST}"```
```
--
HTTP Sidecar to push data to Snowflake::
Expand Down Expand Up @@ -198,7 +203,8 @@ output:
database: "MYDATABASE"
schema: "PUBLIC"
table: "MYTABLE"
private_key_file: "my/private/key.p8"```
private_key_file: "my/private/key.p8"
```
--
======
Expand Down Expand Up @@ -310,6 +316,25 @@ A bloblang mapping to execute on each message.
*Type*: `string`
=== `init_statement`
Optional SQL statements to execute immediately upon the first connection. This is a useful way to initialize tables before processing data. Care should be taken to ensure that the statement is idempotent, and therefore would not cause issues when run multiple times after service restarts.
*Type*: `string`
```yml
# Examples
init_statement: |2
CREATE TABLE IF NOT EXISTS mytable (amount NUMBER);
init_statement: |2
ALTER TABLE t1 ALTER COLUMN c1 DROP NOT NULL;
ALTER TABLE t1 ADD COLUMN a2 NUMBER;
```
=== `batching`
Allows you to configure a xref:configuration:batching.adoc[batching policy].
Expand Down
64 changes: 56 additions & 8 deletions internal/impl/snowflake/output_snowflake_streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"context"
"crypto/rsa"
"fmt"
"strings"
"sync"

"github.com/redpanda-data/benthos/v4/public/bloblang"
Expand All @@ -31,6 +30,7 @@ const (
ssoFieldKey = "private_key"
ssoFieldKeyFile = "private_key_file"
ssoFieldKeyPass = "private_key_pass"
ssoFieldInitStatement = "init_statement"
ssoFieldBatching = "batching"
ssoFieldChannelPrefix = "channel_prefix"
ssoFieldMapping = "mapping"
Expand Down Expand Up @@ -83,6 +83,14 @@ You can monitor the output batch size using the `+"`snowflake_compressed_output_
service.NewStringField(ssoFieldKeyFile).Description("The file to load the private RSA key from. This should be a `.p8` PEM encoded file. Either this or `private_key` must be specified.").Optional(),
service.NewStringField(ssoFieldKeyPass).Description("The RSA key passphrase if the RSA key is encrypted.").Optional().Secret(),
service.NewBloblangField(ssoFieldMapping).Description("A bloblang mapping to execute on each message.").Optional(),
service.NewStringField(ssoFieldInitStatement).Description(`
Optional SQL statements to execute immediately upon the first connection. This is a useful way to initialize tables before processing data. Care should be taken to ensure that the statement is idempotent, and therefore would not cause issues when run multiple times after service restarts.
`).Optional().Example(`
CREATE TABLE IF NOT EXISTS mytable (amount NUMBER);
`).Example(`
ALTER TABLE t1 ALTER COLUMN c1 DROP NOT NULL;
ALTER TABLE t1 ADD COLUMN a2 NUMBER;
`),
service.NewBatchPolicyField(ssoFieldBatching),
service.NewOutputMaxInFlightField(),
service.NewStringField(ssoFieldChannelPrefix).
Expand Down Expand Up @@ -122,18 +130,19 @@ pipeline:
password: "${TODO}"
output:
snowflake_streaming:
# By default there is only a single channel per output table allowed
# if we want to have multiple Redpanda Connect streams writing data
# then we need a unique channel prefix per stream. We'll use the host
# name to get unique prefixes in this example.
channel_prefix: "snowflake-channel-for-${HOST}"
account: "MYSNOW-ACCOUNT"
user: MYUSER
role: ACCOUNTADMIN
database: "MYDATABASE"
schema: "PUBLIC"
table: "MYTABLE"
private_key_file: "my/private/key.p8"
# By default there is only a single channel per output table allowed
# if we want to have multiple Redpanda Connect streams writing data
# then we need a unique channel prefix per stream. We'll use the host
# name to get unique prefixes in this example.
channel_prefix: "snowflake-channel-for-${HOST}"`,
`,
).
Example(
"HTTP Sidecar to push data to Snowflake",
Expand Down Expand Up @@ -163,7 +172,8 @@ output:
database: "MYDATABASE"
schema: "PUBLIC"
table: "MYTABLE"
private_key_file: "my/private/key.p8"`,
private_key_file: "my/private/key.p8"
`,
)
}

Expand Down Expand Up @@ -268,6 +278,35 @@ func newSnowflakeStreamer(
// stream to write to a single table.
channelPrefix = fmt.Sprintf("Redpanda_Connect_%s.%s.%s", db, schema, table)
}
var initStatementsFn func(context.Context) error
if conf.Contains(ssoFieldInitStatement) {
initStatements, err := conf.FieldString(ssoFieldInitStatement)
if err != nil {
return nil, err
}
initStatementsFn = func(ctx context.Context) error {
c, err := streaming.NewRestClient(account, user, mgr.EngineVersion(), channelPrefix, rsaKey, mgr.Logger())
if err != nil {
return err
}
defer c.Close()
_, err = c.RunSQL(ctx, streaming.RunSQLRequest{
Statement: initStatements,
// Currently we set of timeout of 30 seconds so that we don't have to handle async operations
// that need polling to wait until they finish (results are made async when execution is longer
// than 45 seconds).
Timeout: 30,
Database: db,
Schema: schema,
Role: role,
// Auto determine the number of statements
Parameters: map[string]string{
"MULTI_STATEMENT_COUNT": "0",
},
})
return err
}
}
client, err := streaming.NewSnowflakeServiceClient(
context.Background(),
streaming.ClientOptions{
Expand All @@ -277,7 +316,7 @@ func newSnowflakeStreamer(
PrivateKey: rsaKey,
Logger: mgr.Logger(),
ConnectVersion: mgr.EngineVersion(),
Application: strings.TrimPrefix(channelPrefix, "Redpanda_Connect_"),
Application: channelPrefix,
})
if err != nil {
return nil, err
Expand All @@ -293,6 +332,7 @@ func newSnowflakeStreamer(
buildTime: mgr.Metrics().NewTimer("snowflake_build_output_latency_ns"),
uploadTime: mgr.Metrics().NewTimer("snowflake_upload_latency_ns"),
compressedOutput: mgr.Metrics().NewCounter("snowflake_compressed_output_size_bytes"),
initStatementsFn: initStatementsFn,
}
return o, nil
}
Expand All @@ -309,6 +349,7 @@ type snowflakeStreamerOutput struct {
channelPrefix, db, schema, table string
mapping *bloblang.Executor
logger *service.Logger
initStatementsFn func(context.Context) error
}

func (o *snowflakeStreamerOutput) openNewChannel(ctx context.Context) (*streaming.SnowflakeIngestionChannel, error) {
Expand Down Expand Up @@ -336,6 +377,13 @@ func (o *snowflakeStreamerOutput) openChannel(ctx context.Context, name string,
}

func (o *snowflakeStreamerOutput) Connect(ctx context.Context) error {
if o.initStatementsFn != nil {
if err := o.initStatementsFn(ctx); err != nil {
return err
}
// We've already executed our init statement, we don't need to do that anymore
o.initStatementsFn = nil
}
// Precreate a single channel so we know stuff works, otherwise we'll create them on demand.
c, err := o.openNewChannel(ctx)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/impl/snowflake/streaming/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func NewRestClient(account, user, version, app string, privateKey *rsa.PrivateKe
privateKey: privateKey,
userAgent: userAgent,
logger: logger,
app: url.QueryEscape(app),
app: url.QueryEscape("Redpanda_Connect_" + strings.TrimPrefix(app, "Redpanda_Connect_")),
cachedJWT: typed.NewAtomicValue(""),
authRefreshLoop: periodic.New(
time.Hour-(2*time.Minute),
Expand Down
2 changes: 1 addition & 1 deletion internal/impl/snowflake/streaming/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func NewSnowflakeServiceClient(ctx context.Context, opts ClientOptions) (*Snowfl
opts.Account,
opts.User,
opts.ConnectVersion,
"Redpanda_Connect_"+opts.Application,
opts.Application,
opts.PrivateKey,
opts.Logger,
)
Expand Down

0 comments on commit 7ea878e

Please sign in to comment.