Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 25 additions & 9 deletions crates/wasm-rquickjs/skeleton/src/builtin/diagnostics_channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ class Channel {
}

get hasSubscribers() {
return this._subscribers.some(fn => !fn._internal) || this._stores.size > 0;
}

get _hasAnySubscribers() {
return this._subscribers.length > 0 || this._stores.size > 0;
}

Expand Down Expand Up @@ -103,15 +107,20 @@ function hasSubscribers(name) {

const TRACE_EVENTS = ['start', 'end', 'asyncStart', 'asyncEnd', 'error'];

const TRACING_CHANNEL_CREATED = Symbol.for('wasm-rquickjs.internal.tracing_channel.created');
const tracingChannelCreatedCh = channel(TRACING_CHANNEL_CREATED);

class TracingChannel {
constructor(nameOrChannels) {
if (typeof nameOrChannels === 'string' || typeof nameOrChannels === 'symbol') {
const name = nameOrChannels;
this.start = channel(`tracing:${String(name)}:start`);
this.end = channel(`tracing:${String(name)}:end`);
this.asyncStart = channel(`tracing:${String(name)}:asyncStart`);
this.asyncEnd = channel(`tracing:${String(name)}:asyncEnd`);
this.error = channel(`tracing:${String(name)}:error`);
const key = String(name);
this.start = channel(`tracing:${key}:start`);
this.end = channel(`tracing:${key}:end`);
this.asyncStart = channel(`tracing:${key}:asyncStart`);
this.asyncEnd = channel(`tracing:${key}:asyncEnd`);
this.error = channel(`tracing:${key}:error`);
tracingChannelCreatedCh.publish({ name, key });
} else if (nameOrChannels && typeof nameOrChannels === 'object') {
for (const event of TRACE_EVENTS) {
if (!(nameOrChannels[event] instanceof Channel)) {
Expand Down Expand Up @@ -140,6 +149,10 @@ class TracingChannel {
return TRACE_EVENTS.some(e => this[e].hasSubscribers);
}

get _hasAnySubscribers() {
return TRACE_EVENTS.some(e => this[e]._hasAnySubscribers);
}

unsubscribe(subscribers) {
let allRemoved = true;
for (const event of TRACE_EVENTS) {
Expand All @@ -153,7 +166,7 @@ class TracingChannel {
}

traceSync(fn, context = {}, thisArg, ...args) {
if (!this.hasSubscribers) {
if (!this._hasAnySubscribers) {
return fn.apply(thisArg, args);
}

Expand All @@ -175,7 +188,7 @@ class TracingChannel {
}

tracePromise(fn, context = {}, thisArg, ...args) {
if (!this.hasSubscribers) {
if (!this._hasAnySubscribers) {
return fn.apply(thisArg, args);
}

Expand All @@ -184,6 +197,7 @@ class TracingChannel {
return start.runStores(context, () => {
try {
const promise = fn.apply(thisArg, args);
context.__dc_async = true;
end.publish(context);
return Promise.resolve(promise).then(
(result) => {
Expand Down Expand Up @@ -212,7 +226,7 @@ class TracingChannel {
}

traceCallback(fn, position = -1, context = {}, thisArg, ...args) {
if (!this.hasSubscribers) {
if (!this._hasAnySubscribers) {
return fn.apply(thisArg, args);
}

Expand Down Expand Up @@ -260,7 +274,9 @@ class TracingChannel {

return start.runStores(context, () => {
try {
return fn.apply(thisArg, args);
const result = fn.apply(thisArg, args);
context.__dc_async = true;
return result;
} catch (err) {
context.error = err;
error.publish(context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,14 @@ pub const DIAGNOSTICS_CHANNEL_GOLEM_JS: &str = include_str!("diagnostics_channel
#[cfg(feature = "golem")]
pub const GOLEM_WIRE_JS: &str = r#"
{
const { subscribe } = await import('node:diagnostics_channel');
const { _installGolemTracing } = await import('__wasm_rquickjs_builtin/diagnostics_channel_golem');

const TRACING_CHANNEL_CREATED = Symbol.for('wasm-rquickjs.internal.tracing_channel.created');
subscribe(TRACING_CHANNEL_CREATED, ({ key }) => {
_installGolemTracing(key);
});

_installGolemTracing('http.client');
}
"#;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,37 @@ import { channel } from 'node:diagnostics_channel';
// Internal registry to correlate context objects with span handles
const _contextSpans = new WeakMap();

// Deduplication set to prevent multiple subscriber installs for the same channel
const _wired = new Set();

// Helper to create a Golem-integrated subscriber set for any TracingChannel
export function _installGolemTracing(channelName) {
const startCh = channel(`tracing:${channelName}:start`);
const endCh = channel(`tracing:${channelName}:end`);
const errorCh = channel(`tracing:${channelName}:error`);
const asyncEndCh = channel(`tracing:${channelName}:asyncEnd`);
const key = String(channelName);
if (_wired.has(key)) return;
_wired.add(key);

const startCh = channel(`tracing:${key}:start`);
const endCh = channel(`tracing:${key}:end`);
const errorCh = channel(`tracing:${key}:error`);
const asyncEndCh = channel(`tracing:${key}:asyncEnd`);

startCh.subscribe((context) => {
function onStart(context) {
try {
const handle = start_span(channelName);
_contextSpans.set(context, { handle, hasAsync: false });
for (const [key, value] of Object.entries(context)) {
if (key !== 'result' && key !== 'error' && value !== undefined && value !== null) {
const handle = start_span(key);
_contextSpans.set(context, { handle });
for (const [k, value] of Object.entries(context)) {
if (k !== 'result' && k !== 'error' && value !== undefined && value !== null) {
try {
set_span_attribute(handle, key, String(value));
set_span_attribute(handle, k, String(value));
} catch (_) {}
}
}
} catch (_) {}
});
}
onStart._internal = true;
startCh.subscribe(onStart);

errorCh.subscribe((context) => {
function onError(context) {
try {
const entry = _contextSpans.get(context);
if (entry) {
Expand All @@ -37,24 +46,26 @@ export function _installGolemTracing(channelName) {
}
}
} catch (_) {}
});
}
onError._internal = true;
errorCh.subscribe(onError);

endCh.subscribe((context) => {
function onEnd(context) {
try {
const entry = _contextSpans.get(context);
if (entry && !entry.hasAsync) {
// For sync-only traces, finish span on end
// For async traces, asyncEnd will finish it
if (entry && !context.__dc_async) {
if (context.result !== undefined) {
set_span_attribute(entry.handle, 'result', String(context.result));
}
finish_span(entry.handle);
_contextSpans.delete(context);
}
} catch (_) {}
});
}
onEnd._internal = true;
endCh.subscribe(onEnd);

asyncEndCh.subscribe((context) => {
function onAsyncEnd(context) {
try {
const entry = _contextSpans.get(context);
if (entry) {
Expand All @@ -65,16 +76,8 @@ export function _installGolemTracing(channelName) {
_contextSpans.delete(context);
}
} catch (_) {}
});
}
onAsyncEnd._internal = true;
asyncEndCh.subscribe(onAsyncEnd);

// Mark contexts as async when asyncStart fires
const asyncStartCh = channel(`tracing:${channelName}:asyncStart`);
asyncStartCh.subscribe((context) => {
try {
const entry = _contextSpans.get(context);
if (entry) {
entry.hasAsync = true;
}
} catch (_) {}
});
}
6 changes: 2 additions & 4 deletions crates/wasm-rquickjs/src/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,8 @@ fn collect_resource_types_from_item(
collect_resource_types_from_item(&export_item, engine, known);
}
}
ComponentItem::Resource(res_ty) => {
if !known.contains(res_ty) {
known.push(*res_ty);
}
ComponentItem::Resource(res_ty) if !known.contains(res_ty) => {
known.push(*res_ty);
}
_ => {}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ export function test() {
// tracing:http.client:start/end/error/asyncStart/asyncEnd channels.
//
// Test 1: Verify the Golem tracing subscribers are installed on the http.client channels
// Internal subscribers are hidden from hasSubscribers, so check _subscribers directly
try {
const startCh = dc.channel('tracing:http.client:start');
const endCh = dc.channel('tracing:http.client:end');
results.golemTracingInstalled = startCh.hasSubscribers && endCh.hasSubscribers;
results.golemTracingInstalled = startCh._subscribers.length > 0 && endCh._subscribers.length > 0;
} catch (e) {
errors.push('golemTracingInstalled: ' + e.message);
}
Expand Down Expand Up @@ -44,18 +45,16 @@ export function test() {
errors.push('traceSyncError: ' + e.message);
}

// Test 4: Verify custom tracing channels also work with _installGolemTracing pattern
// We manually install golem tracing for a custom channel
// Test 4: Verify custom tracing channels auto-wire Golem tracing
try {
const customCh = dc.tracingChannel('test.custom');
// Before installing, no extra subscribers beyond what we add
// After creation, Golem tracing should be auto-installed (internal subscribers)
const startCh = dc.channel('tracing:test.custom:start');
const initialSubs = startCh.hasSubscribers;
results.customGolemSubs = startCh._subscribers.length > 0;

// The traceSync should work even without Golem tracing
const val = customCh.traceSync(() => 42, {});
// traceSync on custom channel should create a Golem span
const val = customCh.traceSync(() => 42, { operation: 'custom-op' });
results.customTraceSync = val === 42;
results.customNoGolemSubs = !initialSubs;
} catch (e) {
errors.push('customTrace: ' + e.message);
}
Expand Down
24 changes: 10 additions & 14 deletions tests/common/js_subtest_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,8 @@ fn extract_preceding_comment(source: &str, span_start: u32) -> Option<String> {
fn uses_node_test(program: &Program) -> bool {
for stmt in &program.body {
match stmt {
Statement::ImportDeclaration(import) => {
if import.source.value == "node:test" {
return true;
}
Statement::ImportDeclaration(import) if import.source.value == "node:test" => {
return true;
}
Statement::VariableDeclaration(var_decl) => {
for decl in &var_decl.declarations {
Expand Down Expand Up @@ -163,16 +161,14 @@ fn extract_test_name(call: &CallExpression) -> Option<String> {
if let Some(arg) = call.arguments.first() {
match arg {
Argument::StringLiteral(s) => return Some(s.value.to_string()),
Argument::TemplateLiteral(t) => {
if t.expressions.is_empty() && !t.quasis.is_empty() {
let value = t
.quasis
.iter()
.map(|q| q.value.raw.as_str())
.collect::<String>();
if !value.is_empty() {
return Some(value);
}
Argument::TemplateLiteral(t) if t.expressions.is_empty() && !t.quasis.is_empty() => {
let value = t
.quasis
.iter()
.map(|q| q.value.raw.as_str())
.collect::<String>();
if !value.is_empty() {
return Some(value);
}
}
_ => {}
Expand Down
16 changes: 8 additions & 8 deletions tests/runtime/diagnostics_channel_golem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,21 +50,21 @@ async fn golem_context_tracing(
"traceSyncError should be true"
);

// Custom tracing channel works
// Custom tracing channel auto-wires Golem tracing
assert!(
r["customTraceSync"].as_bool().unwrap(),
"customTraceSync should be true"
r["customGolemSubs"].as_bool().unwrap(),
"customGolemSubs should be true - custom channels should auto-wire Golem tracing"
);
assert!(
r["customNoGolemSubs"].as_bool().unwrap(),
"customNoGolemSubs should be true"
r["customTraceSync"].as_bool().unwrap(),
"customTraceSync should be true"
);

// Verify spans were recorded by the mock host
// Verify spans were recorded by the mock host (2 http.client + 1 custom)
let spans = prepared.spans.lock().unwrap();
assert!(
spans.len() >= 2,
"Expected at least 2 spans from traceSync calls, got {}. Output: {}",
spans.len() >= 3,
"Expected at least 3 spans (2 http.client + 1 custom), got {}. Output: {}",
spans.len(),
output
);
Expand Down
Loading