Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 99 additions & 51 deletions src/defib/cli/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -1534,19 +1534,20 @@ def install(
device_ip: str = typer.Option("192.168.1.20", "--device-ip", help="IP for camera in U-Boot"),
tftp_port: int = typer.Option(69, "--tftp-port", help="TFTP server port"),
nor_size: int = typer.Option(8, "--nor-size", help="NOR flash size in MB (8 or 16)"),
nand: bool = typer.Option(False, "--nand", help="Use NAND flash instead of NOR"),
output: str = typer.Option("human", "--output", help="Output mode: human, json"),
debug: bool = typer.Option(False, "-d", "--debug", help="Enable debug logging"),
) -> None:
"""Install a full OpenIPC firmware (U-Boot + kernel + rootfs) via UART + TFTP.

Extracts the firmware tarball, burns U-Boot to RAM via boot ROM,
then uses TFTP to transfer kernel and rootfs to U-Boot which
flashes them to NOR.
flashes them to NOR or NAND.
"""
import asyncio
asyncio.run(_install_async(
chip, firmware, port, power_cycle, nic, host_ip, device_ip,
tftp_port, nor_size, output, debug,
tftp_port, nor_size, nand, output, debug,
))


Expand All @@ -1566,6 +1567,14 @@ def install(
"rootfs": (0x350000, 0xA00000), # 10240KB
}

# NAND flash layout: 1M(boot),1M(env),8M(kernel),-(ubi)
_NAND_LAYOUT = {
"boot": (0x000000, 0x100000), # 1MB
"env": (0x100000, 0x100000), # 1MB
"kernel": (0x200000, 0x800000), # 8MB
"rootfs": (0xA00000, 0x7600000), # 118MB (UBI)
}


