Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog/68672.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix `salt` batch mode incorrectly treating transport-level error payloads as minion IDs, preventing spurious `Minion 'error' failed to respond` messages and hardening duplicate return handling.
66 changes: 37 additions & 29 deletions salt/cli/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,15 @@ def gather_minions(self):
fret = set()
nret = set()
for ret in ping_gen:
if ("minions" and "jid") in ret:
if "minions" in ret and "jid" in ret:
for minion in ret["minions"]:
nret.add(minion)
continue
else:
try:
m = next(iter(ret.keys()))
if not isinstance(m, str) or m == "error":
continue
except StopIteration:
if not self.quiet:
salt.utils.stringutils.print_cli(
Expand Down Expand Up @@ -215,7 +217,7 @@ def run(self):
iters.append(new_iter)
minion_tracker[new_iter] = {}
# every iterator added is 'active' and has its set of minions
minion_tracker[new_iter]["minions"] = next_
minion_tracker[new_iter]["minions"] = set(next_)
minion_tracker[new_iter]["active"] = True

else:
Expand All @@ -227,7 +229,7 @@ def run(self):
if ping_ret is None:
break
m = next(iter(ping_ret.keys()))
if m not in self.minions:
if m not in self.minions and m != "error":
self.minions.append(m)
to_run.append(m)

Expand All @@ -244,26 +246,19 @@ def run(self):
break
continue
if self.opts.get("raw"):
parts.update({part["data"]["id"]: part})
if part["data"]["id"] in minion_tracker[queue]["minions"]:
minion_tracker[queue]["minions"].remove(
part["data"]["id"]
)
else:
salt.utils.stringutils.print_cli(
"minion {} was already deleted from tracker,"
" probably a duplicate key".format(part["id"])
)
if ("data" in part and part["data"]["id"] != "error"):
minion_id = part["data"]["id"]
parts[minion_id] = part
minion_tracker[queue]["minions"].discard(minion_id)
elif "error" in part:
log.debug("Error in batch run (raw mode): %s", part)
continue
else:
parts.update(part)
for id in part:
if id in minion_tracker[queue]["minions"]:
minion_tracker[queue]["minions"].remove(id)
else:
salt.utils.stringutils.print_cli(
"minion {} was already deleted from tracker,"
" probably a duplicate key".format(id)
)
for minion_id, data in part.items():
if minion_id == "error" or minion_id not in self.minions:
continue
parts[minion_id] = data
minion_tracker[queue]["minions"].discard(minion_id)
except StopIteration:
# if a iterator is done:
# - set it to inactive
Expand All @@ -277,8 +272,11 @@ def run(self):
# that have not responded to parts{} with an empty response
for minion in minion_tracker[queue]["minions"]:
if minion not in parts:
parts[minion] = {}
parts[minion]["ret"] = {}
if minion != "error":
parts[minion] = {}
parts[minion]["ret"] = {}
else:
log.debug("Skipping error entry from minions tracker")

for minion, data in parts.items():
if minion in active:
Expand All @@ -289,7 +287,14 @@ def run(self):

# need to check if Minion failed to respond to job sent
failed_check = data.get("failed", False)
if failed_check:
# Missing 'ret' is equivalent to a failed response
if failed_check or "ret" not in data:
if "ret" not in data:
log.debug(
"Minion '%s' returned data without 'ret': %s",
minion,
data,
)
log.debug(
"Minion '%s' failed to respond to job sent, data '%s'",
minion,
Expand Down Expand Up @@ -323,11 +328,14 @@ def run(self):
ret[minion] = data
yield data, retcode
else:
ret[minion] = data["ret"]
yield {minion: data["ret"]}, retcode
ret[minion] = data.get("ret")
yield {minion: data.get("ret")}, retcode
if not self.quiet:
ret[minion] = data["ret"]
data[minion] = data.pop("ret")
ret[minion] = data.get("ret")
if "ret" in data:
data[minion] = data.pop("ret")
else:
data[minion] = None
if "out" in data:
out = data.pop("out")
else:
Expand Down
137 changes: 137 additions & 0 deletions tests/pytests/unit/cli/test_batch_error_handling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import pytest

from salt.cli.batch import Batch


class FakeIter:
"""
Iterator used to simulate cmd_iter_no_block()
"""

def __init__(self, parts):
self._iter = iter(parts)

def __iter__(self):
return self

def __next__(self):
item = next(self._iter)
if isinstance(item, StopIteration):
raise StopIteration
return item


class FakeLocalClient:
"""
Fake LocalClient to control ping + job return payloads
"""

def __init__(self, ping_returns, cmd_returns):
self._ping_returns = ping_returns
self._cmd_returns = cmd_returns

def cmd_iter(self, *args, **kwargs):
return iter(self._ping_returns)

def cmd_iter_no_block(self, *args, **kwargs):
return FakeIter(self._cmd_returns)

def destroy(self):
pass


@pytest.fixture
def base_opts():
return {
"tgt": "*",
"fun": "test.ping",
"arg": [],
"timeout": 5,
"batch": 1,
"conf_file": "/dev/null",
"gather_job_timeout": 5,
}


def test_gather_minions_ignores_error_payload(monkeypatch, base_opts):
ping_returns = [
# Transport-level error payload
{"error": "Authentication failure", "jid": "20260101000000"},
# Legit discovery payload
{"minions": ["minion1"], "jid": "20260101000001"},
{"minion1": {"ret": True}},
]

fake_client = FakeLocalClient(ping_returns, [])

monkeypatch.setattr(
"salt.client.get_local_client",
lambda *a, **k: fake_client,
)

batch = Batch(base_opts, quiet=True)
minions, _, _ = batch.gather_minions()

assert "error" not in minions
assert minions == ["minion1"]

def test_batch_run_ignores_error_return(monkeypatch, base_opts):
ping_returns = [
{"minions": ["minion1"], "jid": "123"},
{"minion1": {"ret": True}},
]

cmd_returns = [
# Error payload emitted by transport/publish
{"error": "Publish failed", "failed": True},
# Legit minion return
{"minion1": {"ret": True}},
]

fake_client = FakeLocalClient(ping_returns, cmd_returns)

monkeypatch.setattr(
"salt.client.get_local_client",
lambda *a, **k: fake_client,
)

batch = Batch(base_opts, quiet=True)

results = list(batch.run())

returned_minions = []
for item, _ in results:
if isinstance(item, dict):
returned_minions.extend(item.keys())

assert "error" not in returned_minions
assert "minion1" in returned_minions


def test_batch_duplicate_returns_are_idempotent(monkeypatch, base_opts):
ping_returns = [
{"minions": ["minion1"], "jid": "123"},
{"minion1": {"ret": True}},
]

cmd_returns = [
{"minion1": {"ret": True}},
{"minion1": {"ret": True}}, # duplicate return
StopIteration(),
]

fake_client = FakeLocalClient(ping_returns, cmd_returns)

monkeypatch.setattr(
"salt.client.get_local_client",
lambda *a, **k: fake_client,
)

batch = Batch(base_opts, quiet=True)

results = list(batch.run())

assert results
for item, _ in results:
if isinstance(item, dict):
assert "error" not in item
Loading