diff --git a/kong/clustering/rpc/socket.lua b/kong/clustering/rpc/socket.lua index f4d23cd6ebe..694a00e41b6 100644 --- a/kong/clustering/rpc/socket.lua +++ b/kong/clustering/rpc/socket.lua @@ -35,6 +35,7 @@ local CLUSTERING_PING_INTERVAL = constants.CLUSTERING_PING_INTERVAL local PING_WAIT = CLUSTERING_PING_INTERVAL * 1.5 local PING_TYPE = "PING" local PONG_TYPE = "PONG" +local ngx_INFO = ngx.INFO local ngx_WARN = ngx.WARN local ngx_DEBUG = ngx.DEBUG @@ -152,7 +153,14 @@ function _M:process_rpc_msg(payload, collection) ngx_log(ngx_DEBUG, "[rpc] got RPC call: ", payload_method, " (id: ", payload_id, ")") local dispatch_cb = self.manager.callbacks.callbacks[payload_method] - if not dispatch_cb and payload_id then + if not dispatch_cb then + -- for RPC notify + if not payload_id then + ngx_log(ngx_INFO, "[rpc] unable to find RPC notify call: ", payload_method) + return true + end + + -- for RPC call local res, err = self:push_response(new_error(payload_id, jsonrpc.METHOD_NOT_FOUND), "unable to send \"METHOD_NOT_FOUND\" error back to client: ", collection) diff --git a/kong/clustering/services/sync/rpc.lua b/kong/clustering/services/sync/rpc.lua index 3dbacf68f2b..ab01009f4d4 100644 --- a/kong/clustering/services/sync/rpc.lua +++ b/kong/clustering/services/sync/rpc.lua @@ -66,6 +66,17 @@ end function _M:init_cp(manager) local purge_delay = manager.conf.cluster_data_plane_purge_delay + -- CP + -- Method: kong.sync.v2.notify_validation_error + -- Params: msg: error message reported by DP + -- example: { version = , error = , } + manager.callbacks:register("kong.sync.v2.notify_validation_error", function(node_id, msg) + ngx_log(ngx_DEBUG, "[kong.sync.v2] received validation error") + -- TODO: We need a better error handling method, it might report this error + -- to Konnect or or log it locally. + return true + end) + -- CP -- Method: kong.sync.v2.get_delta -- Params: versions: list of current versions of the database @@ -179,6 +190,22 @@ local function is_rpc_ready() end +-- tell cp that the deltas validation failed +local function notify_error(ver, err_t) + local msg = { + version = ver or "v02_deltas_have_no_latest_version_field", + error = err_t, + } + + local ok, err = kong.rpc:notify("control_plane", + "kong.sync.v2.notify_validation_error", + msg) + if not ok then + ngx_log(ngx_ERR, "notifying validation errors failed: ", err) + end +end + + -- tell cp we already updated the version by rpc notification local function update_status(ver) local msg = { default = { version = ver, }, } @@ -300,8 +327,9 @@ local function do_sync() assert(type(kong.default_workspace) == "string") -- validate deltas - local ok, err = validate_deltas(deltas, wipe) + local ok, err, err_t = validate_deltas(deltas, wipe) if not ok then + notify_error(ns_delta.latest_version, err_t) return nil, err end diff --git a/kong/clustering/services/sync/validate.lua b/kong/clustering/services/sync/validate.lua index e3969785b21..18b42e66d2d 100644 --- a/kong/clustering/services/sync/validate.lua +++ b/kong/clustering/services/sync/validate.lua @@ -1,17 +1,38 @@ local declarative = require("kong.db.declarative") local declarative_config = require("kong.db.schema.others.declarative_config") +local db_errors = require("kong.db.errors") +local ERRORS = require("kong.constants").CLUSTERING_DATA_PLANE_ERROR local null = ngx.null +local insert = table.insert local pk_string = declarative_config.pk_string local validate_references_sync = declarative_config.validate_references_sync local pretty_print_error = declarative.pretty_print_error +-- It refers to format_error() function in kong/clustering/config_helper.lua. +local function format_error(err_t) + -- Declarative config parse errors will include all the input entities in + -- the error table. Strip these out to keep the error payload size small. + local errors = err_t.flattened_errors + if type(errors) ~= "table" then + return + end + + for i = 1, #errors do + local err = errors[i] + if type(err) == "table" then + err.entity = nil + end + end +end + + local function validate_deltas(deltas, is_full_sync) local errs = {} - local errs_n = 0 + local errs_entities = {} -- generate deltas table mapping primary key string to entity item local deltas_map = {} @@ -46,27 +67,46 @@ local function validate_deltas(deltas, is_full_sync) local ok, err_t = dao.schema:validate(copy) if not ok then - errs_n = errs_n + 1 - errs[errs_n] = { [delta_type] = err_t } + if not errs[delta_type] then + errs[delta_type] = {} + end + insert(errs[delta_type], err_t) + + if not errs_entities[delta_type] then + errs_entities[delta_type] = {} + end + insert(errs_entities[delta_type], delta_entity) end end end end - if next(errs) then - return nil, pretty_print_error(errs, "deltas") - end - -- validate references - local ok, err_t = validate_references_sync(deltas, deltas_map, is_full_sync) - if not ok then - return nil, pretty_print_error(err_t) + + if not next(errs) then + local ok + ok, errs = validate_references_sync(deltas, deltas_map, is_full_sync) + if ok then + return true + end end - return true + -- error handling + + local err = pretty_print_error(errs) + + local err_t = db_errors:sync_deltas_flattened(errs, errs_entities) + + err_t.name = ERRORS.DELTAS_PARSE + err_t.source = "kong.clustering.services.sync.validate.validate_deltas" + + format_error(err_t) + + return nil, err, err_t end return { validate_deltas = validate_deltas, + format_error = format_error, } diff --git a/kong/constants.lua b/kong/constants.lua index 4b04b01ce3f..4781ca23770 100644 --- a/kong/constants.lua +++ b/kong/constants.lua @@ -231,6 +231,7 @@ local constants = { CLUSTERING_OCSP_TIMEOUT = 5000, -- 5 seconds CLUSTERING_DATA_PLANE_ERROR = { CONFIG_PARSE = "declarative configuration parse failure", + DELTAS_PARSE = "sync deltas parse failure", RELOAD = "configuration reload failed", GENERIC = "generic or unknown error", }, diff --git a/kong/db/errors.lua b/kong/db/errors.lua index 0ac400f824a..10b83b8312c 100644 --- a/kong/db/errors.lua +++ b/kong/db/errors.lua @@ -55,6 +55,8 @@ local ERRORS = { INVALID_WORKSPACE = 17, -- strategy reports a workspace error INVALID_UNIQUE_GLOBAL = 18, -- unique field value is invalid for global query REFERENCED_BY_OTHERS = 19, -- still referenced by other entities + INVALID_SEARCH_QUERY = 20, -- ex. searched_field[unknown] = something -> 'unknown' is invalid (HTTP 400) + SYNC_DELTAS = 21, -- error parsing sync deltas for sync.v2 } @@ -81,6 +83,8 @@ local ERRORS_NAMES = { [ERRORS.INVALID_WORKSPACE] = "invalid workspace", [ERRORS.INVALID_UNIQUE_GLOBAL] = "invalid global query", [ERRORS.REFERENCED_BY_OTHERS] = "referenced by others", + [ERRORS.INVALID_SEARCH_QUERY] = "invalid search query", + [ERRORS.SYNC_DELTAS] = "invalid sync deltas", } @@ -500,6 +504,17 @@ function _M:declarative_config(err_t) end +function _M:sync_deltas(err_t) + if type(err_t) ~= "table" then + error("err_t must be a table", 2) + end + + local message = fmt("sync deltas is invalid: %s", pl_pretty(err_t, "")) + + return new_err_t(self, ERRORS.SYNC_DELTAS, message, err_t) +end + + function _M:invalid_workspace(ws_id) if type(ws_id) ~= "string" then error("ws_id must be a string", 2) @@ -1171,4 +1186,30 @@ function _M:declarative_config_flattened(err_t, input) end +-- traverse schema validation errors and correlate them with objects/entities +-- which does not pass delta validation for sync.v2 +-- +---@param err_t table +---@param err_entities table +---@return table +function _M:sync_deltas_flattened(err_t, err_entities) + if type(err_t) ~= "table" then + error("err_t must be a table", 2) + end + + if type(err_entities) ~= "table" then + error("err_entities is nil or not a table", 2) + end + + local flattened = flatten_errors(err_entities, err_t) + + err_t = self:sync_deltas(err_t) + + err_t.flattened_errors = flattened + + return err_t + +end + + return _M diff --git a/spec/01-unit/19-hybrid/04-validate_deltas_spec.lua b/spec/01-unit/19-hybrid/04-validate_deltas_spec.lua index ecde1143529..49b1e2c07a3 100644 --- a/spec/01-unit/19-hybrid/04-validate_deltas_spec.lua +++ b/spec/01-unit/19-hybrid/04-validate_deltas_spec.lua @@ -1,10 +1,14 @@ local helpers = require "spec.helpers" local txn = require "resty.lmdb.transaction" local declarative = require "kong.db.declarative" +local validate = require("kong.clustering.services.sync.validate") +local db_errors = require("kong.db.errors") +local declarative_config = require "kong.db.schema.others.declarative_config" local insert_entity_for_txn = declarative.insert_entity_for_txn -local validate_deltas = require("kong.clustering.services.sync.validate").validate_deltas +local validate_deltas = validate.validate_deltas +local format_error = validate.format_error local function lmdb_drop() @@ -73,6 +77,46 @@ end describe("[delta validations]",function() + local DeclarativeConfig = assert(declarative_config.load( + helpers.test_conf.loaded_plugins, + helpers.test_conf.loaded_vaults + )) + + -- Assert that deltas validation error is same with sync.v1 validation error. + -- sync.v1 config deltas: @deltas + -- sync.v2 config table: @config + local function assert_same_validation_error(deltas, config, expected_errs) + local _, _, err_t = validate_deltas(deltas) + + assert.equal(err_t.code, 21) + assert.same(err_t.source, "kong.clustering.services.sync.validate.validate_deltas") + assert.same(err_t.message, "sync deltas is invalid: {}") + assert.same(err_t.name, "sync deltas parse failure") + + err_t.code = nil + err_t.source = nil + err_t.message = nil + err_t.name = nil + + local _, dc_err = DeclarativeConfig:validate(config) + assert(dc_err, "validate config should has error:" .. require("inspect")(config)) + + local dc_err_t = db_errors:declarative_config_flattened(dc_err, config) + + dc_err_t.code = nil + dc_err_t.source = nil + dc_err_t.message = nil + dc_err_t.name = nil + + format_error(dc_err_t) + + assert.same(err_t, dc_err_t) + + if expected_errs then + assert.same(err_t, expected_errs) + end + end + it("workspace id", function() local bp = setup_bp() @@ -93,7 +137,7 @@ describe("[delta validations]",function() end end) - it("route has no required field", function() + it("route has no required field, but uses default value", function() local bp = setup_bp() -- add entities @@ -124,7 +168,7 @@ describe("[delta validations]",function() -- add entities db_insert(bp, "workspaces", { name = "ws-001" }) local service = db_insert(bp, "services", { name = "service-001", }) - db_insert(bp, "routes", { + local route = db_insert(bp, "routes", { name = "route-001", paths = { "/mock" }, service = { id = service.id }, @@ -139,11 +183,26 @@ describe("[delta validations]",function() end end - local _, err = validate_deltas(deltas) - assert.matches([[- in entry 1 of 'deltas': - in 'routes': - in 'foo': unknown field]], - err) + local config = declarative.export_config() + config["routes"][1].foo = "invalid_field_value" + + local errs = { + fields = {}, + flattened_errors = {{ + entity_id = route.id, + entity_name = "route-001", + entity_type = "route", + errors = { + { + field = "foo", + message = "unknown field", + type = "field", + }, + }, + }} + } + + assert_same_validation_error(deltas, config, errs) end) it("route has foreign service", function() @@ -177,10 +236,26 @@ describe("[delta validations]",function() }) local deltas = declarative.export_config_sync() - local _, err = validate_deltas(deltas, false) + local _, err, err_t = validate_deltas(deltas, false) + assert.matches( "entry 1 of 'services': could not find routes's foreign refrences services", err) + + assert.same(err_t, { + code = 21, + fields = { + routes = { + services = { + "could not find routes's foreign refrences services ({\"id\":\"00000000-0000-0000-0000-000000000000\"})", + }, + }, + }, + message = [[sync deltas is invalid: {routes={services={"could not find routes's foreign refrences services ({\"id\":\"00000000-0000-0000-0000-000000000000\"})"}}}]], + flattened_errors = {}, + name = "sync deltas parse failure", + source = "kong.clustering.services.sync.validate.validate_deltas", + }) end) it("100 routes -> 1 services: matched foreign keys", function() @@ -250,4 +325,101 @@ describe("[delta validations]",function() err) end end) + + -- The following test cases refer to + -- spec/01-unit/01-db/01-schema/11-declarative_config/01-validate_spec.lua. + + it("verifies required fields", function() + local bp = setup_bp() + + -- add entities + db_insert(bp, "workspaces", { name = "ws-001" }) + local service = db_insert(bp, "services", { name = "service-001", }) + + local deltas = declarative.export_config_sync() + + -- delete host field + for _, delta in ipairs(deltas) do + if delta.type == "services" then + delta.entity.host = nil + break + end + end + + local config = declarative.export_config() + config["services"][1].host = nil + + local errs = { + fields = {}, + flattened_errors = {{ + entity_id = service.id, + entity_name = "service-001", + entity_type = "service", + errors = {{ + field = "host", + message = "required field missing", + type = "field", + }}, + }}, + } + + assert_same_validation_error(deltas, config, errs) + end) + + it("performs regular validations", function() + local bp = setup_bp() + + -- add entities + db_insert(bp, "workspaces", { name = "ws-001" }) + local _ = db_insert(bp, "services", { + name = "service-001", + retries = -1, + protocol = "foo", + host = 1234, + port = 99999, + path = "/foo//bar/", + }) + + local deltas = declarative.export_config_sync() + + local config = declarative.export_config() + + local errs = { + fields = {}, + flattened_errors = { + { + entity_id = config.services[1].id, + entity_name = "service-001", + entity_type = "service", + errors = { + { + field = "retries", + message = "value should be between 0 and 32767", + type = "field" + }, { + field = "protocol", + message = "expected one of: grpc, grpcs, http, https, tcp, tls, tls_passthrough, udp", + type = "field" + }, { + field = "port", + message = "value should be between 0 and 65535", + type = "field" + }, { + field = "path", + message = "must not have empty segments", + type = "field" + }, { + field = "host", + message = "expected a string", + type = "field" + }, + }, + }, + }, + } + + assert_same_validation_error(deltas, config, errs) + end) + + -- TODO: add more test cases end) diff --git a/spec/02-integration/18-hybrid_rpc/10-validate_deltas_spec.lua b/spec/02-integration/18-hybrid_rpc/10-validate_deltas_spec.lua index 269c492dda8..016924bdad7 100644 --- a/spec/02-integration/18-hybrid_rpc/10-validate_deltas_spec.lua +++ b/spec/02-integration/18-hybrid_rpc/10-validate_deltas_spec.lua @@ -73,9 +73,11 @@ for _, strategy in helpers.each_strategy() do -- cp logs assert.logfile(name).has.line( "kong.sync.v2.get_delta ok", true, 10) + assert.logfile(name).has.line( + "[rpc] unable to find RPC notify call: kong.sync.v2.notify_validation_error", + true, 10) assert.logfile(name).has.no.line( "[error]", true, 0) - end) end) end)