async def _install_async(
chip: str,
Expand All @@ -1577,6 +1586,7 @@ async def _install_async(
device_ip: str,
tftp_port: int,
nor_size: int,
nand: bool,
output: str,
debug: bool,
) -> None:
Expand Down Expand Up @@ -1605,14 +1615,21 @@ async def _install_async(
else:
logging.basicConfig(level=logging.INFO)

layout = _NOR16M_LAYOUT if nor_size >= 16 else _NOR8M_LAYOUT
if nand:
layout = _NAND_LAYOUT
flash_cmd = "nand"
flash_label = "NAND"
else:
layout = _NOR16M_LAYOUT if nor_size >= 16 else _NOR8M_LAYOUT
flash_cmd = "sf"
flash_label = f"NOR {nor_size}MB"

# --- Step 1: Extract firmware tarball ---
if output == "human":
console.print("[bold]OpenIPC Firmware Install[/bold]")
console.print(f" Chip: [cyan]{chip}[/cyan]")
console.print(f" Port: [cyan]{port}[/cyan]")
console.print(f" NOR: [cyan]{nor_size}MB[/cyan]")
console.print(f" Chip: [cyan]{chip}[/cyan]")
console.print(f" Port: [cyan]{port}[/cyan]")
console.print(f" Flash: [cyan]{flash_label}[/cyan]")

kernel_data: bytes | None = None
rootfs_data: bytes | None = None
Expand All @@ -1629,14 +1646,14 @@ async def _install_async(
f = tf.extractfile(member)
assert f is not None
kernel_data = f.read()
elif name.startswith("rootfs.squashfs"):
elif name.startswith("rootfs.squashfs") or name.startswith("rootfs.ubi"):
rootfs_name = name
f = tf.extractfile(member)
assert f is not None
rootfs_data = f.read()

if not kernel_data or not rootfs_data:
console.print("[red]Tarball missing uImage or rootfs.squashfs[/red]")
console.print("[red]Tarball missing uImage or rootfs (squashfs/ubi)[/red]")
raise typer.Exit(1)

# Verify md5sums if present
Expand Down Expand Up @@ -1761,14 +1778,22 @@ def on_progress(event: ProgressEvent) -> None:

ram_addr = get_ram_staging_addr(chip)

resp = await send_command(transport, "sf probe 0", timeout=5.0, wait_for="# ")
if "error" in resp.lower() or "fail" in resp.lower():
console.print(f"[red]sf probe failed:[/red] {resp.strip()}")
await transport.close()
raise typer.Exit(1)

if output == "human":
console.print(" [green]SPI flash detected[/green]")
if nand:
resp = await send_command(transport, "nand info", timeout=5.0, wait_for="# ")
if "error" in resp.lower() or "no nand" in resp.lower():
console.print(f"[red]NAND detection failed:[/red] {resp.strip()}")
await transport.close()
raise typer.Exit(1)
if output == "human":
console.print(" [green]NAND flash detected[/green]")
else:
resp = await send_command(transport, "sf probe 0", timeout=5.0, wait_for="# ")
if "error" in resp.lower() or "fail" in resp.lower():
console.print(f"[red]sf probe failed:[/red] {resp.strip()}")
await transport.close()
raise typer.Exit(1)
if output == "human":
console.print(" [green]SPI flash detected[/green]")

# --- Step 5: Start TFTP server + configure U-Boot networking ---
if not nic:
Expand Down Expand Up @@ -1848,39 +1873,43 @@ async def tftp_and_flash(
if output == "human":
console.print(f" TFTP CRC verified: {ram_crc:08X}")

erase_timeout = 120.0 if nand else 60.0
await send_command(
transport,
f"sf erase 0x{flash_off:x} 0x{erase_sz:x}",
timeout=60.0, wait_for="# ",
f"{flash_cmd} erase 0x{flash_off:x} 0x{erase_sz:x}",
timeout=erase_timeout, wait_for="# ",
)
await send_command(
transport,
f"sf write 0x{ram_addr:x} 0x{flash_off:x} 0x{len(orig_data):x}",
timeout=60.0, wait_for="# ",
f"{flash_cmd} write 0x{ram_addr:x} 0x{flash_off:x} 0x{len(orig_data):x}",
timeout=120.0 if nand else 60.0, wait_for="# ",
)

# Verify flash write by reading back and checking CRC
await send_command(
transport,
f"sf read 0x{ram_addr:x} 0x{flash_off:x} 0x{len(orig_data):x}",
timeout=30.0, wait_for="# ",
)
resp = await send_command(
transport,
f"crc32 0x{ram_addr:x} 0x{len(orig_data):x}",
timeout=10.0, wait_for="# ",
)
m = re_mod.search(r"==>\s*([0-9a-fA-F]{8})", resp)
if m:
flash_crc = int(m.group(1), 16)
if flash_crc != expected_crc:
console.print(
f"[red]{name} flash verify failed![/red] "
f"expected={expected_crc:08X} got={flash_crc:08X}"
)
raise typer.Exit(1)
if output == "human":
console.print(f" Flash verified: {flash_crc:08X}")
# Verify flash write by reading back and checking CRC.
# Skip for NAND — ECC/OOB makes raw read-back differ from
# the original data; the TFTP-to-RAM CRC above is sufficient.
if not nand:
await send_command(
transport,
f"{flash_cmd} read 0x{ram_addr:x} 0x{flash_off:x} 0x{len(orig_data):x}",
timeout=30.0, wait_for="# ",
)
resp = await send_command(
transport,
f"crc32 0x{ram_addr:x} 0x{len(orig_data):x}",
timeout=10.0, wait_for="# ",
)
m = re_mod.search(r"==>\s*([0-9a-fA-F]{8})", resp)
if m:
flash_crc = int(m.group(1), 16)
if flash_crc != expected_crc:
console.print(
f"[red]{name} flash verify failed![/red] "
f"expected={expected_crc:08X} got={flash_crc:08X}"
)
raise typer.Exit(1)
if output == "human":
console.print(f" Flash verified: {flash_crc:08X}")

if output == "human":
console.print(f" [green]{name} OK[/green]")
Expand All @@ -1889,15 +1918,34 @@ async def tftp_and_flash(
await tftp_and_flash("kernel", kernel_name, kernel_data, k_off, k_sz)
await tftp_and_flash("rootfs", rootfs_name, rootfs_data, r_off, r_sz)

# Set up proper boot environment for this NOR size
nor_cmd = "setnor8m" if nor_size < 16 else "setnor16m"
if output == "human":
console.print(f"\n [bold]Setting boot environment[/bold] (run {nor_cmd})")
# setnor8m does: set mtdparts, set bootcmd, saveenv, reset
# We do it manually to avoid the auto-reset
mtdparts_var = f"mtdpartsnor{nor_size}m"
await send_command(transport, f"run {mtdparts_var}", timeout=3.0, wait_for="# ")
await send_command(transport, "setenv bootcmd ${bootcmdnor}", timeout=3.0, wait_for="# ")
# Set up proper boot environment
if nand:
if output == "human":
console.print("\n [bold]Setting boot environment[/bold] (NAND)")
# Set mtdparts and bootcmd directly — don't rely on env macros
# which may be wrong or missing on the target device.
# Layout: 1M(boot),1M(env),8M(kernel),-(ubi)
await send_command(
transport,
"setenv mtdparts hinand:1024k(boot),1024k(env),8192k(kernel),-(ubi)",
timeout=3.0, wait_for="# ",
)
await send_command(
transport,
r"setenv bootcmd nand read ${baseaddr} 0x200000 0x800000\; bootm ${baseaddr}",
timeout=3.0, wait_for="# ",
)
else:
nor_cmd = "setnor8m" if nor_size < 16 else "setnor16m"
if output == "human":
console.print(f"\n [bold]Setting boot environment[/bold] (run {nor_cmd})")
# setnor8m does: set mtdparts, set bootcmd, saveenv, reset
# We do it manually to avoid the auto-reset
mtdparts_var = f"mtdpartsnor{nor_size}m"
await send_command(transport, f"run {mtdparts_var}", timeout=3.0, wait_for="# ")
await send_command(
transport, "setenv bootcmd ${bootcmdnor}", timeout=3.0, wait_for="# ",
)
resp = await send_command(transport, "saveenv", timeout=10.0, wait_for="# ")
if output == "human":
console.print(" [green]Environment saved[/green]")
Expand Down
49 changes: 47 additions & 2 deletions src/defib/protocol/hisilicon_standard.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,42 @@ async def _send_frame_with_retry(
continue
return False

@staticmethod
async def _rehandshake(transport: Transport) -> bool:
"""Re-enter boot mode after SPL runs.

On some SoCs the SPL re-sends 0x20 bootmode markers after DDR init,
requiring a fresh 0xAA acknowledgment before it will accept HEAD
frames. On SoCs that don't do this, the line stays quiet and we
return True immediately.
"""
import time
marker_count = 0
deadline = time.monotonic() + 5.0
while time.monotonic() < deadline:
try:
data = await transport.read(1, timeout=0.2)
if data == BOOTMODE_MARKER:
marker_count += 1
if marker_count >= BOOTMODE_COUNT:
await transport.write(BOOTMODE_ACK)
logger.debug("rehandshake: sent 0xAA after %d markers", marker_count)
return True
elif data in (b"\x0a", b"\x0d"):
continue # ignore newlines mixed into marker stream
else:
# unexpected byte — not in marker mode
logger.debug("rehandshake: got 0x%02X, no re-handshake needed", data[0])
return True
except TransportTimeout:
if marker_count > 0:
# saw some markers but not enough — send ACK anyway
await transport.write(BOOTMODE_ACK)
logger.debug("rehandshake: sent 0xAA after partial %d markers", marker_count)
return True
return True # silence — device is ready without re-handshake
return False # deadline reached

async def _send_head(
self, transport: Transport, length: int, address: int
) -> bool:
Expand Down Expand Up @@ -272,7 +308,13 @@ async def _send_uboot(
message=f"Sending {label}",
))

if not await self._send_head(transport, total, profile.uboot_address):
# After SPL runs DDR init, some SoCs (e.g. hi3516av200) re-send
# 0x20 bootmode markers requiring a fresh 0xAA handshake.
await self._rehandshake(transport)
head = HeadFrame(length=total, address=profile.uboot_address).encode()
if not await self._send_frame_with_retry(
transport, head, retries=64, timeout=0.15,
):
return False

chunks = chunk_data(firmware, MAX_DATA_LEN)
Expand All @@ -285,8 +327,11 @@ async def _send_uboot(
bytes_total=total,
))

# Tail frame — best-effort for U-Boot stage. Some SoCs (e.g.
# hi3516av200) consider the transfer complete once they've received
# all bytes declared in HEAD and don't ACK the TAIL.
if not await self._send_tail(transport, len(chunks) + 1):
return False
logger.debug("U-Boot TAIL not ACKed (non-fatal, all data sent)")

_emit(on_progress, ProgressEvent(
stage=Stage.UBOOT, bytes_sent=total, bytes_total=total,
Expand Down
50 changes: 50 additions & 0 deletions tests/test_nand_install.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
"""Tests for NAND flash install support and protocol robustness fixes."""

from defib.cli.app import _NAND_LAYOUT, _NOR8M_LAYOUT, _NOR16M_LAYOUT


class TestNandLayout:
"""Verify NAND partition layout constants."""

def test_nand_layout_partitions_exist(self):
for key in ("boot", "env", "kernel", "rootfs"):
assert key in _NAND_LAYOUT

def test_nand_layout_offsets_contiguous(self):
"""Partitions must not overlap and boot+env+kernel must be contiguous."""
b_off, b_sz = _NAND_LAYOUT["boot"]
e_off, e_sz = _NAND_LAYOUT["env"]
k_off, k_sz = _NAND_LAYOUT["kernel"]
r_off, _r_sz = _NAND_LAYOUT["rootfs"]

assert b_off == 0
assert e_off == b_off + b_sz
assert k_off == e_off + e_sz
assert r_off == k_off + k_sz

def test_nand_boot_env_sizes(self):
"""Boot and env are 1MB each (NAND erase-block aligned)."""
assert _NAND_LAYOUT["boot"] == (0x000000, 0x100000)
assert _NAND_LAYOUT["env"] == (0x100000, 0x100000)

def test_nand_kernel_8mb(self):
assert _NAND_LAYOUT["kernel"] == (0x200000, 0x800000)

def test_nand_rootfs_starts_at_10mb(self):
r_off, r_sz = _NAND_LAYOUT["rootfs"]
assert r_off == 0xA00000 # 10MB
assert r_sz > 0

def test_nand_layout_larger_than_nor(self):
"""NAND partitions must be larger than NOR equivalents."""
for key in ("boot", "env", "kernel", "rootfs"):
_, nand_sz = _NAND_LAYOUT[key]
_, nor_sz = _NOR8M_LAYOUT[key]
assert nand_sz >= nor_sz, f"NAND {key} smaller than NOR 8M"

def test_nor_layouts_unchanged(self):
"""Regression: NOR layouts must not be modified."""
assert _NOR8M_LAYOUT["boot"] == (0x000000, 0x40000)
assert _NOR8M_LAYOUT["kernel"] == (0x050000, 0x200000)
assert _NOR16M_LAYOUT["boot"] == (0x000000, 0x40000)
assert _NOR16M_LAYOUT["kernel"] == (0x050000, 0x300000)
Loading
Loading