Skip to content

Commit 99ae719

Browse files
committed
[ISSUE apache#420]remove openmessage-runtime dependency
1 parent e5fa6c3 commit 99ae719

File tree

8 files changed

+104
-87
lines changed

8 files changed

+104
-87
lines changed

rocketmq-connect-kafka/README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
**启动Connector**
1717

18-
http://127.0.0.1:8081/connectors/connector-name?config={"connector-class":"org.apache.rocketmq.connect.kafka.connector.KafkaSourceConnector","oms-driver-url":"oms: rocketmq://127.0.0.1:9876/default:default","tasks.num":"1","kafka.topics":"test1,test2","kafka.group.id":"group0","kafka.bootstrap.server":"127.0.0.1:9092","source-record-converter":"io.openmessaging.connect.runtime.converter.JsonConverter"}
18+
http://127.0.0.1:8081/connectors/connector-name?config={"connector-class":"org.apache.rocketmq.connect.kafka.connector.KafkaSourceConnector","tasks.num":"1","kafka.topics":"test1,test2","kafka.group.id":"group0","kafka.bootstrap.server":"127.0.0.1:9092","source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"}
1919

2020
**查看Connector运行状态**
2121

rocketmq-connect-kafka/pom.xml

-5
Original file line numberDiff line numberDiff line change
@@ -156,11 +156,6 @@
156156
<scope>test</scope>
157157
</dependency>
158158
-->
159-
<dependency>
160-
<groupId>io.openmessaging</groupId>
161-
<artifactId>openmessaging-connect-runtime</artifactId>
162-
<version>0.0.1-SNAPSHOT</version>
163-
</dependency>
164159
<dependency>
165160
<groupId>io.openmessaging</groupId>
166161
<artifactId>openmessaging-connector</artifactId>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.rocketmq.connect.kafka.config;
19+
20+
import java.util.*;
21+
22+
public class ConfigDefine {
23+
24+
public static String TASK_NUM = "tasks.num";
25+
public static String TOPICS = "kafka.topics";
26+
public static String GROUP_ID = "kafka.group.id";
27+
public static String BOOTSTRAP_SERVER = "kafka.bootstrap.server";
28+
public static String CONNECTOR_CLASS = "connector-class";
29+
public static String SOURCE_RECORD_CONVERTER = "source-record-converter";
30+
public static String ROCKETMQ_TOPIC = "rocketmq.topic";
31+
32+
private String bootstrapServers;
33+
private String topics;
34+
private String groupId;
35+
36+
public String getTopics() {
37+
return topics;
38+
}
39+
40+
public void setTopics(String topics) {
41+
this.topics = topics;
42+
}
43+
44+
public String getBootstrapServers() {
45+
return bootstrapServers;
46+
}
47+
48+
public void setBootstrapServers(String bootstrapServers) {
49+
this.bootstrapServers = bootstrapServers;
50+
}
51+
52+
public String getGroupId() {
53+
return groupId;
54+
}
55+
56+
public void setGroupId(String groupId) {
57+
this.groupId = groupId;
58+
}
59+
60+
public static final Set<String> REQUEST_CONFIG = new HashSet<String>(){
61+
{
62+
add(TOPICS);
63+
add(GROUP_ID);
64+
add(BOOTSTRAP_SERVER);
65+
}
66+
};
67+
}

rocketmq-connect-kafka/src/main/java/org/apache/rocketmq/connect/kafka/Config.java rocketmq-connect-kafka/src/main/java/org/apache/rocketmq/connect/kafka/config/ConfigUtil.java

+6-52
Original file line numberDiff line numberDiff line change
@@ -14,62 +14,20 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
18-
package org.apache.rocketmq.connect.kafka;
17+
package org.apache.rocketmq.connect.kafka.config;
1918

2019
import io.openmessaging.KeyValue;
21-
import java.lang.reflect.Method;
22-
import java.util.*;
23-
24-
public class Config {
25-
26-
public static String TASK_NUM = "tasks.num";
27-
public static String TOPICS = "kafka.topics";
28-
public static String GROUP_ID = "kafka.group.id";
29-
public static String BOOTSTRAP_SERVER = "kafka.bootstrap.server";
30-
public static String ROCKETMQ_TOPIC = "rocketmq.topic";
31-
32-
private String bootstrapServers;
33-
private String topics;
34-
private String groupId;
3520

36-
public String getTopics() {
37-
return topics;
38-
}
39-
40-
public void setTopics(String topics) {
41-
this.topics = topics;
42-
}
43-
44-
public String getBootstrapServers() {
45-
return bootstrapServers;
46-
}
21+
import java.lang.reflect.Method;
4722

48-
public void setBootstrapServers(String bootstrapServers) {
49-
this.bootstrapServers = bootstrapServers;
50-
}
23+
public class ConfigUtil {
5124

52-
public String getGroupId() {
53-
return groupId;
54-
}
25+
public static <T> void load(KeyValue props, Object object) {
5526

56-
public void setGroupId(String groupId) {
57-
this.groupId = groupId;
27+
properties2Object(props, object);
5828
}
5929

60-
public static final Set<String> REQUEST_CONFIG = new HashSet<String>(){
61-
{
62-
add(TOPICS);
63-
add(GROUP_ID);
64-
add(BOOTSTRAP_SERVER);
65-
}
66-
};
67-
68-
public void load(KeyValue props) {
69-
properties2Object(props, this);
70-
}
71-
72-
private void properties2Object(final KeyValue p, final Object object) {
30+
private static <T> void properties2Object(final KeyValue p, final Object object) {
7331

7432
Method[] methods = object.getClass().getMethods();
7533
for (Method method : methods) {
@@ -109,8 +67,4 @@ private void properties2Object(final KeyValue p, final Object object) {
10967
}
11068
}
11169
}
112-
113-
public static Set<String> getRequestConfig() {
114-
return REQUEST_CONFIG;
115-
}
11670
}

rocketmq-connect-kafka/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnector.java

+18-17
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,17 @@
1818
package org.apache.rocketmq.connect.kafka.connector;
1919

2020
import io.openmessaging.KeyValue;
21-
import io.openmessaging.connect.runtime.common.ConnectKeyValue;
22-
import io.openmessaging.connect.runtime.config.RuntimeConfigDefine;
2321
import io.openmessaging.connector.api.Task;
2422
import io.openmessaging.connector.api.source.SourceConnector;
25-
import org.apache.rocketmq.connect.kafka.Config;
23+
import io.openmessaging.internal.DefaultKeyValue;
24+
import org.apache.rocketmq.connect.kafka.config.ConfigDefine;
2625
import org.slf4j.Logger;
2726
import org.slf4j.LoggerFactory;
2827

2928
import java.util.ArrayList;
3029
import java.util.List;
3130

32-
public class KafkaSourceConnector extends SourceConnector{
31+
public class KafkaSourceConnector extends SourceConnector {
3332
private static final Logger log = LoggerFactory.getLogger(KafkaSourceConnector.class);
3433

3534
private KeyValue connectConfig;
@@ -42,12 +41,12 @@ public KafkaSourceConnector() {
4241
public String verifyAndSetConfig(KeyValue config) {
4342

4443
log.info("KafkaSourceConnector verifyAndSetConfig enter");
45-
for ( String key : config.keySet()) {
44+
for (String key : config.keySet()) {
4645
log.info("connector verifyAndSetConfig: key:{}, value:{}", key, config.getString(key));
4746
}
4847

49-
for(String requestKey : Config.REQUEST_CONFIG){
50-
if(!config.containsKey(requestKey)){
48+
for (String requestKey : ConfigDefine.REQUEST_CONFIG) {
49+
if (!config.containsKey(requestKey)) {
5150
return "Request Config key: " + requestKey;
5251
}
5352
}
@@ -82,20 +81,22 @@ public Class<? extends Task> taskClass() {
8281

8382
@Override
8483
public List<KeyValue> taskConfigs() {
84+
if (connectConfig == null) {
85+
return new ArrayList<KeyValue>();
86+
}
8587

8688
log.info("Source Connector taskConfigs enter");
8789
List<KeyValue> configs = new ArrayList<>();
88-
int task_num = connectConfig.getInt(Config.TASK_NUM);
90+
int task_num = connectConfig.getInt(ConfigDefine.TASK_NUM);
8991
log.info("Source Connector taskConfigs: task_num:" + task_num);
90-
for (int i=0; i < task_num; ++i) {
91-
KeyValue config = new ConnectKeyValue();
92-
config.put(Config.BOOTSTRAP_SERVER, connectConfig.getString(Config.BOOTSTRAP_SERVER));
93-
config.put(Config.TOPICS, connectConfig.getString(Config.TOPICS));
94-
config.put(Config.GROUP_ID, connectConfig.getString(Config.GROUP_ID));
95-
96-
config.put(RuntimeConfigDefine.CONNECTOR_CLASS, connectConfig.getString(RuntimeConfigDefine.CONNECTOR_CLASS));
97-
config.put(RuntimeConfigDefine.SOURCE_RECORD_CONVERTER, connectConfig.getString(RuntimeConfigDefine.SOURCE_RECORD_CONVERTER));
98-
config.put(RuntimeConfigDefine.OMS_DRIVER_URL, connectConfig.getString(RuntimeConfigDefine.OMS_DRIVER_URL));
92+
for (int i = 0; i < task_num; ++i) {
93+
KeyValue config = new DefaultKeyValue();
94+
config.put(ConfigDefine.BOOTSTRAP_SERVER, connectConfig.getString(ConfigDefine.BOOTSTRAP_SERVER));
95+
config.put(ConfigDefine.TOPICS, connectConfig.getString(ConfigDefine.TOPICS));
96+
config.put(ConfigDefine.GROUP_ID, connectConfig.getString(ConfigDefine.GROUP_ID));
97+
98+
config.put(ConfigDefine.CONNECTOR_CLASS, connectConfig.getString(ConfigDefine.CONNECTOR_CLASS));
99+
config.put(ConfigDefine.SOURCE_RECORD_CONVERTER, connectConfig.getString(ConfigDefine.SOURCE_RECORD_CONVERTER));
99100
configs.add(config);
100101
}
101102
return configs;

rocketmq-connect-kafka/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import io.openmessaging.connector.api.source.SourceTask;
2424
import org.apache.kafka.clients.consumer.*;
2525
import org.apache.kafka.common.TopicPartition;
26-
import org.apache.rocketmq.connect.kafka.Config;
26+
import org.apache.rocketmq.connect.kafka.config.ConfigDefine;
2727
import org.slf4j.Logger;
2828
import org.slf4j.LoggerFactory;
2929

@@ -97,16 +97,16 @@ public void start(KeyValue taskConfig) {
9797
this.currentTPList = new ArrayList<>();
9898
this.config = taskConfig;
9999
Properties props = new Properties();
100-
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.config.getString(Config.BOOTSTRAP_SERVER));
101-
props.put(ConsumerConfig.GROUP_ID_CONFIG, this.config.getString(Config.GROUP_ID));
100+
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.config.getString(ConfigDefine.BOOTSTRAP_SERVER));
101+
props.put(ConsumerConfig.GROUP_ID_CONFIG, this.config.getString(ConfigDefine.GROUP_ID));
102102
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
103103
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
104104
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteBufferDeserializer");
105105
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteBufferDeserializer");
106106

107107
this.consumer = new KafkaConsumer<>(props);
108108

109-
String topics = this.config.getString(Config.TOPICS);
109+
String topics = this.config.getString(ConfigDefine.TOPICS);
110110
for (String topic : topics.split(",")) {
111111
if (!topic.isEmpty()) {
112112
topicList.add(topic);

rocketmq-connect-kafka/src/main/resources/connect-kafka-source.properties

-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515

1616
name=rocketmq-connect-kafka
1717
connector-class=org.apache.rocketmq.connect.kafka.connector.KafkaSourceConnector
18-
oms-driver-url=oms:rocketmq://101.132.96.164:9876/default:default
1918
source-record-converter=io.openmessaging.connect.runtime.converter.JsonConverter
2019
task.num=2
2120
kafka.bootstrap.server=47.112.213.204:9092;47.112.213.204:9092

rocketmq-connect-kafka/src/test/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnectorTest.java

+8-7
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
import io.openmessaging.KeyValue;
2121
import io.openmessaging.internal.DefaultKeyValue;
22-
import org.apache.rocketmq.connect.kafka.Config;
22+
import org.apache.rocketmq.connect.kafka.config.ConfigDefine;
2323
import org.junit.Test;
2424

2525
import static org.junit.Assert.assertEquals;
@@ -31,26 +31,27 @@ public class KafkaSourceConnectorTest {
3131
public void verifyAndSetConfigTest() {
3232
KeyValue keyValue = new DefaultKeyValue();
3333

34-
for (String requestKey : Config.REQUEST_CONFIG) {
35-
assertEquals(connector.verifyAndSetConfig(keyValue), "Request config key: " + requestKey);
34+
for (String requestKey : ConfigDefine.REQUEST_CONFIG) {
35+
assertEquals(connector.verifyAndSetConfig(keyValue), "Request Config key: " + requestKey);
3636
keyValue.put(requestKey, requestKey);
3737
}
3838
assertEquals(connector.verifyAndSetConfig(keyValue), "");
3939
}
4040

4141
@Test
4242
public void taskClassTest() {
43-
assertEquals(connector.taskClass(), KafkaSourceConnector.class);
43+
assertEquals(connector.taskClass(), KafkaSourceTask.class);
4444
}
4545

4646
@Test
4747
public void taskConfigsTest() {
48-
assertEquals(connector.taskConfigs().get(0), null);
48+
assertEquals(connector.taskConfigs().size(), 0);
4949
KeyValue keyValue = new DefaultKeyValue();
50-
for (String requestKey : Config.REQUEST_CONFIG) {
50+
for (String requestKey : ConfigDefine.REQUEST_CONFIG) {
5151
keyValue.put(requestKey, requestKey);
5252
}
53+
keyValue.put(ConfigDefine.TASK_NUM,1);
5354
connector.verifyAndSetConfig(keyValue);
54-
assertEquals(connector.taskConfigs().get(0), keyValue);
55+
assertEquals(connector.taskConfigs().get(0).getString(ConfigDefine.TOPICS), keyValue.getString(ConfigDefine.TOPICS));
5556
}
5657
}

0 commit comments

Comments
 (0)