From a68206e792e1a839b863909d3447285e9461e87f Mon Sep 17 00:00:00 2001 From: Bruce Zhang Date: Fri, 25 Nov 2016 11:11:18 +0800 Subject: [PATCH 1/2] [Req/Res] add tostring on request.lua and response.lua for simplying debug; overflow means reach the batch size or batch num; --- lib/resty/kafka/producer.lua | 2 +- lib/resty/kafka/request.lua | 9 +++++++++ lib/resty/kafka/response.lua | 4 ++++ 3 files changed, 14 insertions(+), 1 deletion(-) diff --git a/lib/resty/kafka/producer.lua b/lib/resty/kafka/producer.lua index f66588de..e4590d6e 100644 --- a/lib/resty/kafka/producer.lua +++ b/lib/resty/kafka/producer.lua @@ -238,7 +238,7 @@ local function _flush(premature, self) end local overflow = sendbuffer:add(topic, partition_id, key, msg) - if overflow then -- reached batch_size in one topic-partition + if overflow then -- reached batch_size/batch_num in one topic-partition break end end diff --git a/lib/resty/kafka/request.lua b/lib/resty/kafka/request.lua index 2ec9ca9c..87599fe4 100644 --- a/lib/resty/kafka/request.lua +++ b/lib/resty/kafka/request.lua @@ -60,6 +60,15 @@ local function str_int64(int) tonumber(band(int, 0xff))) end +-- for easy to debug +function mt.__tostring( self ) + local str = "" + for _,v in ipairs(self._req) do + str = str .. tostring(v) .. " " + end + str = str .. tostring(self.offset) .. " " .. tostring(self.len) + return str +end function _M.new(self, apikey, correlation_id, client_id) local c_len = #client_id diff --git a/lib/resty/kafka/response.lua b/lib/resty/kafka/response.lua index 5c40e348..c14f6e79 100644 --- a/lib/resty/kafka/response.lua +++ b/lib/resty/kafka/response.lua @@ -15,6 +15,10 @@ local strbyte = string.byte local _M = {} local mt = { __index = _M } +-- for easy to debug +function mt.__tostring( self ) + return tostring(self.str) .. " " .. tostring(self.offset) .. " " .. tostring(self.correlation_id) +end function _M.new(self, str) local resp = setmetatable({ From b44f9242bce49984e071b841b1a8bcdd32850e35 Mon Sep 17 00:00:00 2001 From: Bruce Zhang Date: Fri, 25 Nov 2016 13:53:14 +0800 Subject: [PATCH 2/2] [Bug Fix] Fix the issue "too many pending timers..." --- lib/resty/kafka/producer.lua | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/lib/resty/kafka/producer.lua b/lib/resty/kafka/producer.lua index e4590d6e..e788afe5 100644 --- a/lib/resty/kafka/producer.lua +++ b/lib/resty/kafka/producer.lua @@ -23,6 +23,19 @@ local crc32 = ngx.crc32_short local pcall = pcall local pairs = pairs +local lrucache = require "resty.lrucache" +local cache = lrucache.new(1) +if not cache then + return ngx_log(ERR, "failed to create the cache: " .. (err or "unknown")) +else + cache:set("max_timers_count", 0) +end + +-- according to the current implementation, each "running timer" will take one (fake) connection record +-- from the global connection record list configured by the standard worker_connections directive in nginx.conf. +-- so limit the timers. global max timer count for all workers +-- todo: make it configurable +local max_timers_count = 128 local ok, new_tab = pcall(require, "table.new") if not ok then @@ -272,6 +285,10 @@ local function _flush(premature, self) elseif is_exiting() and ringbuffer:left_num() > 0 then -- still can create 0 timer even exiting _flush_buffer(self) + else + -- timer ends, decrease the timer count + local timer_count = tonumber(cache:get("max_timers_count")) + cache:set("max_timers_count", timer_count - 1) end return true @@ -279,9 +296,18 @@ end _flush_buffer = function (self) + local timer_count = tonumber(cache:get("max_timers_count")) + if timer_count >= max_timers_count then + return + end + -- maybe the coroutine is not yielded, so the timer will not start immediately local ok, err = timer_at(0, _flush, self) if not ok then ngx_log(ERR, "failed to create timer at _flush_buffer, err: ", err) + else + -- whatever the timer is running or pending, increase the existing timer count + local count = tonumber(cache:get("max_timers_count")) + cache:set("max_timers_count", count + 1) end end