Skip to content
174 changes: 174 additions & 0 deletions cadence/signal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
"""
Signal definition for Cadence workflows.

This module provides the SignalDefinition class used internally by WorkflowDefinition
to track signal handler metadata.
"""

import inspect
from dataclasses import dataclass
from functools import update_wrapper
from inspect import Parameter, signature
from typing import (
Callable,
Generic,
ParamSpec,
Type,
TypeVar,
TypedDict,
get_type_hints,
Any,
)

P = ParamSpec("P")
T = TypeVar("T")


@dataclass(frozen=True)
class SignalParameter:
"""Parameter metadata for a signal handler."""

name: str
type_hint: Type | None
has_default: bool
default_value: Any


class SignalDefinitionOptions(TypedDict, total=False):
"""Options for defining a signal."""

name: str


class SignalDefinition(Generic[P, T]):
"""
Definition of a signal handler with metadata.

Similar to ActivityDefinition but for signal handlers.
Provides type safety and metadata for signal handlers.
"""

def __init__(
self,
wrapped: Callable[P, T],
name: str,
params: list[SignalParameter],
is_async: bool,
):
self._wrapped = wrapped
self._name = name
self._params = params
self._is_async = is_async
update_wrapper(self, wrapped)

def __call__(self, *args: P.args, **kwargs: P.kwargs) -> T:
"""Call the wrapped signal handler function."""
return self._wrapped(*args, **kwargs)

@property
def name(self) -> str:
"""Get the signal name."""
return self._name

@property
def params(self) -> list[SignalParameter]:
"""Get the signal parameters."""
return self._params

@property
def is_async(self) -> bool:
"""Check if the signal handler is async."""
return self._is_async

@property
def wrapped(self) -> Callable[P, T]:
"""Get the wrapped signal handler function."""
return self._wrapped

@staticmethod
def wrap(
fn: Callable[P, T], opts: SignalDefinitionOptions
) -> "SignalDefinition[P, T]":
"""
Wrap a function as a SignalDefinition.

This is an internal method used by WorkflowDefinition to create signal definitions
from methods decorated with @workflow.signal.

Args:
fn: The signal handler function to wrap
opts: Options for the signal definition

Returns:
A SignalDefinition instance

Raises:
ValueError: If return type is not None
"""
name = opts.get("name") or fn.__qualname__
is_async = inspect.iscoroutinefunction(fn)
params = _get_signal_signature(fn)
_validate_signal_return_type(fn)

return SignalDefinition[P, T](fn, name, params, is_async)


def _validate_signal_return_type(fn: Callable) -> None:
"""
Validate that signal handler returns None.

Args:
fn: The signal handler function

Raises:
ValueError: If return type is not None
"""
try:
hints = get_type_hints(fn)
ret_type = hints.get("return", inspect.Signature.empty)

if ret_type is not None and ret_type is not inspect.Signature.empty:
raise ValueError(
f"Signal handler '{fn.__qualname__}' must return None "
f"(signals cannot return values), got {ret_type}"
)
except NameError:
pass


def _get_signal_signature(fn: Callable[P, T]) -> list[SignalParameter]:
"""
Extract parameter information from a signal handler function.

Args:
fn: The signal handler function

Returns:
List of SignalParameter objects

Raises:
ValueError: If parameters are not positional
"""
sig = signature(fn)
args = sig.parameters
hints = get_type_hints(fn)
params = []

for name, param in args.items():
# Filter out the self parameter for instance methods
if param.name == "self":
continue

has_default = param.default != Parameter.empty
default = param.default if has_default else None

if param.kind in (Parameter.POSITIONAL_ONLY, Parameter.POSITIONAL_OR_KEYWORD):
type_hint = hints.get(name, None)
params.append(SignalParameter(name, type_hint, has_default, default))
else:
raise ValueError(
f"Signal handler '{fn.__qualname__}' parameter '{name}' must be positional, "
f"got {param.kind.name}"
)

return params
66 changes: 64 additions & 2 deletions cadence/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import inspect

from cadence.data_converter import DataConverter
from cadence.signal import SignalDefinition, SignalDefinitionOptions

ResultType = TypeVar("ResultType")

