forked from Altinity/clickhouse-sink-connector
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsink-connector-setup-schema-registry.sh
executable file
·128 lines (103 loc) · 4.29 KB
/
sink-connector-setup-schema-registry.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#!/bin/bash
# Source configuration
CUR_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" >/dev/null 2>&1 && pwd)"
source "${CUR_DIR}/sink-connector-config.sh"
# clickhouse-sink-connector params
CLICKHOUSE_HOST="clickhouse"
CLICKHOUSE_PORT=8123
CLICKHOUSE_USER="root"
CLICKHOUSE_PASSWORD="root"
CLICKHOUSE_TABLE="employees"
CLICKHOUSE_DATABASE="test"
BUFFER_COUNT=10000
#SERVER5432.transaction
if [[ $1 == "postgres" ]]; then
TOPICS="SERVER5432.public.Employee"
else
TOPICS="SERVER5432.test.employees_predated, SERVER5432.test.customers"
#TOPICS="SERVER5432.test.ontime"
TOPICS_TABLE_MAP="SERVER5432.test.employees_predated:employees, SERVER5432.test.products:products"
fi
#TOPICS="SERVER5432"
#"topics.regex": "SERVER5432.sbtest.(.*), SERVER5432.test.(.*)",
#"topics": "${TOPICS}",
if [[ $2 == "apicurio" ]]; then
echo "APICURIO SCHEMA REGISTRY"
cat <<EOF | curl --request POST --url "${CONNECTORS_MANAGEMENT_URL}" --header 'Content-Type: application/json' --data @-
{
"name": "${CONNECTOR_NAME}",
"config": {
"connector.class": "com.altinity.clickhouse.sink.connector.ClickHouseSinkConnector",
"tasks.max": "10",
"topics": "${TOPICS}",
"clickhouse.topic2table.map": "${TOPICS_TABLE_MAP}",
"clickhouse.server.url": "${CLICKHOUSE_HOST}",
"clickhouse.server.user": "${CLICKHOUSE_USER}",
"clickhouse.server.pass": "${CLICKHOUSE_PASSWORD}",
"clickhouse.server.database": "${CLICKHOUSE_DATABASE}",
"clickhouse.server.port": ${CLICKHOUSE_PORT},
"clickhouse.table.name": "${CLICKHOUSE_TABLE}",
"key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"key.converter.apicurio.registry.url": "http://schemaregistry:8080/apis/registry/v2",
"key.converter.apicurio.registry.auto-register": "true",
"key.converter.apicurio.registry.find-latest": "true",
"value.converter.apicurio.registry.url": "http://schemaregistry:8080/apis/registry/v2",
"value.converter.apicurio.registry.auto-register": "true",
"value.converter.apicurio.registry.find-latest": "true",
"store.kafka.metadata": true,
"topic.creation.default.partitions": 6,
"store.raw.data": false,
"store.raw.data.column": "raw_data",
"metrics.enable": true,
"metrics.port": 8084,
"buffer.flush.time.ms": 500,
"thread.pool.size": 1,
"fetch.min.bytes": 52428800,
"enable.kafka.offset": false,
"replacingmergetree.delete.column": "sign",
"auto.create.tables": false,
"schema.evolution": false,
"deduplication.policy": "off"
}
}
EOF
else
echo "Using confluent schema registry"
cat <<EOF | curl --request POST --url "${CONNECTORS_MANAGEMENT_URL}" --header 'Content-Type: application/json' --data @-
{
"name": "${CONNECTOR_NAME}",
"config": {
"connector.class": "com.altinity.clickhouse.sink.connector.ClickHouseSinkConnector",
"tasks.max": "20",
"topics": "${TOPICS}",
"clickhouse.topic2table.map": "${TOPICS_TABLE_MAP}",
"clickhouse.server.url": "${CLICKHOUSE_HOST}",
"clickhouse.server.user": "${CLICKHOUSE_USER}",
"clickhouse.server.pass": "${CLICKHOUSE_PASSWORD}",
"clickhouse.server.database": "${CLICKHOUSE_DATABASE}",
"clickhouse.server.port": ${CLICKHOUSE_PORT},
"clickhouse.table.name": "${CLICKHOUSE_TABLE}",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schemaregistry:8081",
"value.converter.schema.registry.url":"http://schemaregistry:8081",
"store.kafka.metadata": true,
"topic.creation.default.partitions": 6,
"store.raw.data": false,
"store.raw.data.column": "raw_data",
"metrics.enable": true,
"metrics.port": 8084,
"buffer.flush.time.ms": 500,
"thread.pool.size": 1,
"fetch.min.bytes": 52428800,
"enable.kafka.offset": false,
"replacingmergetree.delete.column": "sign",
"auto.create.tables": true,
"schema.evolution": false,
"deduplication.policy": "off"
}
}
EOF
fi
# "replacingmergetree.delete.column": "sign_delete"