Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(clustering/rpc): correct rpc disconnected event on dp side #14279

Merged
merged 4 commits into from
Feb 14, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 20 additions & 19 deletions kong/clustering/rpc/manager.lua
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,17 @@ local WS_OPTS = {
}


local function post_rpc_event(ev, params)
local worker_events = assert(kong.worker_events)

-- notify this worker
local ok, err = worker_events.post_local("clustering:jsonrpc", ev, params)
if not ok then
ngx_log(ngx_ERR, _log_prefix, "unable to post rpc ", ev, " event: ", err)
end
end


-- create a new RPC manager, node_id is own node_id
function _M.new(conf, node_id)
local self = {
Expand Down Expand Up @@ -327,14 +338,7 @@ function _M:_meta_call(c, meta_cap, node_id)
}

-- tell outside that rpc is ready
local worker_events = assert(kong.worker_events)

-- notify this worker
local ok, err = worker_events.post_local("clustering:jsonrpc", "connected",
capabilities_list)
if not ok then
ngx_log(ngx_ERR, _log_prefix, "unable to post rpc connected event: ", err)
end
post_rpc_event("connected", capabilities_list)

return true
end
Expand Down Expand Up @@ -502,17 +506,6 @@ function _M:handle_websocket()
local res, err = s:join()
self:_remove_socket(s)

-- tell outside that rpc disconnected
do
local worker_events = assert(kong.worker_events)

-- notify this worker
local ok, err = worker_events.post_local("clustering:jsonrpc", "disconnected")
if not ok then
ngx_log(ngx_ERR, _log_prefix, "unable to post rpc disconnected event: ", err)
end
end

if not res then
ngx_log(ngx_ERR, _log_prefix, "RPC connection broken: ", err, " node_id: ", node_id)
return ngx_exit(ngx.ERROR)
Expand Down Expand Up @@ -585,6 +578,9 @@ function _M:connect(premature, node_id, host, path, cert, key)
" to connect control plane")
end

-- a flag to ensure connection is established
local connection_established

local ok, err = c:connect(uri, opts)
if not ok then
ngx_log(ngx_ERR, _log_prefix, "unable to connect to peer: ", err)
Expand Down Expand Up @@ -619,6 +615,8 @@ function _M:connect(premature, node_id, host, path, cert, key)
goto err
end

connection_established = true

local s = socket.new(self, c, node_id)
s:start()
self:_add_socket(s)
Expand All @@ -635,6 +633,9 @@ function _M:connect(premature, node_id, host, path, cert, key)

::err::

-- tell outside that rpc disconnected or failed
post_rpc_event(connection_established and "disconnected" or "connection_failed")

if not exiting() then
c:close()
self:try_connect(reconnection_delay)
Expand Down
Loading