Skip to content
Open
3 changes: 3 additions & 0 deletions relay-server/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ pub const DEFAULT_CHECK_IN_CLIENT: &str = "relay-http";
#[cfg(feature = "processing")]
pub const DEFAULT_ATTACHMENT_RETENTION: Duration = Duration::from_hours(24 * 30);

#[cfg(feature = "processing")]
pub const DEFAULT_PROFILE_RETENTION: Duration = Duration::from_hours(24 * 90);

/// Magic number indicating the dying message file is encoded by sentry-switch SDK.
pub const NNSWITCH_SENTRY_MAGIC: &[u8] = b"sntr";

Expand Down
16 changes: 16 additions & 0 deletions relay-server/src/processing/forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,22 @@ impl<'a> StoreHandle<'a> {
}
}

/// Tries to send a message to the [`Objectstore`] service.
///
/// Returns the message back if the service is not configured,
/// allowing the caller to handle the fallback.
pub fn try_send_to_objectstore<M>(&self, message: M) -> Option<M>
where
Objectstore: FromMessage<M>,
{
if let Some(objectstore) = self.objectstore {
objectstore.send(message);
None
} else {
Some(message)
}
}

/// Dispatches an envelopes to either the [`Objectstore`] or [`Store`] service.
pub fn send_envelope(&self, envelope: ManagedEnvelope) {
use crate::services::store::StoreEnvelope;
Expand Down
35 changes: 29 additions & 6 deletions relay-server/src/processing/profile_chunks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ impl Forward for ProfileChunkOutput {
s: processing::forward::StoreHandle<'_>,
ctx: processing::ForwardContext<'_>,
) -> Result<(), Rejected<()>> {
use crate::services::objectstore::StoreRawProfile;
use crate::services::store::StoreProfileChunk;

let expanded = match self {
Expand All @@ -243,12 +244,34 @@ impl Forward for ProfileChunkOutput {
let retention_days = ctx.event_retention().standard;

for chunk in expanded.split(|e| e.chunks) {
s.send_to_store(chunk.map(|chunk, _| StoreProfileChunk {
retention_days,
payload: chunk.payload,
quantities: chunk.quantities,
raw_profile: chunk.raw_profile,
}));
if chunk.raw_profile.is_some() {
let msg = chunk.map(|chunk, _| {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be possible to move the if into the map and then use if let Some(raw_profile) = chunk.raw_profile.

let raw_profile = chunk.raw_profile.unwrap();
StoreRawProfile {
payload: raw_profile.payload,
content_type: raw_profile.content_type,
store_message: StoreProfileChunk {
retention_days,
Comment thread
sentry[bot] marked this conversation as resolved.
payload: chunk.payload,
quantities: chunk.quantities,
raw_profile_object_store_key: None,
raw_profile_content_type: None,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Content type omitted after failures

Medium Severity

When a chunk has a raw profile, StoreProfileChunk is built with raw_profile_content_type set to None, and objectstore only fills it after a successful upload. Previously, Kafka always included the content type whenever raw_profile was present. Failed uploads, session errors, objectstore fallback, and load-shed paths now emit messages without that field despite an ingested Perfetto trace.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit dc7c95b. Configure here.

},
retention: retention_days,
}
});
if let Some(unsent) = s.try_send_to_objectstore(msg) {
s.send_to_store(unsent.map(|profile, _| profile.store_message));
}
Comment on lines +263 to +265

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only need this branch if we want to support perfetto profiles in self-hosted, which has no objectstore yet by default. In that case, we also need the kafka consumer to handler the raw_profile_object_store_key correctly.

So the simpler approach would be to never send raw profiles via kafka, and accept that perfetto is not enabled in self-hosted.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As relay takes care of parsing the perfetto profile into a sample v2 format I think it's still worth sending the profile chunk kafka message, no matter if object store is available or not. If it's not available, the kafka message would still be sent, just without a raw_profile_object_store_key set.

So the simpler approach would be to never send raw profiles via kafka

Maybe I'm misunderstanding something here, but that's the intention of this PR. Instead of packing the raw profile into the kafka message, we reference using raw_profile_object_store_key instead, which is only set when objectstore is available and successfully stored the raw profile.

} else {
s.send_to_store(chunk.map(|chunk, _| StoreProfileChunk {
retention_days,
payload: chunk.payload,
quantities: chunk.quantities,
raw_profile_object_store_key: None,
raw_profile_content_type: None,
}));
}
}

Ok(())
Expand Down
134 changes: 127 additions & 7 deletions relay-server/src/services/objectstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@ use relay_system::{
};
use sentry_protos::snuba::v1::TraceItem;

use crate::constants::DEFAULT_ATTACHMENT_RETENTION;
use crate::envelope::{Item, ItemType};
use crate::constants::{DEFAULT_ATTACHMENT_RETENTION, DEFAULT_PROFILE_RETENTION};
use crate::envelope::{ContentType, Item, ItemType};
use crate::managed::{
Counted, Managed, ManagedEnvelope, ManagedResult, OutcomeError, Quantities, Rejected,
};
use crate::processing::utils::store::item_id_to_uuid;
use crate::services::outcome::DiscardReason;
use crate::services::store::{Store, StoreAttachment, StoreEnvelope, StoreTraceItem};
use crate::services::store::{
Store, StoreAttachment, StoreEnvelope, StoreProfileChunk, StoreTraceItem,
};
use crate::services::upload::ByteStream;
use crate::statsd::{RelayCounters, RelayTimers};
use crate::utils::{BoundedStream, MeteredStream, RetryableStream, TakeOnce};
Expand All @@ -38,6 +40,7 @@ pub enum Objectstore {
Envelope(StoreEnvelope),
TraceAttachment(Managed<StoreTraceAttachment>),
EventAttachment(Managed<StoreAttachment>),
RawProfile(Managed<StoreRawProfile>),
Stream(Stream, Sender<Result<ObjectstoreKey, Error>>),
}

Expand All @@ -47,6 +50,7 @@ impl Objectstore {
Self::Envelope(_) => MessageKind::Envelope,
Self::TraceAttachment(_) => MessageKind::TraceAttachment,
Self::EventAttachment(_) => MessageKind::EventAttachment,
Self::RawProfile(_) => MessageKind::RawProfile,
Self::Stream { .. } => MessageKind::Stream,
}
}
Expand All @@ -60,6 +64,7 @@ impl Objectstore {
.count(),
Self::TraceAttachment(_) => 1,
Self::EventAttachment(_) => 1,
Self::RawProfile(_) => 1,
Self::Stream { .. } => 1,
}
}
Expand Down Expand Up @@ -91,12 +96,21 @@ impl FromMessage<Managed<StoreAttachment>> for Objectstore {
}
}

impl FromMessage<Managed<StoreRawProfile>> for Objectstore {
type Response = NoResponse;

fn from_message(message: Managed<StoreRawProfile>, _sender: ()) -> Self {
Self::RawProfile(message)
}
}

/// A type tag used for logging.
#[derive(Debug, Clone, Copy)]
enum MessageKind {
Envelope,
EventAttachment,
TraceAttachment,
RawProfile,
Stream,
}

Expand All @@ -106,6 +120,7 @@ impl MessageKind {
Self::Envelope => "envelope",
Self::EventAttachment => "attachment",
Self::TraceAttachment => "attachment_v2",
Self::RawProfile => "profile_raw",
Self::Stream => "stream",
}
}
Expand Down Expand Up @@ -143,6 +158,28 @@ impl Counted for StoreTraceAttachment {
}
}

