Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ extensions/oss/backend/oss
.claude
CLAUDE.md
superpowers
.omc

# Dev configs (contain secrets)
devconfig.json
Expand Down
18 changes: 18 additions & 0 deletions sdk/go/opskat/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,21 @@ func TestDispatch(t *testing.T) {
})
})
}

func TestActionContextShouldStop(t *testing.T) {
Convey("When action cancel is triggered", t, func() {
th := NewTestHost(WithActionCancel())
defer th.Close()

var captured bool
resetRegistries()
RegisterAction("cancel_test", func(ctx *ActionContext) (any, error) {
captured = ctx.ShouldStop()
return nil, nil
})

_, err := th.CallAction("cancel_test", json.RawMessage("{}"), func(TestEvent) {})
So(err, ShouldBeNil)
So(captured, ShouldBeTrue)
})
}
6 changes: 6 additions & 0 deletions sdk/go/opskat/hostcall.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ type hostCaller interface {
IOWrite(handleID uint32, data []byte) (int, error)
IOFlush(handleID uint32) (metaJSON []byte, err error)
IOClose(handleID uint32) error
IOSetDeadline(handleID uint32, kind string, unixNanos int64) error
AssetGetConfig(assetID int64) (json.RawMessage, error)
FileDialog(params []byte) (string, error)
KVGet(key string) ([]byte, error)
KVSet(key string, value []byte) error
ActionEvent(eventType string, data []byte)
ActionShouldStop() bool
}

