-
Notifications
You must be signed in to change notification settings - Fork 3.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KIP 848:Extend DescribeConfigs and IncrementalAlterConfigs to support GROUP Config #4939
base: master
Are you sure you want to change the base?
Changes from 9 commits
e1d0df0
8caa707
8bc4cd1
f37bf36
a11e9bf
2be7858
1f5d439
d05781e
a4dd049
527c4a3
d42ab68
48b0c75
4f4a1e5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -283,6 +283,24 @@ struct rd_kafka_ConfigResource_result_s { | |
* but with response error values. */ | ||
}; | ||
|
||
typedef enum rd_kafka_ConfigResourceType_t { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add doc. |
||
RD_KAFKA_CONFIG_RESOURCE_GROUP = 32, | ||
} rd_kafka_ConfigResourceType_t; | ||
|
||
/** | ||
* @brief Map from rd_kafka_ResourceType_t to int8_t | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Better doc explaining what and why are we doing this. |
||
*/ | ||
int8_t | ||
map_from_resource_type_to_config_resource_type(rd_kafka_ResourceType_t restype); | ||
|
||
/** | ||
* @brief Map from int8_t to rd_kafka_ResourceType_t | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Better doc explaining what and why are we doing this. |
||
*/ | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove extra line. |
||
rd_kafka_ResourceType_t | ||
map_from_config_resource_type_to_resource_type(int8_t config_resource_type); | ||
|
||
|
||
/**@}*/ | ||
|
||
|
||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -902,7 +902,7 @@ static void do_test_AlterConfigs(rd_kafka_t *rk, rd_kafka_queue_t *rkqu) { | |||||
*/ | ||||||
static void do_test_IncrementalAlterConfigs(rd_kafka_t *rk, | ||||||
rd_kafka_queue_t *rkqu) { | ||||||
#define MY_CONFRES_CNT 3 | ||||||
#define MY_CONFRES_CNT 4 | ||||||
char *topics[MY_CONFRES_CNT]; | ||||||
rd_kafka_ConfigResource_t *configs[MY_CONFRES_CNT]; | ||||||
rd_kafka_AdminOptions_t *options; | ||||||
|
@@ -935,6 +935,7 @@ static void do_test_IncrementalAlterConfigs(rd_kafka_t *rk, | |||||
/** Test the test helper, for use in other tests. */ | ||||||
do { | ||||||
const char *broker_id = tsprintf("%d", avail_brokers[0]); | ||||||
const char *group_id = "my-group"; | ||||||
const char *confs_set_append[] = { | ||||||
"compression.type", "SET", "lz4", | ||||||
"cleanup.policy", "APPEND", "compact"}; | ||||||
|
@@ -947,6 +948,10 @@ static void do_test_IncrementalAlterConfigs(rd_kafka_t *rk, | |||||
const char *confs_delete_subtract_broker[] = { | ||||||
"background.threads", "DELETE", "", | ||||||
"log.cleanup.policy", "SUBTRACT", "compact"}; | ||||||
const char *confs_set_append_group[] = { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
"consumer.session.timeout.ms", "SET", "50000"}; | ||||||
const char *confs_delete_group[] = { | ||||||
"consumer.session.timeout.ms", "DELETE", ""}; | ||||||
|
||||||
TEST_SAY("Testing test helper with SET and APPEND\n"); | ||||||
test_IncrementalAlterConfigs_simple(rk, RD_KAFKA_RESOURCE_TOPIC, | ||||||
|
@@ -969,6 +974,17 @@ static void do_test_IncrementalAlterConfigs(rd_kafka_t *rk, | |||||
test_IncrementalAlterConfigs_simple( | ||||||
rk, RD_KAFKA_RESOURCE_BROKER, broker_id, | ||||||
confs_delete_subtract_broker, 2); | ||||||
TEST_SAY( | ||||||
"Testing test helper with SET with GROUP resource type\n"); | ||||||
test_IncrementalAlterConfigs_simple(rk, RD_KAFKA_RESOURCE_GROUP, | ||||||
group_id, | ||||||
confs_set_append_group, 1); | ||||||
TEST_SAY( | ||||||
"Testing test helper with DELETE with GROUP resource " | ||||||
"type\n"); | ||||||
test_IncrementalAlterConfigs_simple(rk, RD_KAFKA_RESOURCE_GROUP, | ||||||
group_id, | ||||||
confs_delete_group, 1); | ||||||
TEST_SAY("End testing test helper\n"); | ||||||
} while (0); | ||||||
|
||||||
|
@@ -1035,6 +1051,23 @@ static void do_test_IncrementalAlterConfigs(rd_kafka_t *rk, | |||||
exp_err[ci] = RD_KAFKA_RESP_ERR_UNKNOWN; | ||||||
ci++; | ||||||
|
||||||
/** | ||||||
* ConfigResource #3: valid group config | ||||||
*/ | ||||||
configs[ci] = | ||||||
rd_kafka_ConfigResource_new(RD_KAFKA_RESOURCE_GROUP, "my-group"); | ||||||
|
||||||
error = rd_kafka_ConfigResource_add_incremental_config( | ||||||
configs[ci], "consumer.session.timeout.ms", | ||||||
RD_KAFKA_ALTER_CONFIG_OP_TYPE_SET, "50000"); | ||||||
TEST_ASSERT(!error, "%s", rd_kafka_error_string(error)); | ||||||
if (!test_consumer_group_protocol_classic()) { | ||||||
exp_err[ci] = RD_KAFKA_RESP_ERR_NO_ERROR; | ||||||
} else { | ||||||
exp_err[ci] = RD_KAFKA_RESP_ERR_INVALID_REQUEST; | ||||||
} | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should check broker version as well here. |
||||||
ci++; | ||||||
|
||||||
/* | ||||||
* Timeout options | ||||||
*/ | ||||||
|
@@ -1334,6 +1367,137 @@ static void do_test_DescribeConfigs(rd_kafka_t *rk, rd_kafka_queue_t *rkqu) { | |||||
SUB_TEST_PASS(); | ||||||
} | ||||||
|
||||||
/** | ||||||
* @brief Test DescribeConfigs for groups | ||||||
*/ | ||||||
static void do_test_DescribeConfigs_groups(rd_kafka_t *rk, | ||||||
rd_kafka_queue_t *rkqu) { | ||||||
#define MY_CONFRES_CNT 1 | ||||||
rd_kafka_ConfigResource_t *configs[MY_CONFRES_CNT]; | ||||||
rd_kafka_AdminOptions_t *options; | ||||||
rd_kafka_resp_err_t exp_err[MY_CONFRES_CNT]; | ||||||
rd_kafka_event_t *rkev; | ||||||
rd_kafka_resp_err_t err; | ||||||
const rd_kafka_DescribeConfigs_result_t *res; | ||||||
const rd_kafka_ConfigResource_t **rconfigs; | ||||||
char *group = rd_strdup(test_mk_topic_name(__FUNCTION__, 1)); | ||||||
size_t rconfig_cnt; | ||||||
char errstr[128]; | ||||||
const char *errstr2; | ||||||
int ci = 0; | ||||||
int i; | ||||||
int fails = 0; | ||||||
|
||||||
SUB_TEST_QUICK(); | ||||||
|
||||||
/* | ||||||
* ConfigResource #0: group config, for a non-existent group. | ||||||
*/ | ||||||
/* | ||||||
* ConfigResource #3: group config, for a non-existent group. | ||||||
*/ | ||||||
configs[ci] = | ||||||
rd_kafka_ConfigResource_new(RD_KAFKA_RESOURCE_GROUP, group); | ||||||
if (!test_consumer_group_protocol_classic()) { | ||||||
exp_err[ci] = RD_KAFKA_RESP_ERR_NO_ERROR; | ||||||
} else { | ||||||
exp_err[ci] = RD_KAFKA_RESP_ERR_INVALID_REQUEST; | ||||||
} | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same |
||||||
ci++; | ||||||
|
||||||
/* | ||||||
* Timeout options | ||||||
*/ | ||||||
options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_ANY); | ||||||
err = rd_kafka_AdminOptions_set_request_timeout(options, 10000, errstr, | ||||||
sizeof(errstr)); | ||||||
TEST_ASSERT(!err, "%s", errstr); | ||||||
|
||||||
/* | ||||||
* Fire off request | ||||||
*/ | ||||||
rd_kafka_DescribeConfigs(rk, configs, ci, options, rkqu); | ||||||
|
||||||
/* | ||||||
* Wait for result | ||||||
*/ | ||||||
rkev = test_wait_admin_result( | ||||||
rkqu, RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT, 10000 + 1000); | ||||||
|
||||||
/* | ||||||
* Extract result | ||||||
*/ | ||||||
res = rd_kafka_event_DescribeConfigs_result(rkev); | ||||||
TEST_ASSERT(res, "Expected DescribeConfigs result, not %s", | ||||||
rd_kafka_event_name(rkev)); | ||||||
|
||||||
err = rd_kafka_event_error(rkev); | ||||||
errstr2 = rd_kafka_event_error_string(rkev); | ||||||
TEST_ASSERT(!err, "Expected success, not %s: %s", | ||||||
rd_kafka_err2name(err), errstr2); | ||||||
|
||||||
rconfigs = rd_kafka_DescribeConfigs_result_resources(res, &rconfig_cnt); | ||||||
TEST_ASSERT((int)rconfig_cnt == ci, | ||||||
"Expected %d result resources, got %" PRIusz "\n", ci, | ||||||
rconfig_cnt); | ||||||
|
||||||
/* | ||||||
* Verify status per resource | ||||||
*/ | ||||||
for (i = 0; i < (int)rconfig_cnt; i++) { | ||||||
const rd_kafka_ConfigEntry_t **entries; | ||||||
size_t entry_cnt; | ||||||
|
||||||
err = rd_kafka_ConfigResource_error(rconfigs[i]); | ||||||
errstr2 = rd_kafka_ConfigResource_error_string(rconfigs[i]); | ||||||
|
||||||
entries = | ||||||
rd_kafka_ConfigResource_configs(rconfigs[i], &entry_cnt); | ||||||
|
||||||
TEST_SAY( | ||||||
"ConfigResource #%d: type %s (%d), \"%s\": " | ||||||
"%" PRIusz " ConfigEntries, error %s (%s)\n", | ||||||
i, | ||||||
rd_kafka_ResourceType_name( | ||||||
rd_kafka_ConfigResource_type(rconfigs[i])), | ||||||
rd_kafka_ConfigResource_type(rconfigs[i]), | ||||||
rd_kafka_ConfigResource_name(rconfigs[i]), entry_cnt, | ||||||
rd_kafka_err2name(err), errstr2 ? errstr2 : ""); | ||||||
|
||||||
test_print_ConfigEntry_array(entries, entry_cnt, 1); | ||||||
|
||||||
if (rd_kafka_ConfigResource_type(rconfigs[i]) != | ||||||
rd_kafka_ConfigResource_type(configs[i]) || | ||||||
strcmp(rd_kafka_ConfigResource_name(rconfigs[i]), | ||||||
rd_kafka_ConfigResource_name(configs[i]))) { | ||||||
TEST_FAIL_LATER( | ||||||
"ConfigResource #%d: " | ||||||
"expected type %s name %s, " | ||||||
"got type %s name %s", | ||||||
i, | ||||||
rd_kafka_ResourceType_name( | ||||||
rd_kafka_ConfigResource_type(configs[i])), | ||||||
rd_kafka_ConfigResource_name(configs[i]), | ||||||
rd_kafka_ResourceType_name( | ||||||
rd_kafka_ConfigResource_type(rconfigs[i])), | ||||||
rd_kafka_ConfigResource_name(rconfigs[i])); | ||||||
fails++; | ||||||
continue; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||||||
} | ||||||
} | ||||||
|
||||||
TEST_ASSERT(!fails, "See %d previous failure(s)", fails); | ||||||
|
||||||
rd_kafka_event_destroy(rkev); | ||||||
|
||||||
rd_kafka_ConfigResource_destroy_array(configs, ci); | ||||||
|
||||||
TEST_LATER_CHECK(); | ||||||
#undef MY_CONFRES_CNT | ||||||
|
||||||
SUB_TEST_PASS(); | ||||||
} | ||||||
|
||||||
/** | ||||||
* @brief Test CreateAcls | ||||||
*/ | ||||||
|
@@ -5258,6 +5422,7 @@ static void do_test_apis(rd_kafka_type_t cltype) { | |||||
|
||||||
/* DescribeConfigs */ | ||||||
do_test_DescribeConfigs(rk, mainq); | ||||||
do_test_DescribeConfigs_groups(rk, mainq); | ||||||
Comment on lines
5424
to
+5425
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not add new condition in |
||||||
|
||||||
/* Delete records */ | ||||||
do_test_DeleteRecords("temp queue, op timeout 0", rk, NULL, 0); | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a problem that the error message contain 32 instead of 3. We are not exposing 32 to the user. I think we should modify the error code in the response as well though it is difficult to catch all the cases. Can be an improvement though.
ConfigResource result: 3,my-group: error: Unknown resource type 32