/// A raw profile (e.g. Perfetto trace) ready for objectstore upload.
///
/// After upload, the [`StoreProfileChunk`] is forwarded to the Store service
/// with the objectstore key set, so the Kafka message carries a reference
/// instead of the full binary blob.
pub struct StoreRawProfile {
/// The raw binary profile payload to upload.
pub payload: Bytes,
/// Content type of the raw profile.
pub content_type: ContentType,
/// The profile chunk message to forward to Kafka after upload.
pub store_message: StoreProfileChunk,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks unnecessarily nested. The objectstore service should be able to construct the StoreProfileChunk message from scratch when it sends the data to the store service.

/// Data retention in days.
pub retention: u16,
}

impl Counted for StoreRawProfile {
fn quantities(&self) -> Quantities {
self.store_message.quantities()
}
}

#[derive(Debug, thiserror::Error)]
#[error("objectstore upload failed")]
pub struct Error {
Expand Down Expand Up @@ -302,12 +339,15 @@ impl ObjectstoreService {
.with_expiration_policy(ExpirationPolicy::TimeToLive(DEFAULT_ATTACHMENT_RETENTION));
let trace_attachments = Usecase::new("trace_attachments")
.with_expiration_policy(ExpirationPolicy::TimeToLive(DEFAULT_ATTACHMENT_RETENTION));
let profiles = Usecase::new("profile_raw")
.with_expiration_policy(ExpirationPolicy::TimeToLive(DEFAULT_PROFILE_RETENTION));

let inner = ObjectstoreServiceInner {
store,
objectstore_client,
event_attachments,
trace_attachments,
profiles,
timeout: Duration::from_secs(*timeout),
stream_timeout: Duration::from_secs(*stream_timeout),
retry_interval: Duration::from_secs_f64(*retry_delay),
Expand Down Expand Up @@ -344,6 +384,11 @@ impl LoadShed<Objectstore> for ObjectstoreService {
Objectstore::TraceAttachment(managed) => {
let _ = managed.reject_err(error);
}
Objectstore::RawProfile(managed) => {
self.inner
.store
.send(managed.map(|profile, _| profile.store_message));
}
Objectstore::Stream(_, sender) => {
sender.send(Err(error));
}
Expand All @@ -357,6 +402,7 @@ struct ObjectstoreServiceInner {
objectstore_client: Client,
event_attachments: Usecase,
trace_attachments: Usecase,
profiles: Usecase,
timeout: Duration,
stream_timeout: Duration,
retry_interval: Duration,
Expand All @@ -381,6 +427,9 @@ impl ObjectstoreServiceInner {
Objectstore::EventAttachment(attachment) => {
self.handle_event_attachment(attachment).await;
}
Objectstore::RawProfile(profile) => {
self.handle_raw_profile(profile).await;
}
Objectstore::Stream(stream, sender) => {
let result = self.handle_stream(stream).await;
if let Err(error) = &result {
Expand Down Expand Up @@ -424,6 +473,7 @@ impl ObjectstoreServiceInner {
attachment.payload(),
retention,
None,
None,
)
.await;

Expand Down Expand Up @@ -469,6 +519,7 @@ impl ObjectstoreServiceInner {
attachment.attachment.payload(),
attachment.retention,
None,
None,
)
.await;

Expand Down Expand Up @@ -532,6 +583,7 @@ impl ObjectstoreServiceInner {
body,
retention,
Some(key),
None,
)
.await
.reject(&trace_item)?;
Expand All @@ -546,6 +598,53 @@ impl ObjectstoreServiceInner {
Ok(())
}

async fn handle_raw_profile(&self, managed: Managed<StoreRawProfile>) {
Comment thread
markushi marked this conversation as resolved.
let scoping = managed.scoping();
let session = self
.profiles
.for_project(scoping.organization_id.value(), scoping.project_id.value())
.session(&self.objectstore_client);

let payload = managed.payload.clone();
let content_type = managed.content_type;
let retention = managed.retention;

let mut store_message = managed.map(|profile, _| profile.store_message);

match session {
Err(error) => Error::from(error).log(MessageKind::RawProfile),
Ok(session) if !payload.is_empty() => {
let result = self
.upload_bytes(
MessageKind::RawProfile,
&session,
payload,
retention,
None,
Some(content_type),
)
.await;

match result {
Ok(stored_key) => {
store_message.modify(|msg, _| {
msg.raw_profile_object_store_key = Some(stored_key.into_inner());
msg.raw_profile_content_type = Some(content_type);
});
}
Err(error) => {
error.log(MessageKind::RawProfile);
}
}
}
Ok(_) => {}
}

// Always forward to store even if the raw profile upload failed,
// to ensure the kafka message is produced.
self.store.send(store_message);
}

async fn handle_stream(&self, stream: Stream) -> Result<ObjectstoreKey, Error> {
let Stream {
organization_id,
Expand All @@ -564,6 +663,7 @@ impl ObjectstoreServiceInner {
Some(key),
Body::Stream(TakeOnce::new(stream)),
None,
None,
)
.await
}
Expand All @@ -575,10 +675,18 @@ impl ObjectstoreServiceInner {
payload: Bytes,
retention: u16,
key: Option<String>,
content_type: Option<ContentType>,
) -> Result<ObjectstoreKey, Error> {
let retention_hours = retention.checked_mul(24);
self.upload(kind, session, key, Body::Bytes(payload), retention_hours)
.await
self.upload(
kind,
session,
key,
Body::Bytes(payload),
retention_hours,
content_type,
)
.await
}

async fn upload(
Expand All @@ -588,6 +696,7 @@ impl ObjectstoreServiceInner {
key: Option<String>,
body: Body,
retention_hours: Option<u16>,
content_type: Option<ContentType>,
) -> Result<ObjectstoreKey, Error> {
let mut attempts = 0;
let timeout = match &body {
Expand All @@ -602,8 +711,15 @@ impl ObjectstoreServiceInner {
};
attempts += 1;
result.replace(
self.attempt_upload(kind, session, key.clone(), body, retention_hours)
.await,
self.attempt_upload(
kind,
session,
key.clone(),
body,
retention_hours,
content_type,
)
.await,
);

if attempts < self.max_attempts.get()
Expand Down Expand Up @@ -642,12 +758,16 @@ impl ObjectstoreServiceInner {
key: Option<String>,
body: BodyAttempt,
retention_hours: Option<u16>,
content_type: Option<ContentType>,
) -> Result<ObjectstoreKey, objectstore_client::Error> {
let mut request = match body {
BodyAttempt::Bytes(bytes) => session.put(bytes),
BodyAttempt::Stream(stream) => session.put_stream(stream.boxed()),
};

if let Some(content_type) = content_type {
request = request.content_type(content_type.as_str());
}
if let Some(retention_hours) = retention_hours {
request = request.expiration_policy(ExpirationPolicy::TimeToLive(
Duration::from_hours(retention_hours.into()),
Expand Down
Loading
Loading