Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

### Fixed

- `ERR_READONLY` errors on previous leader after leader change in cluster
while processing `on_disconnect` triggers (#248).

## [1.4.4] - 2025-05-26

The patch release fixes incorrect behavior of the utubettl driver with enabled
Expand Down
50 changes: 36 additions & 14 deletions queue/abstract.lua
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,10 @@ end

--- Release all session tasks.
local function release_session_tasks(session_uuid)
if box.info.ro then
return
end
Comment thread
oleg-jukovec marked this conversation as resolved.

local taken_tasks = box.space._queue_taken_2.index.uuid:select{session_uuid}

for _, task in pairs(taken_tasks) do
Expand All @@ -501,26 +505,25 @@ end

function method._on_consumer_disconnect()
Comment thread
oleg-jukovec marked this conversation as resolved.
local conn_id = connection.id()
local consumers = box.space._queue_consumers

-- wakeup all waiters
while true do
local waiter = box.space._queue_consumers.index.pk:min{conn_id}
if waiter == nil then
break
end
-- Don't touch the other consumers
if waiter[1] ~= conn_id then
break
end
box.space._queue_consumers:delete{waiter[1], waiter[2]}
local cond = conds[waiter[2]]
for _, waiter in consumers.index.pk:pairs(conn_id, { iterator = 'EQ' }) do
local fid = waiter[2]
local cond = conds[fid]
if cond then
releasing_connections[waiter[2]] = true
cond:signal(waiter[2])
releasing_connections[fid] = true
cond:signal(fid)
end

if not box.info.ro then
consumers:delete{waiter[1], waiter[2]}
end
end

session.disconnect(conn_id)
if not box.info.ro then
session.disconnect(conn_id)
end
end

-- function takes tuples and recreates tube
Expand All @@ -539,10 +542,29 @@ local function recreate_tube(tube_tuple)
return make_self(driver, space, name, tube_type, id, opts)
end

-- function cleans local temporary spaces on startup to avoid
-- storing old data
local function cleanup_temp_spaces()
if box.info.ro then
return
end

local s = box.space._queue_consumers
if s ~= nil then
s:truncate()
end

s = box.space._queue_session_ids
if s ~= nil then
s:truncate()
end
end

-- Function takes new queue state.
-- The "RUNNING" and "WAITING" states do not require additional actions.
local function on_state_change(state)
if state == queue_state.states.STARTUP then
cleanup_temp_spaces()
local replicaset_mode = queue.cfg['in_replicaset'] or false
-- gh-202: In replicaset mode, tubes can be created and deleted on different nodes.
-- Accordingly, it is necessary to rebuild the queue.tube index.
Expand Down
136 changes: 136 additions & 0 deletions t/240-ro-on-disconnect.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
#!/usr/bin/env tarantool

local log = require('log')
local tnt = require('t.tnt')
local test = require('tap').test('')
local fiber = require('fiber')
local queue = require('queue')

local qc = require('queue.compat')
if not qc.check_version({2, 4, 1}) then
log.info('Tests skipped, tarantool version < 2.4.1')
return
end

rawset(_G, 'queue', require('queue'))

local session = require('queue.abstract.queue_session')
local queue_state = require('queue.abstract.queue_state')

test:plan(3)

test:test('on_disconnect handler must be RO-safe', function(test)
test:plan(6)

tnt.cluster.cfg{}
test:ok(tnt.cluster.wait_replica(), 'wait for replica to connect')

queue.cfg{ttr = 0.5, in_replicaset = true}
local tube = queue.create_tube('test_ro_disc', 'fifo', {if_not_exists = true})
test:ok(tube, 'tube created')

local f = fiber.new(function()
queue.tube.test_ro_disc:take(3600)
end)
f:name('queue_waiter_fiber')

local ok = false
for _ = 1, 300 do
if box.space._queue_consumers:count() > 0 then
ok = true
break
end
fiber.sleep(0.01)
end
test:ok(ok, 'waiter registered in _queue_consumers')

box.cfg{read_only = true}
test:ok(box.info.ro, 'instance is RO')

local ok_call, err = pcall(queue._on_consumer_disconnect)
test:ok(ok_call, ('_on_consumer_disconnect() must not fail on RO, err = %s'):format(tostring(err)))

box.cfg{read_only = false}
test:ok(not box.info.ro, 'instance back to RW')
end)

test:test('release_session_tasks: RO-safe', function(test)
test:plan(10)

local tube = queue.create_tube('test_rel', 'fifo', {if_not_exists = true})
test:ok(tube, 'tube created')

-- Create a session + take a task so _queue_taken_2 has a record.
local client = tnt.cluster.connect_master()
test:ok(client.error == nil, 'client connected')

local session_uuid = client:call('queue.identify')
test:ok(session_uuid ~= nil, 'got session_uuid')

test:ok(queue.tube.test_rel:put('data'), 'put task')
local task = client:call('queue.tube.test_rel:take')
test:ok(task ~= nil, 'task taken')

local taken_before = box.space._queue_taken_2.index.uuid:select{session_uuid}
test:is(#taken_before, 1, '_queue_taken_2 has 1 record before')

box.cfg{read_only = true}
test:ok(queue_state.poll(queue_state.states.WAITING, 10), 'state WAITING')
test:ok(box.info.ro, 'instance is RO')

local ok_call, err = pcall(session._on_session_remove, session_uuid)
test:ok(ok_call, ('on_session_remove does not fail on RO, err=%s'):format(tostring(err)))

local taken_after = box.space._queue_taken_2.index.uuid:select{session_uuid}
test:is(#taken_after, 1, '_queue_taken_2 unchanged on RO')

-- Cleanup: back to RW.
box.cfg{read_only = false}
queue_state.poll(queue_state.states.RUNNING, 10)
client:close()
end)

test:test('release_session_tasks: works on RW', function(test)
test:plan(11)

box.cfg{read_only = false}
queue_state.poll(queue_state.states.RUNNING, 10)
test:ok(not box.info.ro, 'instance is RW')

queue.cfg{ttr = 0.5, in_replicaset = true}
local tube = queue.create_tube('test_rel2', 'fifo', {if_not_exists = true})
test:ok(tube, 'tube created')

local client = tnt.cluster.connect_master()
test:ok(client.error == nil, 'client connected')

local session_uuid = client:call('queue.identify')
test:ok(session_uuid ~= nil, 'got session_uuid')

test:ok(queue.tube.test_rel2:put('data2'), 'put task')
local task = client:call('queue.tube.test_rel2:take')
test:ok(task ~= nil, 'task taken')

local taken_before = box.space._queue_taken_2.index.uuid:select{session_uuid}
test:is(#taken_before, 1, '_queue_taken_2 has 1 record before')

-- Call on_session_remove callback directly on RW: must release task.
local ok_call, err = pcall(session._on_session_remove, session_uuid)
test:ok(ok_call, ('on_session_remove ok on RW, err=%s'):format(tostring(err)))

-- Taken record must be removed.
local taken_after = box.space._queue_taken_2.index.uuid:select{session_uuid}
test:is(#taken_after, 0, '_queue_taken_2 record removed')

-- Task must become READY again and be takeable.
local task2 = client:call('queue.tube.test_rel2:take', {0})
test:ok(task2 ~= nil, 'task is takeable again after release')
test:is(task2[3], 'data2', 'task data preserved')

client:close()
end)

rawset(_G, 'queue', nil)
tnt.finish()
os.exit(test:check() and 0 or 1)
-- vim: set ft=lua :
Comment thread
oleg-jukovec marked this conversation as resolved.
Loading