Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix dead taker #16

Open
wants to merge 1 commit into
base: v5
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions example-dead-taker/dead-taker-cli.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#!/usr/bin/env tarantool

local netbox = require 'net.box'
local fiber = require 'fiber'
local log = require 'log'

local clis = {}
for w = 1, 2 do
clis[w] = netbox.connect('127.0.0.1:3301')
fiber.create(function()
log.info("Received: %s %s", w, clis[w]:eval('return box.space.queue:take(60)', {}, { timeout = 65 }).payload)
end)
end

clis[1]:close()
27 changes: 27 additions & 0 deletions example-dead-taker/dead-taker.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#!/usr/bin/env tarantool

box.cfg{ listen = '127.0.0.1:3301', wal_mode = 'none' }
box.schema.user.grant('guest', 'super', nil, nil, { if_not_exists = true })
box.schema.space.create('queue', { if_not_exists = true }):create_index('pri', { if_not_exists = true, parts = {1, 'string'} })
box.space.queue:create_index('xq', { parts = { { 2, 'string' }, { 1, 'string' } }, if_not_exists = true })
box.space.queue:create_index('runat', { parts = { { 3, 'number' }, { 1, 'string' } }, if_not_exists = true })

require 'xqueue'.upgrade(box.space.queue, {
format = {
{ name = 'id', type = 'string' },
{ name = 'status', type = 'string' },
{ name = 'runat', type = 'number' },
{ name = 'payload', type = '*' },
},
debug = true,
fields = {
status = 'status',
runat = 'runat',
},
features = {
id = 'uuid',
delayed = true,
},
})

require 'console'.start() os.exit(0)
50 changes: 29 additions & 21 deletions xqueue.lua
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ Status:
requires `runat`
`delay` may be set during put, release, kick
turned into R after delay

B - buried - task was temporary discarded from queue by consumer
may be revived using kick by administrator
use it in unpredicted conditions, when man intervention is required
Expand All @@ -84,7 +84,7 @@ Status:

D - done - task was processed and ack'ed and permanently left in database
enabled when keep feature is set

X - reserved for statistics

(TODO: reload/upgrade and feature switch)
Expand Down Expand Up @@ -269,11 +269,11 @@ function M.upgrade(space,opts,depth)
end
})
self.debug = not not opts.debug

if not self._default_truncate then
self._default_truncate = space.truncate
end

local format_av = box.space._space.index.name:get(space.name)[ 7 ]
local format = {}
local have_format = false
Expand Down Expand Up @@ -365,7 +365,7 @@ function M.upgrade(space,opts,depth)
self.key = pkf
self.fields = fields
self.fieldmap = fieldmap

if not self._stat then
self._stat = {
counts = {};
Expand All @@ -376,7 +376,7 @@ function M.upgrade(space,opts,depth)
self._stat.counts[s] = (self._stat.counts[s] or 0LL) + 1
end
end

function self:getkey(arg)
local _type = type(arg)
if _type == 'table' then
Expand Down Expand Up @@ -768,7 +768,7 @@ function M.upgrade(space,opts,depth)
end
if #collect >= maxrun then remaining = 0 break end
end

for _,t in ipairs(collect) do
-- log.info("Runat: %s, %s", _, t)
if t[xq.fields.status] == 'W' then
Expand Down Expand Up @@ -817,7 +817,7 @@ function M.upgrade(space,opts,depth)
end
return 1
end)

table_clear(collect)

if r then
Expand Down Expand Up @@ -900,10 +900,10 @@ function M.upgrade(space,opts,depth)
end
self.ready = nil
end

local meta = debug.getmetatable(space)
for k,v in pairs(methods) do meta[k] = v end

-- Triggers must set right before updating space
-- because raising error earlier leads to trigger inconsistency
self._on_repl = space:on_replace(function(old, new)
Expand All @@ -922,7 +922,7 @@ function M.upgrade(space,opts,depth)
else
old_st = 'X'
end

if new then
new_st = new[self.fields.status]
counts[new_st] = (counts[new_st] or 0LL) + 1
Expand All @@ -932,15 +932,15 @@ function M.upgrade(space,opts,depth)
else
new_st = 'X'
end

local field = old_st.."-"..new_st
self._stat.transition[field] = (self._stat.transition[field] or 0ULL) + 1
end, self._on_repl)

self._on_dis = box.session.on_disconnect(function()
local sid = box.session.id()
local peer = box.session.storage.peer

log.info("%s: disconnected %s, sid=%s, fid=%s", space.name, peer, sid, fiber.id() )
box.session.storage.destroyed = true
if self.bysid[sid] then
Expand Down Expand Up @@ -968,7 +968,7 @@ function M.upgrade(space,opts,depth)
self.bysid[sid] = nil
end
end, self._on_dis)

rawset(space,'xq',self)

log.info("Upgraded %s into xqueue (status=%s)", space.name, box.info.status)
Expand Down Expand Up @@ -1211,7 +1211,7 @@ function methods:take(timeout, opts)
local index
local start_with

local tube_chan
local take_wait, tube_chan
if opts.tube then
if not xq.features.tube then
error("Feature tube is not enabled", 2)
Expand All @@ -1223,9 +1223,11 @@ function methods:take(timeout, opts)
start_with = {opts.tube, 'R'}
tube_chan = xq.take_chans[opts.tube] or fiber.channel()
xq.take_chans[opts.tube] = tube_chan
take_wait = tube_chan
else
index = xq.index
start_with = {'R'}
take_wait = xq.take_wait
end

local now = fiber.time()
Expand All @@ -1245,8 +1247,14 @@ function methods:take(timeout, opts)
local left = (now + timeout) - fiber.time()
if left <= 0 then goto finish end

(tube_chan or xq.take_wait):get(left)
if box.session.storage.destroyed then goto finish end
take_wait:get(left)

if box.session.storage.destroyed then
-- We are the dead taker, we should retransmit or notification
-- to another taker
take_wait:put(true, 0)
goto finish
end
end
end
::finish::
Expand All @@ -1263,7 +1271,7 @@ function methods:take(timeout, opts)
local r,e = pcall(function()
local sid = box.session.id()
local peer = box.session.storage.peer

-- print("Take ",key," for ",peer," sid=",sid, "; fid=",fiber.id() )
if xq.debug then
log.info("Take {%s} by %s, sid=%s, fid=%s", key, peer, sid, fiber.id())
Expand Down Expand Up @@ -1358,9 +1366,9 @@ function methods:release(key, attr)
log.info("Rel: %s->%s {%s} +%s from %s/sid=%s/fid=%s", old, t[xq.fields.status],
key, attr.delay, box.session.storage.peer, box.session.id(), fiber.id() )
end)

xq:putback(t)

return t
end

Expand Down