From a7a3d219bc766fbd877dbdc92806aa55e564c18d Mon Sep 17 00:00:00 2001 From: Dmitry Ilyin Date: Thu, 16 Apr 2026 22:22:31 +0300 Subject: [PATCH] Add NAND flash install support and boot protocol robustness fixes Install command: - Add --nand flag to `defib install` for NAND flash devices - Add _NAND_LAYOUT: 1M(boot), 1M(env), 8M(kernel), -(ubi) - Use `nand erase/write/read` commands instead of `sf` when --nand - Accept rootfs.ubi tarballs (NAND uses UBI, not squashfs) - Set mtdparts and bootcmd directly (don't rely on device env macros) - Skip flash read-back CRC verify for NAND (ECC/OOB makes it unreliable) - Longer timeouts for NAND erase/write operations Boot protocol (hisilicon_standard): - Add _rehandshake() after SPL: some SoCs (e.g. hi3516av200) re-send 0x20 bootmode markers after DDR init, requiring a fresh 0xAA before accepting U-Boot HEAD frames - Increase U-Boot HEAD retries to 64 at 150ms timeout (was 16 at 30ms) to handle SoCs that are slow to become ready after DDR init - Make U-Boot TAIL failure non-fatal: some SoCs consider the transfer complete once all bytes declared in HEAD are received and don't ACK the TAIL frame Co-Authored-By: Claude Opus 4.6 (1M context) --- src/defib/cli/app.py | 150 ++++++++++----- src/defib/protocol/hisilicon_standard.py | 49 ++++- tests/test_nand_install.py | 50 +++++ tests/test_protocol_robustness.py | 232 +++++++++++++++++++++++ 4 files changed, 428 insertions(+), 53 deletions(-) create mode 100644 tests/test_nand_install.py create mode 100644 tests/test_protocol_robustness.py diff --git a/src/defib/cli/app.py b/src/defib/cli/app.py index ede475f..bb5db76 100644 --- a/src/defib/cli/app.py +++ b/src/defib/cli/app.py @@ -1534,6 +1534,7 @@ def install( device_ip: str = typer.Option("192.168.1.20", "--device-ip", help="IP for camera in U-Boot"), tftp_port: int = typer.Option(69, "--tftp-port", help="TFTP server port"), nor_size: int = typer.Option(8, "--nor-size", help="NOR flash size in MB (8 or 16)"), + nand: bool = typer.Option(False, "--nand", help="Use NAND flash instead of NOR"), output: str = typer.Option("human", "--output", help="Output mode: human, json"), debug: bool = typer.Option(False, "-d", "--debug", help="Enable debug logging"), ) -> None: @@ -1541,12 +1542,12 @@ def install( Extracts the firmware tarball, burns U-Boot to RAM via boot ROM, then uses TFTP to transfer kernel and rootfs to U-Boot which - flashes them to NOR. + flashes them to NOR or NAND. """ import asyncio asyncio.run(_install_async( chip, firmware, port, power_cycle, nic, host_ip, device_ip, - tftp_port, nor_size, output, debug, + tftp_port, nor_size, nand, output, debug, )) @@ -1566,6 +1567,14 @@ def install( "rootfs": (0x350000, 0xA00000), # 10240KB } +# NAND flash layout: 1M(boot),1M(env),8M(kernel),-(ubi) +_NAND_LAYOUT = { + "boot": (0x000000, 0x100000), # 1MB + "env": (0x100000, 0x100000), # 1MB + "kernel": (0x200000, 0x800000), # 8MB + "rootfs": (0xA00000, 0x7600000), # 118MB (UBI) +} + async def _install_async( chip: str, @@ -1577,6 +1586,7 @@ async def _install_async( device_ip: str, tftp_port: int, nor_size: int, + nand: bool, output: str, debug: bool, ) -> None: @@ -1605,14 +1615,21 @@ async def _install_async( else: logging.basicConfig(level=logging.INFO) - layout = _NOR16M_LAYOUT if nor_size >= 16 else _NOR8M_LAYOUT + if nand: + layout = _NAND_LAYOUT + flash_cmd = "nand" + flash_label = "NAND" + else: + layout = _NOR16M_LAYOUT if nor_size >= 16 else _NOR8M_LAYOUT + flash_cmd = "sf" + flash_label = f"NOR {nor_size}MB" # --- Step 1: Extract firmware tarball --- if output == "human": console.print("[bold]OpenIPC Firmware Install[/bold]") - console.print(f" Chip: [cyan]{chip}[/cyan]") - console.print(f" Port: [cyan]{port}[/cyan]") - console.print(f" NOR: [cyan]{nor_size}MB[/cyan]") + console.print(f" Chip: [cyan]{chip}[/cyan]") + console.print(f" Port: [cyan]{port}[/cyan]") + console.print(f" Flash: [cyan]{flash_label}[/cyan]") kernel_data: bytes | None = None rootfs_data: bytes | None = None @@ -1629,14 +1646,14 @@ async def _install_async( f = tf.extractfile(member) assert f is not None kernel_data = f.read() - elif name.startswith("rootfs.squashfs"): + elif name.startswith("rootfs.squashfs") or name.startswith("rootfs.ubi"): rootfs_name = name f = tf.extractfile(member) assert f is not None rootfs_data = f.read() if not kernel_data or not rootfs_data: - console.print("[red]Tarball missing uImage or rootfs.squashfs[/red]") + console.print("[red]Tarball missing uImage or rootfs (squashfs/ubi)[/red]") raise typer.Exit(1) # Verify md5sums if present @@ -1761,14 +1778,22 @@ def on_progress(event: ProgressEvent) -> None: ram_addr = get_ram_staging_addr(chip) - resp = await send_command(transport, "sf probe 0", timeout=5.0, wait_for="# ") - if "error" in resp.lower() or "fail" in resp.lower(): - console.print(f"[red]sf probe failed:[/red] {resp.strip()}") - await transport.close() - raise typer.Exit(1) - - if output == "human": - console.print(" [green]SPI flash detected[/green]") + if nand: + resp = await send_command(transport, "nand info", timeout=5.0, wait_for="# ") + if "error" in resp.lower() or "no nand" in resp.lower(): + console.print(f"[red]NAND detection failed:[/red] {resp.strip()}") + await transport.close() + raise typer.Exit(1) + if output == "human": + console.print(" [green]NAND flash detected[/green]") + else: + resp = await send_command(transport, "sf probe 0", timeout=5.0, wait_for="# ") + if "error" in resp.lower() or "fail" in resp.lower(): + console.print(f"[red]sf probe failed:[/red] {resp.strip()}") + await transport.close() + raise typer.Exit(1) + if output == "human": + console.print(" [green]SPI flash detected[/green]") # --- Step 5: Start TFTP server + configure U-Boot networking --- if not nic: @@ -1848,39 +1873,43 @@ async def tftp_and_flash( if output == "human": console.print(f" TFTP CRC verified: {ram_crc:08X}") + erase_timeout = 120.0 if nand else 60.0 await send_command( transport, - f"sf erase 0x{flash_off:x} 0x{erase_sz:x}", - timeout=60.0, wait_for="# ", + f"{flash_cmd} erase 0x{flash_off:x} 0x{erase_sz:x}", + timeout=erase_timeout, wait_for="# ", ) await send_command( transport, - f"sf write 0x{ram_addr:x} 0x{flash_off:x} 0x{len(orig_data):x}", - timeout=60.0, wait_for="# ", + f"{flash_cmd} write 0x{ram_addr:x} 0x{flash_off:x} 0x{len(orig_data):x}", + timeout=120.0 if nand else 60.0, wait_for="# ", ) - # Verify flash write by reading back and checking CRC - await send_command( - transport, - f"sf read 0x{ram_addr:x} 0x{flash_off:x} 0x{len(orig_data):x}", - timeout=30.0, wait_for="# ", - ) - resp = await send_command( - transport, - f"crc32 0x{ram_addr:x} 0x{len(orig_data):x}", - timeout=10.0, wait_for="# ", - ) - m = re_mod.search(r"==>\s*([0-9a-fA-F]{8})", resp) - if m: - flash_crc = int(m.group(1), 16) - if flash_crc != expected_crc: - console.print( - f"[red]{name} flash verify failed![/red] " - f"expected={expected_crc:08X} got={flash_crc:08X}" - ) - raise typer.Exit(1) - if output == "human": - console.print(f" Flash verified: {flash_crc:08X}") + # Verify flash write by reading back and checking CRC. + # Skip for NAND — ECC/OOB makes raw read-back differ from + # the original data; the TFTP-to-RAM CRC above is sufficient. + if not nand: + await send_command( + transport, + f"{flash_cmd} read 0x{ram_addr:x} 0x{flash_off:x} 0x{len(orig_data):x}", + timeout=30.0, wait_for="# ", + ) + resp = await send_command( + transport, + f"crc32 0x{ram_addr:x} 0x{len(orig_data):x}", + timeout=10.0, wait_for="# ", + ) + m = re_mod.search(r"==>\s*([0-9a-fA-F]{8})", resp) + if m: + flash_crc = int(m.group(1), 16) + if flash_crc != expected_crc: + console.print( + f"[red]{name} flash verify failed![/red] " + f"expected={expected_crc:08X} got={flash_crc:08X}" + ) + raise typer.Exit(1) + if output == "human": + console.print(f" Flash verified: {flash_crc:08X}") if output == "human": console.print(f" [green]{name} OK[/green]") @@ -1889,15 +1918,34 @@ async def tftp_and_flash( await tftp_and_flash("kernel", kernel_name, kernel_data, k_off, k_sz) await tftp_and_flash("rootfs", rootfs_name, rootfs_data, r_off, r_sz) - # Set up proper boot environment for this NOR size - nor_cmd = "setnor8m" if nor_size < 16 else "setnor16m" - if output == "human": - console.print(f"\n [bold]Setting boot environment[/bold] (run {nor_cmd})") - # setnor8m does: set mtdparts, set bootcmd, saveenv, reset - # We do it manually to avoid the auto-reset - mtdparts_var = f"mtdpartsnor{nor_size}m" - await send_command(transport, f"run {mtdparts_var}", timeout=3.0, wait_for="# ") - await send_command(transport, "setenv bootcmd ${bootcmdnor}", timeout=3.0, wait_for="# ") + # Set up proper boot environment + if nand: + if output == "human": + console.print("\n [bold]Setting boot environment[/bold] (NAND)") + # Set mtdparts and bootcmd directly — don't rely on env macros + # which may be wrong or missing on the target device. + # Layout: 1M(boot),1M(env),8M(kernel),-(ubi) + await send_command( + transport, + "setenv mtdparts hinand:1024k(boot),1024k(env),8192k(kernel),-(ubi)", + timeout=3.0, wait_for="# ", + ) + await send_command( + transport, + r"setenv bootcmd nand read ${baseaddr} 0x200000 0x800000\; bootm ${baseaddr}", + timeout=3.0, wait_for="# ", + ) + else: + nor_cmd = "setnor8m" if nor_size < 16 else "setnor16m" + if output == "human": + console.print(f"\n [bold]Setting boot environment[/bold] (run {nor_cmd})") + # setnor8m does: set mtdparts, set bootcmd, saveenv, reset + # We do it manually to avoid the auto-reset + mtdparts_var = f"mtdpartsnor{nor_size}m" + await send_command(transport, f"run {mtdparts_var}", timeout=3.0, wait_for="# ") + await send_command( + transport, "setenv bootcmd ${bootcmdnor}", timeout=3.0, wait_for="# ", + ) resp = await send_command(transport, "saveenv", timeout=10.0, wait_for="# ") if output == "human": console.print(" [green]Environment saved[/green]") diff --git a/src/defib/protocol/hisilicon_standard.py b/src/defib/protocol/hisilicon_standard.py index 6582bb5..9ceb897 100644 --- a/src/defib/protocol/hisilicon_standard.py +++ b/src/defib/protocol/hisilicon_standard.py @@ -164,6 +164,42 @@ async def _send_frame_with_retry( continue return False + @staticmethod + async def _rehandshake(transport: Transport) -> bool: + """Re-enter boot mode after SPL runs. + + On some SoCs the SPL re-sends 0x20 bootmode markers after DDR init, + requiring a fresh 0xAA acknowledgment before it will accept HEAD + frames. On SoCs that don't do this, the line stays quiet and we + return True immediately. + """ + import time + marker_count = 0 + deadline = time.monotonic() + 5.0 + while time.monotonic() < deadline: + try: + data = await transport.read(1, timeout=0.2) + if data == BOOTMODE_MARKER: + marker_count += 1 + if marker_count >= BOOTMODE_COUNT: + await transport.write(BOOTMODE_ACK) + logger.debug("rehandshake: sent 0xAA after %d markers", marker_count) + return True + elif data in (b"\x0a", b"\x0d"): + continue # ignore newlines mixed into marker stream + else: + # unexpected byte — not in marker mode + logger.debug("rehandshake: got 0x%02X, no re-handshake needed", data[0]) + return True + except TransportTimeout: + if marker_count > 0: + # saw some markers but not enough — send ACK anyway + await transport.write(BOOTMODE_ACK) + logger.debug("rehandshake: sent 0xAA after partial %d markers", marker_count) + return True + return True # silence — device is ready without re-handshake + return False # deadline reached + async def _send_head( self, transport: Transport, length: int, address: int ) -> bool: @@ -272,7 +308,13 @@ async def _send_uboot( message=f"Sending {label}", )) - if not await self._send_head(transport, total, profile.uboot_address): + # After SPL runs DDR init, some SoCs (e.g. hi3516av200) re-send + # 0x20 bootmode markers requiring a fresh 0xAA handshake. + await self._rehandshake(transport) + head = HeadFrame(length=total, address=profile.uboot_address).encode() + if not await self._send_frame_with_retry( + transport, head, retries=64, timeout=0.15, + ): return False chunks = chunk_data(firmware, MAX_DATA_LEN) @@ -285,8 +327,11 @@ async def _send_uboot( bytes_total=total, )) + # Tail frame — best-effort for U-Boot stage. Some SoCs (e.g. + # hi3516av200) consider the transfer complete once they've received + # all bytes declared in HEAD and don't ACK the TAIL. if not await self._send_tail(transport, len(chunks) + 1): - return False + logger.debug("U-Boot TAIL not ACKed (non-fatal, all data sent)") _emit(on_progress, ProgressEvent( stage=Stage.UBOOT, bytes_sent=total, bytes_total=total, diff --git a/tests/test_nand_install.py b/tests/test_nand_install.py new file mode 100644 index 0000000..dba5d2d --- /dev/null +++ b/tests/test_nand_install.py @@ -0,0 +1,50 @@ +"""Tests for NAND flash install support and protocol robustness fixes.""" + +from defib.cli.app import _NAND_LAYOUT, _NOR8M_LAYOUT, _NOR16M_LAYOUT + + +class TestNandLayout: + """Verify NAND partition layout constants.""" + + def test_nand_layout_partitions_exist(self): + for key in ("boot", "env", "kernel", "rootfs"): + assert key in _NAND_LAYOUT + + def test_nand_layout_offsets_contiguous(self): + """Partitions must not overlap and boot+env+kernel must be contiguous.""" + b_off, b_sz = _NAND_LAYOUT["boot"] + e_off, e_sz = _NAND_LAYOUT["env"] + k_off, k_sz = _NAND_LAYOUT["kernel"] + r_off, _r_sz = _NAND_LAYOUT["rootfs"] + + assert b_off == 0 + assert e_off == b_off + b_sz + assert k_off == e_off + e_sz + assert r_off == k_off + k_sz + + def test_nand_boot_env_sizes(self): + """Boot and env are 1MB each (NAND erase-block aligned).""" + assert _NAND_LAYOUT["boot"] == (0x000000, 0x100000) + assert _NAND_LAYOUT["env"] == (0x100000, 0x100000) + + def test_nand_kernel_8mb(self): + assert _NAND_LAYOUT["kernel"] == (0x200000, 0x800000) + + def test_nand_rootfs_starts_at_10mb(self): + r_off, r_sz = _NAND_LAYOUT["rootfs"] + assert r_off == 0xA00000 # 10MB + assert r_sz > 0 + + def test_nand_layout_larger_than_nor(self): + """NAND partitions must be larger than NOR equivalents.""" + for key in ("boot", "env", "kernel", "rootfs"): + _, nand_sz = _NAND_LAYOUT[key] + _, nor_sz = _NOR8M_LAYOUT[key] + assert nand_sz >= nor_sz, f"NAND {key} smaller than NOR 8M" + + def test_nor_layouts_unchanged(self): + """Regression: NOR layouts must not be modified.""" + assert _NOR8M_LAYOUT["boot"] == (0x000000, 0x40000) + assert _NOR8M_LAYOUT["kernel"] == (0x050000, 0x200000) + assert _NOR16M_LAYOUT["boot"] == (0x000000, 0x40000) + assert _NOR16M_LAYOUT["kernel"] == (0x050000, 0x300000) diff --git a/tests/test_protocol_robustness.py b/tests/test_protocol_robustness.py new file mode 100644 index 0000000..7a63a62 --- /dev/null +++ b/tests/test_protocol_robustness.py @@ -0,0 +1,232 @@ +"""Tests for protocol robustness: rehandshake after SPL and non-fatal U-Boot TAIL. + +These are regression tests for fixes discovered on hi3516av200 where: +1. The SPL re-sends 0x20 bootmode markers after DDR init, requiring a second + 0xAA handshake before U-Boot HEAD frames are accepted. +2. The U-Boot TAIL frame may not be ACKed (device considers transfer complete + based on byte count from HEAD), so TAIL failure is non-fatal. +""" + +import pathlib +import time + +import pytest + +from defib.protocol.crc import ACK_BYTE +from defib.protocol.hisilicon_standard import ( + BOOTMODE_ACK, + BOOTMODE_MARKER, + HiSiliconStandard, +) +from defib.profiles.loader import load_profile +from defib.recovery.events import Stage +from defib.transport.mock import MockTransport + +PROFILES_DIR = pathlib.Path(__file__).parent.parent / "src" / "defib" / "profiles" / "data" + + +class TestRehandshake: + """Tests for _rehandshake() — the post-SPL bootmode re-entry.""" + + @pytest.mark.asyncio + async def test_rehandshake_with_markers(self): + """When SPL sends 0x20 markers, rehandshake sends 0xAA.""" + transport = MockTransport(flush_clears_buffer=False) + transport.enqueue_rx(BOOTMODE_MARKER * 10) # 10x 0x20 + + result = await HiSiliconStandard._rehandshake(transport) + + assert result is True + assert BOOTMODE_ACK in transport.all_tx_data + + @pytest.mark.asyncio + async def test_rehandshake_quiet_line(self): + """When line is quiet (no markers), rehandshake returns True immediately.""" + transport = MockTransport(flush_clears_buffer=False) + # No data enqueued — read will timeout + + t0 = time.monotonic() + result = await HiSiliconStandard._rehandshake(transport) + elapsed = time.monotonic() - t0 + + assert result is True + # Should return quickly (within the 0.2s timeout), not wait 5s + assert elapsed < 1.0 + # No ACK sent — device was already ready + assert BOOTMODE_ACK not in transport.all_tx_data + + @pytest.mark.asyncio + async def test_rehandshake_ignores_newlines(self): + """Newlines (0x0A, 0x0D) mixed into marker stream are ignored.""" + transport = MockTransport(flush_clears_buffer=False) + transport.enqueue_rx(b"\x20\x20\x0a\x20\x0d\x20\x20") + + result = await HiSiliconStandard._rehandshake(transport) + + assert result is True + assert BOOTMODE_ACK in transport.all_tx_data + + @pytest.mark.asyncio + async def test_rehandshake_partial_markers(self): + """If fewer than 5 markers arrive then silence, ACK is still sent.""" + transport = MockTransport(flush_clears_buffer=False) + transport.enqueue_rx(BOOTMODE_MARKER * 3) # only 3 markers + + result = await HiSiliconStandard._rehandshake(transport) + + assert result is True + assert BOOTMODE_ACK in transport.all_tx_data + + @pytest.mark.asyncio + async def test_rehandshake_unexpected_byte(self): + """Non-marker, non-newline byte means device is ready (no re-handshake).""" + transport = MockTransport(flush_clears_buffer=False) + transport.enqueue_rx(b"\x42") # some random byte + + result = await HiSiliconStandard._rehandshake(transport) + + assert result is True + assert BOOTMODE_ACK not in transport.all_tx_data + + +class TestNonFatalUbootTail: + """Verify U-Boot transfer succeeds even when TAIL is not ACKed.""" + + @pytest.mark.asyncio + async def test_uboot_succeeds_without_tail_ack(self): + """U-Boot transfer reports success even when TAIL frame is not ACKed. + + Regression test: hi3516av200 SPL responds with 0xFE to TAIL instead + of 0xAA. All data frames ARE ACKed, so the transfer is complete. + """ + transport = MockTransport(flush_clears_buffer=False) + profile = load_profile("hi3516cv300", PROFILES_DIR) + + protocol = HiSiliconStandard() + protocol.set_profile(profile) + + # ACKs for: DDR(head+data+tail=3) + SPL(head+chunks+tail) + U-Boot(head+chunks) + # Enough for everything EXCEPT the U-Boot TAIL + firmware = bytes(range(256)) * 100 # 25600 bytes + # Over-provision ACKs — the U-Boot tail will consume a non-ACK byte + transport.enqueue_rx(ACK_BYTE * 200 + b"\xfe") + + result = await protocol.send_firmware(transport, firmware) + + assert result.success + assert Stage.DDR_INIT in result.stages_completed + assert Stage.SPL in result.stages_completed + assert Stage.UBOOT in result.stages_completed + assert Stage.COMPLETE in result.stages_completed + + @pytest.mark.asyncio + async def test_uboot_succeeds_with_tail_ack(self): + """Normal path: TAIL is ACKed (e.g. hi3516ev300). Must still work.""" + transport = MockTransport(flush_clears_buffer=False) + profile = load_profile("hi3516ev300", PROFILES_DIR) + + protocol = HiSiliconStandard() + protocol.set_profile(profile) + + transport.enqueue_rx(ACK_BYTE * 200) + firmware = bytes(range(256)) * 100 + + result = await protocol.send_firmware(transport, firmware) + + assert result.success + assert Stage.UBOOT in result.stages_completed + + +class TestRehandshakeIntegration: + """End-to-end tests: full firmware transfer with rehandshake.""" + + @pytest.mark.asyncio + async def test_full_transfer_with_rehandshake(self): + """Simulate hi3516av200-style transfer: SPL re-sends markers before U-Boot. + + Flow: handshake → DDR → SPL → markers → rehandshake → U-Boot + """ + transport = MockTransport(flush_clears_buffer=False) + profile = load_profile("hi3516cv300", PROFILES_DIR) + + protocol = HiSiliconStandard() + protocol.set_profile(profile) + + # Phase 1: DDR step ACKs (head + data + tail = 3 ACKs) + # Phase 2: SPL ACKs (head + spl_chunks + tail) + spl_size = profile.spl_max_size + spl_chunks = (spl_size + 1023) // 1024 + spl_acks = 1 + spl_chunks + 1 # head + data + tail + + # Phase 3: Rehandshake markers (0x20 * 5) + # Phase 4: U-Boot ACKs (head + uboot_chunks + tail-or-not) + firmware = bytes(range(256)) * 100 # 25600 bytes + uboot_chunks = (len(firmware) + 1023) // 1024 + uboot_acks = 1 + uboot_chunks + 1 # head + data + tail + + transport.enqueue_rx( + ACK_BYTE * (3 + spl_acks) # DDR + SPL + + BOOTMODE_MARKER * 5 # rehandshake markers + + ACK_BYTE * uboot_acks # U-Boot + ) + + result = await protocol.send_firmware(transport, firmware) + + assert result.success + assert Stage.DDR_INIT in result.stages_completed + assert Stage.SPL in result.stages_completed + assert Stage.UBOOT in result.stages_completed + assert Stage.COMPLETE in result.stages_completed + + # Verify rehandshake ACK was sent (0xAA appears after SPL phase) + tx = transport.all_tx_data + assert tx.count(BOOTMODE_ACK) >= 2 # initial handshake would be separate + + @pytest.mark.asyncio + async def test_full_transfer_no_rehandshake_needed(self): + """Simulate hi3516ev300-style transfer: no markers between SPL and U-Boot. + + The rehandshake should return quickly (timeout) and not break the flow. + """ + transport = MockTransport(flush_clears_buffer=False) + profile = load_profile("hi3516ev300", PROFILES_DIR) + + protocol = HiSiliconStandard() + protocol.set_profile(profile) + + # Just ACKs — no markers between SPL and U-Boot + transport.enqueue_rx(ACK_BYTE * 200) + firmware = bytes(range(256)) * 100 + + t0 = time.monotonic() + result = await protocol.send_firmware(transport, firmware) + elapsed = time.monotonic() - t0 + + assert result.success + assert Stage.UBOOT in result.stages_completed + # Should complete in reasonable time (rehandshake timeout + transfer) + # The rehandshake adds at most 0.2s timeout + assert elapsed < 5.0 + + @pytest.mark.asyncio + async def test_spl_tail_still_required(self): + """SPL TAIL failure must still be fatal (only U-Boot TAIL is non-fatal).""" + transport = MockTransport(flush_clears_buffer=False) + profile = load_profile("hi3516cv300", PROFILES_DIR) + + protocol = HiSiliconStandard() + protocol.set_profile(profile) + + # Enough ACKs for DDR (3) + SPL head + SPL data, but NOT SPL tail + spl_size = profile.spl_max_size + spl_chunks = (spl_size + 1023) // 1024 + # DDR: head(1) + data(1) + tail(1) = 3 + # SPL: head(1) + data(spl_chunks) = 1 + spl_chunks + # Then NO MORE ACKs — SPL tail will fail + transport.enqueue_rx(ACK_BYTE * (3 + 1 + spl_chunks)) + + firmware = bytes(range(256)) * 100 + result = await protocol.send_firmware(transport, firmware) + + assert not result.success + assert "SPL" in (result.error or "")