Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(clustering/sync): flatten and report validation error #14262

Merged
merged 19 commits into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion kong/clustering/services/sync/rpc.lua
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,15 @@ end
function _M:init_cp(manager)
local purge_delay = manager.conf.cluster_data_plane_purge_delay

-- CP
-- Method: kong.sync.v2.notify_validation_error
-- Params: msg: list of current versions of the database
-- example: { version = <lastest version of deltas>, error = <flatten error>, }
chobits marked this conversation as resolved.
Show resolved Hide resolved
chronolaw marked this conversation as resolved.
Show resolved Hide resolved
manager.callbacks:register("kong.sync.v2.notify_validation_error", function(node_id, msg)
ngx_log(ngx_DEBUG, "[kong.sync.v2] received validation error")
chobits marked this conversation as resolved.
Show resolved Hide resolved
return true
end)

-- CP
-- Method: kong.sync.v2.get_delta
-- Params: versions: list of current versions of the database
Expand Down Expand Up @@ -179,6 +188,22 @@ local function is_rpc_ready()
end


-- tell cp that the deltas validation failed
local function notify_error(ver, err_t)
local msg = {
version = ver or "v02_deltas_have_no_latest_version_field",
error = err_t,
}

local ok, err = kong.rpc:notify("control_plane",
"kong.sync.v2.notify_validation_error",
msg)
if not ok then
ngx_log(ngx_ERR, "notifying validation errors failed: ", err)
end
end


-- tell cp we already updated the version by rpc notification
local function update_status(ver)
local msg = { default = { version = ver, }, }
Expand Down Expand Up @@ -300,8 +325,9 @@ local function do_sync()
assert(type(kong.default_workspace) == "string")

-- validate deltas
local ok, err = validate_deltas(deltas, wipe)
local ok, err, err_t = validate_deltas(deltas, wipe)
if not ok then
notify_error(ns_delta.lastest_version, err_t)
chobits marked this conversation as resolved.
Show resolved Hide resolved
chobits marked this conversation as resolved.
Show resolved Hide resolved
chronolaw marked this conversation as resolved.
Show resolved Hide resolved
return nil, err
end

Expand Down
62 changes: 51 additions & 11 deletions kong/clustering/services/sync/validate.lua
Original file line number Diff line number Diff line change
@@ -1,17 +1,38 @@
local declarative = require("kong.db.declarative")
local declarative_config = require("kong.db.schema.others.declarative_config")
local db_errors = require("kong.db.errors")
local ERRORS = require("kong.constants").CLUSTERING_DATA_PLANE_ERROR


local null = ngx.null
local insert = table.insert
local pk_string = declarative_config.pk_string
local validate_references_sync = declarative_config.validate_references_sync
local pretty_print_error = declarative.pretty_print_error


-- It refers to format_error() function in kong/clustering/config_helper.lua.
local function format_error(err_t)
-- Declarative config parse errors will include all the input entities in
-- the error table. Strip these out to keep the error payload size small.
local errors = err_t.flattened_errors
if type(errors) ~= "table" then
return
end

for i = 1, #errors do
chronolaw marked this conversation as resolved.
Show resolved Hide resolved
local err = errors[i]
if type(err) == "table" then
err.entity = nil
end
end
end


local function validate_deltas(deltas, is_full_sync)

local errs = {}
local errs_n = 0
local errs_entities = {}

-- generate deltas table mapping primary key string to entity item
local deltas_map = {}
Expand Down Expand Up @@ -46,27 +67,46 @@ local function validate_deltas(deltas, is_full_sync)

local ok, err_t = dao.schema:validate(copy)
if not ok then
errs_n = errs_n + 1
errs[errs_n] = { [delta_type] = err_t }
if not errs[delta_type] then
errs[delta_type] = {}
end
insert(errs[delta_type], err_t)

if not errs_entities[delta_type] then
errs_entities[delta_type] = {}
end
insert(errs_entities[delta_type], delta_entity)
end
end
end
end

if next(errs) then
return nil, pretty_print_error(errs, "deltas")
end

-- validate references
local ok, err_t = validate_references_sync(deltas, deltas_map, is_full_sync)
if not ok then
return nil, pretty_print_error(err_t)

if not next(errs) then
local ok
ok, errs = validate_references_sync(deltas, deltas_map, is_full_sync)
if ok then
return true
end
end

return true
-- error handling

local err = pretty_print_error(errs)

local err_t = db_errors:sync_deltas_flattened(errs, errs_entities)

err_t.name = ERRORS.DELTAS_PARSE
err_t.source = "kong.clustering.services.sync.validate.validate_deltas"

format_error(err_t)

return nil, err, err_t
end


return {
validate_deltas = validate_deltas,
format_error = format_error,
}
1 change: 1 addition & 0 deletions kong/constants.lua
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ local constants = {
CLUSTERING_OCSP_TIMEOUT = 5000, -- 5 seconds
CLUSTERING_DATA_PLANE_ERROR = {
CONFIG_PARSE = "declarative configuration parse failure",
DELTAS_PARSE = "sync deltas parse failure",
RELOAD = "configuration reload failed",
GENERIC = "generic or unknown error",
},
Expand Down
37 changes: 37 additions & 0 deletions kong/db/errors.lua
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,17 @@ function _M:declarative_config(err_t)
end


function _M:sync_deltas(err_t)
if type(err_t) ~= "table" then
error("err_t must be a table", 2)
end

local message = fmt("sync deltas is invalid: %s", pl_pretty(err_t, ""))

return new_err_t(self, ERRORS.SYNC_DELTAS, message, err_t)
end


function _M:invalid_workspace(ws_id)
if type(ws_id) ~= "string" then
error("ws_id must be a string", 2)
Expand Down Expand Up @@ -1171,4 +1182,30 @@ function _M:declarative_config_flattened(err_t, input)
end


-- traverse schema validation errors and correlate them with objects/entities
-- which does not pass delta validation for sync.v2
--
---@param err_t table
---@param err_entities table
---@return table
function _M:sync_deltas_flattened(err_t, err_entities)
if type(err_t) ~= "table" then
error("err_t must be a table", 2)
end

if type(err_entities) ~= "table" then
error("err_entities is nil or not a table", 2)
end

local flattened = flatten_errors(err_entities, err_t)

err_t = self:sync_deltas(err_t)

err_t.flattened_errors = flattened

return err_t

end


return _M
Loading
Loading