From 2a5f0bdba96e315af2b24e55782ce8952efe24ea Mon Sep 17 00:00:00 2001 From: Dominik Riemer Date: Fri, 14 Feb 2025 21:56:30 +0100 Subject: [PATCH 1/3] feat(#3483): Use Kraft-based Kafka as default for internal messaging --- docker-compose.yml | 36 +++----- .../standalone/kafka/docker-compose.dev.yml | 26 +++--- .../standalone/kafka/docker-compose.yml | 31 ++++--- installer/compose/docker-compose.full.yml | 41 +++------ .../compose/docker-compose.quickstart.yml | 39 +++------ installer/compose/docker-compose.yml | 39 +++------ .../datalake/DataExplorerWidgetModel.java | 2 + .../grounding/KafkaTransportProtocol.java | 40 --------- .../manager/matching/ProtocolSelector.java | 4 +- .../manager/matching/v2/TestUtils.java | 2 +- .../streampipes/sdk/helpers/Protocols.java | 2 +- .../src/lib/model/gen/streampipes-model.ts | 85 +------------------ 12 files changed, 91 insertions(+), 256 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 50f7a71506..e15d439227 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -84,33 +84,26 @@ services: spnet: kafka: - image: fogsyio/kafka:2.2.0 + image: bitnami/kafka:3.9.0 hostname: kafka - depends_on: - - zookeeper environment: - # see: https://github.com/confluentinc/schema-registry/issues/648 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://:9092 - KAFKA_LISTENERS: PLAINTEXT://:9092 - KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT - KAFKA_ADVERTISED_HOST_NAME: kafka - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + - KAFKA_CFG_NODE_ID=0 + - KAFKA_CFG_PROCESS_ROLES=controller,broker + - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093 + - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,OUTSIDE://:9094 + - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,OUTSIDE://localhost:9094 + - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT + - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT + - KAFKA_CFG_MESSAGE_MAX_BYTES=5000012 + - KAFKA_CFG_FETCH_MESSAGE_MAX_BYTES=5000012 + - KAFKA_CFG_REPLICA_FETCH_MAX_BYTES=10000000 + - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER volumes: - - kafka:/kafka - - /var/run/docker.sock:/var/run/docker.sock + - kafka3:/bitnami logging: *default-logging networks: spnet: - zookeeper: - image: fogsyio/zookeeper:3.4.13 - logging: *default-logging - volumes: - - zookeeper:/opt/zookeeper-3.4.13 - networks: - spnet: - influxdb: image: influxdb:2.6 environment: @@ -144,10 +137,9 @@ services: spnet: volumes: - kafka: + kafka3: files: couchdb: - zookeeper: influxdb: influxdb2: backend: diff --git a/installer/cli/deploy/standalone/kafka/docker-compose.dev.yml b/installer/cli/deploy/standalone/kafka/docker-compose.dev.yml index 2410930d61..8ac1f2e8ba 100644 --- a/installer/cli/deploy/standalone/kafka/docker-compose.dev.yml +++ b/installer/cli/deploy/standalone/kafka/docker-compose.dev.yml @@ -17,16 +17,18 @@ services: kafka: ports: - "9094:9094" - depends_on: - - zookeeper environment: - # see: https://github.com/confluentinc/schema-registry/issues/648 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://:9092,OUTSIDE://localhost:9094 # Replace localhost with your external address if Kafka should be reachable from external systems. - KAFKA_LISTENERS: PLAINTEXT://:9092,OUTSIDE://:9094 - KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT - KAFKA_ADVERTISED_HOST_NAME: kafka - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_MESSAGE_MAX_BYTES: 5000012 - KAFKA_FETCH_MESSAGE_MAX_BYTES: 5000012 - KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000 + # KRaft settings + - KAFKA_CFG_NODE_ID=0 + - KAFKA_CFG_PROCESS_ROLES=controller,broker + - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093 + # Listeners + - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,OUTSIDE://:9094 + - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,OUTSIDE://localhost:9094 + - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT + - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT + - KAFKA_CFG_MESSAGE_MAX_BYTES=5000012 + - KAFKA_CFG_FETCH_MESSAGE_MAX_BYTES=5000012 + - KAFKA_CFG_REPLICA_FETCH_MAX_BYTES=10000000 + - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER + diff --git a/installer/cli/deploy/standalone/kafka/docker-compose.yml b/installer/cli/deploy/standalone/kafka/docker-compose.yml index c3df0c7e0c..a4d689d2e1 100644 --- a/installer/cli/deploy/standalone/kafka/docker-compose.yml +++ b/installer/cli/deploy/standalone/kafka/docker-compose.yml @@ -15,24 +15,23 @@ services: kafka: - image: fogsyio/kafka:2.2.0 + image: bitnami/kafka:3.9.0 hostname: kafka - depends_on: - - zookeeper environment: - # see: https://github.com/confluentinc/schema-registry/issues/648 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://:9092 - KAFKA_LISTENERS: PLAINTEXT://:9092 - KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT - KAFKA_ADVERTISED_HOST_NAME: kafka - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_MESSAGE_MAX_BYTES: 5000012 - KAFKA_FETCH_MESSAGE_MAX_BYTES: 5000012 - KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000 + - KAFKA_CFG_NODE_ID=0 + - KAFKA_CFG_PROCESS_ROLES=controller,broker + - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093 + # Listeners + - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 + - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092 + - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT + - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT + - KAFKA_CFG_MESSAGE_MAX_BYTES=5000012 + - KAFKA_CFG_FETCH_MESSAGE_MAX_BYTES=5000012 + - KAFKA_CFG_REPLICA_FETCH_MAX_BYTES=10000000 + - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER volumes: - - kafka:/kafka - - /var/run/docker.sock:/var/run/docker.sock + - kafka3:/bitnami logging: driver: "json-file" options: @@ -42,7 +41,7 @@ services: spnet: volumes: - kafka: + kafka3: networks: spnet: diff --git a/installer/compose/docker-compose.full.yml b/installer/compose/docker-compose.full.yml index 2ad26d92cd..47d12fff2e 100644 --- a/installer/compose/docker-compose.full.yml +++ b/installer/compose/docker-compose.full.yml @@ -65,32 +65,22 @@ services: spnet: kafka: - image: fogsyio/kafka:2.2.0 + image: bitnami/kafka:3.9.0 hostname: kafka - depends_on: - - zookeeper environment: - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://:9092,OUTSIDE://localhost:9094 # Replace localhost with your external address if Kafka should be reachable from external systems. - KAFKA_LISTENERS: PLAINTEXT://:9092,OUTSIDE://:9094 - KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT - KAFKA_ADVERTISED_HOST_NAME: kafka - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_MESSAGE_MAX_BYTES: 5000012 - KAFKA_FETCH_MESSAGE_MAX_BYTES: 5000012 - KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000 + - KAFKA_CFG_NODE_ID=0 + - KAFKA_CFG_PROCESS_ROLES=controller,broker + - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093 + - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,OUTSIDE://:9094 + - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,OUTSIDE://localhost:9094 + - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT + - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT + - KAFKA_CFG_MESSAGE_MAX_BYTES=5000012 + - KAFKA_CFG_FETCH_MESSAGE_MAX_BYTES=5000012 + - KAFKA_CFG_REPLICA_FETCH_MAX_BYTES=10000000 + - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER volumes: - - kafka:/kafka - - /var/run/docker.sock:/var/run/docker.sock - logging: *default-logging - restart: unless-stopped - networks: - spnet: - - zookeeper: - image: fogsyio/zookeeper:3.4.13 - volumes: - - zookeeper:/opt/zookeeper-3.4.13 + - kafka3:/bitnami logging: *default-logging restart: unless-stopped networks: @@ -155,14 +145,11 @@ volumes: backend: connect: couchdb: - kafka: - zookeeper: + kafka3: influxdb: influxdb2: files: nginx: - - networks: spnet: diff --git a/installer/compose/docker-compose.quickstart.yml b/installer/compose/docker-compose.quickstart.yml index 175d931f0d..915c7901eb 100644 --- a/installer/compose/docker-compose.quickstart.yml +++ b/installer/compose/docker-compose.quickstart.yml @@ -65,32 +65,22 @@ services: spnet: kafka: - image: fogsyio/kafka:2.2.0 + image: bitnami/kafka:3.9.0 hostname: kafka - depends_on: - - zookeeper environment: - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://:9092,OUTSIDE://localhost:9094 # Replace localhost with your external address if Kafka should be reachable from external systems. - KAFKA_LISTENERS: PLAINTEXT://:9092,OUTSIDE://:9094 - KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT - KAFKA_ADVERTISED_HOST_NAME: kafka - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_MESSAGE_MAX_BYTES: 5000012 - KAFKA_FETCH_MESSAGE_MAX_BYTES: 5000012 - KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000 - volumes: - - kafka:/kafka - - /var/run/docker.sock:/var/run/docker.sock - logging: *default-logging - restart: unless-stopped - networks: - spnet: - - zookeeper: - image: fogsyio/zookeeper:3.4.13 + - KAFKA_CFG_NODE_ID=0 + - KAFKA_CFG_PROCESS_ROLES=controller,broker + - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093 + - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,OUTSIDE://:9094 + - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,OUTSIDE://localhost:9094 + - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT + - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT + - KAFKA_CFG_MESSAGE_MAX_BYTES=5000012 + - KAFKA_CFG_FETCH_MESSAGE_MAX_BYTES=5000012 + - KAFKA_CFG_REPLICA_FETCH_MAX_BYTES=10000000 + - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER volumes: - - zookeeper:/opt/zookeeper-3.4.13 + - kafka3:/bitnami logging: *default-logging restart: unless-stopped networks: @@ -141,8 +131,7 @@ volumes: backend: connect: couchdb: - kafka: - zookeeper: + kafka3: influxdb: influxdb2: files: diff --git a/installer/compose/docker-compose.yml b/installer/compose/docker-compose.yml index 6157972476..5c8a9e3c77 100644 --- a/installer/compose/docker-compose.yml +++ b/installer/compose/docker-compose.yml @@ -65,32 +65,22 @@ services: spnet: kafka: - image: fogsyio/kafka:2.2.0 + image: bitnami/kafka:3.9.0 hostname: kafka - depends_on: - - zookeeper environment: - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://:9092,OUTSIDE://localhost:9094 # Replace localhost with your external address if Kafka should be reachable from external systems. - KAFKA_LISTENERS: PLAINTEXT://:9092,OUTSIDE://:9094 - KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT - KAFKA_ADVERTISED_HOST_NAME: kafka - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_MESSAGE_MAX_BYTES: 5000012 - KAFKA_FETCH_MESSAGE_MAX_BYTES: 5000012 - KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000 - volumes: - - kafka:/kafka - - /var/run/docker.sock:/var/run/docker.sock - logging: *default-logging - restart: unless-stopped - networks: - spnet: - - zookeeper: - image: fogsyio/zookeeper:3.4.13 + - KAFKA_CFG_NODE_ID=0 + - KAFKA_CFG_PROCESS_ROLES=controller,broker + - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093 + - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,OUTSIDE://:9094 + - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,OUTSIDE://localhost:9094 + - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT + - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT + - KAFKA_CFG_MESSAGE_MAX_BYTES=5000012 + - KAFKA_CFG_FETCH_MESSAGE_MAX_BYTES=5000012 + - KAFKA_CFG_REPLICA_FETCH_MAX_BYTES=10000000 + - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER volumes: - - zookeeper:/opt/zookeeper-3.4.13 + - kafka3:/bitnami logging: *default-logging restart: unless-stopped networks: @@ -131,8 +121,7 @@ volumes: backend: connect: couchdb: - kafka: - zookeeper: + kafka3: influxdb: influxdb2: files: diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/DataExplorerWidgetModel.java b/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/DataExplorerWidgetModel.java index 02b244d22c..8e302ae163 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/DataExplorerWidgetModel.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/DataExplorerWidgetModel.java @@ -19,12 +19,14 @@ package org.apache.streampipes.model.datalake; import org.apache.streampipes.model.dashboard.DashboardEntity; +import org.apache.streampipes.model.shared.annotation.TsModel; import com.fasterxml.jackson.databind.annotation.JsonSerialize; import java.util.HashMap; import java.util.Map; +@TsModel public class DataExplorerWidgetModel extends DashboardEntity { private String widgetId; diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/grounding/KafkaTransportProtocol.java b/streampipes-model/src/main/java/org/apache/streampipes/model/grounding/KafkaTransportProtocol.java index 9fe5228abe..8af85bdd56 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/grounding/KafkaTransportProtocol.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/grounding/KafkaTransportProtocol.java @@ -20,12 +20,6 @@ public class KafkaTransportProtocol extends TransportProtocol { - private static final long serialVersionUID = -4067982203807146257L; - - private String zookeeperHost; - - private int zookeeperPort; - private int kafkaPort; private Integer lingerMs; @@ -44,24 +38,12 @@ public class KafkaTransportProtocol extends TransportProtocol { public KafkaTransportProtocol(String kafkaHost, int kafkaPort, String topic) { super(kafkaHost, new SimpleTopicDefinition(topic)); - this.zookeeperHost = kafkaHost; - this.zookeeperPort = kafkaPort; - this.kafkaPort = kafkaPort; - } - - public KafkaTransportProtocol(String kafkaHost, int kafkaPort, String topic, String zookeeperHost, - int zookeeperPort) { - super(kafkaHost, new SimpleTopicDefinition(topic)); - this.zookeeperHost = zookeeperHost; - this.zookeeperPort = zookeeperPort; this.kafkaPort = kafkaPort; } public KafkaTransportProtocol(KafkaTransportProtocol other) { super(other); this.kafkaPort = other.getKafkaPort(); - this.zookeeperHost = other.getZookeeperHost(); - this.zookeeperPort = other.getZookeeperPort(); this.acks = other.getAcks(); this.batchSize = other.getBatchSize(); this.groupId = other.getGroupId(); @@ -74,34 +56,12 @@ public KafkaTransportProtocol(KafkaTransportProtocol other) { public KafkaTransportProtocol(String kafkaHost, Integer kafkaPort, WildcardTopicDefinition wildcardTopicDefinition) { super(kafkaHost, wildcardTopicDefinition); this.kafkaPort = kafkaPort; - this.zookeeperHost = kafkaHost; - this.zookeeperPort = kafkaPort; } public KafkaTransportProtocol() { super(); } - public static long getSerialVersionUID() { - return serialVersionUID; - } - - public String getZookeeperHost() { - return zookeeperHost; - } - - public void setZookeeperHost(String zookeeperHost) { - this.zookeeperHost = zookeeperHost; - } - - public int getZookeeperPort() { - return zookeeperPort; - } - - public void setZookeeperPort(int zookeeperPort) { - this.zookeeperPort = zookeeperPort; - } - public int getKafkaPort() { return kafkaPort; } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ProtocolSelector.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ProtocolSelector.java index e1eb685f6d..e1eeb4f58f 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ProtocolSelector.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ProtocolSelector.java @@ -114,9 +114,7 @@ private TransportProtocol kafkaTopic() { return new KafkaTransportProtocol( messagingSettings.getKafkaHost(), messagingSettings.getKafkaPort(), - outputTopic, - messagingSettings.getZookeeperHost(), - messagingSettings.getZookeeperPort() + outputTopic ); } diff --git a/streampipes-pipeline-management/src/test/java/org/apache/streampipes/manager/matching/v2/TestUtils.java b/streampipes-pipeline-management/src/test/java/org/apache/streampipes/manager/matching/v2/TestUtils.java index 78d291a369..ddfee95cdc 100644 --- a/streampipes-pipeline-management/src/test/java/org/apache/streampipes/manager/matching/v2/TestUtils.java +++ b/streampipes-pipeline-management/src/test/java/org/apache/streampipes/manager/matching/v2/TestUtils.java @@ -25,7 +25,7 @@ public class TestUtils { public static TransportProtocol kafkaProtocol() { - return new KafkaTransportProtocol("localhost", 9092, "abc", "localhost", 2181); + return new KafkaTransportProtocol("localhost", 9092, "abc"); } public static TransportProtocol jmsProtocol() { diff --git a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/Protocols.java b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/Protocols.java index f9c25f11c8..f20e15f40f 100644 --- a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/Protocols.java +++ b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/Protocols.java @@ -36,7 +36,7 @@ public class Protocols { * containing URL and topic where data arrives. */ public static KafkaTransportProtocol kafka(String kafkaHost, Integer kafkaPort, String topic) { - return new KafkaTransportProtocol(kafkaHost, kafkaPort, topic, kafkaHost, kafkaPort); + return new KafkaTransportProtocol(kafkaHost, kafkaPort, topic); } /** diff --git a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts index 31f14e6b19..03013586f6 100644 --- a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts +++ b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts @@ -20,7 +20,7 @@ /* tslint:disable */ /* eslint-disable */ // @ts-nocheck -// Generated using typescript-generator version 3.2.1263 on 2025-01-25 23:02:05. +// Generated using typescript-generator version 3.2.1263 on 2025-02-14 21:48:17. export class NamedStreamPipesEntity implements Storable { '@class': @@ -1188,61 +1188,6 @@ export class DashboardModel implements Storable { } } -export class DashboardWidgetModel extends DashboardEntity { - dashboardWidgetSettings: DashboardWidgetSettings; - pipelineId: string; - visualizationName: string; - widgetId: string; - widgetType: string; - - static fromData( - data: DashboardWidgetModel, - target?: DashboardWidgetModel, - ): DashboardWidgetModel { - if (!data) { - return data; - } - const instance = target || new DashboardWidgetModel(); - super.fromData(data, instance); - instance.dashboardWidgetSettings = DashboardWidgetSettings.fromData( - data.dashboardWidgetSettings, - ); - instance.pipelineId = data.pipelineId; - instance.visualizationName = data.visualizationName; - instance.widgetId = data.widgetId; - instance.widgetType = data.widgetType; - return instance; - } -} - -export class DashboardWidgetSettings { - config: StaticPropertyUnion[]; - requiredSchema: EventSchema; - widgetDescription: string; - widgetIconName: string; - widgetLabel: string; - widgetName: string; - - static fromData( - data: DashboardWidgetSettings, - target?: DashboardWidgetSettings, - ): DashboardWidgetSettings { - if (!data) { - return data; - } - const instance = target || new DashboardWidgetSettings(); - instance.config = __getCopyArrayFn(StaticProperty.fromDataUnion)( - data.config, - ); - instance.requiredSchema = EventSchema.fromData(data.requiredSchema); - instance.widgetDescription = data.widgetDescription; - instance.widgetIconName = data.widgetIconName; - instance.widgetLabel = data.widgetLabel; - instance.widgetName = data.widgetName; - return instance; - } -} - export class DataExplorerWidgetModel extends DashboardEntity { baseAppearanceConfig: { [index: string]: any }; dataConfig: { [index: string]: any }; @@ -2208,8 +2153,6 @@ export class KafkaTransportProtocol extends TransportProtocol { 'maxRequestSize': string; 'messageMaxBytes': string; 'offset': string; - 'zookeeperHost': string; - 'zookeeperPort': number; static 'fromData'( data: KafkaTransportProtocol, @@ -2228,8 +2171,6 @@ export class KafkaTransportProtocol extends TransportProtocol { instance.maxRequestSize = data.maxRequestSize; instance.messageMaxBytes = data.messageMaxBytes; instance.offset = data.offset; - instance.zookeeperHost = data.zookeeperHost; - instance.zookeeperPort = data.zookeeperPort; return instance; } } @@ -4139,30 +4080,6 @@ export class UserInfo { } } -export class VisualizablePipeline { - pipelineId: string; - pipelineName: string; - schema: EventSchema; - topic: string; - visualizationName: string; - - static fromData( - data: VisualizablePipeline, - target?: VisualizablePipeline, - ): VisualizablePipeline { - if (!data) { - return data; - } - const instance = target || new VisualizablePipeline(); - instance.pipelineId = data.pipelineId; - instance.pipelineName = data.pipelineName; - instance.schema = EventSchema.fromData(data.schema); - instance.topic = data.topic; - instance.visualizationName = data.visualizationName; - return instance; - } -} - export class WildcardTopicDefinition extends TopicDefinition { '@class': 'org.apache.streampipes.model.grounding.WildcardTopicDefinition'; 'wildcardTopicMappings': WildcardTopicMapping[]; From 71776d93506cce2447206a1ed69a93b2660e80c5 Mon Sep 17 00:00:00 2001 From: Dominik Riemer Date: Sat, 15 Feb 2025 12:14:08 +0100 Subject: [PATCH 2/3] Let Kafka consumer wait until rebalancing is finished --- .../kafka/adapter/KafkaProtocol.java | 8 +- .../messaging/kafka/SpKafkaConsumer.java | 111 ++++++++++-------- 2 files changed, 60 insertions(+), 59 deletions(-) diff --git a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java index 0e00e94b01..ce10fcb759 100644 --- a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java +++ b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java @@ -78,7 +78,6 @@ public class KafkaProtocol implements StreamPipesAdapter, SupportsRuntimeConfig public static final String ID = "org.apache.streampipes.connect.iiot.protocol.stream.kafka"; - private Thread thread; private SpKafkaConsumer kafkaConsumer; public KafkaProtocol() { @@ -190,13 +189,9 @@ public void onAdapterStarted(IAdapterParameterExtractor extractor, protocol.setTopicDefinition(new SimpleTopicDefinition(config.getTopic())); this.kafkaConsumer = new SpKafkaConsumer(protocol, - config.getTopic(), - new BrokerEventProcessor(extractor.selectedParser(), collector), config.getConfigAppenders() ); - - thread = new Thread(this.kafkaConsumer); - thread.start(); + this.kafkaConsumer.connect(new BrokerEventProcessor(extractor.selectedParser(), collector)); } @Override @@ -215,7 +210,6 @@ public void onAdapterStopped(IAdapterParameterExtractor extractor, } LOG.info("Kafka Adapter was sucessfully stopped"); - thread.interrupt(); } @Override diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java index 6b8f1bbc23..8413452765 100644 --- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java +++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java @@ -41,6 +41,8 @@ import java.util.Collections; import java.util.List; import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; public class SpKafkaConsumer implements EventConsumer, Runnable, @@ -50,56 +52,25 @@ public class SpKafkaConsumer implements EventConsumer, Runnable, private InternalEventProcessor eventProcessor; private final KafkaTransportProtocol protocol; private volatile boolean isRunning; - private Boolean patternTopic = false; private List appenders = new ArrayList<>(); + private KafkaConsumer consumer; private static final Logger LOG = LoggerFactory.getLogger(SpKafkaConsumer.class); public SpKafkaConsumer(KafkaTransportProtocol protocol) { this.protocol = protocol; + this.topic = protocol.getTopicDefinition().getActualTopicName(); } public SpKafkaConsumer(KafkaTransportProtocol protocol, - String topic, - InternalEventProcessor eventProcessor) { - this.protocol = protocol; - this.topic = topic; - this.eventProcessor = eventProcessor; - this.isRunning = true; - } - - public SpKafkaConsumer(KafkaTransportProtocol protocol, - String topic, - InternalEventProcessor eventProcessor, List appenders) { - this(protocol, topic, eventProcessor); + this(protocol); this.appenders = appenders; } @Override public void run() { - - Properties props = makeProperties(protocol, appenders); - - LOG.info("Using kafka properties: {}", props.toString()); - KafkaConsumer consumer = new KafkaConsumer<>(props); - if (!patternTopic) { - consumer.subscribe(Collections.singletonList(topic)); - } else { - topic = replaceWildcardWithPatternFormat(topic); - consumer.subscribe(Pattern.compile(topic), new ConsumerRebalanceListener() { - @Override - public void onPartitionsRevoked(Collection partitions) { - // TODO - } - - @Override - public void onPartitionsAssigned(Collection partitions) { - // TODO - } - }); - } Duration duration = Duration.of(100, ChronoUnit.MILLIS); while (isRunning) { ConsumerRecords records = consumer.poll(duration); @@ -108,40 +79,76 @@ public void onPartitionsAssigned(Collection partitions) { consumer.close(); } - private String replaceWildcardWithPatternFormat(String topic) { - topic = topic.replaceAll("\\.", "\\\\."); - return topic.replaceAll("\\*", ".*"); - } - - private Properties makeProperties(KafkaTransportProtocol protocol, - List appenders) { - return new ConsumerConfigFactory(protocol).buildProperties(appenders); - } - @Override public void connect(InternalEventProcessor eventProcessor) throws SpRuntimeException { - LOG.info("Kafka consumer: Connecting to " + protocol.getTopicDefinition().getActualTopicName()); - if (protocol.getTopicDefinition() instanceof WildcardTopicDefinition) { - this.patternTopic = true; - } + LOG.info("Kafka consumer: Connecting to {}", protocol.getTopicDefinition().getActualTopicName()); + var patternTopic = isPatternTopic(); this.eventProcessor = eventProcessor; - - this.topic = protocol.getTopicDefinition().getActualTopicName(); this.isRunning = true; + Properties props = makeProperties(protocol, appenders); + + consumer = new KafkaConsumer<>(props); + var latch = new CountDownLatch(1); + if (!patternTopic) { + consumer.subscribe(Collections.singletonList(topic), new RebalanceListener(latch)); + } else { + topic = replaceWildcardWithPatternFormat(topic); + consumer.subscribe(Pattern.compile(topic), new RebalanceListener(latch)); + } Thread thread = new Thread(this); thread.start(); + try { + if (!latch.await(10, TimeUnit.SECONDS)) { + throw new SpRuntimeException("Timeout while waiting for partition assignment"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SpRuntimeException("Interrupted while waiting for partition assignment", e); + } } @Override public void disconnect() throws SpRuntimeException { - LOG.info("Kafka consumer: Disconnecting from " + topic); + LOG.info("Kafka consumer: Disconnecting from {}", topic); this.isRunning = false; - } @Override public boolean isConnected() { return isRunning; } + + private boolean isPatternTopic() { + return this.protocol.getTopicDefinition() instanceof WildcardTopicDefinition; + } + + private String replaceWildcardWithPatternFormat(String topic) { + topic = topic.replaceAll("\\.", "\\\\."); + return topic.replaceAll("\\*", ".*"); + } + + private Properties makeProperties(KafkaTransportProtocol protocol, + List appenders) { + return new ConsumerConfigFactory(protocol).buildProperties(appenders); + } + + private class RebalanceListener implements ConsumerRebalanceListener { + + private final CountDownLatch latch; + public RebalanceListener(CountDownLatch latch) { + this.latch = latch; + } + + @Override + public void onPartitionsRevoked(Collection collection) { + consumer.pause(collection); + } + + @Override + public void onPartitionsAssigned(Collection partitions) { + consumer.resume(partitions); + latch.countDown(); + } + } } From cf41d5ceb08635c496a80887a44ab16d3a359267 Mon Sep 17 00:00:00 2001 From: Dominik Riemer Date: Mon, 17 Feb 2025 23:38:37 +0100 Subject: [PATCH 3/3] Update helm charts --- installer/k8s/README.md | 16 +--- .../templates/core/backend-deployment.yaml | 7 -- .../external/kafka/kafka-deployment.yaml | 44 +++++------ .../zookeeper/zookeeper-deployment.yaml | 74 ------------------- .../external/zookeeper/zookeeper-pvc.yaml | 44 ----------- .../external/zookeeper/zookeeper-service.yaml | 29 -------- installer/k8s/values.yaml | 12 --- 7 files changed, 23 insertions(+), 203 deletions(-) delete mode 100644 installer/k8s/templates/external/zookeeper/zookeeper-deployment.yaml delete mode 100644 installer/k8s/templates/external/zookeeper/zookeeper-pvc.yaml delete mode 100644 installer/k8s/templates/external/zookeeper/zookeeper-service.yaml diff --git a/installer/k8s/README.md b/installer/k8s/README.md index a093c40d11..7693cd2667 100644 --- a/installer/k8s/README.md +++ b/installer/k8s/README.md @@ -225,7 +225,7 @@ rm -rf ${HOME}/streampipes-k8s | Parameter Name | Description | Value | |---------------------------------------------|------------------------------------------------------------------------------------------|-------------| | external.kafka.appName | Kafka application name | "kafka" | -| external.kafka.version | Kafka version | 2.2.0 | +| external.kafka.version | Kafka version | 3.9.0 | | external.kafka.port | Port for the Kafka service | 9092 | | external.kafka.external.hostname | Name which will be advertised to external clients. Clients which use (default) port 9094 | "localhost" | | external.kafka.service.name | Name of the Kafka service | "kafka" | @@ -237,20 +237,6 @@ rm -rf ${HOME}/streampipes-k8s | external.kafka.persistence.pvName | Name of the Kafka PersistentVolume | "kafka-pv" | | -#### Zookeeper common parameters - -| Parameter Name | Description | Value | -|-------------------------------------------------|---------------------------------------------|-----------------| -| external.zookeeper.appName | ZooKeeper application name | "zookeeper" | -| external.zookeeper.version | ZooKeeper version | 3.4.13 | -| external.zookeeper.port | Port for the ZooKeeper service | 2181 | -| external.zookeeper.service.name | Name of the ZooKeeper service | "zookeeper" | -| external.zookeeper.service.port | TargetPort of the ZooKeeper service | 2181 | -| external.zookeeper.persistence.storageClassName | Storage class name for ZooKeeper PVs | "hostpath" | -| external.zookeeper.persistence.storageSize | Size of the ZooKeeper PV | "1Gi" | -| external.zookeeper.persistence.claimName | Name of the ZooKeeper PersistentVolumeClaim | "zookeeper-pvc" | -| external.zookeeper.persistence.pvName | Name of the ZooKeeper PersistentVolume | "zookeeper-pv" | - #### Pulsar common parameters | Parameter Name | Description | Value | diff --git a/installer/k8s/templates/core/backend-deployment.yaml b/installer/k8s/templates/core/backend-deployment.yaml index 96d09ee9c1..8103e20ac6 100644 --- a/installer/k8s/templates/core/backend-deployment.yaml +++ b/installer/k8s/templates/core/backend-deployment.yaml @@ -59,13 +59,6 @@ spec: value: "{{ .Values.external.kafka.service.name }}" - name: SP_KAFKA_PORT value: "{{ .Values.external.kafka.service.port }}" - - name: SP_ZOOKEEPER_HOST - value: "{{ .Values.external.zookeeper.service.name }}" - - name: SP_ZOOKEEPER_PORT - value: "{{ .Values.external.zookeeper.service.port }}" - {{- end }} - {{- if eq .Values.preferredBroker "nats" }} - value: "nats" - name: SP_NATS_HOST value: "{{ .Values.external.nats.service.name }}" - name: SP_NATS_PORT diff --git a/installer/k8s/templates/external/kafka/kafka-deployment.yaml b/installer/k8s/templates/external/kafka/kafka-deployment.yaml index 76ac96f049..659d56d148 100644 --- a/installer/k8s/templates/external/kafka/kafka-deployment.yaml +++ b/installer/k8s/templates/external/kafka/kafka-deployment.yaml @@ -29,17 +29,13 @@ spec: app: {{ .Values.external.kafka.appName }} spec: restartPolicy: {{ .Values.restartPolicy }} - initContainers: - - name: init-wait - image: alpine - command: ["sh", "-c", "for i in $(seq 1 300); do nc -zvw1 {{ .Values.external.zookeeper.service.name }} {{ .Values.external.zookeeper.service.port }} && exit 0 || sleep 3; done; exit 1"] volumes: - name: {{ .Values.external.kafka.persistence.pvName }} persistentVolumeClaim: claimName: {{ .Values.external.kafka.persistence.claimName }} containers: - name: {{ .Values.external.kafka.appName }} - image: fogsyio/kafka:{{ .Values.external.kafka.version }} + image: bitnami/kafka:{{ .Values.external.kafka.version }} imagePullPolicy: {{ .Values.pullPolicy }} ports: - containerPort: {{ .Values.external.kafka.port }} @@ -47,25 +43,29 @@ spec: - mountPath: "/kafka" name: {{ .Values.external.kafka.persistence.pvName }} env: - # Known issue with kafka running in kubernetes: - # https://github.com/wurstmeister/kafka-docker/issues/122 - - name: KAFKA_PORT - value: "{{ .Values.external.kafka.port }}" - - name: KAFKA_ZOOKEEPER_CONNECT - value: "zookeeper:{{ .Values.external.zookeeper.port }}" - - name: KAFKA_LISTENERS - value: "PLAINTEXT://:{{ .Values.external.kafka.port }},OUTSIDE://:9094" - - name: KAFKA_ADVERTISED_LISTENERS - value: "PLAINTEXT://kafka:{{ .Values.external.kafka.port }},OUTSIDE://{{ .Values.external.kafka.external.hostname }}:9094" - - name: KAFKA_INTER_BROKER_LISTENER_NAME + - name: KAFKA_CFG_NODE_ID + value: "0" + - name: KAFKA_CFG_PROCESS_ROLES + value: "controller,broker" + - name: KAFKA_CFG_CONTROLLER_QUORUM_VOTERS + value: "0@kafka:9093" + - name: KAFKA_CFG_LISTENERS + value: "PLAINTEXT://:{{ .Values.external.kafka.port }},CONTROLLER://:9093,OUTSIDE://:9094" + - name: KAFKA_CFG_ADVERTISED_LISTENERS + value: "PLAINTEXT://{{ .Values.external.kafka.port }}:9092,OUTSIDE://{{ .Values.external.kafka.external.hostname }}:9094" + - name: KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP + value: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT" + - name: KAFKA_CFG_INTER_BROKER_LISTENER_NAME value: "PLAINTEXT" - - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP - value: "PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT" - - name: KAFKA_MESSAGE_MAX_BYTES + - name: KAFKA_CFG_CONTROLLER_LISTENER_NAMES + value: "CONTROLLER" + - name: KAFKA_CFG_PORT + value: "{{ .Values.external.kafka.port }}" + - name: KAFKA_CFG_MESSAGE_MAX_BYTES value: "5000012" - - name: KAFKA_FETCH_MESSAGE_MAX_BYTES + - name: KAFKA_CFG_FETCH_MESSAGE_MAX_BYTES value: "5000012" - - name: KAFKA_REPLICA_FETCH_MAX_BYTES + - name: KAFKA_CFG_REPLICA_FETCH_MAX_BYTES value: "10000000" livenessProbe: tcpSocket: @@ -85,4 +85,4 @@ spec: initialDelaySeconds: {{ .Values.initialDelaySeconds }} periodSeconds: {{ .Values.periodSeconds }} failureThreshold: {{ .Values.failureThreshold }} -{{- end }} \ No newline at end of file +{{- end }} diff --git a/installer/k8s/templates/external/zookeeper/zookeeper-deployment.yaml b/installer/k8s/templates/external/zookeeper/zookeeper-deployment.yaml deleted file mode 100644 index 07314cdeb3..0000000000 --- a/installer/k8s/templates/external/zookeeper/zookeeper-deployment.yaml +++ /dev/null @@ -1,74 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -{{- if eq .Values.preferredBroker "kafka" }} -apiVersion: apps/v1 -kind: Deployment -metadata: - name: {{ .Values.external.zookeeper.appName }} -spec: - selector: - matchLabels: - app: {{ .Values.external.zookeeper.appName }} - replicas: 1 - template: - metadata: - labels: - app: {{ .Values.external.zookeeper.appName }} - spec: - restartPolicy: {{ .Values.restartPolicy }} - volumes: - - name: {{ .Values.external.zookeeper.persistence.pvName }} - persistentVolumeClaim: - claimName: {{ .Values.external.zookeeper.persistence.claimName }} - containers: - #TODO: wurstmeister/zookeeper:latest is running ZK 3.4.13. Once this - # changes, the mount path needs to be adapted - - name: {{ .Values.external.zookeeper.appName }} - image: fogsyio/zookeeper:{{ .Values.external.zookeeper.version }} - imagePullPolicy: {{ .Values.pullPolicy }} - ports: - - containerPort: {{ .Values.external.zookeeper.port }} - volumeMounts: - - mountPath: "/opt/zookeeper-{{ .Values.external.zookeeper.version }}/data" - name: {{ .Values.external.zookeeper.persistence.pvName }} - livenessProbe: - exec: - command: - - sh - - -c - - echo ruok | nc localhost {{ .Values.external.zookeeper.port }} - initialDelaySeconds: {{ .Values.initialDelaySeconds }} - periodSeconds: {{ .Values.periodSeconds }} - failureThreshold: {{ .Values.failureThreshold }} - readinessProbe: - exec: - command: - - sh - - -c - - echo ruok | nc localhost {{ .Values.external.zookeeper.port }} - initialDelaySeconds: {{ .Values.initialDelaySeconds }} - periodSeconds: {{ .Values.periodSeconds }} - failureThreshold: {{ .Values.failureThreshold }} - startupProbe: - exec: - command: - - sh - - -c - - echo ruok | nc localhost {{ .Values.external.zookeeper.port }} - initialDelaySeconds: {{ .Values.initialDelaySeconds }} - periodSeconds: {{ .Values.periodSeconds }} - failureThreshold: {{ .Values.failureThreshold }} -{{- end }} \ No newline at end of file diff --git a/installer/k8s/templates/external/zookeeper/zookeeper-pvc.yaml b/installer/k8s/templates/external/zookeeper/zookeeper-pvc.yaml deleted file mode 100644 index 465963d6e8..0000000000 --- a/installer/k8s/templates/external/zookeeper/zookeeper-pvc.yaml +++ /dev/null @@ -1,44 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -{{- if eq .Values.preferredBroker "kafka" }} -apiVersion: v1 -kind: PersistentVolume -metadata: - name: {{ .Values.external.zookeeper.persistence.pvName }} -spec: - storageClassName: {{ .Values.external.zookeeper.persistence.storageClassName }} - capacity: - storage: {{ .Values.external.zookeeper.persistence.storageSize }} - accessModes: - - {{ .Values.persistentVolumeAccessModes }} - persistentVolumeReclaimPolicy: {{ .Values.persistentVolumeReclaimPolicy }} - hostPath: - path: {{ .Values.hostPath }}/zookeeper ---- -apiVersion: v1 -kind: PersistentVolumeClaim -metadata: - labels: - app: {{ .Values.external.zookeeper.appName }} - name: {{ .Values.external.zookeeper.persistence.claimName }} -spec: - storageClassName: {{ .Values.external.zookeeper.persistence.storageClassName }} - accessModes: - - {{ .Values.persistentVolumeAccessModes }} - resources: - requests: - storage: {{ .Values.external.zookeeper.persistence.storageSize }} -{{- end }} \ No newline at end of file diff --git a/installer/k8s/templates/external/zookeeper/zookeeper-service.yaml b/installer/k8s/templates/external/zookeeper/zookeeper-service.yaml deleted file mode 100644 index 492d0558ea..0000000000 --- a/installer/k8s/templates/external/zookeeper/zookeeper-service.yaml +++ /dev/null @@ -1,29 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -{{- if eq .Values.preferredBroker "kafka" }} -apiVersion: v1 -kind: Service -metadata: - name: {{ .Values.external.zookeeper.service.name }} -spec: - selector: - app: {{ .Values.external.zookeeper.appName }} - ports: - - name: main - protocol: TCP - port: {{ .Values.external.zookeeper.port }} - targetPort: {{ .Values.external.zookeeper.port }} -{{- end }} \ No newline at end of file diff --git a/installer/k8s/values.yaml b/installer/k8s/values.yaml index 68b7e2e95c..e6601bf64d 100644 --- a/installer/k8s/values.yaml +++ b/installer/k8s/values.yaml @@ -152,18 +152,6 @@ external: storageSize: "1Gi" claimName: "kafka-pvc" pvName: "kafka-pv" - zookeeper: - appName: "zookeeper" - version: 3.4.13 - port: 2181 - service: - name: "zookeeper" - port: 2181 - persistence: - storageClassName: "hostpath" - storageSize: "1Gi" - claimName: "zookeeper-pvc" - pvName: "zookeeper-pv" pulsar: appName: "pulsar" version: 3.0.0