Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Req/Res] add tostring on request.lua and response.lua for simplying … #42

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion lib/resty/kafka/producer.lua
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,19 @@ local crc32 = ngx.crc32_short
local pcall = pcall
local pairs = pairs

local lrucache = require "resty.lrucache"
local cache = lrucache.new(1)
if not cache then
return ngx_log(ERR, "failed to create the cache: " .. (err or "unknown"))
else
cache:set("max_timers_count", 0)
end

-- according to the current implementation, each "running timer" will take one (fake) connection record
-- from the global connection record list configured by the standard worker_connections directive in nginx.conf.
-- so limit the timers. global max timer count for all workers
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually there can only have two running timers at most.
One is flushing data, another one is acquiring the flush lock ant it will fail soon.
So the running timer is limited:)

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And we should not limit the pending timers, because the pending timers can be failed to turn running :(

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And it will be better when we have ngx.timer.every openresty/lua-nginx-module#856

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can limit the pending timer when we got ngx.timer.every.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fail to turn pending timer running causes an error log which is annoying, :). Here actually limit the total timers including both pending and running timers.
If ngx.timer.every is better, let's look forward to it.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here actually limit the total timers including both pending and running timers.

I mean we should not limit the pending timer, which may cause data lose.

Fail to turn pending timer running causes an error log which is annoying

this usually happen when the running timer num reached the lua_max_running_timer, but we only have two running timer at most in this lib.

-- todo: make it configurable
local max_timers_count = 128

local ok, new_tab = pcall(require, "table.new")
if not ok then
Expand Down Expand Up @@ -238,7 +251,7 @@ local function _flush(premature, self)
end

local overflow = sendbuffer:add(topic, partition_id, key, msg)
if overflow then -- reached batch_size in one topic-partition
if overflow then -- reached batch_size/batch_num in one topic-partition
break
end
end
Expand Down Expand Up @@ -272,16 +285,29 @@ local function _flush(premature, self)
elseif is_exiting() and ringbuffer:left_num() > 0 then
-- still can create 0 timer even exiting
_flush_buffer(self)
else
-- timer ends, decrease the timer count
local timer_count = tonumber(cache:get("max_timers_count"))
cache:set("max_timers_count", timer_count - 1)
end

return true
end


_flush_buffer = function (self)
local timer_count = tonumber(cache:get("max_timers_count"))
if timer_count >= max_timers_count then
return
end
-- maybe the coroutine is not yielded, so the timer will not start immediately
local ok, err = timer_at(0, _flush, self)
if not ok then
ngx_log(ERR, "failed to create timer at _flush_buffer, err: ", err)
else
-- whatever the timer is running or pending, increase the existing timer count
local count = tonumber(cache:get("max_timers_count"))
cache:set("max_timers_count", count + 1)
end
end

Expand Down
9 changes: 9 additions & 0 deletions lib/resty/kafka/request.lua
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,15 @@ local function str_int64(int)
tonumber(band(int, 0xff)))
end

-- for easy to debug
function mt.__tostring( self )
local str = ""
for _,v in ipairs(self._req) do
str = str .. tostring(v) .. " "
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First of all, I'm not very happy to add these code for debug, I think we'd better have these code on higher level.
But I'm fine if you optimized these code.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should push the segments to table first and then use table.concat.
I know this's just for debug, but it's really not a good example.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What you mean on higher level? I write these codes not only for debug kafka but also this kafka client. I want to make sure if this client sends request correctly and receive response correctly.
Thanks for your advice ".." is not the best practice, instead, table.concat is.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean you can do this in your own code, over this lib.

end
str = str .. tostring(self.offset) .. " " .. tostring(self.len)
return str
end

function _M.new(self, apikey, correlation_id, client_id)
local c_len = #client_id
Expand Down
4 changes: 4 additions & 0 deletions lib/resty/kafka/response.lua
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ local strbyte = string.byte
local _M = {}
local mt = { __index = _M }

-- for easy to debug
function mt.__tostring( self )
return tostring(self.str) .. " " .. tostring(self.offset) .. " " .. tostring(self.correlation_id)
end

function _M.new(self, str)
local resp = setmetatable({
Expand Down