diff --git a/example-dead-taker/dead-taker-cli.lua b/example-dead-taker/dead-taker-cli.lua new file mode 100644 index 0000000..eb493d1 --- /dev/null +++ b/example-dead-taker/dead-taker-cli.lua @@ -0,0 +1,15 @@ +#!/usr/bin/env tarantool + +local netbox = require 'net.box' +local fiber = require 'fiber' +local log = require 'log' + +local clis = {} +for w = 1, 2 do + clis[w] = netbox.connect('127.0.0.1:3301') + fiber.create(function() + log.info("Received: %s %s", w, clis[w]:eval('return box.space.queue:take(60)', {}, { timeout = 65 }).payload) + end) +end + +clis[1]:close() diff --git a/example-dead-taker/dead-taker.lua b/example-dead-taker/dead-taker.lua new file mode 100644 index 0000000..7f48d72 --- /dev/null +++ b/example-dead-taker/dead-taker.lua @@ -0,0 +1,27 @@ +#!/usr/bin/env tarantool + +box.cfg{ listen = '127.0.0.1:3301', wal_mode = 'none' } +box.schema.user.grant('guest', 'super', nil, nil, { if_not_exists = true }) +box.schema.space.create('queue', { if_not_exists = true }):create_index('pri', { if_not_exists = true, parts = {1, 'string'} }) +box.space.queue:create_index('xq', { parts = { { 2, 'string' }, { 1, 'string' } }, if_not_exists = true }) +box.space.queue:create_index('runat', { parts = { { 3, 'number' }, { 1, 'string' } }, if_not_exists = true }) + +require 'xqueue'.upgrade(box.space.queue, { + format = { + { name = 'id', type = 'string' }, + { name = 'status', type = 'string' }, + { name = 'runat', type = 'number' }, + { name = 'payload', type = '*' }, + }, + debug = true, + fields = { + status = 'status', + runat = 'runat', + }, + features = { + id = 'uuid', + delayed = true, + }, +}) + +require 'console'.start() os.exit(0) diff --git a/xqueue.lua b/xqueue.lua index 5911e61..c0564c1 100644 --- a/xqueue.lua +++ b/xqueue.lua @@ -74,7 +74,7 @@ Status: requires `runat` `delay` may be set during put, release, kick turned into R after delay - + B - buried - task was temporary discarded from queue by consumer may be revived using kick by administrator use it in unpredicted conditions, when man intervention is required @@ -84,7 +84,7 @@ Status: D - done - task was processed and ack'ed and permanently left in database enabled when keep feature is set - + X - reserved for statistics (TODO: reload/upgrade and feature switch) @@ -269,11 +269,11 @@ function M.upgrade(space,opts,depth) end }) self.debug = not not opts.debug - + if not self._default_truncate then self._default_truncate = space.truncate end - + local format_av = box.space._space.index.name:get(space.name)[ 7 ] local format = {} local have_format = false @@ -365,7 +365,7 @@ function M.upgrade(space,opts,depth) self.key = pkf self.fields = fields self.fieldmap = fieldmap - + if not self._stat then self._stat = { counts = {}; @@ -376,7 +376,7 @@ function M.upgrade(space,opts,depth) self._stat.counts[s] = (self._stat.counts[s] or 0LL) + 1 end end - + function self:getkey(arg) local _type = type(arg) if _type == 'table' then @@ -768,7 +768,7 @@ function M.upgrade(space,opts,depth) end if #collect >= maxrun then remaining = 0 break end end - + for _,t in ipairs(collect) do -- log.info("Runat: %s, %s", _, t) if t[xq.fields.status] == 'W' then @@ -817,7 +817,7 @@ function M.upgrade(space,opts,depth) end return 1 end) - + table_clear(collect) if r then @@ -900,10 +900,10 @@ function M.upgrade(space,opts,depth) end self.ready = nil end - + local meta = debug.getmetatable(space) for k,v in pairs(methods) do meta[k] = v end - + -- Triggers must set right before updating space -- because raising error earlier leads to trigger inconsistency self._on_repl = space:on_replace(function(old, new) @@ -922,7 +922,7 @@ function M.upgrade(space,opts,depth) else old_st = 'X' end - + if new then new_st = new[self.fields.status] counts[new_st] = (counts[new_st] or 0LL) + 1 @@ -932,15 +932,15 @@ function M.upgrade(space,opts,depth) else new_st = 'X' end - + local field = old_st.."-"..new_st self._stat.transition[field] = (self._stat.transition[field] or 0ULL) + 1 end, self._on_repl) - + self._on_dis = box.session.on_disconnect(function() local sid = box.session.id() local peer = box.session.storage.peer - + log.info("%s: disconnected %s, sid=%s, fid=%s", space.name, peer, sid, fiber.id() ) box.session.storage.destroyed = true if self.bysid[sid] then @@ -968,7 +968,7 @@ function M.upgrade(space,opts,depth) self.bysid[sid] = nil end end, self._on_dis) - + rawset(space,'xq',self) log.info("Upgraded %s into xqueue (status=%s)", space.name, box.info.status) @@ -1211,7 +1211,7 @@ function methods:take(timeout, opts) local index local start_with - local tube_chan + local take_wait, tube_chan if opts.tube then if not xq.features.tube then error("Feature tube is not enabled", 2) @@ -1223,9 +1223,11 @@ function methods:take(timeout, opts) start_with = {opts.tube, 'R'} tube_chan = xq.take_chans[opts.tube] or fiber.channel() xq.take_chans[opts.tube] = tube_chan + take_wait = tube_chan else index = xq.index start_with = {'R'} + take_wait = xq.take_wait end local now = fiber.time() @@ -1245,8 +1247,14 @@ function methods:take(timeout, opts) local left = (now + timeout) - fiber.time() if left <= 0 then goto finish end - (tube_chan or xq.take_wait):get(left) - if box.session.storage.destroyed then goto finish end + take_wait:get(left) + + if box.session.storage.destroyed then + -- We are the dead taker, we should retransmit or notification + -- to another taker + take_wait:put(true, 0) + goto finish + end end end ::finish:: @@ -1263,7 +1271,7 @@ function methods:take(timeout, opts) local r,e = pcall(function() local sid = box.session.id() local peer = box.session.storage.peer - + -- print("Take ",key," for ",peer," sid=",sid, "; fid=",fiber.id() ) if xq.debug then log.info("Take {%s} by %s, sid=%s, fid=%s", key, peer, sid, fiber.id()) @@ -1358,9 +1366,9 @@ function methods:release(key, attr) log.info("Rel: %s->%s {%s} +%s from %s/sid=%s/fid=%s", old, t[xq.fields.status], key, attr.delay, box.session.storage.peer, box.session.id(), fiber.id() ) end) - + xq:putback(t) - + return t end