Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(clustering/sync): flatten and report validation error #14262

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
8 changes: 7 additions & 1 deletion kong/clustering/rpc/socket.lua
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,13 @@ function _M:process_rpc_msg(payload, collection)
ngx_log(ngx_DEBUG, "[rpc] got RPC call: ", payload_method, " (id: ", payload_id, ")")

local dispatch_cb = self.manager.callbacks.callbacks[payload_method]
if not dispatch_cb and payload_id then
if not dispatch_cb then
-- for RPC notify
if not payload_id then
return nil, "unable to find RPC notify callback for method: " .. payload_method
end

-- for RPC call
local res, err = self:push_response(new_error(payload_id, jsonrpc.METHOD_NOT_FOUND),
"unable to send \"METHOD_NOT_FOUND\" error back to client: ",
collection)
Expand Down
30 changes: 29 additions & 1 deletion kong/clustering/services/sync/rpc.lua
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,17 @@ end
function _M:init_cp(manager)
local purge_delay = manager.conf.cluster_data_plane_purge_delay

-- CP
-- Method: kong.sync.v2.notify_validation_error
-- Params: msg: error message reported by DP
-- example: { version = <lastest version of deltas>, error = <flatten error>, }
manager.callbacks:register("kong.sync.v2.notify_validation_error", function(node_id, msg)
ngx_log(ngx_DEBUG, "[kong.sync.v2] received validation error")
chobits marked this conversation as resolved.
Show resolved Hide resolved
-- TODO: We need a better error handling method, it might report this error
-- to Konnect or or log it locally.
return true
end)

-- CP
-- Method: kong.sync.v2.get_delta
-- Params: versions: list of current versions of the database
Expand Down Expand Up @@ -179,6 +190,22 @@ local function is_rpc_ready()
end


-- tell cp that the deltas validation failed
local function notify_error(ver, err_t)
local msg = {
version = ver or "v02_deltas_have_no_latest_version_field",
error = err_t,
}

local ok, err = kong.rpc:notify("control_plane",
"kong.sync.v2.notify_validation_error",
msg)
if not ok then
ngx_log(ngx_ERR, "notifying validation errors failed: ", err)
end
end


-- tell cp we already updated the version by rpc notification
local function update_status(ver)
local msg = { default = { version = ver, }, }
Expand Down Expand Up @@ -300,8 +327,9 @@ local function do_sync()
assert(type(kong.default_workspace) == "string")

-- validate deltas
local ok, err = validate_deltas(deltas, wipe)
local ok, err, err_t = validate_deltas(deltas, wipe)
if not ok then
notify_error(ns_delta.lastest_version, err_t)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which version should we use? dp's current version or delta's version?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this lastest_version will be introduced by this rpc spec: https://github.com/Kong/openrpc_specfiles/pull/18

Copy link
Contributor Author

