-
Notifications
You must be signed in to change notification settings - Fork 2
/
jobs.lua
138 lines (127 loc) · 3.59 KB
/
jobs.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
local ct = require 'box.conntrack'
if not rawget(_G,'jobs') then
rawset(_G,'jobs',{
t = {
-- type, type, ...
}, -- available types
w = {
-- wid = { chan, sid, type, alias, peer }
}, -- workers by id
inprog = { -- tasks in progress
-- tid = {ch,wid}
},
wrk = { -- array of active workers
-- type = { id, id, ... }
},
})
end
function jobs:on_disconnect(sid)
print("disconnected ",sid)
for wid, inf in pairs( jobs.w ) do
local wch, wsid, wtype = unpack(inf)
if wsid == sid then
print("drop worker ",wid, " of type ", wtype)
wch:put(false,0)
for _,i in pairs( jobs.wrk[ wtype ] ) do
if i == wid then
print("remove idx ",_)
table.remove(jobs.wrk[ wtype ],_)
break
end
end
for tid,tinf in pairs( jobs.inprog ) do
if tinf[2] == wid then
tinf[1]:put(false,0)
end
end
end
end
end
ct.on_disconnect(jobs,jobs.on_disconnect)
function jobs.workers(...)
for k,v in pairs({...}) do
print("Allowed worker '",v,"' for jobs")
jobs.t[ v ] = true -- setmetatable({},{__mode = "kv"})
jobs.wrk[v] = jobs.wrk[v] or {}
end
end
function jobs.worker(wtype, alias)
wtype = wtype or 'any'
alias = alias or 'unk'
if not jobs.t[wtype] then
box.raise(51, string.format("Worker type '%s' not allowed", tostring(wtype)))
end
local sid = box.session.id()
print(string.format("Incoming worker '%s' as '%s' from %s (%s/%s)", wtype, alias, box.session.peer(), sid, box.fiber.id() ))
local ch = box.ipc.channel(10)
local wid = tostring(box.time64())
jobs.w[wid] = { ch, sid, wtype, alias, box.session.peer() }
table.insert(jobs.wrk[ wtype ], wid)
return wid
end
function jobs.work(wid, timeout)
timeout = timeout and tonumber(timeout) or 1
if type(wid) ~= 'string' then wid = tostring(wid) end
if not jobs.w[wid] then
box.raise(51, string.format("Worker id '%s' not registered", wid))
end
local task = jobs.w[wid][1]:get(timeout)
if task then
return box.tuple.new(task)
else
return
end
end
function jobs.done(tid, data)
-- print("got task result: ",tid, " ", data)
if not jobs.inprog[tid] then
print(string.format("No task '%s' (timeout or error)", tostring(tid)))
return
end
local res = jobs.inprog[tid][1]:put(data,0)
if not res then
print(string.format("Task '%s' result timeout", tostring(tid)))
return
end
end
function jobs.task(wtype,data,timeout)
timeout = timeout and tonumber(timeout) or 1
wtype = wtype or 'any'
if not jobs.t[wtype] then
box.raise(51, string.format("Task type '%s' not allowed", tostring(wtype)))
end
if #jobs.wrk[wtype] > 0 then
-- print("Have workers ",table.concat(jobs.wrk[wtype],', '))
local ch = box.ipc.channel(1)
local tid = tostring(box.time64())
local task = {tid,data}
jobs.inprog[ tid ] = { ch, -1 }
for _,wid in pairs(jobs.wrk[wtype]) do
if jobs.w[wid][1]:has_readers() then
local enq = jobs.w[wid][1]:put(task,0)
if enq then
local wch, wsid, wtype, alias, peer = unpack( jobs.w[wid] )
print("Task passed to worker ",wid," known as ",alias," at ",peer)
jobs.inprog[ tid ][2] = wid
if #jobs.wrk[wtype] > 1 then
table.remove(jobs.wrk[wtype], _)
table.insert(jobs.wrk[wtype], wid)
end
local res = ch:get(timeout)
jobs.inprog[ tid ] = nil
if res then
return res
elseif res == nil then
box.raise(51, "Request timed out")
else
box.raise(51, "Connection reset by peer")
end
end
end
end
jobs.inprog[ tid ] = nil
box.raise(51, string.format("No workers active for type '%s'", tostring(wtype)))
else
box.raise(51, string.format("No workers available for type '%s'", tostring(wtype)))
end
end