Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KIP 848:Extend DescribeConfigs and IncrementalAlterConfigs to support GROUP Config #4939

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
63 changes: 61 additions & 2 deletions src/rdkafka_admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -2911,6 +2911,55 @@ const char *rd_kafka_ResourceType_name(rd_kafka_ResourceType_t restype) {
}


rd_kafka_InternalConfigResourceType_t
map_from_resource_type_to_internal_config_resource_type(
rd_kafka_ResourceType_t resourcetype) {

if (resourcetype > RD_KAFKA_RESOURCE__CNT) {
rd_assert("Invalid resource type");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Provide information about the resource type value in the assert as well.

}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make this default case in the below switch.


switch (resourcetype) {
case RD_KAFKA_RESOURCE_UNKNOWN:
return RD_KAFKA_INTERNAL_RESOURCE_CONFIG_UNKNOWN;
case RD_KAFKA_RESOURCE_ANY:
return RD_KAFKA_INTERNAL_RESOURCE_CONFIG_ANY;
case RD_KAFKA_RESOURCE_TOPIC:
return RD_KAFKA_INTERNAL_RESOURCE_CONFIG_TOPIC;
case RD_KAFKA_RESOURCE_GROUP:
return RD_KAFKA_INTERNAL_RESOURCE_CONFIG_GROUP;
case RD_KAFKA_RESOURCE_BROKER:
return RD_KAFKA_INTERNAL_RESOURCE_CONFIG_BROKER;
case RD_KAFKA_RESOURCE__CNT:
return RD_KAFKA_INTERNAL_RESOURCE_CONFIG_CNT;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want cnt conversion. This is incorrect in my opinion.

}
}

rd_kafka_ResourceType_t map_from_internal_config_resource_type_to_resource_type(
rd_kafka_InternalConfigResourceType_t internal_resourcetype) {

if (internal_resourcetype != RD_KAFKA_INTERNAL_RESOURCE_CONFIG_GROUP &&
internal_resourcetype > RD_KAFKA_INTERNAL_RESOURCE_CONFIG_CNT) {
rd_assert("Recieved invalid resource type");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Provide information about the resource type value in the assert as well.

}

switch (internal_resourcetype) {
case RD_KAFKA_INTERNAL_RESOURCE_CONFIG_UNKNOWN:
return RD_KAFKA_RESOURCE_UNKNOWN;
case RD_KAFKA_INTERNAL_RESOURCE_CONFIG_ANY:
return RD_KAFKA_RESOURCE_ANY;
case RD_KAFKA_INTERNAL_RESOURCE_CONFIG_TOPIC:
return RD_KAFKA_RESOURCE_TOPIC;
case RD_KAFKA_INTERNAL_RESOURCE_CONFIG_GROUP:
return RD_KAFKA_RESOURCE_GROUP;
case RD_KAFKA_INTERNAL_RESOURCE_CONFIG_BROKER:
return RD_KAFKA_RESOURCE_BROKER;
case RD_KAFKA_INTERNAL_RESOURCE_CONFIG_CNT:
return RD_KAFKA_RESOURCE__CNT;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Count conversion is not needed.

}
}


rd_kafka_ConfigResource_t *
rd_kafka_ConfigResource_new(rd_kafka_ResourceType_t restype,
const char *resname) {
Expand Down Expand Up @@ -3368,6 +3417,7 @@ rd_kafka_IncrementalAlterConfigsResponse_parse(rd_kafka_op_t *rko_req,
int16_t error_code;
rd_kafkap_str_t error_msg;
int8_t res_type;
int8_t internal_res_type;
rd_kafkap_str_t kres_name;
char *res_name;
char *this_errstr = NULL;
Expand All @@ -3377,11 +3427,15 @@ rd_kafka_IncrementalAlterConfigsResponse_parse(rd_kafka_op_t *rko_req,

rd_kafka_buf_read_i16(reply, &error_code);
rd_kafka_buf_read_str(reply, &error_msg);
rd_kafka_buf_read_i8(reply, &res_type);
rd_kafka_buf_read_i8(reply, &internal_res_type);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

config_resource_type

rd_kafka_buf_read_str(reply, &kres_name);
RD_KAFKAP_STR_DUPA(&res_name, &kres_name);
rd_kafka_buf_skip_tags(reply);

res_type =
map_from_internal_config_resource_type_to_resource_type(
internal_res_type);

if (error_code) {
if (RD_KAFKAP_STR_IS_NULL(&error_msg) ||
RD_KAFKAP_STR_LEN(&error_msg) == 0)
Expand Down Expand Up @@ -3638,6 +3692,7 @@ rd_kafka_DescribeConfigsResponse_parse(rd_kafka_op_t *rko_req,
for (i = 0; i < (int)res_cnt; i++) {
int16_t error_code;
rd_kafkap_str_t error_msg;
int8_t internal_res_type;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

config_resource_type

int8_t res_type;
rd_kafkap_str_t kres_name;
char *res_name;
Expand All @@ -3649,10 +3704,14 @@ rd_kafka_DescribeConfigsResponse_parse(rd_kafka_op_t *rko_req,

rd_kafka_buf_read_i16(reply, &error_code);
rd_kafka_buf_read_str(reply, &error_msg);
rd_kafka_buf_read_i8(reply, &res_type);
rd_kafka_buf_read_i8(reply, &internal_res_type);
rd_kafka_buf_read_str(reply, &kres_name);
RD_KAFKAP_STR_DUPA(&res_name, &kres_name);

res_type =
map_from_internal_config_resource_type_to_resource_type(
internal_res_type);

if (error_code) {
if (RD_KAFKAP_STR_IS_NULL(&error_msg) ||
RD_KAFKAP_STR_LEN(&error_msg) == 0)
Expand Down
27 changes: 27 additions & 0 deletions src/rdkafka_admin.h
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a problem that the error message contain 32 instead of 3. We are not exposing 32 to the user. I think we should modify the error code in the response as well though it is difficult to catch all the cases. Can be an improvement though.

ConfigResource result: 3,my-group: error: Unknown resource type 32

Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,33 @@ struct rd_kafka_ConfigResource_result_s {
* but with response error values. */
};

typedef enum rd_kafka_InternalConfigResourceType_t {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't use Internal with ConfigResource. Use Java name which is rd_kafka_ConfigResource_t

RD_KAFKA_INTERNAL_RESOURCE_CONFIG_UNKNOWN = 0,
RD_KAFKA_INTERNAL_RESOURCE_CONFIG_ANY = 1,
RD_KAFKA_INTERNAL_RESOURCE_CONFIG_TOPIC = 2,
RD_KAFKA_INTERNAL_RESOURCE_CONFIG_GROUP =
32, // Changed value for config APIs
RD_KAFKA_INTERNAL_RESOURCE_CONFIG_BROKER = 4,
RD_KAFKA_INTERNAL_RESOURCE_CONFIG_CNT,
} rd_kafka_InternalConfigResourceType_t;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do this conversion only for GROUP right now. We can think about others later.


/**
* @brief Map from rd_kafka_ResourceType_t to
* rd_kafka_InternalConfigResourceType_t
*/
rd_kafka_InternalConfigResourceType_t
map_from_resource_type_to_internal_config_resource_type(
rd_kafka_ResourceType_t resourcetype);

/**
* @brief Map from rd_kafka_InternalConfigResourceType_t to
* rd_kafka_ResourceType_t
*/

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove extra line.

rd_kafka_ResourceType_t map_from_internal_config_resource_type_to_resource_type(
rd_kafka_InternalConfigResourceType_t internal_resourcetype);


/**@}*/


Expand Down
10 changes: 8 additions & 2 deletions src/rdkafka_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -5371,7 +5371,10 @@ rd_kafka_resp_err_t rd_kafka_IncrementalAlterConfigsRequest(
int ei;

/* ResourceType */
rd_kafka_buf_write_i8(rkbuf, config->restype);
rd_kafka_buf_write_i8(
rkbuf,
map_from_resource_type_to_internal_config_resource_type(
config->restype));

/* ResourceName */
rd_kafka_buf_write_str(rkbuf, config->name, -1);
Expand Down Expand Up @@ -5465,7 +5468,10 @@ rd_kafka_resp_err_t rd_kafka_DescribeConfigsRequest(
int ei;

/* resource_type */
rd_kafka_buf_write_i8(rkbuf, config->restype);
rd_kafka_buf_write_i8(
rkbuf,
map_from_resource_type_to_internal_config_resource_type(
config->restype));

/* resource_name */
rd_kafka_buf_write_str(rkbuf, config->name, -1);
Expand Down
167 changes: 166 additions & 1 deletion tests/0081-admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -902,7 +902,7 @@ static void do_test_AlterConfigs(rd_kafka_t *rk, rd_kafka_queue_t *rkqu) {
*/
static void do_test_IncrementalAlterConfigs(rd_kafka_t *rk,
rd_kafka_queue_t *rkqu) {
#define MY_CONFRES_CNT 3
#define MY_CONFRES_CNT 4
char *topics[MY_CONFRES_CNT];
rd_kafka_ConfigResource_t *configs[MY_CONFRES_CNT];
rd_kafka_AdminOptions_t *options;
Expand Down Expand Up @@ -935,6 +935,7 @@ static void do_test_IncrementalAlterConfigs(rd_kafka_t *rk,
/** Test the test helper, for use in other tests. */
do {
const char *broker_id = tsprintf("%d", avail_brokers[0]);
const char *group_id = "my-group";
const char *confs_set_append[] = {
"compression.type", "SET", "lz4",
"cleanup.policy", "APPEND", "compact"};
Expand All @@ -947,6 +948,10 @@ static void do_test_IncrementalAlterConfigs(rd_kafka_t *rk,
const char *confs_delete_subtract_broker[] = {
"background.threads", "DELETE", "",
"log.cleanup.policy", "SUBTRACT", "compact"};
const char *confs_set_append_group[] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const char *confs_set_append_group[] = {
const char *confs_set_group[] = {

"consumer.session.timeout.ms", "SET", "50000"};
const char *confs_delete_group[] = {
"consumer.session.timeout.ms", "DELETE", ""};

TEST_SAY("Testing test helper with SET and APPEND\n");
test_IncrementalAlterConfigs_simple(rk, RD_KAFKA_RESOURCE_TOPIC,
Expand All @@ -969,6 +974,17 @@ static void do_test_IncrementalAlterConfigs(rd_kafka_t *rk,
test_IncrementalAlterConfigs_simple(
rk, RD_KAFKA_RESOURCE_BROKER, broker_id,
confs_delete_subtract_broker, 2);
TEST_SAY(
"Testing test helper with SET with GROUP resource type\n");
test_IncrementalAlterConfigs_simple(rk, RD_KAFKA_RESOURCE_GROUP,
group_id,
confs_set_append_group, 1);
TEST_SAY(
"Testing test helper with DELETE with GROUP resource "
"type\n");
test_IncrementalAlterConfigs_simple(rk, RD_KAFKA_RESOURCE_GROUP,
group_id,
confs_delete_group, 1);
TEST_SAY("End testing test helper\n");
} while (0);

Expand Down Expand Up @@ -1035,6 +1051,23 @@ static void do_test_IncrementalAlterConfigs(rd_kafka_t *rk,
exp_err[ci] = RD_KAFKA_RESP_ERR_UNKNOWN;
ci++;

/**
* ConfigResource #3: valid group config
*/
configs[ci] =
rd_kafka_ConfigResource_new(RD_KAFKA_RESOURCE_GROUP, "my-group");

error = rd_kafka_ConfigResource_add_incremental_config(
configs[ci], "consumer.session.timeout.ms",
RD_KAFKA_ALTER_CONFIG_OP_TYPE_SET, "50000");
TEST_ASSERT(!error, "%s", rd_kafka_error_string(error));
if (!test_consumer_group_protocol_classic()) {
exp_err[ci] = RD_KAFKA_RESP_ERR_NO_ERROR;
} else {
exp_err[ci] = RD_KAFKA_RESP_ERR_INVALID_REQUEST;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should check broker version as well here.

ci++;

/*
* Timeout options
*/
Expand Down Expand Up @@ -1334,6 +1367,137 @@ static void do_test_DescribeConfigs(rd_kafka_t *rk, rd_kafka_queue_t *rkqu) {
SUB_TEST_PASS();
}

/**
* @brief Test DescribeConfigs for groups
*/
static void do_test_DescribeConfigs_groups(rd_kafka_t *rk,
rd_kafka_queue_t *rkqu) {
#define MY_CONFRES_CNT 1
rd_kafka_ConfigResource_t *configs[MY_CONFRES_CNT];
rd_kafka_AdminOptions_t *options;
rd_kafka_resp_err_t exp_err[MY_CONFRES_CNT];
rd_kafka_event_t *rkev;
rd_kafka_resp_err_t err;
const rd_kafka_DescribeConfigs_result_t *res;
const rd_kafka_ConfigResource_t **rconfigs;
char *group = rd_strdup(test_mk_topic_name(__FUNCTION__, 1));
size_t rconfig_cnt;
char errstr[128];
const char *errstr2;
int ci = 0;
int i;
int fails = 0;

SUB_TEST_QUICK();

/*
* ConfigResource #0: group config, for a non-existent group.
*/
/*
* ConfigResource #3: group config, for a non-existent group.
*/
configs[ci] =
rd_kafka_ConfigResource_new(RD_KAFKA_RESOURCE_GROUP, group);
if (!test_consumer_group_protocol_classic()) {
exp_err[ci] = RD_KAFKA_RESP_ERR_NO_ERROR;
} else {
exp_err[ci] = RD_KAFKA_RESP_ERR_INVALID_REQUEST;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

ci++;

/*
* Timeout options
*/
options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_ANY);
err = rd_kafka_AdminOptions_set_request_timeout(options, 10000, errstr,
sizeof(errstr));
TEST_ASSERT(!err, "%s", errstr);

/*
* Fire off request
*/
rd_kafka_DescribeConfigs(rk, configs, ci, options, rkqu);

/*
* Wait for result
*/
rkev = test_wait_admin_result(
rkqu, RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT, 10000 + 1000);

/*
* Extract result
*/
res = rd_kafka_event_DescribeConfigs_result(rkev);
TEST_ASSERT(res, "Expected DescribeConfigs result, not %s",
rd_kafka_event_name(rkev));

err = rd_kafka_event_error(rkev);
errstr2 = rd_kafka_event_error_string(rkev);
TEST_ASSERT(!err, "Expected success, not %s: %s",
rd_kafka_err2name(err), errstr2);

rconfigs = rd_kafka_DescribeConfigs_result_resources(res, &rconfig_cnt);
TEST_ASSERT((int)rconfig_cnt == ci,
"Expected %d result resources, got %" PRIusz "\n", ci,
rconfig_cnt);

/*
* Verify status per resource
*/
for (i = 0; i < (int)rconfig_cnt; i++) {
const rd_kafka_ConfigEntry_t **entries;
size_t entry_cnt;

err = rd_kafka_ConfigResource_error(rconfigs[i]);
errstr2 = rd_kafka_ConfigResource_error_string(rconfigs[i]);

entries =
rd_kafka_ConfigResource_configs(rconfigs[i], &entry_cnt);

TEST_SAY(
"ConfigResource #%d: type %s (%d), \"%s\": "
"%" PRIusz " ConfigEntries, error %s (%s)\n",
i,
rd_kafka_ResourceType_name(
rd_kafka_ConfigResource_type(rconfigs[i])),
rd_kafka_ConfigResource_type(rconfigs[i]),
rd_kafka_ConfigResource_name(rconfigs[i]), entry_cnt,
rd_kafka_err2name(err), errstr2 ? errstr2 : "");

test_print_ConfigEntry_array(entries, entry_cnt, 1);

if (rd_kafka_ConfigResource_type(rconfigs[i]) !=
rd_kafka_ConfigResource_type(configs[i]) ||
strcmp(rd_kafka_ConfigResource_name(rconfigs[i]),
rd_kafka_ConfigResource_name(configs[i]))) {
TEST_FAIL_LATER(
"ConfigResource #%d: "
"expected type %s name %s, "
"got type %s name %s",
i,
rd_kafka_ResourceType_name(
rd_kafka_ConfigResource_type(configs[i])),
rd_kafka_ConfigResource_name(configs[i]),
rd_kafka_ResourceType_name(
rd_kafka_ConfigResource_type(rconfigs[i])),
rd_kafka_ConfigResource_name(rconfigs[i]));
fails++;
continue;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

continue; not needed.

}
}

TEST_ASSERT(!fails, "See %d previous failure(s)", fails);

rd_kafka_event_destroy(rkev);

rd_kafka_ConfigResource_destroy_array(configs, ci);

TEST_LATER_CHECK();
#undef MY_CONFRES_CNT

SUB_TEST_PASS();
}

/**
* @brief Test CreateAcls
*/
Expand Down Expand Up @@ -5258,6 +5422,7 @@ static void do_test_apis(rd_kafka_type_t cltype) {

/* DescribeConfigs */
do_test_DescribeConfigs(rk, mainq);
do_test_DescribeConfigs_groups(rk, mainq);
Comment on lines 5424 to +5425
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not add new condition in do_test_DescribeConfigs?


/* Delete records */
do_test_DeleteRecords("temp queue, op timeout 0", rk, NULL, 0);
Expand Down