From 46744c7f4e6fd54d8de63d0494e62e67d5262c22 Mon Sep 17 00:00:00 2001 From: Vladislav Grubov Date: Wed, 16 Dec 2020 21:45:35 +0300 Subject: [PATCH] dead takers now notifies other takers inside :take We name the client the "dead taker" if it closed connection to Tarantool during xq:take(). Tarantool closes socket and calls callback box.session.on_disconnect and xqueue registers that taker inside this session can't consume tasks anymore. But, if clients sleeps for too long on the xq:take() (ex. 60 seconds) then next taker will receive task only when new task will be produced. It is too long to wait, so this fix implements alarm of the other takers (if there are any) in "dead taker" --- example-dead-taker/dead-taker-cli.lua | 15 ++++++++ example-dead-taker/dead-taker.lua | 27 +++++++++++++++ xqueue.lua | 50 ++++++++++++++++----------- 3 files changed, 71 insertions(+), 21 deletions(-) create mode 100644 example-dead-taker/dead-taker-cli.lua create mode 100644 example-dead-taker/dead-taker.lua diff --git a/example-dead-taker/dead-taker-cli.lua b/example-dead-taker/dead-taker-cli.lua new file mode 100644 index 0000000..eb493d1 --- /dev/null +++ b/example-dead-taker/dead-taker-cli.lua @@ -0,0 +1,15 @@ +#!/usr/bin/env tarantool + +local netbox = require 'net.box' +local fiber = require 'fiber' +local log = require 'log' + +local clis = {} +for w = 1, 2 do + clis[w] = netbox.connect('127.0.0.1:3301') + fiber.create(function() + log.info("Received: %s %s", w, clis[w]:eval('return box.space.queue:take(60)', {}, { timeout = 65 }).payload) + end) +end + +clis[1]:close() diff --git a/example-dead-taker/dead-taker.lua b/example-dead-taker/dead-taker.lua new file mode 100644 index 0000000..7f48d72 --- /dev/null +++ b/example-dead-taker/dead-taker.lua @@ -0,0 +1,27 @@ +#!/usr/bin/env tarantool + +box.cfg{ listen = '127.0.0.1:3301', wal_mode = 'none' } +box.schema.user.grant('guest', 'super', nil, nil, { if_not_exists = true }) +box.schema.space.create('queue', { if_not_exists = true }):create_index('pri', { if_not_exists = true, parts = {1, 'string'} }) +box.space.queue:create_index('xq', { parts = { { 2, 'string' }, { 1, 'string' } }, if_not_exists = true }) +box.space.queue:create_index('runat', { parts = { { 3, 'number' }, { 1, 'string' } }, if_not_exists = true }) + +require 'xqueue'.upgrade(box.space.queue, { + format = { + { name = 'id', type = 'string' }, + { name = 'status', type = 'string' }, + { name = 'runat', type = 'number' }, + { name = 'payload', type = '*' }, + }, + debug = true, + fields = { + status = 'status', + runat = 'runat', + }, + features = { + id = 'uuid', + delayed = true, + }, +}) + +require 'console'.start() os.exit(0) diff --git a/xqueue.lua b/xqueue.lua index 5911e61..c0564c1 100644 --- a/xqueue.lua +++ b/xqueue.lua @@ -74,7 +74,7 @@ Status: requires `runat` `delay` may be set during put, release, kick turned into R after delay - + B - buried - task was temporary discarded from queue by consumer may be revived using kick by administrator use it in unpredicted conditions, when man intervention is required @@ -84,7 +84,7 @@ Status: D - done - task was processed and ack'ed and permanently left in database enabled when keep feature is set - + X - reserved for statistics (TODO: reload/upgrade and feature switch) @@ -269,11 +269,11 @@ function M.upgrade(space,opts,depth) end }) self.debug = not not opts.debug - + if not self._default_truncate then self._default_truncate = space.truncate end - + local format_av = box.space._space.index.name:get(space.name)[ 7 ] local format = {} local have_format = false @@ -365,7 +365,7 @@ function M.upgrade(space,opts,depth) self.key = pkf self.fields = fields self.fieldmap = fieldmap - + if not self._stat then self._stat = { counts = {}; @@ -376,7 +376,7 @@ function M.upgrade(space,opts,depth) self._stat.counts[s] = (self._stat.counts[s] or 0LL) + 1 end end - + function self:getkey(arg) local _type = type(arg) if _type == 'table' then @@ -768,7 +768,7 @@ function M.upgrade(space,opts,depth) end if #collect >= maxrun then remaining = 0 break end end - + for _,t in ipairs(collect) do -- log.info("Runat: %s, %s", _, t) if t[xq.fields.status] == 'W' then @@ -817,7 +817,7 @@ function M.upgrade(space,opts,depth) end return 1 end) - + table_clear(collect) if r then @@ -900,10 +900,10 @@ function M.upgrade(space,opts,depth) end self.ready = nil end - + local meta = debug.getmetatable(space) for k,v in pairs(methods) do meta[k] = v end - + -- Triggers must set right before updating space -- because raising error earlier leads to trigger inconsistency self._on_repl = space:on_replace(function(old, new) @@ -922,7 +922,7 @@ function M.upgrade(space,opts,depth) else old_st = 'X' end - + if new then new_st = new[self.fields.status] counts[new_st] = (counts[new_st] or 0LL) + 1 @@ -932,15 +932,15 @@ function M.upgrade(space,opts,depth) else new_st = 'X' end - + local field = old_st.."-"..new_st self._stat.transition[field] = (self._stat.transition[field] or 0ULL) + 1 end, self._on_repl) - + self._on_dis = box.session.on_disconnect(function() local sid = box.session.id() local peer = box.session.storage.peer - + log.info("%s: disconnected %s, sid=%s, fid=%s", space.name, peer, sid, fiber.id() ) box.session.storage.destroyed = true if self.bysid[sid] then @@ -968,7 +968,7 @@ function M.upgrade(space,opts,depth) self.bysid[sid] = nil end end, self._on_dis) - + rawset(space,'xq',self) log.info("Upgraded %s into xqueue (status=%s)", space.name, box.info.status) @@ -1211,7 +1211,7 @@ function methods:take(timeout, opts) local index local start_with - local tube_chan + local take_wait, tube_chan if opts.tube then if not xq.features.tube then error("Feature tube is not enabled", 2) @@ -1223,9 +1223,11 @@ function methods:take(timeout, opts) start_with = {opts.tube, 'R'} tube_chan = xq.take_chans[opts.tube] or fiber.channel() xq.take_chans[opts.tube] = tube_chan + take_wait = tube_chan else index = xq.index start_with = {'R'} + take_wait = xq.take_wait end local now = fiber.time() @@ -1245,8 +1247,14 @@ function methods:take(timeout, opts) local left = (now + timeout) - fiber.time() if left <= 0 then goto finish end - (tube_chan or xq.take_wait):get(left) - if box.session.storage.destroyed then goto finish end + take_wait:get(left) + + if box.session.storage.destroyed then + -- We are the dead taker, we should retransmit or notification + -- to another taker + take_wait:put(true, 0) + goto finish + end end end ::finish:: @@ -1263,7 +1271,7 @@ function methods:take(timeout, opts) local r,e = pcall(function() local sid = box.session.id() local peer = box.session.storage.peer - + -- print("Take ",key," for ",peer," sid=",sid, "; fid=",fiber.id() ) if xq.debug then log.info("Take {%s} by %s, sid=%s, fid=%s", key, peer, sid, fiber.id()) @@ -1358,9 +1366,9 @@ function methods:release(key, attr) log.info("Rel: %s->%s {%s} +%s from %s/sid=%s/fid=%s", old, t[xq.fields.status], key, attr.delay, box.session.storage.peer, box.session.id(), fiber.id() ) end) - + xq:putback(t) - + return t end