feat(taskbroker-client): Add pass_headers option to task registration#623
feat(taskbroker-client): Add pass_headers option to task registration#623
Conversation
Allow tasks to opt into receiving activation headers by setting pass_headers=True in the @register() decorator. When enabled, the task function receives headers as a keyword argument (dict[str, str]). This provides a simpler alternative to implementing a full ContextHook for tasks that just need direct access to header values. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
77075a8 to
15615c2
Compare
Add `process_profile_from_kafka` task that accepts raw Kafka message bytes for use with taskbroker's passthrough mode. This allows taskbroker to read directly from the ingest-profiles Kafka topic. Changes: - New task `process_profile_from_kafka` in `ingest.profiling.passthrough` namespace - Refactored `_process_profile_message` to share logic between consumer and task - Removed base64 encoding - tasks now accept bytes directly - Use Kafka headers for killswitch/sampled when available, fall back to message body Depends on getsentry/taskbroker#623 for exposing Kafka headers to tasks. STREAM-882
Add `process_profile_from_kafka` task that accepts raw Kafka message bytes for use with taskbroker's passthrough mode. This allows taskbroker to read directly from the ingest-profiles Kafka topic. Changes: - New task `process_profile_from_kafka` in `ingest.profiling.passthrough` namespace - Refactored `_process_profile_message` to share logic between consumer and task - Removed base64 encoding - tasks now accept bytes directly - Use Kafka headers for killswitch/sampled when available, fall back to message body Depends on getsentry/taskbroker#623 for exposing Kafka headers to tasks. STREAM-882
markstory
left a comment
There was a problem hiding this comment.
I think the general design is reasonable. Another potential solution would be to pass the entire activation into the task, and let userland code pick the headers out. I prefer passing just the headers over the the entire activation though.
| compression_type=compression_type, | ||
| report_timeout_errors=report_timeout_errors, | ||
| silenced_exceptions=silenced_exceptions, | ||
| pass_headers=pass_headers, |
There was a problem hiding this comment.
Since we're going to inject parameters into the task, should we validate that the task doesn't already have a headers parameter with an incompatible type?
There was a problem hiding this comment.
Another potential solution would be to pass the entire activation into the task,
as in, you have a pass_activation option?
Since we're going to inject parameters into the task, should we validate that the task doesn't already have a headers parameter with an incompatible type?
this feels magical but i can do it
There was a problem hiding this comment.
I think we can enforce that the headers kwarg is there explicitly. def mytask(**kwargs): is then just invalid.
There was a problem hiding this comment.
I think we can enforce that the headers kwarg is there explicitly. def mytask(**kwargs): is then just invalid.
I was more thinking of the scenario where a task has an incompatible headers parameter like:
def some_task(headers: set[str]):There was a problem hiding this comment.
as in, you have a pass_activation option?
Yeah, but as I think about it more I like this idea less, and pass_headers is the better solution.
There was a problem hiding this comment.
i think the current version covers this. it requires the header arg to be there, to have the right type, and to be explicit
…=True Use inspect module to check at task registration time that: - The function has a 'headers' parameter when pass_headers=True - The 'headers' parameter is not positional-only Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
| activation = multi_task.create_activation([], {}) | ||
| assert activation.headers["x-test-context"] == "dispatched" | ||
| assert activation.headers["x-another"] == "also-here" | ||
|
|
There was a problem hiding this comment.
Can you add a test for the case when the user passes headers as one of the kwargs to their function?
There was a problem hiding this comment.
ok. testing for a specific error doesn't make a lot of sense if there's no explicit errorhandling for it, so I added more checks to the code too. i think it's a bit bloated now, but probably more user-friendly.
markstory
left a comment
There was a problem hiding this comment.
Looks good to me. The test addition that Evan mentioned would be a good one to have.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit 6e002f9. Configure here.
| if origin not in expected_types: | ||
| raise TypeError( | ||
| f"{context}: {param_name!r} parameter has type {param.annotation!r}. " | ||
| f"Expected one of: {', '.join(t.__name__ for t in expected_types)}." |
There was a problem hiding this comment.
Any.__name__ crashes error formatting on Python 3.10
Low Severity
The error message in assert_typed_kwarg uses t.__name__ to format all types in expected_types, but typing.Any does not have a __name__ attribute on Python 3.10 (where it's a _SpecialForm instance, not a class). This causes an AttributeError to be raised instead of the intended helpful TypeError when a user provides an incompatible type annotation for headers. The expected_types tuple at the call site includes Any, so the generator expression t.__name__ for t in expected_types will crash when it reaches Any.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit 6e002f9. Configure here.
There was a problem hiding this comment.
I don't care. Pick better types.


Summary
pass_headersparameter to@namespace.register()decoratorpass_headers=True, the task function receives activation headers as aheaderskeyword argument (dict[str, str])ContextHookfor tasks that just need direct access to header valuesWe need this kind of functionality when using raw mode together with the profiles consumer. Mainly for killswitches.
ref STREAM-882
Usage
Test plan
pass_headersattribute on Task🤖 Generated with Claude Code