Skip to content

Commit

Permalink
Components: explicitly pass in subscriber name on .subscribe()
Browse files Browse the repository at this point in the history
  • Loading branch information
fwang committed Oct 31, 2024
1 parent 68c4f31 commit b4c3f50
Show file tree
Hide file tree
Showing 15 changed files with 495 additions and 151 deletions.
2 changes: 1 addition & 1 deletion examples/aws-bucket-topic-subscriber/sst.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ export default $config({
},
async run() {
const topic = new sst.aws.SnsTopic("MyTopic");
topic.subscribe("subscriber.handler");
topic.subscribe("MySubscriber", "subscriber.handler");

const bucket = new sst.aws.Bucket("MyBucket");
bucket.subscribeTopic(topic.arn, {
Expand Down
2 changes: 1 addition & 1 deletion examples/aws-dynamo/sst.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export default $config({
primaryIndex: { hashKey: "id" },
stream: "new-and-old-images",
});
table.subscribe("subscriber.handler", {
table.subscribe("MySubscriber", "subscriber.handler", {
filters: [
{
dynamodb: {
Expand Down
4 changes: 2 additions & 2 deletions examples/aws-kinesis-stream/sst.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ export default $config({
const stream = new sst.aws.KinesisStream("MyStream");

// Create a function subscribing to all events
stream.subscribe("subscriber.all");
stream.subscribe("AllSub", "subscriber.all");

// Create a function subscribing to events of `bar` type
stream.subscribe("subscriber.filtered", {
stream.subscribe("FilteredSub", "subscriber.filtered", {
filters: [
{
data: {
Expand Down
4 changes: 2 additions & 2 deletions examples/aws-topic/sst.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ export default $config({
queue.subscribe("subscriber.handler");

const topic = new sst.aws.SnsTopic("MyTopic");
topic.subscribe("subscriber.handler", {});
topic.subscribeQueue(queue.arn);
topic.subscribe("MySubscriber1", "subscriber.handler", {});
topic.subscribeQueue("MySubscriber2", queue.arn);

const app = new sst.aws.Function("MyApp", {
handler: "publisher.handler",
Expand Down
7 changes: 5 additions & 2 deletions examples/internal/playground/sst.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ export default $config({

function addEmail() {
const topic = new sst.aws.SnsTopic("MyTopic");
topic.subscribe("functions/email/index.notification");
topic.subscribe(
"MyTopicSubscriber",
"functions/email/index.notification"
);

const email = new sst.aws.Email("MyEmail", {
sender: "[email protected]",
Expand Down Expand Up @@ -168,7 +171,7 @@ export default $config({

function addTopic() {
const topic = new sst.aws.SnsTopic("MyTopic");
topic.subscribe("functions/topic/index.subscriber", {
topic.subscribe("MyTopicSubscriber", "functions/topic/index.subscriber", {
filter: {
color: ["red"],
},
Expand Down
64 changes: 31 additions & 33 deletions platform/src/components/aws/bus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -220,19 +220,20 @@ export class Bus extends Component implements Link.Linkable {
/**
* Subscribe to this EventBus.
*
* @param name The name of the subscription.
* @param subscriber The function that'll be notified.
* @param args Configure the subscription.
*
* @example
*
* ```js
* bus.subscribe("src/subscriber.handler");
* bus.subscribe("MySubscription", "src/subscriber.handler");
* ```
*
* Add a pattern to the subscription.
*
* ```js
* bus.subscribe("src/subscriber.handler", {
* bus.subscribe("MySubscription", "src/subscriber.handler", {
* pattern: {
* source: ["my.source", "my.source2"],
* price_usd: [{numeric: [">=", 100]}]
Expand All @@ -243,7 +244,7 @@ export class Bus extends Component implements Link.Linkable {
* Customize the subscriber function.
*
* ```js
* bus.subscribe({
* bus.subscribe("MySubscription", {
* handler: "src/subscriber.handler",
* timeout: "60 seconds"
* });
Expand All @@ -256,11 +257,13 @@ export class Bus extends Component implements Link.Linkable {
* ```
*/
public subscribe(
name: string,
subscriber: Input<string | FunctionArgs | FunctionArn>,
args: BusSubscriberArgs = {},
) {
return Bus._subscribeFunction(
this.constructorName,
name,
this.nodes.bus.name,
this.nodes.bus.arn,
subscriber,
Expand All @@ -272,6 +275,7 @@ export class Bus extends Component implements Link.Linkable {
/**
* Subscribe to an EventBus that was not created in your app.
*
* @param name The name of the subscription.
* @param busArn The ARN of the EventBus to subscribe to.
* @param subscriber The function that'll be notified.
* @param args Configure the subscription.
Expand All @@ -287,13 +291,13 @@ export class Bus extends Component implements Link.Linkable {
* You can subscribe to it by passing in the ARN.
*
* ```js
* sst.aws.Bus.subscribe(busArn, "src/subscriber.handler");
* sst.aws.Bus.subscribe("MySubscription", busArn, "src/subscriber.handler");
* ```
*
* Add a pattern to the subscription.
*
* ```js
* sst.aws.Bus.subscribe(busArn, "src/subscriber.handler", {
* sst.aws.Bus.subscribe("MySubscription", busArn, "src/subscriber.handler", {
* pattern: {
* price_usd: [{numeric: [">=", 100]}]
* }
Expand All @@ -303,21 +307,23 @@ export class Bus extends Component implements Link.Linkable {
* Customize the subscriber function.
*
* ```js
* sst.aws.Bus.subscribe(busArn, {
* sst.aws.Bus.subscribe("MySubscription", busArn, {
* handler: "src/subscriber.handler",
* timeout: "60 seconds"
* });
* ```
*/
public static subscribe(
name: string,
busArn: Input<string>,
subscriber: Input<string | FunctionArgs | FunctionArn>,
args?: BusSubscriberArgs,
) {
return output(busArn).apply((busArn) => {
const busName = parseEventBusArn(busArn).busName;
return this._subscribeFunction(
logicalName(busName),
busName,
name,
busName,
busArn,
subscriber,
Expand All @@ -328,26 +334,16 @@ export class Bus extends Component implements Link.Linkable {

private static _subscribeFunction(
name: string,
subscriberName: string,
busName: Input<string>,
busArn: string | Output<string>,
subscriber: Input<string | FunctionArgs | FunctionArn>,
args: BusSubscriberArgs = {},
opts: ComponentResourceOptions = {},
) {
return all([subscriber, args]).apply(([subscriber, args]) => {
const suffix = logicalName(
hashStringToPrettyString(
[
typeof busArn === "string" ? busArn : outputId,
JSON.stringify(args.pattern ?? {}),
typeof subscriber === "string" ? subscriber : subscriber.handler,
].join(""),
6,
),
);

return output(args).apply((args) => {
return new BusLambdaSubscriber(
`${name}Subscriber${suffix}`,
`${name}Subscriber${subscriberName}`,
{
bus: { name: busName, arn: busArn },
subscriber,
Expand All @@ -359,9 +355,10 @@ export class Bus extends Component implements Link.Linkable {
}

/**
* Subscribe to this SNS Topic with an SQS Queue.
* Subscribe to this EventBus with an SQS Queue.
*
* @param queueArn The ARN of the queue that'll be notified.
* @param name The name of the subscription.
* @param queue The ARN of the queue or `Queue` component that'll be notified.
* @param args Configure the subscription.
*
* @example
Expand All @@ -372,16 +369,16 @@ export class Bus extends Component implements Link.Linkable {
* const queue = sst.aws.Queue("MyQueue");
* ```
*
* You can subscribe to this topic with it.
* You can subscribe to this bus with it.
*
* ```js title="sst.config.ts"
* topic.subscribeQueue(queue.arn);
* bus.subscribeQueue(queue);
* ```
*
* Add a filter to the subscription.
*
* ```js title="sst.config.ts"
* topic.subscribeQueue(queue.arn, {
* bus.subscribeQueue(queue, {
* filter: {
* price_usd: [{numeric: [">=", 100]}]
* }
Expand All @@ -404,31 +401,32 @@ export class Bus extends Component implements Link.Linkable {
}

/**
* Subscribe to an existing SNS Topic with a previously created SQS Queue.
* Subscribe to an existing EventBus with a previously created SQS Queue.
*
* @param topicArn The ARN of the SNS Topic to subscribe to.
* @param queueArn The ARN of the queue that'll be notified.
* @param name The name of the subscription.
* @param busArn The ARN of the EventBus to subscribe to.
* @param queue The ARN of the queue or `Queue` component that'll be notified.
* @param args Configure the subscription.
*
* @example
*
* For example, let's say you have an existing SNS Topic and SQS Queue with the following ARNs.
* For example, let's say you have an existing EventBus and SQS Queue with the following ARNs.
*
* ```js title="sst.config.ts"
* const topicArn = "arn:aws:sns:us-east-1:123456789012:MyTopic";
* const busArn = "arn:aws:events:us-east-1:123456789012:event-bus/MyBus";
* const queueArn = "arn:aws:sqs:us-east-1:123456789012:MyQueue";
* ```
*
* You can subscribe to the topic with the queue.
* You can subscribe to the bus with the queue.
*
* ```js title="sst.config.ts"
* sst.aws.SnsTopic.subscribeQueue(topicArn, queueArn);
* sst.aws.Bus.subscribeQueue(busArn, queueArn);
* ```
*
* Add a filter to the subscription.
*
* ```js title="sst.config.ts"
* sst.aws.SnsTopic.subscribeQueue(topicArn, queueArn, {
* sst.aws.Bus.subscribeQueue(busArn, queueArn, {
* filter: {
* price_usd: [{numeric: [">=", 100]}]
* }
Expand Down
18 changes: 10 additions & 8 deletions platform/src/components/aws/dynamo-lambda-subscriber.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
import {
ComponentResourceOptions,
Input,
Output,
output,
} from "@pulumi/pulumi";
import { ComponentResourceOptions, Input, output } from "@pulumi/pulumi";
import { Component, transform } from "../component";
import { Function, FunctionArgs } from "./function";
import { FunctionArgs } from "./function";
import { DynamoSubscriberArgs } from "./dynamo";
import { lambda } from "@pulumi/aws";
import { FunctionBuilder, functionBuilder } from "./helpers/function-builder";
Expand All @@ -25,6 +20,13 @@ export interface Args extends DynamoSubscriberArgs {
* The subscriber function.
*/
subscriber: Input<string | FunctionArgs>;
/**
* In early versions of SST, parent were forgotten to be set for resources in components.
* This flag is used to disable the automatic setting of the parent to prevent breaking
* changes.
* @internal
*/
disableParent?: boolean;
}

/**
Expand Down Expand Up @@ -94,7 +96,7 @@ export class DynamoLambdaSubscriber extends Component {
: undefined,
startingPosition: "LATEST",
},
{},
{ parent: args.disableParent ? undefined : self },
),
);
}
Expand Down
Loading

0 comments on commit b4c3f50

Please sign in to comment.