diff --git a/changelog/68672.fixed.md b/changelog/68672.fixed.md new file mode 100644 index 000000000000..dc10001dbe57 --- /dev/null +++ b/changelog/68672.fixed.md @@ -0,0 +1 @@ +Fix `salt` batch mode incorrectly treating transport-level error payloads as minion IDs, preventing spurious `Minion 'error' failed to respond` messages and hardening duplicate return handling. diff --git a/salt/cli/batch.py b/salt/cli/batch.py index 3a648c02b86f..e8650235671c 100644 --- a/salt/cli/batch.py +++ b/salt/cli/batch.py @@ -69,13 +69,15 @@ def gather_minions(self): fret = set() nret = set() for ret in ping_gen: - if ("minions" and "jid") in ret: + if "minions" in ret and "jid" in ret: for minion in ret["minions"]: nret.add(minion) continue else: try: m = next(iter(ret.keys())) + if not isinstance(m, str) or m == "error": + continue except StopIteration: if not self.quiet: salt.utils.stringutils.print_cli( @@ -215,7 +217,7 @@ def run(self): iters.append(new_iter) minion_tracker[new_iter] = {} # every iterator added is 'active' and has its set of minions - minion_tracker[new_iter]["minions"] = next_ + minion_tracker[new_iter]["minions"] = set(next_) minion_tracker[new_iter]["active"] = True else: @@ -227,7 +229,7 @@ def run(self): if ping_ret is None: break m = next(iter(ping_ret.keys())) - if m not in self.minions: + if m not in self.minions and m != "error": self.minions.append(m) to_run.append(m) @@ -244,26 +246,19 @@ def run(self): break continue if self.opts.get("raw"): - parts.update({part["data"]["id"]: part}) - if part["data"]["id"] in minion_tracker[queue]["minions"]: - minion_tracker[queue]["minions"].remove( - part["data"]["id"] - ) - else: - salt.utils.stringutils.print_cli( - "minion {} was already deleted from tracker," - " probably a duplicate key".format(part["id"]) - ) + if ("data" in part and part["data"]["id"] != "error"): + minion_id = part["data"]["id"] + parts[minion_id] = part + minion_tracker[queue]["minions"].discard(minion_id) + elif "error" in part: + log.debug("Error in batch run (raw mode): %s", part) + continue else: - parts.update(part) - for id in part: - if id in minion_tracker[queue]["minions"]: - minion_tracker[queue]["minions"].remove(id) - else: - salt.utils.stringutils.print_cli( - "minion {} was already deleted from tracker," - " probably a duplicate key".format(id) - ) + for minion_id, data in part.items(): + if minion_id == "error" or minion_id not in self.minions: + continue + parts[minion_id] = data + minion_tracker[queue]["minions"].discard(minion_id) except StopIteration: # if a iterator is done: # - set it to inactive @@ -277,8 +272,11 @@ def run(self): # that have not responded to parts{} with an empty response for minion in minion_tracker[queue]["minions"]: if minion not in parts: - parts[minion] = {} - parts[minion]["ret"] = {} + if minion != "error": + parts[minion] = {} + parts[minion]["ret"] = {} + else: + log.debug("Skipping error entry from minions tracker") for minion, data in parts.items(): if minion in active: @@ -289,7 +287,14 @@ def run(self): # need to check if Minion failed to respond to job sent failed_check = data.get("failed", False) - if failed_check: + # Missing 'ret' is equivalent to a failed response + if failed_check or "ret" not in data: + if "ret" not in data: + log.debug( + "Minion '%s' returned data without 'ret': %s", + minion, + data, + ) log.debug( "Minion '%s' failed to respond to job sent, data '%s'", minion, @@ -323,11 +328,14 @@ def run(self): ret[minion] = data yield data, retcode else: - ret[minion] = data["ret"] - yield {minion: data["ret"]}, retcode + ret[minion] = data.get("ret") + yield {minion: data.get("ret")}, retcode if not self.quiet: - ret[minion] = data["ret"] - data[minion] = data.pop("ret") + ret[minion] = data.get("ret") + if "ret" in data: + data[minion] = data.pop("ret") + else: + data[minion] = None if "out" in data: out = data.pop("out") else: diff --git a/tests/pytests/unit/cli/test_batch_error_handling.py b/tests/pytests/unit/cli/test_batch_error_handling.py new file mode 100644 index 000000000000..e1b9c473f217 --- /dev/null +++ b/tests/pytests/unit/cli/test_batch_error_handling.py @@ -0,0 +1,137 @@ +import pytest + +from salt.cli.batch import Batch + + +class FakeIter: + """ + Iterator used to simulate cmd_iter_no_block() + """ + + def __init__(self, parts): + self._iter = iter(parts) + + def __iter__(self): + return self + + def __next__(self): + item = next(self._iter) + if isinstance(item, StopIteration): + raise StopIteration + return item + + +class FakeLocalClient: + """ + Fake LocalClient to control ping + job return payloads + """ + + def __init__(self, ping_returns, cmd_returns): + self._ping_returns = ping_returns + self._cmd_returns = cmd_returns + + def cmd_iter(self, *args, **kwargs): + return iter(self._ping_returns) + + def cmd_iter_no_block(self, *args, **kwargs): + return FakeIter(self._cmd_returns) + + def destroy(self): + pass + + +@pytest.fixture +def base_opts(): + return { + "tgt": "*", + "fun": "test.ping", + "arg": [], + "timeout": 5, + "batch": 1, + "conf_file": "/dev/null", + "gather_job_timeout": 5, + } + + +def test_gather_minions_ignores_error_payload(monkeypatch, base_opts): + ping_returns = [ + # Transport-level error payload + {"error": "Authentication failure", "jid": "20260101000000"}, + # Legit discovery payload + {"minions": ["minion1"], "jid": "20260101000001"}, + {"minion1": {"ret": True}}, + ] + + fake_client = FakeLocalClient(ping_returns, []) + + monkeypatch.setattr( + "salt.client.get_local_client", + lambda *a, **k: fake_client, + ) + + batch = Batch(base_opts, quiet=True) + minions, _, _ = batch.gather_minions() + + assert "error" not in minions + assert minions == ["minion1"] + +def test_batch_run_ignores_error_return(monkeypatch, base_opts): + ping_returns = [ + {"minions": ["minion1"], "jid": "123"}, + {"minion1": {"ret": True}}, + ] + + cmd_returns = [ + # Error payload emitted by transport/publish + {"error": "Publish failed", "failed": True}, + # Legit minion return + {"minion1": {"ret": True}}, + ] + + fake_client = FakeLocalClient(ping_returns, cmd_returns) + + monkeypatch.setattr( + "salt.client.get_local_client", + lambda *a, **k: fake_client, + ) + + batch = Batch(base_opts, quiet=True) + + results = list(batch.run()) + + returned_minions = [] + for item, _ in results: + if isinstance(item, dict): + returned_minions.extend(item.keys()) + + assert "error" not in returned_minions + assert "minion1" in returned_minions + + +def test_batch_duplicate_returns_are_idempotent(monkeypatch, base_opts): + ping_returns = [ + {"minions": ["minion1"], "jid": "123"}, + {"minion1": {"ret": True}}, + ] + + cmd_returns = [ + {"minion1": {"ret": True}}, + {"minion1": {"ret": True}}, # duplicate return + StopIteration(), + ] + + fake_client = FakeLocalClient(ping_returns, cmd_returns) + + monkeypatch.setattr( + "salt.client.get_local_client", + lambda *a, **k: fake_client, + ) + + batch = Batch(base_opts, quiet=True) + + results = list(batch.run()) + + assert results + for item, _ in results: + if isinstance(item, dict): + assert "error" not in item