Expand Down Expand Up @@ -60,10 +61,22 @@ class WorkflowDefinition(Generic[C]):
Provides type safety and metadata for workflow classes.
"""

def __init__(self, cls: Type[C], name: str, run_method_name: str):
def __init__(
self,
cls: Type[C],
name: str,
run_method_name: str,
signals: dict[str, SignalDefinition[..., Any]],
):
self._cls: Type[C] = cls
self._name = name
self._run_method_name = run_method_name
self._signals = signals

@property
def signals(self) -> dict[str, SignalDefinition[..., Any]]:
"""Get the signal definitions."""
return self._signals

@property
def name(self) -> str:
Expand Down Expand Up @@ -99,6 +112,11 @@ def wrap(cls: Type, opts: WorkflowDefinitionOptions) -> "WorkflowDefinition":
name = opts["name"]

# Validate that the class has exactly one run method and find it
# Also validate that class does not have multiple signal methods with the same name
signals: dict[str, SignalDefinition[..., Any]] = {}
signal_names: dict[
str, str
] = {} # Map signal name to method name for duplicate detection
run_method_name = None
for attr_name in dir(cls):
if attr_name.startswith("_"):
Expand All @@ -116,10 +134,24 @@ def wrap(cls: Type, opts: WorkflowDefinitionOptions) -> "WorkflowDefinition":
)
run_method_name = attr_name

if hasattr(attr, "_workflow_signal"):
signal_name = getattr(attr, "_workflow_signal")
if signal_name in signal_names:
raise ValueError(
f"Multiple @workflow.signal methods found in class {cls.__name__} "
f"with signal name '{signal_name}': '{attr_name}' and '{signal_names[signal_name]}'"
)
# Create SignalDefinition from the decorated method
signal_def = SignalDefinition.wrap(
attr, SignalDefinitionOptions(name=signal_name)
)
signals[signal_name] = signal_def
signal_names[signal_name] = attr_name

if run_method_name is None:
raise ValueError(f"No @workflow.run method found in class {cls.__name__}")

return WorkflowDefinition(cls, name, run_method_name)
return WorkflowDefinition(cls, name, run_method_name, signals)


def run(func: Optional[T] = None) -> Union[T, Callable[[T], T]]:
Expand Down Expand Up @@ -163,6 +195,36 @@ def decorator(f: T) -> T:
return decorator(func)


def signal(name: str | None = None) -> Callable[[T], T]:
"""
Decorator to mark a method as a workflow signal handler.

Example:
@workflow.signal(name="approval_channel")
async def approve(self, approved: bool):
self.approved = approved

Args:
name: The name of the signal

Returns:
The decorated method with workflow signal metadata

Raises:
ValueError: If name is not provided

"""
if name is None:
raise ValueError("name is required")

def decorator(f: T) -> T:
f._workflow_signal = name # type: ignore
return f

# Only allow @workflow.signal(name), require name to be explicitly provided
return decorator


@dataclass(frozen=True)
class WorkflowInfo:
workflow_type: str
Expand Down
97 changes: 97 additions & 0 deletions tests/cadence/worker/test_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from cadence import workflow
from cadence.worker import Registry
from cadence.workflow import WorkflowDefinition
from cadence.signal import SignalDefinition
from tests.cadence import common_activities


Expand Down Expand Up @@ -212,3 +213,99 @@ async def run(self, input: str) -> str:
workflow_def = reg.get_workflow("custom_workflow_name")
assert workflow_def.name == "custom_workflow_name"
assert workflow_def.cls == CustomWorkflow

def test_workflow_with_signal(self):
"""Test workflow with signal handler."""
reg = Registry()

@reg.workflow
class WorkflowWithSignal:
@workflow.run
async def run(self):
return "done"

@workflow.signal(name="approval")
async def handle_approval(self, approved: bool):
self.approved = approved

workflow_def = reg.get_workflow("WorkflowWithSignal")
assert isinstance(workflow_def, WorkflowDefinition)
assert len(workflow_def.signals) == 1
assert "approval" in workflow_def.signals
signal_def = workflow_def.signals["approval"]
assert isinstance(signal_def, SignalDefinition)
assert signal_def.name == "approval"
assert signal_def.is_async is True
assert len(signal_def.params) == 1
assert signal_def.params[0].name == "approved"

def test_workflow_with_multiple_signals(self):
"""Test workflow with multiple signal handlers."""
reg = Registry()

@reg.workflow
class WorkflowWithMultipleSignals:
@workflow.run
async def run(self):
return "done"

@workflow.signal(name="approval")
async def handle_approval(self, approved: bool):
self.approved = approved

@workflow.signal(name="cancel")
async def handle_cancel(self):
self.cancelled = True

workflow_def = reg.get_workflow("WorkflowWithMultipleSignals")
assert len(workflow_def.signals) == 2
assert "approval" in workflow_def.signals
assert "cancel" in workflow_def.signals
assert isinstance(workflow_def.signals["approval"], SignalDefinition)
assert isinstance(workflow_def.signals["cancel"], SignalDefinition)
assert workflow_def.signals["approval"].name == "approval"
assert workflow_def.signals["cancel"].name == "cancel"

def test_signal_decorator_requires_name(self):
"""Test that signal decorator requires name parameter."""
with pytest.raises(ValueError, match="name is required"):

@workflow.signal()
async def test_signal(self):
pass

def test_workflow_without_signals(self):
"""Test that workflow without signals has empty signals dict."""
reg = Registry()

@reg.workflow
class WorkflowWithoutSignals:
@workflow.run
async def run(self):
return "done"

workflow_def = reg.get_workflow("WorkflowWithoutSignals")
assert isinstance(workflow_def.signals, dict)
assert len(workflow_def.signals) == 0

def test_duplicate_signal_names_error(self):
"""Test that duplicate signal names raise ValueError."""
reg = Registry()

with pytest.raises(
ValueError, match="Multiple.*signal.*found.*with signal name 'approval'"
):

@reg.workflow
class WorkflowWithDuplicateSignalNames:
@workflow.run
async def run(self):
return "done"

@workflow.signal(name="approval")
async def handle_approval(self, approved: bool):
self.approved = approved

@workflow.signal(name="approval")
async def handle_approval_different(self):
self.also_approved = True