Skip to content

feat(taskbroker-client): Add pass_headers option to task registration#623

Merged
untitaker merged 5 commits intomainfrom
feat/pass-headers-to-task
May 8, 2026
Merged

feat(taskbroker-client): Add pass_headers option to task registration#623
untitaker merged 5 commits intomainfrom
feat/pass-headers-to-task

Conversation

@untitaker
Copy link
Copy Markdown
Member

@untitaker untitaker commented May 7, 2026

Summary

  • Adds pass_headers parameter to @namespace.register() decorator
  • When pass_headers=True, the task function receives activation headers as a headers keyword argument (dict[str, str])
  • Provides a simpler alternative to implementing a full ContextHook for tasks that just need direct access to header values

We need this kind of functionality when using raw mode together with the profiles consumer. Mainly for killswitches.

ref STREAM-882

Usage

from taskbroker_client.types import TaskHeaders

@namespace.register(name="my.task", pass_headers=True)
def my_task(org_id: int, headers: TaskHeaders) -> None:
    tenant_id = headers.get("x-tenant-id", "")
    trace_id = headers.get("sentry-trace", "")

Test plan

  • Added unit tests for pass_headers attribute on Task
  • Added tests for ALWAYS_EAGER mode (empty headers dict passed)
  • Added worker integration test verifying headers are passed during execution
  • All 158 tests pass

🤖 Generated with Claude Code

@untitaker untitaker requested a review from a team as a code owner May 7, 2026 13:49
Allow tasks to opt into receiving activation headers by setting
pass_headers=True in the @register() decorator. When enabled, the
task function receives headers as a keyword argument (dict[str, str]).

This provides a simpler alternative to implementing a full ContextHook
for tasks that just need direct access to header values.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@untitaker untitaker force-pushed the feat/pass-headers-to-task branch from 77075a8 to 15615c2 Compare May 7, 2026 13:52
@getsentry getsentry deleted a comment from linear-code Bot May 7, 2026
untitaker added a commit to getsentry/sentry that referenced this pull request May 7, 2026
Add `process_profile_from_kafka` task that accepts raw Kafka message bytes
for use with taskbroker's passthrough mode. This allows taskbroker to read
directly from the ingest-profiles Kafka topic.

Changes:
- New task `process_profile_from_kafka` in `ingest.profiling.passthrough` namespace
- Refactored `_process_profile_message` to share logic between consumer and task
- Removed base64 encoding - tasks now accept bytes directly
- Use Kafka headers for killswitch/sampled when available, fall back to message body

Depends on getsentry/taskbroker#623 for exposing Kafka headers to tasks.

STREAM-882
untitaker added a commit to getsentry/sentry that referenced this pull request May 7, 2026
Add `process_profile_from_kafka` task that accepts raw Kafka message bytes
for use with taskbroker's passthrough mode. This allows taskbroker to read
directly from the ingest-profiles Kafka topic.

Changes:
- New task `process_profile_from_kafka` in `ingest.profiling.passthrough` namespace
- Refactored `_process_profile_message` to share logic between consumer and task
- Removed base64 encoding - tasks now accept bytes directly
- Use Kafka headers for killswitch/sampled when available, fall back to message body

Depends on getsentry/taskbroker#623 for exposing Kafka headers to tasks.

STREAM-882
Comment thread clients/python/src/taskbroker_client/worker/workerchild.py
Copy link
Copy Markdown
Member

@markstory markstory left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the general design is reasonable. Another potential solution would be to pass the entire activation into the task, and let userland code pick the headers out. I prefer passing just the headers over the the entire activation though.

compression_type=compression_type,
report_timeout_errors=report_timeout_errors,
silenced_exceptions=silenced_exceptions,
pass_headers=pass_headers,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're going to inject parameters into the task, should we validate that the task doesn't already have a headers parameter with an incompatible type?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another potential solution would be to pass the entire activation into the task,

as in, you have a pass_activation option?

Since we're going to inject parameters into the task, should we validate that the task doesn't already have a headers parameter with an incompatible type?

this feels magical but i can do it

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can enforce that the headers kwarg is there explicitly. def mytask(**kwargs): is then just invalid.

Copy link
Copy Markdown
Member

@markstory markstory May 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can enforce that the headers kwarg is there explicitly. def mytask(**kwargs): is then just invalid.

I was more thinking of the scenario where a task has an incompatible headers parameter like:

def some_task(headers: set[str]):

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as in, you have a pass_activation option?

Yeah, but as I think about it more I like this idea less, and pass_headers is the better solution.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think the current version covers this. it requires the header arg to be there, to have the right type, and to be explicit

untitaker and others added 2 commits May 7, 2026 18:16
…=True

Use inspect module to check at task registration time that:
- The function has a 'headers' parameter when pass_headers=True
- The 'headers' parameter is not positional-only

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Comment thread clients/python/src/taskbroker_client/task.py
Comment thread clients/python/src/taskbroker_client/task.py
Comment thread clients/python/src/taskbroker_client/task.py
@untitaker untitaker requested review from evanh and markstory May 7, 2026 18:23
activation = multi_task.create_activation([], {})
assert activation.headers["x-test-context"] == "dispatched"
assert activation.headers["x-another"] == "also-here"

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a test for the case when the user passes headers as one of the kwargs to their function?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. testing for a specific error doesn't make a lot of sense if there's no explicit errorhandling for it, so I added more checks to the code too. i think it's a bit bloated now, but probably more user-friendly.

Copy link
Copy Markdown
Member

@markstory markstory left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me. The test addition that Evan mentioned would be a good one to have.

Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

Reviewed by Cursor Bugbot for commit 6e002f9. Configure here.

if origin not in expected_types:
raise TypeError(
f"{context}: {param_name!r} parameter has type {param.annotation!r}. "
f"Expected one of: {', '.join(t.__name__ for t in expected_types)}."
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any.__name__ crashes error formatting on Python 3.10

Low Severity

The error message in assert_typed_kwarg uses t.__name__ to format all types in expected_types, but typing.Any does not have a __name__ attribute on Python 3.10 (where it's a _SpecialForm instance, not a class). This causes an AttributeError to be raised instead of the intended helpful TypeError when a user provides an incompatible type annotation for headers. The expected_types tuple at the call site includes Any, so the generator expression t.__name__ for t in expected_types will crash when it reaches Any.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 6e002f9. Configure here.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't care. Pick better types.

@untitaker untitaker merged commit f94fcdc into main May 8, 2026
23 checks passed
@untitaker untitaker deleted the feat/pass-headers-to-task branch May 8, 2026 17:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants