Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dddddd #1221

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open

dddddd #1221

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
<main.basedir>${project.basedir}</main.basedir>
<spark.2_2.version>2.2.0</spark.2_2.version>
<spark.2_3.version>2.3.3</spark.2_3.version>
<spark.2_4.version>2.4.3</spark.2_4.version>
<spark.2_4.version>2.4.0</spark.2_4.version>
<ums.version>1.3</ums.version>
<project.version>0.6.3</project.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
Expand Down Expand Up @@ -92,9 +92,9 @@
<fastjson.version>1.2.58</fastjson.version>
<hikariCP.version>3.3.1</hikariCP.version>
<hash.version>0.9</hash.version>
<spark.extension.version>2.2</spark.extension.version>
<!--<json4s.version>3.5.3</json4s.version>-->
<json4s.version>3.2.11</json4s.version>
<spark.extension.version>2.4</spark.extension.version>
<json4s.version>3.5.3</json4s.version>
<!-- <json4s.version>3.2.11</json4s.version>-->

<guava.version>19.0</guava.version>
<sonar.core.codeCoveragePlugin>jacoco</sonar.core.codeCoveragePlugin>
Expand Down
20 changes: 10 additions & 10 deletions rider/conf/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ akka.http.server.request-timeout = 120s

wormholeServer {
cluster.id = "" #optional global uuid
host = "localhost"
host = "master"
port = 8989
ui.default.language = "Chinese"
token.timeout = 1
Expand All @@ -18,8 +18,8 @@ mysql = {
db = {
driver = "com.mysql.jdbc.Driver"
user = "root"
password = "root"
url = "jdbc:mysql://localhost:3306/wormhole?useUnicode=true&characterEncoding=UTF-8&useSSL=false"
password = "Your@pwd123"
url = "jdbc:mysql://master:3306/wormhole?useUnicode=true&characterEncoding=UTF-8&useSSL=false"
numThreads = 4
minConnections = 4
maxConnections = 10
Expand All @@ -42,11 +42,11 @@ ldap = {
}

spark = {
spark.home = "/usr/local/spark"
spark.home = "/usr"
yarn.queue.name = "default" #WormholeServer submit spark streaming/job queue
wormhole.hdfs.root.path = "hdfs://nn1/wormhole" #WormholeServer hdfslog data default hdfs root path
yarn.rm1.http.url = "localhost:8088" #Yarn ActiveResourceManager address
yarn.rm2.http.url = "localhost:8088" #Yarn StandbyResourceManager address
wormhole.hdfs.root.path = "hdfs://master:8020/wormhole" #WormholeServer hdfslog data default hdfs root path
yarn.rm1.http.url = "slave01:8088" #Yarn ActiveResourceManager address
#yarn.rm2.http.url = "slave01:8088" #Yarn StandbyResourceManager address
#yarn.web-proxy.port = 8888 #Yarn web proxy port, just set if yarn service set yarn.web-proxy.address config
}

Expand All @@ -64,13 +64,13 @@ flink = {
}

zookeeper = {
connection.url = "localhost:2181" #WormholeServer stream and flow interaction channel
connection.url = "slave01:2181,master:2181,slave02:2181" #WormholeServer stream and flow interaction channel
wormhole.root.path = "/wormhole" #zookeeper
}

kafka = {
brokers.url = "localhost:6667" #WormholeServer feedback data store
zookeeper.url = "localhost:2181"
brokers.url = "master:9092,slave01:9092,slave02:9092" #WormholeServer feedback data store
zookeeper.url = "slave01:2181,master:2181,slave02:2181"
topic.refactor = 3
using.cluster.suffix = false #if true, _${cluster.id} will be concatenated to consumer.feedback.topic
consumer = {
Expand Down
2 changes: 1 addition & 1 deletion rider/conf/log4j.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
log4j.rootLogger=INFO, FILE
log4j.rootLogger=INFO, FILE, CONSOLE
## for console
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ case class Monitor(databaseType: String)
object RiderConfig {

lazy val riderRootPath = s"${System.getProperty("WORMHOLE_HOME")}"
//lazy val riderRootPath ="D:\\workspaces\\gaoji\\wormhole\\rider"

lazy val riderServer = RiderServer(
getStringConfig("wormholeServer.cluster.id", ""),
Expand Down
3 changes: 3 additions & 0 deletions sparkextension/spark_extension_2_4/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
</properties>

<dependencies>

<dependency>
<groupId>edp.wormhole</groupId>
<artifactId>wormhole-hadoop</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
Expand All @@ -48,6 +50,7 @@
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
Expand Down
4 changes: 4 additions & 0 deletions sparkx/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</exclusion>
<exclusion>
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
</exclusion>
</exclusions>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import scala.collection.mutable
object BatchflowDirective extends Directive {

private def registerFlowStartDirective(flowDirectiveConfig: FlowDirectiveConfig): String = {

val consumptionDataMap = mutable.HashMap.empty[String, Boolean]
val consumption = JSON.parseObject(flowDirectiveConfig.consumptionDataStr)
val initial = consumption.getString(InputDataProtocolBaseType.INITIAL.toString).trim.toLowerCase.toBoolean
Expand Down Expand Up @@ -176,16 +177,82 @@ object BatchflowDirective extends Directive {
}

override def flowStartProcess(ums: Ums): String = {
"""
{
"protocol": {
"type": "directive_flow_start"
},
"schema": {
"namespace": "kafka.hdp-kafka.test_source.test_table.*.*.*",
"fields": [{
"name": "directive_id",
"type": "long",
"nullable": false
}, {
"name": "stream_id",
"type": "long",
"nullable": false
}, {
"name": "flow_id",
"type": "long",
"nullable": false
}, {
"name": "source_increment_topic",
"type": "string",
"nullable": false
}, {
"name": "ums_ts_",
"type": "datetime",
"nullable": false
}, {
"name": "data_type",
"type": "string",
"nullable": false
}, {
"name": "data_parse",
"type": "string",
"nullable": true
}, {
"name": "sink_namespace",
"type": "string",
"nullable": false
}, {
"name": "consumption_protocol",
"type": "string",
"nullable": false
}, {
"name": "sinks",
"type": "string",
"nullable": false
}, {
"name": "swifts",
"type": "string",
"nullable": true
}, {
"name": "kerberos",
"type": "boolean",
"nullable": true
}, {
"name": "priority_id",
"type": "long",
"nullable": true
}]
},
"payload": [{
"tuple": [35, 1, 2, "test_source", "2020-05-14 11:53:31.000000", "ums_extension", "eyJmaWVsZHMiOlt7Im5hbWUiOiJpZCIsInR5cGUiOiJsb25nIiwibnVsbGFibGUiOnRydWV9LHsibmFtZSI6Im5hbWUiLCJ0eXBlIjoic3RyaW5nIiwibnVsbGFibGUiOnRydWV9LHsibmFtZSI6InBob25lIiwidHlwZSI6InN0cmluZyIsIm51bGxhYmxlIjp0cnVlfSx7Im5hbWUiOiJjaXR5IiwidHlwZSI6InN0cmluZyIsIm51bGxhYmxlIjp0cnVlfSx7Im5hbWUiOiJ0aW1lIiwidHlwZSI6ImRhdGV0aW1lIiwibnVsbGFibGUiOnRydWV9LHsibmFtZSI6InRpbWUiLCJ0eXBlIjoiZGF0ZXRpbWUiLCJudWxsYWJsZSI6dHJ1ZSwicmVuYW1lIjoidW1zX3RzXyJ9XX0=", "mysql.hdp-mysql.testdb.user.*.*.*", "eyJpbml0aWFsIjogdHJ1ZSwgImluY3JlbWVudCI6IHRydWUsICJiYXRjaCI6IGZhbHNlfQ==", "ew0ic2lua19jb25uZWN0aW9uX3VybCI6ICJqZGJjOm15c3FsOi8vbWFzdGVyOjMzMDYvdGVzdGRiIiwNInNpbmtfY29ubmVjdGlvbl91c2VybmFtZSI6ICJyb290IiwNInNpbmtfY29ubmVjdGlvbl9wYXNzd29yZCI6ICJZb3VyQHB3ZDEyMyIsDSJzaW5rX3RhYmxlX2tleXMiOiAiaWQiLA0ic2lua19vdXRwdXQiOiAiIiwNInNpbmtfY29ubmVjdGlvbl9jb25maWciOiAiIiwNInNpbmtfcHJvY2Vzc19jbGFzc19mdWxsbmFtZSI6ICJlZHAud29ybWhvbGUuc2lua3MuZGJzaW5rLkRhdGEyRGJTaW5rIiwNInNpbmtfc3BlY2lmaWNfY29uZmlnIjogeyJtdXRhdGlvbl90eXBlIjoiaSJ9LA0ic2lua19yZXRyeV90aW1lcyI6ICIzIiwNInNpbmtfcmV0cnlfc2Vjb25kcyI6ICIzMDAiDX0=", "eyJwdXNoZG93bl9jb25uZWN0aW9uIjpbeyJwYXNzd29yZCI6IllvdXJAcHdkMTIzIiwibmFtZV9zcGFjZSI6Im15c3FsLmhkcC1teXNxbC5sb29rdXAiLCJjb25uZWN0aW9uX2NvbmZpZyI6W10sImpkYmNfdXJsIjoiamRiYzpteXNxbDovL21hc3RlcjozMzA2L2xvb2t1cD91c2VVbmljb2RlPXRydWUmY2hhcmFjdGVyRW5jb2Rpbmc9dXRmOCZhdXRvUmVjb25uZWN0PXRydWUmZmFpbE92ZXJSZWFkT25seT1mYWxzZSZub0FjY2Vzc1RvUHJvY2VkdXJlQm9kaWVzPXRydWUmemVyb0RhdGVUaW1lQmVoYXZpb3I9Y29udmVydFRvTnVsbCZ0aW55SW50MWlzQml0PWZhbHNlIiwidXNlcm5hbWUiOiJyb290In1dLCJkYXRhZnJhbWVfc2hvdyI6InRydWUiLCJhY3Rpb24iOiJjSFZ6YUdSdmQyNWZjM0ZzSUd4bFpuUWdhbTlwYmlCM2FYUm9JRzE1YzNGc0xtaGtjQzF0ZVhOeGJDNXNiMjlyZFhBZ1BTQnpaV3hsXG5ZM1FnYVdRZ1lYTWdhV1F4TEdOaGNtUkNZVzVySUdaeWIyMGdkWE5sY2tOaGNtUWdkMmhsY21VZ0tHbGtLU0JwYmlBb2EyRm1hMkV1XG5hR1J3TFd0aFptdGhMblJsYzNSZmMyOTFjbU5sTG5SbGMzUmZkR0ZpYkdVdWFXUXBPM053WVhKclgzTnhiQ0E5SUhObGJHVmpkQ0JwXG5aQ3h1WVcxbExHTmhjbVJDWVc1ckxIQm9iMjVsTEdOcGRIa2dabkp2YlNCMFpYTjBYM1JoWW14bE93PT0iLCJkYXRhZnJhbWVfc2hvd19udW0iOjEwfQ==", "false", "1"]
}]
}
"""
val payloads = ums.payload_get
val schemas = ums.schema.fields_get
val sourceNamespace = ums.schema.namespace.toLowerCase
val tuple = payloads.head
val tuple = payloads.head // 取第一条

val streamId = UmsFieldType.umsFieldValue(tuple.tuple, schemas, "stream_id").toString.toLong
val directiveId = UmsFieldType.umsFieldValue(tuple.tuple, schemas, "directive_id").toString.toLong
val flowId = UmsFieldType.umsFieldValue(tuple.tuple, schemas, "flow_id").toString.toLong
try {
val swiftsEncoded = UmsFieldType.umsFieldValue(tuple.tuple, schemas, "swifts")
val swiftsEncoded = UmsFieldType.umsFieldValue(tuple.tuple, schemas, "swifts") // base64编码的

val swiftsStr = if (swiftsEncoded != null && !swiftsEncoded.toString.isEmpty) new String(new sun.misc.BASE64Decoder().decodeBuffer(swiftsEncoded.toString)) else null
logInfo("swiftsStr:" + swiftsStr)
Expand All @@ -206,8 +273,9 @@ object BatchflowDirective extends Directive {
if(null != sourceIncrementTopic) sourceIncrementTopic.toString.split(",").toList
else null

// 定义config对象
val flowDirectiveConfig = FlowDirectiveConfig(sourceNamespace, fullSinkNamespace, streamId, flowId, directiveId, swiftsStr, sinksStr, consumptionDataStr, dataType, dataParseStr, kerberos, priorityId, sourceIncrementTopicList)

// 初始化各种配置
registerFlowStartDirective(flowDirectiveConfig)
} catch {
case e: Throwable =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,9 @@ object BatchflowMainProcess extends EdpLogging {
UdfDirective.registerUdfProcess(config.kafka_output.feedback_topic_name, config.kafka_output.brokers, session)

logInfo("start create classifyRdd")
//将rdd中的row进行分类,mainNamespace lookupNameSpace, OthrNameSpace
val classifyRdd: RDD[(ListBuffer[((UmsProtocolType, String), Seq[UmsTuple])], ListBuffer[((UmsProtocolType, String), Seq[UmsTuple])], ListBuffer[String], Array[((UmsProtocolType, String), Seq[UmsField])])] = getClassifyRdd(dataRepartitionRdd).cache()
val distinctSchema: mutable.Map[(UmsProtocolType, String), (Seq[UmsField], Long)] = getDistinctSchema(classifyRdd)
val distinctSchema: mutable.Map[(UmsProtocolType, String), (Seq[UmsField], Long)] = getDistinctSchema(classifyRdd) // 对所有的namespace去重
logInfo("start doStreamLookupData")

doStreamLookupData(session, classifyRdd, config, distinctSchema)
Expand Down Expand Up @@ -156,27 +157,27 @@ object BatchflowMainProcess extends EdpLogging {
)
}


// 将batch中的rdd数据,按照namespace的不同,进行区分,生成不同的rdd
private def getClassifyRdd(dataRepartitionRdd: RDD[(String, String)]): RDD[(ListBuffer[((UmsProtocolType, String), Seq[UmsTuple])], ListBuffer[((UmsProtocolType, String), Seq[UmsTuple])], ListBuffer[String], Array[((UmsProtocolType, String), Seq[UmsField])])] = {
val streamLookupNamespaceSet = ConfMemoryStorage.getAllLookupNamespaceSet
val mainNamespaceSet = ConfMemoryStorage.getAllMainNamespaceSet
val jsonSourceParseMap: Map[(UmsProtocolType, String), (Seq[UmsField], Seq[FieldInfo], ArrayBuffer[(String, String)])] = ConfMemoryStorage.getAllSourceParseMap
val mainNamespaceSet = ConfMemoryStorage.getAllMainNamespaceSet // 获取source的命名空间
val jsonSourceParseMap: Map[(UmsProtocolType, String), (Seq[UmsField], Seq[FieldInfo], ArrayBuffer[(String, String)])] = ConfMemoryStorage.getAllSourceParseMap // flow的source的两种处理协议 increment和initial,页面上配置即可
//log.info(s"streamLookupNamespaceSet: $streamLookupNamespaceSet, mainNamespaceSet $mainNamespaceSet, jsonSourceParseMap $jsonSourceParseMap")
dataRepartitionRdd.mapPartitions(partition => {
dataRepartitionRdd.mapPartitions(partition => { // 遍历rdd的分区
val mainDataList = ListBuffer.empty[((UmsProtocolType, String), Seq[UmsTuple])]
val lookupDataList = ListBuffer.empty[((UmsProtocolType, String), Seq[UmsTuple])]
val otherList = ListBuffer.empty[String]
val nsSchemaMap = mutable.HashMap.empty[(UmsProtocolType, String), Seq[UmsField]]
partition.foreach(row => {
val nsSchemaMap = mutable.HashMap.empty[(UmsProtocolType, String), Seq[UmsField]] // 记录所有的(protocalType,namesapce)和其type的映射
partition.foreach(row => { // 遍历分区的每一个row
try {
val (protocolType, namespace) = UmsCommonUtils.getTypeNamespaceFromKafkaKey(row._1)
val (protocolType, namespace) = UmsCommonUtils.getTypeNamespaceFromKafkaKey(row._1) // row 是个tuple _1是 data_increment_data.kafka.hdp-kafka.test_source.test_table.*.*.*
if (protocolType == UmsProtocolType.DATA_INCREMENT_DATA || protocolType == UmsProtocolType.DATA_BATCH_DATA || protocolType == UmsProtocolType.DATA_INITIAL_DATA) {
if (ConfMemoryStorage.existNamespace(mainNamespaceSet, namespace)) {
if (ConfMemoryStorage.existNamespace(mainNamespaceSet, namespace)) { // 如果当前的namespace属于mainNameSpace
val schemaValueTuple: (Seq[UmsField], Seq[UmsTuple]) = SparkxUtils.jsonGetValue(namespace, protocolType, row._2, jsonSourceParseMap)
if (!nsSchemaMap.contains((protocolType, namespace))) nsSchemaMap((protocolType, namespace)) = schemaValueTuple._1.map(f => UmsField(f.name.toLowerCase, f.`type`, f.nullable))
mainDataList += (((protocolType, namespace), schemaValueTuple._2))
}
if (ConfMemoryStorage.existNamespace(streamLookupNamespaceSet, namespace)) {
if (ConfMemoryStorage.existNamespace(streamLookupNamespaceSet, namespace)) { // 如果当前的namespace属于lookupNameSpace
//todo change if back to if, efficiency
val schemaValueTuple: (Seq[UmsField], Seq[UmsTuple]) = SparkxUtils.jsonGetValue(namespace, protocolType, row._2, jsonSourceParseMap)
if (!nsSchemaMap.contains((protocolType, namespace))) nsSchemaMap((protocolType, namespace)) = schemaValueTuple._1.map(f => UmsField(f.name.toLowerCase, f.`type`, f.nullable))
Expand All @@ -188,7 +189,7 @@ object BatchflowMainProcess extends EdpLogging {
case e1: Throwable => logAlert("do classifyRdd,one data has error,row:" + row, e1)
}
})
List((mainDataList, lookupDataList, otherList, nsSchemaMap.toArray)).toIterator
List((mainDataList, lookupDataList, otherList, nsSchemaMap.toArray)).toIterator // 统一成tuple,返回rdd的迭代
})
}

Expand Down Expand Up @@ -238,8 +239,8 @@ object BatchflowMainProcess extends EdpLogging {
val umsRdd: RDD[(UmsProtocolType, String, ArrayBuffer[Seq[String]])] = formatRdd(allDataRdd, "lookup")
distinctSchema.foreach(schema => {
val namespace = schema._1._2
val matchLookupNamespace = ConfMemoryStorage.getMatchLookupNamespaceRule(namespace)
if (matchLookupNamespace != null) {
val matchLookupNamespace = ConfMemoryStorage.getMatchLookupNamespaceRule(namespace) // 查看下是否是look up的namespace
if (matchLookupNamespace != null) { // 如果不为空,则为是
val protocolType: UmsProtocolType = schema._1._1
val lookupDf = createSourceDf(session, namespace, schema._2._1, umsRdd.filter(row => {
row._1 == protocolType && row._2 == namespace
Expand Down Expand Up @@ -321,7 +322,7 @@ object BatchflowMainProcess extends EdpLogging {
val flowConfig: FlowConfig = flowConfigMap(sinkNamespace)

if (swiftsProcessConfig.nonEmpty && swiftsProcessConfig.get.swiftsSql.nonEmpty) {

// 如果有swift计算,则进入
val (returnUmsFields, tuplesRDD, unionDf) = swiftsProcess(protocolType, flowConfig, swiftsProcessConfig, uuid, session, sourceTupleRDD, config, sourceNamespace, sinkNamespace, minTs, maxTs, count, sinkFields, batchId, topicPartitionOffset)
sinkFields = returnUmsFields
sinkRDD = tuplesRDD
Expand Down Expand Up @@ -429,7 +430,7 @@ object BatchflowMainProcess extends EdpLogging {
if (dataSetShow.get) {
sourceDf.show(swiftsProcessConfig.get.datasetShowNum.get)
}

//是否需要对hdfs上的parquet rdd进行union
val afterUnionDf = unionParquetNonTimeoutDf(swiftsProcessConfig, uuid, session, sourceDf, config, sourceNamespace, sinkNamespace).cache

try {
Expand Down Expand Up @@ -470,7 +471,7 @@ object BatchflowMainProcess extends EdpLogging {

private def getDistinctSchema(umsRdd: RDD[(ListBuffer[((UmsProtocolType, String), Seq[UmsTuple])], ListBuffer[((UmsProtocolType, String), Seq[UmsTuple])], ListBuffer[String], Array[((UmsProtocolType, String), Seq[UmsField])])]): mutable.Map[(UmsProtocolType.UmsProtocolType, String), (Seq[UmsField], Long)] = {
val schemaMap = mutable.HashMap.empty[(UmsProtocolType, String), (Seq[UmsField], Long)]
umsRdd.map(_._4).collect().foreach(_.foreach {
umsRdd.map(_._4).collect().foreach(_.foreach { // _4 是所有(protocol,ns)和field type的map关系
case ((protocol, ns), schema) =>
if (!schemaMap.contains((protocol, ns))) {
val matchSourceNs = ConfMemoryStorage.getMatchSourceNamespaceRule(ns)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ object BatchflowStarter extends App with EdpLogging {
SparkContextUtils.setLoggerLevel()

logInfo("swiftsConfig:" + args(0))
val config: WormholeConfig = JsonUtils.json2caseClass[WormholeConfig](args(0))
//val config: WormholeConfig = JsonUtils.json2caseClass[WormholeConfig](args(0))

System.setProperty("hadoop.home.dir", "D:\\hadoop-common-2.2.0-bin-master")
var s="""{"kafka_input":{"group_id":"wormhole_demo_test_stream","batch_duration_seconds":30,"brokers":"master:9092,slave01:9092,slave02:9091","kerberos":false,"max.partition.fetch.bytes":10485760,"session.timeout.ms":30000,"group.max.session.timeout.ms":60000,"auto.offset.reset":"earliest","key.deserializer":"org.apache.kafka.common.serialization.StringDeserializer","value.deserializer":"org.apache.kafka.common.serialization.StringDeserializer","enable.auto.commit":false},"kafka_output":{"feedback_topic_name":"wormhole_feedback","brokers":"master:9092,slave01:9092,slave02:9092","kerberos":false},"spark_config":{"stream_id":1,"stream_name":"wormhole_demo_test_stream","master":"local[1]","spark.sql.shuffle.partitions":3},"rdd_partition_number":3,"zookeeper_address":"slave01:2181,master:2181,slave02:2181","zookeeper_path":"/wormhole","kafka_persistence_config_isvalid":false,"stream_hdfs_address":"hdfs://master:8020/wormhole","kerberos":false}"""
val config: WormholeConfig = JsonUtils.json2caseClass[WormholeConfig](s)
val appId = SparkUtils.getAppId
WormholeKafkaProducer.initWithoutAcksAll(config.kafka_output.brokers, config.kafka_output.config,config.kafka_output.kerberos)
val sparkConf = new SparkConf()
Expand Down
Loading