Skip to content

Commit 17276c6

Browse files
committed
add support for pubsub emulator
1 parent d940685 commit 17276c6

File tree

4 files changed

+66
-2
lines changed

4 files changed

+66
-2
lines changed

Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -78,5 +78,5 @@ elasticsink = ["elasticsearch", "tokio"]
7878
fingerprint = ["murmur3"]
7979
aws = ["aws-config", "aws-sdk-sqs", "aws-sdk-lambda", "aws-sdk-s3", "tokio"]
8080
redissink = ["redis", "tokio"]
81-
gcp = ["google-cloud-pubsub", "google-cloud-googleapis", "tokio", "web"]
81+
gcp = ["google-cloud-pubsub", "google-cloud-googleapis", "tokio", "web" ,"google-cloud-gax"]
8282
rabbitmqsink = ["lapin", "tokio"]

daemon.toml

+39
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
[source]
2+
type = "N2N"
3+
address = ["Tcp","node-mainnet-multiaz.jpgs-2uc.demeter.run:3000"]
4+
magic = "mainnet"
5+
# Because we are using blockfrost we had to delay the incoming blocks because
6+
# blockfrost would not have the txs indexed yet.
7+
# min_depth = 2
8+
9+
# include_block_cbor = false
10+
# include_block_end_events = false
11+
# include_block_details = true
12+
13+
[[filters]]
14+
type = "Selection"
15+
16+
[filters.check]
17+
predicate = "all_of"
18+
19+
[[filters.check.argument]]
20+
predicate = "variant_in"
21+
argument = ["Transaction"]
22+
23+
[[filters.check.argument]]
24+
predicate = "policy_equals"
25+
argument = "8eb03958e1c5e0fbf2e8d4473050a92d178e66f9ff0c9089f07d7440"
26+
27+
[sink]
28+
type = "Webhook"
29+
url = "http://localhost:9091"
30+
authorization = "user:pass"
31+
timeout = 30000
32+
error_policy = "Continue"
33+
34+
[source.mapper]
35+
include_block_end_events = false
36+
include_transaction_details = true
37+
include_transaction_end_events = false
38+
include_block_cbor = false
39+
include_byron_ebb = false

src/sinks/gcp_pubsub/run.rs

+14-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::{collections::HashMap, sync::Arc};
22

3+
use google_cloud_gax::conn::Environment;
34
use google_cloud_googleapis::pubsub::v1::PubsubMessage;
45
use google_cloud_pubsub::{
56
client::{Client, ClientConfig},
@@ -48,6 +49,9 @@ pub fn writer_loop(
4849
retry_policy: &retry::Policy,
4950
ordering_key: &str,
5051
attributes: &GenericKV,
52+
emulator: bool,
53+
emulator_endpoint: &Option<String>,
54+
emulator_project_id: &Option<String>,
5155
utils: Arc<Utils>,
5256
) -> Result<(), crate::Error> {
5357
let rt = tokio::runtime::Builder::new_current_thread()
@@ -56,7 +60,16 @@ pub fn writer_loop(
5660
.build()?;
5761

5862
let publisher: Publisher = rt.block_on(async {
59-
let client = Client::new(ClientConfig::default()).await?;
63+
let client_config = if emulator {
64+
ClientConfig {
65+
project_id: Some(emulator_project_id.clone().unwrap_or_default()),
66+
environment: Environment::Emulator(emulator_endpoint.clone().unwrap_or_default()),
67+
..Default::default()
68+
}
69+
} else {
70+
ClientConfig::default().with_auth().await?
71+
};
72+
let client = Client::new(client_config).await?;
6073
let topic = client.topic(topic_name);
6174
Result::<_, crate::Error>::Ok(topic.new_publisher(None))
6275
})?;

src/sinks/gcp_pubsub/setup.rs

+12
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ pub struct Config {
1616
pub retry_policy: Option<retry::Policy>,
1717
pub ordering_key: Option<String>,
1818
pub attributes: Option<GenericKV>,
19+
pub emulator: Option<bool>,
20+
pub emulator_endpoint: Option<String>,
21+
pub emulator_project_id: Option<String>,
1922

2023
#[warn(deprecated)]
2124
pub credentials: Option<String>,
@@ -24,6 +27,12 @@ pub struct Config {
2427
impl SinkProvider for WithUtils<Config> {
2528
fn bootstrap(&self, input: StageReceiver) -> BootstrapResult {
2629
let topic_name = self.inner.topic.to_owned();
30+
let mut use_emulator = self.inner.emulator.unwrap_or(false);
31+
let emulator_endpoint = self.inner.emulator_endpoint.to_owned();
32+
let emulator_project_id = self.inner.emulator_project_id.to_owned();
33+
if use_emulator && (emulator_endpoint.is_none() || emulator_project_id.is_none()) {
34+
use_emulator = false;
35+
}
2736

2837
let error_policy = self
2938
.inner
@@ -47,6 +56,9 @@ impl SinkProvider for WithUtils<Config> {
4756
&retry_policy,
4857
&ordering_key,
4958
&attributes,
59+
use_emulator,
60+
&emulator_endpoint,
61+
&emulator_project_id,
5062
utils,
5163
)
5264
.expect("writer loop failed");

0 commit comments

Comments
 (0)