@chobits chobits Feb 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/Kong/kong-ee/pull/11238 introduces this new field

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if no this field, could we use deltas[#deltas].version?

return nil, err
end

Expand Down
62 changes: 51 additions & 11 deletions kong/clustering/services/sync/validate.lua
Original file line number Diff line number Diff line change
@@ -1,17 +1,38 @@
local declarative = require("kong.db.declarative")
local declarative_config = require("kong.db.schema.others.declarative_config")
local db_errors = require("kong.db.errors")
local ERRORS = require("kong.constants").CLUSTERING_DATA_PLANE_ERROR


local null = ngx.null
local insert = table.insert
local pk_string = declarative_config.pk_string
local validate_references_sync = declarative_config.validate_references_sync
local pretty_print_error = declarative.pretty_print_error


-- It refers to format_error() function in kong/clustering/config_helper.lua.
local function format_error(err_t)
-- Declarative config parse errors will include all the input entities in
-- the error table. Strip these out to keep the error payload size small.
local errors = err_t.flattened_errors
if type(errors) ~= "table" then
return
end

for i = 1, #errors do
chronolaw marked this conversation as resolved.
Show resolved Hide resolved
local err = errors[i]
if type(err) == "table" then
err.entity = nil
end
end
end


local function validate_deltas(deltas, is_full_sync)

local errs = {}
local errs_n = 0
local errs_entities = {}

-- generate deltas table mapping primary key string to entity item
local deltas_map = {}
Expand Down Expand Up @@ -46,27 +67,46 @@ local function validate_deltas(deltas, is_full_sync)

local ok, err_t = dao.schema:validate(copy)
if not ok then
errs_n = errs_n + 1
errs[errs_n] = { [delta_type] = err_t }
if not errs[delta_type] then
errs[delta_type] = {}
end
insert(errs[delta_type], err_t)

if not errs_entities[delta_type] then
errs_entities[delta_type] = {}
end
insert(errs_entities[delta_type], delta_entity)
end
end
end
end

if next(errs) then
return nil, pretty_print_error(errs, "deltas")
end

-- validate references
local ok, err_t = validate_references_sync(deltas, deltas_map, is_full_sync)
if not ok then
return nil, pretty_print_error(err_t)

if not next(errs) then
local ok
ok, errs = validate_references_sync(deltas, deltas_map, is_full_sync)
if ok then
return true
end
end

return true
-- error handling

local err = pretty_print_error(errs)

local err_t = db_errors:sync_deltas_flattened(errs, errs_entities)

err_t.name = ERRORS.DELTAS_PARSE
err_t.source = "kong.clustering.services.sync.validate.validate_deltas"

format_error(err_t)

return nil, err, err_t
end


return {
validate_deltas = validate_deltas,
format_error = format_error,
}
1 change: 1 addition & 0 deletions kong/constants.lua
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ local constants = {
CLUSTERING_OCSP_TIMEOUT = 5000, -- 5 seconds
CLUSTERING_DATA_PLANE_ERROR = {
CONFIG_PARSE = "declarative configuration parse failure",
DELTAS_PARSE = "sync deltas parse failure",
RELOAD = "configuration reload failed",
GENERIC = "generic or unknown error",
},
Expand Down
41 changes: 41 additions & 0 deletions kong/db/errors.lua
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ local ERRORS = {
INVALID_WORKSPACE = 17, -- strategy reports a workspace error
INVALID_UNIQUE_GLOBAL = 18, -- unique field value is invalid for global query
REFERENCED_BY_OTHERS = 19, -- still referenced by other entities
INVALID_SEARCH_QUERY = 20, -- ex. searched_field[unknown] = something -> 'unknown' is invalid (HTTP 400)
SYNC_DELTAS = 21, -- error parsing sync deltas for sync.v2
}


Expand All @@ -81,6 +83,8 @@ local ERRORS_NAMES = {
[ERRORS.INVALID_WORKSPACE] = "invalid workspace",
[ERRORS.INVALID_UNIQUE_GLOBAL] = "invalid global query",
[ERRORS.REFERENCED_BY_OTHERS] = "referenced by others",
[ERRORS.INVALID_SEARCH_QUERY] = "invalid search query",
[ERRORS.SYNC_DELTAS] = "invalid sync deltas",
}


Expand Down Expand Up @@ -500,6 +504,17 @@ function _M:declarative_config(err_t)
end


function _M:sync_deltas(err_t)
if type(err_t) ~= "table" then
error("err_t must be a table", 2)
end

local message = fmt("sync deltas is invalid: %s", pl_pretty(err_t, ""))

return new_err_t(self, ERRORS.SYNC_DELTAS, message, err_t)
end


function _M:invalid_workspace(ws_id)
if type(ws_id) ~= "string" then
error("ws_id must be a string", 2)
Expand Down Expand Up @@ -1171,4 +1186,30 @@ function _M:declarative_config_flattened(err_t, input)
end


-- traverse schema validation errors and correlate them with objects/entities
-- which does not pass delta validation for sync.v2
--
---@param err_t table
---@param err_entities table
---@return table
function _M:sync_deltas_flattened(err_t, err_entities)
if type(err_t) ~= "table" then
error("err_t must be a table", 2)
end

if type(err_entities) ~= "table" then
error("err_entities is nil or not a table", 2)
end

local flattened = flatten_errors(err_entities, err_t)

err_t = self:sync_deltas(err_t)

err_t.flattened_errors = flattened

return err_t

end


return _M
Loading
Loading