From ea46997e416d5e493968f60c67cda24ac3187c92 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E4=B8=80=E4=B9=8B?= Date: Thu, 23 Apr 2026 14:10:46 +0800 Subject: [PATCH 1/4] =?UTF-8?q?=E2=9C=A8=20add=20IOSetDeadline=20and=20Act?= =?UTF-8?q?ionShouldStop=20host=20calls?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 1 + sdk/go/opskat/hostcall.go | 6 ++++++ sdk/go/opskat/hostcall_stub.go | 4 ++++ sdk/go/opskat/hostcall_test.go | 8 ++++++++ sdk/go/opskat/hostcall_wasm.go | 17 +++++++++++++++++ sdk/go/opskat/testing.go | 11 +++++++++++ 6 files changed, 47 insertions(+) diff --git a/.gitignore b/.gitignore index 155e2e8..55fdea9 100644 --- a/.gitignore +++ b/.gitignore @@ -16,6 +16,7 @@ extensions/oss/backend/oss .claude CLAUDE.md superpowers +.omc # Dev configs (contain secrets) devconfig.json diff --git a/sdk/go/opskat/hostcall.go b/sdk/go/opskat/hostcall.go index 6405c6c..eb528f1 100644 --- a/sdk/go/opskat/hostcall.go +++ b/sdk/go/opskat/hostcall.go @@ -12,11 +12,13 @@ type hostCaller interface { IOWrite(handleID uint32, data []byte) (int, error) IOFlush(handleID uint32) (metaJSON []byte, err error) IOClose(handleID uint32) error + IOSetDeadline(handleID uint32, kind string, unixNanos int64) error AssetGetConfig(assetID int64) (json.RawMessage, error) FileDialog(params []byte) (string, error) KVGet(key string) ([]byte, error) KVSet(key string, value []byte) error ActionEvent(eventType string, data []byte) + ActionShouldStop() bool } // Package-level host call functions. All SDK code calls through these. @@ -26,8 +28,12 @@ func hostIORead(handleID uint32, size int) ([]byte, error) { return current func hostIOWrite(handleID uint32, data []byte) (int, error) { return currentHost.IOWrite(handleID, data) } func hostIOFlush(handleID uint32) ([]byte, error) { return currentHost.IOFlush(handleID) } func hostIOClose(handleID uint32) error { return currentHost.IOClose(handleID) } +func hostIOSetDeadline(handleID uint32, kind string, unixNanos int64) error { + return currentHost.IOSetDeadline(handleID, kind, unixNanos) +} func hostAssetGetConfig(assetID int64) (json.RawMessage, error) { return currentHost.AssetGetConfig(assetID) } func hostFileDialog(params []byte) (string, error) { return currentHost.FileDialog(params) } func hostKVGet(key string) ([]byte, error) { return currentHost.KVGet(key) } func hostKVSet(key string, value []byte) error { return currentHost.KVSet(key, value) } func hostActionEvent(eventType string, data []byte) { currentHost.ActionEvent(eventType, data) } +func hostActionShouldStop() bool { return currentHost.ActionShouldStop() } diff --git a/sdk/go/opskat/hostcall_stub.go b/sdk/go/opskat/hostcall_stub.go index c25a475..ac2fb22 100644 --- a/sdk/go/opskat/hostcall_stub.go +++ b/sdk/go/opskat/hostcall_stub.go @@ -29,10 +29,14 @@ func (n *nopHost) IORead(handleID uint32, size int) ([]byte, error) { retur func (n *nopHost) IOWrite(handleID uint32, data []byte) (int, error) { return 0, errNotConfigured } func (n *nopHost) IOFlush(handleID uint32) ([]byte, error) { return nil, errNotConfigured } func (n *nopHost) IOClose(handleID uint32) error { return errNotConfigured } +func (n *nopHost) IOSetDeadline(handleID uint32, kind string, unixNanos int64) error { + return errNotConfigured +} func (n *nopHost) AssetGetConfig(assetID int64) (json.RawMessage, error) { return nil, errNotConfigured } func (n *nopHost) FileDialog(params []byte) (string, error) { return "", errNotConfigured } func (n *nopHost) KVGet(key string) ([]byte, error) { return nil, errNotConfigured } func (n *nopHost) KVSet(key string, value []byte) error { return errNotConfigured } func (n *nopHost) ActionEvent(eventType string, data []byte) {} +func (n *nopHost) ActionShouldStop() bool { return false } var errNotConfigured = fmt.Errorf("host not configured: use TestHost or run inside WASM") diff --git a/sdk/go/opskat/hostcall_test.go b/sdk/go/opskat/hostcall_test.go index 3173b31..6f833cb 100644 --- a/sdk/go/opskat/hostcall_test.go +++ b/sdk/go/opskat/hostcall_test.go @@ -85,3 +85,11 @@ func (m *mockHostCaller) KVSet(key string, value []byte) error { func (m *mockHostCaller) ActionEvent(eventType string, data []byte) { } + +func (m *mockHostCaller) IOSetDeadline(handleID uint32, kind string, unixNanos int64) error { + return fmt.Errorf("not implemented") +} + +func (m *mockHostCaller) ActionShouldStop() bool { + return false +} diff --git a/sdk/go/opskat/hostcall_wasm.go b/sdk/go/opskat/hostcall_wasm.go index 28df8e6..4f8144e 100644 --- a/sdk/go/opskat/hostcall_wasm.go +++ b/sdk/go/opskat/hostcall_wasm.go @@ -46,6 +46,12 @@ func wasmHostKVSet(keyPtr, keyLen, valPtr, valLen uint32) //go:wasmimport opskat host_action_event func wasmHostActionEvent(typePtr, typeLen, dataPtr, dataLen uint32) +//go:wasmimport opskat host_io_set_deadline +func wasmHostIOSetDeadline(handleID, kindPtr, kindLen uint32, unixNanos int64) uint64 + +//go:wasmimport opskat host_action_should_stop +func wasmHostActionShouldStop() uint32 + // Helper: convert Go string to (ptr, len) for WASM. func strToPtr(s string) (uint32, uint32) { if len(s) == 0 { @@ -165,3 +171,14 @@ func (w *wasmHostCaller) ActionEvent(eventType string, data []byte) { dp, dl := bytesToPtr(data) wasmHostActionEvent(tp, tl, dp, dl) } + +func (w *wasmHostCaller) IOSetDeadline(handleID uint32, kind string, unixNanos int64) error { + kp, kl := strToPtr(kind) + packed := wasmHostIOSetDeadline(handleID, kp, kl, unixNanos) + _, err := unpackResult(packed) + return err +} + +func (w *wasmHostCaller) ActionShouldStop() bool { + return wasmHostActionShouldStop() == 1 +} diff --git a/sdk/go/opskat/testing.go b/sdk/go/opskat/testing.go index 8ce6732..1e92dd1 100644 --- a/sdk/go/opskat/testing.go +++ b/sdk/go/opskat/testing.go @@ -164,3 +164,14 @@ func (h *TestHost) ActionEvent(eventType string, data []byte) { h.eventCb(e) } } + +// IOSetDeadline is a stub — TestHost's HTTP mock does not support deadlines. +// TCP mock support (via WithMockTCP) is layered on top in Phase 1.E-1. +func (h *TestHost) IOSetDeadline(handleID uint32, kind string, unixNanos int64) error { + return nil +} + +// ActionShouldStop returns false by default. WithActionCancel flips this in Phase 1.E-1. +func (h *TestHost) ActionShouldStop() bool { + return false +} From 7801ad1209378e89a9b8f5fddbde7fcff33b6443 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E4=B8=80=E4=B9=8B?= Date: Thu, 23 Apr 2026 14:13:24 +0800 Subject: [PATCH 2/4] =?UTF-8?q?=E2=9C=A8=20add=20TCP=20Dial=20and=20TestHo?= =?UTF-8?q?st=20mock=20TCP=20support?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sdk/go/opskat/tcp.go | 69 ++++++++++++++++++++++++ sdk/go/opskat/tcp_test.go | 41 +++++++++++++++ sdk/go/opskat/testing.go | 61 ++++++++++++++++++---- sdk/go/opskat/testing_http.go | 98 ++++++++++++++++++++++++++--------- 4 files changed, 233 insertions(+), 36 deletions(-) create mode 100644 sdk/go/opskat/tcp.go create mode 100644 sdk/go/opskat/tcp_test.go diff --git a/sdk/go/opskat/tcp.go b/sdk/go/opskat/tcp.go new file mode 100644 index 0000000..078b810 --- /dev/null +++ b/sdk/go/opskat/tcp.go @@ -0,0 +1,69 @@ +package opskat + +import ( + "context" + "net" + "time" +) + +// DialContext opens a TCP connection via the host's IO bridge. +// Its signature matches kafka-go Transport.Dial for drop-in use. +// ctx is consulted only for pre-dial cancellation; per-call deadlines +// should be set on the returned net.Conn via SetDeadline / SetReadDeadline. +func DialContext(ctx context.Context, network, addr string) (net.Conn, error) { + if err := ctx.Err(); err != nil { + return nil, err + } + h, err := IOOpen("tcp", map[string]any{"addr": addr}) + if err != nil { + return nil, err + } + return &tcpConn{ + h: h, + remote: &addrStr{network: network, addr: addr}, + local: &addrStr{network: network, addr: "wasm"}, + }, nil +} + +// Dial is a convenience wrapper without ctx. +func Dial(network, addr string) (net.Conn, error) { + return DialContext(context.Background(), network, addr) +} + +type tcpConn struct { + h *IOHandle + remote, local net.Addr +} + +func (c *tcpConn) Read(p []byte) (int, error) { return c.h.Read(p) } +func (c *tcpConn) Write(p []byte) (int, error) { return c.h.Write(p) } +func (c *tcpConn) Close() error { return c.h.Close() } +func (c *tcpConn) LocalAddr() net.Addr { return c.local } +func (c *tcpConn) RemoteAddr() net.Addr { return c.remote } + +func (c *tcpConn) SetDeadline(t time.Time) error { + return hostIOSetDeadline(c.h.ID(), "both", toUnixNanos(t)) +} + +func (c *tcpConn) SetReadDeadline(t time.Time) error { + return hostIOSetDeadline(c.h.ID(), "read", toUnixNanos(t)) +} + +func (c *tcpConn) SetWriteDeadline(t time.Time) error { + return hostIOSetDeadline(c.h.ID(), "write", toUnixNanos(t)) +} + +func toUnixNanos(t time.Time) int64 { + if t.IsZero() { + return 0 + } + return t.UnixNano() +} + +type addrStr struct { + network string + addr string +} + +func (a *addrStr) Network() string { return a.network } +func (a *addrStr) String() string { return a.addr } diff --git a/sdk/go/opskat/tcp_test.go b/sdk/go/opskat/tcp_test.go new file mode 100644 index 0000000..5a3a2e5 --- /dev/null +++ b/sdk/go/opskat/tcp_test.go @@ -0,0 +1,41 @@ +//go:build !wasip1 + +package opskat + +import ( + "context" + "net" + "testing" + + . "github.com/smartystreets/goconvey/convey" +) + +func TestTCPDialBasic(t *testing.T) { + Convey("Given a TestHost with a mock TCP echo server", t, func() { + th := NewTestHost(WithMockTCP(func(addr string) (net.Conn, error) { + a, b := net.Pipe() + go func() { + buf := make([]byte, 256) + n, _ := a.Read(buf) + a.Write(buf[:n]) //nolint:errcheck // echo pipe + }() + return b, nil + })) + defer th.Close() + + Convey("DialContext returns a working net.Conn", func() { + conn, err := DialContext(context.Background(), "tcp", "broker:9092") + So(err, ShouldBeNil) + defer conn.Close() + + n, err := conn.Write([]byte("hi")) + So(err, ShouldBeNil) + So(n, ShouldEqual, 2) + + buf := make([]byte, 8) + n, err = conn.Read(buf) + So(err, ShouldBeNil) + So(string(buf[:n]), ShouldEqual, "hi") + }) + }) +} diff --git a/sdk/go/opskat/testing.go b/sdk/go/opskat/testing.go index 1e92dd1..87b6b9e 100644 --- a/sdk/go/opskat/testing.go +++ b/sdk/go/opskat/testing.go @@ -5,8 +5,11 @@ package opskat import ( "encoding/json" "fmt" + "net" "net/http" "sync" + "sync/atomic" + "time" ) // TestEvent represents a captured action event. @@ -31,14 +34,31 @@ func WithMockHTTP(handler http.HandlerFunc) TestOption { return func(h *TestHost) { h.httpHandler = handler } } +// WithMockTCP installs a TCP dialer for IOOpen(type=tcp). The dialer receives +// the requested addr and returns a net.Conn that backs the TestHost's IO +// operations for the resulting handle. +func WithMockTCP(dial func(addr string) (net.Conn, error)) TestOption { + return func(h *TestHost) { h.mockTCP = dial } +} + +// WithActionCancel marks the TestHost as already-cancelled, so ActionShouldStop +// returns true. Used to exercise cooperative-cancel paths in action handlers. +func WithActionCancel() TestOption { + return func(h *TestHost) { h.actionStopped.Store(true) } +} + // TestHost simulates the host environment for extension unit testing. type TestHost struct { - assetConfigs map[int64]json.RawMessage - kv map[string][]byte - httpHandler http.HandlerFunc - events []TestEvent - eventCb func(TestEvent) - mu sync.Mutex + assetConfigs map[int64]json.RawMessage + kv map[string][]byte + httpHandler http.HandlerFunc + mockTCP func(addr string) (net.Conn, error) + mockTCPMu sync.Mutex + mockTCPConns map[uint32]net.Conn + actionStopped atomic.Bool + events []TestEvent + eventCb func(TestEvent) + mu sync.Mutex } // NewTestHost creates a TestHost and installs it as the active host. @@ -165,13 +185,32 @@ func (h *TestHost) ActionEvent(eventType string, data []byte) { } } -// IOSetDeadline is a stub — TestHost's HTTP mock does not support deadlines. -// TCP mock support (via WithMockTCP) is layered on top in Phase 1.E-1. +// IOSetDeadline applies a deadline to a TCP mock handle registered via WithMockTCP. +// For non-TCP handles (HTTP mocks) it is a no-op returning nil. func (h *TestHost) IOSetDeadline(handleID uint32, kind string, unixNanos int64) error { - return nil + h.mockTCPMu.Lock() + conn, ok := h.mockTCPConns[handleID] + h.mockTCPMu.Unlock() + if !ok { + return nil + } + var t time.Time + if unixNanos != 0 { + t = time.Unix(0, unixNanos) + } + switch kind { + case "read": + return conn.SetReadDeadline(t) + case "write": + return conn.SetWriteDeadline(t) + case "both": + return conn.SetDeadline(t) + default: + return fmt.Errorf("unknown deadline kind: %q", kind) + } } -// ActionShouldStop returns false by default. WithActionCancel flips this in Phase 1.E-1. +// ActionShouldStop returns the cancellation flag (default false; set via WithActionCancel). func (h *TestHost) ActionShouldStop() bool { - return false + return h.actionStopped.Load() } diff --git a/sdk/go/opskat/testing_http.go b/sdk/go/opskat/testing_http.go index b7686bc..896eca7 100644 --- a/sdk/go/opskat/testing_http.go +++ b/sdk/go/opskat/testing_http.go @@ -6,6 +6,7 @@ import ( "bytes" "encoding/json" "fmt" + "net" "net/http" "net/http/httptest" "sync" @@ -38,38 +39,70 @@ func (h *TestHost) IOOpen(params []byte) (uint32, []byte, error) { Method string `json:"method"` URL string `json:"url"` Headers map[string]string `json:"headers"` + Addr string `json:"addr"` } json.Unmarshal(params, &p) - if p.Type != "http" { - return 0, nil, fmt.Errorf("only http IO is supported in TestHost") + switch p.Type { + case "http": + if h.httpHandler == nil { + return 0, nil, fmt.Errorf("no HTTP mock configured: use WithMockHTTP") + } + testIOMu.Lock() + id := testIONextID + testIONextID++ + testIOHandles[id] = &testIOEntry{ + httpHandle: &testHTTPHandle{ + handler: h.httpHandler, + method: p.Method, + url: p.URL, + headers: p.Headers, + }, + } + testIOMu.Unlock() + return id, []byte(`{}`), nil + + case "tcp": + if h.mockTCP == nil { + return 0, nil, fmt.Errorf("no TCP mock configured: use WithMockTCP") + } + conn, err := h.mockTCP(p.Addr) + if err != nil { + return 0, nil, err + } + testIOMu.Lock() + id := testIONextID + testIONextID++ + testIOMu.Unlock() + h.mockTCPMu.Lock() + if h.mockTCPConns == nil { + h.mockTCPConns = make(map[uint32]net.Conn) + } + h.mockTCPConns[id] = conn + h.mockTCPMu.Unlock() + return id, []byte(`{}`), nil + + default: + return 0, nil, fmt.Errorf("unsupported IO type in TestHost: %q", p.Type) } - - if h.httpHandler == nil { - return 0, nil, fmt.Errorf("no HTTP mock configured: use WithMockHTTP") - } - - testIOMu.Lock() - id := testIONextID - testIONextID++ - testIOHandles[id] = &testIOEntry{ - httpHandle: &testHTTPHandle{ - handler: h.httpHandler, - method: p.Method, - url: p.URL, - headers: p.Headers, - }, - } - testIOMu.Unlock() - - return id, []byte(`{}`), nil } func (h *TestHost) IORead(handleID uint32, size int) ([]byte, error) { + h.mockTCPMu.Lock() + conn, ok := h.mockTCPConns[handleID] + h.mockTCPMu.Unlock() + if ok { + buf := make([]byte, size) + n, err := conn.Read(buf) + if n > 0 { + return buf[:n], nil + } + return nil, err + } testIOMu.Lock() - entry, ok := testIOHandles[handleID] + entry, httpOK := testIOHandles[handleID] testIOMu.Unlock() - if !ok || entry.httpHandle == nil { + if !httpOK || entry.httpHandle == nil { return nil, fmt.Errorf("handle not found") } hh := entry.httpHandle @@ -85,10 +118,16 @@ func (h *TestHost) IORead(handleID uint32, size int) ([]byte, error) { } func (h *TestHost) IOWrite(handleID uint32, data []byte) (int, error) { + h.mockTCPMu.Lock() + conn, ok := h.mockTCPConns[handleID] + h.mockTCPMu.Unlock() + if ok { + return conn.Write(data) + } testIOMu.Lock() - entry, ok := testIOHandles[handleID] + entry, httpOK := testIOHandles[handleID] testIOMu.Unlock() - if !ok || entry.httpHandle == nil { + if !httpOK || entry.httpHandle == nil { return 0, fmt.Errorf("handle not found") } return entry.httpHandle.body.Write(data) @@ -127,6 +166,15 @@ func (h *TestHost) IOFlush(handleID uint32) ([]byte, error) { } func (h *TestHost) IOClose(handleID uint32) error { + h.mockTCPMu.Lock() + conn, tcpOK := h.mockTCPConns[handleID] + if tcpOK { + delete(h.mockTCPConns, handleID) + } + h.mockTCPMu.Unlock() + if tcpOK { + return conn.Close() + } testIOMu.Lock() entry, ok := testIOHandles[handleID] if ok { From a8e5d6cb337e403ecabd719d25ac9a6303431b43 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E4=B8=80=E4=B9=8B?= Date: Thu, 23 Apr 2026 14:13:46 +0800 Subject: [PATCH 3/4] =?UTF-8?q?=E2=9C=85=20add=20tcpConn=20SetReadDeadline?= =?UTF-8?q?=20test?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sdk/go/opskat/tcp_test.go | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/sdk/go/opskat/tcp_test.go b/sdk/go/opskat/tcp_test.go index 5a3a2e5..f135ad1 100644 --- a/sdk/go/opskat/tcp_test.go +++ b/sdk/go/opskat/tcp_test.go @@ -4,8 +4,10 @@ package opskat import ( "context" + "errors" "net" "testing" + "time" . "github.com/smartystreets/goconvey/convey" ) @@ -39,3 +41,29 @@ func TestTCPDialBasic(t *testing.T) { }) }) } + +func TestTCPSetDeadline(t *testing.T) { + Convey("SetDeadline on tcpConn", t, func() { + th := NewTestHost(WithMockTCP(func(addr string) (net.Conn, error) { + _, b := net.Pipe() + return b, nil + })) + defer th.Close() + + conn, err := DialContext(context.Background(), "tcp", "x:9092") + So(err, ShouldBeNil) + defer conn.Close() + + Convey("SetReadDeadline propagates to underlying conn", func() { + err := conn.SetReadDeadline(time.Now().Add(50 * time.Millisecond)) + So(err, ShouldBeNil) + + // Read should block briefly then timeout. + buf := make([]byte, 8) + _, err = conn.Read(buf) + So(err, ShouldNotBeNil) + var netErr net.Error + So(errors.As(err, &netErr), ShouldBeTrue) + }) + }) +} From db898de1ec3bf484b01f8bb18c9de3fa2cddbc1b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E4=B8=80=E4=B9=8B?= Date: Thu, 23 Apr 2026 14:14:33 +0800 Subject: [PATCH 4/4] =?UTF-8?q?=E2=9C=A8=20add=20ActionContext.ShouldStop?= =?UTF-8?q?=20for=20cooperative=20cancellation?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sdk/go/opskat/handler_test.go | 18 ++++++++++++++++++ sdk/go/opskat/opskat.go | 7 +++++++ 2 files changed, 25 insertions(+) diff --git a/sdk/go/opskat/handler_test.go b/sdk/go/opskat/handler_test.go index de8cb7e..fdbd8af 100644 --- a/sdk/go/opskat/handler_test.go +++ b/sdk/go/opskat/handler_test.go @@ -80,3 +80,21 @@ func TestDispatch(t *testing.T) { }) }) } + +func TestActionContextShouldStop(t *testing.T) { + Convey("When action cancel is triggered", t, func() { + th := NewTestHost(WithActionCancel()) + defer th.Close() + + var captured bool + resetRegistries() + RegisterAction("cancel_test", func(ctx *ActionContext) (any, error) { + captured = ctx.ShouldStop() + return nil, nil + }) + + _, err := th.CallAction("cancel_test", json.RawMessage("{}"), func(TestEvent) {}) + So(err, ShouldBeNil) + So(captured, ShouldBeTrue) + }) +} diff --git a/sdk/go/opskat/opskat.go b/sdk/go/opskat/opskat.go index 261755f..9f1ce82 100644 --- a/sdk/go/opskat/opskat.go +++ b/sdk/go/opskat/opskat.go @@ -16,6 +16,13 @@ type ActionContext struct { Events *EventWriter } +// ShouldStop returns true if the caller has requested cancellation. +// Long-running actions should poll this periodically and exit cleanly +// (e.g. send an "ended" event with reason="userStop"). +func (ctx *ActionContext) ShouldStop() bool { + return hostActionShouldStop() +} + // ToolHandler handles a tool invocation. type ToolHandler func(ctx *ToolContext) (any, error)