// Package-level host call functions. All SDK code calls through these.
Expand All @@ -26,8 +28,12 @@ func hostIORead(handleID uint32, size int) ([]byte, error) { return current
func hostIOWrite(handleID uint32, data []byte) (int, error) { return currentHost.IOWrite(handleID, data) }
func hostIOFlush(handleID uint32) ([]byte, error) { return currentHost.IOFlush(handleID) }
func hostIOClose(handleID uint32) error { return currentHost.IOClose(handleID) }
func hostIOSetDeadline(handleID uint32, kind string, unixNanos int64) error {
return currentHost.IOSetDeadline(handleID, kind, unixNanos)
}
func hostAssetGetConfig(assetID int64) (json.RawMessage, error) { return currentHost.AssetGetConfig(assetID) }
func hostFileDialog(params []byte) (string, error) { return currentHost.FileDialog(params) }
func hostKVGet(key string) ([]byte, error) { return currentHost.KVGet(key) }
func hostKVSet(key string, value []byte) error { return currentHost.KVSet(key, value) }
func hostActionEvent(eventType string, data []byte) { currentHost.ActionEvent(eventType, data) }
func hostActionShouldStop() bool { return currentHost.ActionShouldStop() }
4 changes: 4 additions & 0 deletions sdk/go/opskat/hostcall_stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,14 @@ func (n *nopHost) IORead(handleID uint32, size int) ([]byte, error) { retur
func (n *nopHost) IOWrite(handleID uint32, data []byte) (int, error) { return 0, errNotConfigured }
func (n *nopHost) IOFlush(handleID uint32) ([]byte, error) { return nil, errNotConfigured }
func (n *nopHost) IOClose(handleID uint32) error { return errNotConfigured }
func (n *nopHost) IOSetDeadline(handleID uint32, kind string, unixNanos int64) error {
return errNotConfigured
}
func (n *nopHost) AssetGetConfig(assetID int64) (json.RawMessage, error) { return nil, errNotConfigured }
func (n *nopHost) FileDialog(params []byte) (string, error) { return "", errNotConfigured }
func (n *nopHost) KVGet(key string) ([]byte, error) { return nil, errNotConfigured }
func (n *nopHost) KVSet(key string, value []byte) error { return errNotConfigured }
func (n *nopHost) ActionEvent(eventType string, data []byte) {}
func (n *nopHost) ActionShouldStop() bool { return false }

var errNotConfigured = fmt.Errorf("host not configured: use TestHost or run inside WASM")
8 changes: 8 additions & 0 deletions sdk/go/opskat/hostcall_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,11 @@ func (m *mockHostCaller) KVSet(key string, value []byte) error {

func (m *mockHostCaller) ActionEvent(eventType string, data []byte) {
}

func (m *mockHostCaller) IOSetDeadline(handleID uint32, kind string, unixNanos int64) error {
return fmt.Errorf("not implemented")
}

func (m *mockHostCaller) ActionShouldStop() bool {
return false
}
17 changes: 17 additions & 0 deletions sdk/go/opskat/hostcall_wasm.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ func wasmHostKVSet(keyPtr, keyLen, valPtr, valLen uint32)
//go:wasmimport opskat host_action_event
func wasmHostActionEvent(typePtr, typeLen, dataPtr, dataLen uint32)

//go:wasmimport opskat host_io_set_deadline
func wasmHostIOSetDeadline(handleID, kindPtr, kindLen uint32, unixNanos int64) uint64

//go:wasmimport opskat host_action_should_stop
func wasmHostActionShouldStop() uint32

// Helper: convert Go string to (ptr, len) for WASM.
func strToPtr(s string) (uint32, uint32) {
if len(s) == 0 {
Expand Down Expand Up @@ -165,3 +171,14 @@ func (w *wasmHostCaller) ActionEvent(eventType string, data []byte) {
dp, dl := bytesToPtr(data)
wasmHostActionEvent(tp, tl, dp, dl)
}

func (w *wasmHostCaller) IOSetDeadline(handleID uint32, kind string, unixNanos int64) error {
kp, kl := strToPtr(kind)
packed := wasmHostIOSetDeadline(handleID, kp, kl, unixNanos)
_, err := unpackResult(packed)
return err
}

func (w *wasmHostCaller) ActionShouldStop() bool {
return wasmHostActionShouldStop() == 1
}
7 changes: 7 additions & 0 deletions sdk/go/opskat/opskat.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ type ActionContext struct {
Events *EventWriter
}

// ShouldStop returns true if the caller has requested cancellation.
// Long-running actions should poll this periodically and exit cleanly
// (e.g. send an "ended" event with reason="userStop").
func (ctx *ActionContext) ShouldStop() bool {
return hostActionShouldStop()
}

// ToolHandler handles a tool invocation.
type ToolHandler func(ctx *ToolContext) (any, error)

Expand Down
69 changes: 69 additions & 0 deletions sdk/go/opskat/tcp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package opskat

import (
"context"
"net"
"time"
)

// DialContext opens a TCP connection via the host's IO bridge.
// Its signature matches kafka-go Transport.Dial for drop-in use.
// ctx is consulted only for pre-dial cancellation; per-call deadlines
// should be set on the returned net.Conn via SetDeadline / SetReadDeadline.
func DialContext(ctx context.Context, network, addr string) (net.Conn, error) {
if err := ctx.Err(); err != nil {
return nil, err
}
h, err := IOOpen("tcp", map[string]any{"addr": addr})
if err != nil {
return nil, err
}
return &tcpConn{
h: h,
remote: &addrStr{network: network, addr: addr},
local: &addrStr{network: network, addr: "wasm"},
}, nil
}

// Dial is a convenience wrapper without ctx.
func Dial(network, addr string) (net.Conn, error) {
return DialContext(context.Background(), network, addr)
}

type tcpConn struct {
h *IOHandle
remote, local net.Addr
}

func (c *tcpConn) Read(p []byte) (int, error) { return c.h.Read(p) }
func (c *tcpConn) Write(p []byte) (int, error) { return c.h.Write(p) }
func (c *tcpConn) Close() error { return c.h.Close() }
func (c *tcpConn) LocalAddr() net.Addr { return c.local }
func (c *tcpConn) RemoteAddr() net.Addr { return c.remote }

func (c *tcpConn) SetDeadline(t time.Time) error {
return hostIOSetDeadline(c.h.ID(), "both", toUnixNanos(t))
}

func (c *tcpConn) SetReadDeadline(t time.Time) error {
return hostIOSetDeadline(c.h.ID(), "read", toUnixNanos(t))
}

func (c *tcpConn) SetWriteDeadline(t time.Time) error {
return hostIOSetDeadline(c.h.ID(), "write", toUnixNanos(t))
}

func toUnixNanos(t time.Time) int64 {
if t.IsZero() {
return 0
}
return t.UnixNano()
}

type addrStr struct {
network string
addr string
}

func (a *addrStr) Network() string { return a.network }
func (a *addrStr) String() string { return a.addr }
69 changes: 69 additions & 0 deletions sdk/go/opskat/tcp_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
//go:build !wasip1

package opskat

import (
"context"
"errors"
"net"
"testing"
"time"

. "github.com/smartystreets/goconvey/convey"
)

func TestTCPDialBasic(t *testing.T) {
Convey("Given a TestHost with a mock TCP echo server", t, func() {
th := NewTestHost(WithMockTCP(func(addr string) (net.Conn, error) {
a, b := net.Pipe()
go func() {
buf := make([]byte, 256)
n, _ := a.Read(buf)
a.Write(buf[:n]) //nolint:errcheck // echo pipe
}()
return b, nil
}))
defer th.Close()

Convey("DialContext returns a working net.Conn", func() {
conn, err := DialContext(context.Background(), "tcp", "broker:9092")
So(err, ShouldBeNil)
defer conn.Close()

n, err := conn.Write([]byte("hi"))
So(err, ShouldBeNil)
So(n, ShouldEqual, 2)

buf := make([]byte, 8)
n, err = conn.Read(buf)
So(err, ShouldBeNil)
So(string(buf[:n]), ShouldEqual, "hi")
})
})
}

func TestTCPSetDeadline(t *testing.T) {
Convey("SetDeadline on tcpConn", t, func() {
th := NewTestHost(WithMockTCP(func(addr string) (net.Conn, error) {
_, b := net.Pipe()
return b, nil
}))
defer th.Close()

conn, err := DialContext(context.Background(), "tcp", "x:9092")
So(err, ShouldBeNil)
defer conn.Close()

Convey("SetReadDeadline propagates to underlying conn", func() {
err := conn.SetReadDeadline(time.Now().Add(50 * time.Millisecond))
So(err, ShouldBeNil)

// Read should block briefly then timeout.
buf := make([]byte, 8)
_, err = conn.Read(buf)
So(err, ShouldNotBeNil)
var netErr net.Error
So(errors.As(err, &netErr), ShouldBeTrue)
})
})
}
62 changes: 56 additions & 6 deletions sdk/go/opskat/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ package opskat
import (
"encoding/json"
"fmt"
"net"
"net/http"
"sync"
"sync/atomic"
"time"
)

// TestEvent represents a captured action event.
Expand All @@ -31,14 +34,31 @@ func WithMockHTTP(handler http.HandlerFunc) TestOption {
return func(h *TestHost) { h.httpHandler = handler }
}

// WithMockTCP installs a TCP dialer for IOOpen(type=tcp). The dialer receives
// the requested addr and returns a net.Conn that backs the TestHost's IO
// operations for the resulting handle.
func WithMockTCP(dial func(addr string) (net.Conn, error)) TestOption {
return func(h *TestHost) { h.mockTCP = dial }
}

// WithActionCancel marks the TestHost as already-cancelled, so ActionShouldStop
// returns true. Used to exercise cooperative-cancel paths in action handlers.
func WithActionCancel() TestOption {
return func(h *TestHost) { h.actionStopped.Store(true) }
}

// TestHost simulates the host environment for extension unit testing.
type TestHost struct {
assetConfigs map[int64]json.RawMessage
kv map[string][]byte
httpHandler http.HandlerFunc
events []TestEvent
eventCb func(TestEvent)
mu sync.Mutex
assetConfigs map[int64]json.RawMessage
kv map[string][]byte
httpHandler http.HandlerFunc
mockTCP func(addr string) (net.Conn, error)
mockTCPMu sync.Mutex
mockTCPConns map[uint32]net.Conn
actionStopped atomic.Bool
events []TestEvent
eventCb func(TestEvent)
mu sync.Mutex
}

// NewTestHost creates a TestHost and installs it as the active host.
Expand Down Expand Up @@ -164,3 +184,33 @@ func (h *TestHost) ActionEvent(eventType string, data []byte) {
h.eventCb(e)
}
}

// IOSetDeadline applies a deadline to a TCP mock handle registered via WithMockTCP.
// For non-TCP handles (HTTP mocks) it is a no-op returning nil.
func (h *TestHost) IOSetDeadline(handleID uint32, kind string, unixNanos int64) error {
h.mockTCPMu.Lock()
conn, ok := h.mockTCPConns[handleID]
h.mockTCPMu.Unlock()
if !ok {
return nil
}
var t time.Time
if unixNanos != 0 {
t = time.Unix(0, unixNanos)
}
switch kind {
case "read":
return conn.SetReadDeadline(t)
case "write":
return conn.SetWriteDeadline(t)
case "both":
return conn.SetDeadline(t)
default:
return fmt.Errorf("unknown deadline kind: %q", kind)
}
}

// ActionShouldStop returns the cancellation flag (default false; set via WithActionCancel).
func (h *TestHost) ActionShouldStop() bool {
return h.actionStopped.Load()
}
Loading