From de95257eeda04b017be5d0324a7096c8a922d6be Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Thu, 3 Apr 2025 08:33:14 -0700 Subject: [PATCH] store: Revert entity versions during copy instead of after Reverting entities after copying can be very slow; the step is also unnecessary since we already know during copying which entity versions need to be unclamped and we never copy versions that would have to be deleted by the revert. We move the revert logic into CopyEntityBatchQuery so that entity versions are reverted as they are copied rather than in a separate revert_block pass after copying completes. The post-copy revert_block call in start_subgraph is kept as a no-op safety net for copies that were started with older code and resumed after upgrading. It can be removed once a release with this logic has been out for long enough. --- store/postgres/src/copy.rs | 34 +++++++++++++++++------- store/postgres/src/deployment_store.rs | 20 ++++++++++---- store/postgres/src/relational_queries.rs | 24 ++++++++++++++++- 3 files changed, 63 insertions(+), 15 deletions(-) diff --git a/store/postgres/src/copy.rs b/store/postgres/src/copy.rs index 22c37bf2498..05939362352 100644 --- a/store/postgres/src/copy.rs +++ b/store/postgres/src/copy.rs @@ -178,7 +178,14 @@ impl CopyState { dst: Arc, target_block: BlockPtr, ) -> Result { - let tables = TableState::load(conn, primary, src.as_ref(), dst.as_ref()).await?; + let tables = TableState::load( + conn, + primary, + src.as_ref(), + dst.as_ref(), + target_block.number, + ) + .await?; let (finished, mut unfinished): (Vec<_>, Vec<_>) = tables.into_iter().partition(|table| table.finished()); unfinished.sort_by_key(|table| table.dst.object.to_string()); @@ -329,6 +336,7 @@ struct TableState { dst_site: Arc, batcher: VidBatcher, duration_ms: i64, + target_block: BlockNumber, } impl TableState { @@ -351,6 +359,7 @@ impl TableState { dst_site, batcher, duration_ms: 0, + target_block: target_block.number, }) } @@ -363,6 +372,7 @@ impl TableState { primary: Primary, src_layout: &Layout, dst_layout: &Layout, + target_block: BlockNumber, ) -> Result, StoreError> { use copy_table_state as cts; @@ -429,6 +439,7 @@ impl TableState { dst_site: dst_layout.site.clone(), batcher, duration_ms, + target_block, }; states.push(state); } @@ -503,15 +514,20 @@ impl TableState { } async fn copy_batch(&mut self, conn: &mut AsyncPgConnection) -> Result { - let (duration, count) = self + let (duration, count): (_, Option) = self .batcher - .step(async |start, end| { - let count = - rq::CopyEntityBatchQuery::new(self.dst.as_ref(), &self.src, start, end)? - .count_current() - .get_result::(conn) - .await - .optional()?; + .step(async |start: i64, end: i64| { + let count = rq::CopyEntityBatchQuery::new( + self.dst.as_ref(), + &self.src, + start, + end, + self.target_block, + )? + .count_current() + .get_result::(conn) + .await + .optional()?; Ok(count.unwrap_or(0) as i32) }) .await?; diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index df4e34c448a..a6caec5637c 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -1651,11 +1651,21 @@ impl DeploymentStore { .await?; } - // Rewind the subgraph so that entity versions that are - // clamped in the future (beyond `block`) become valid for - // all blocks after `block`. `revert_block` gets rid of - // everything including the block passed to it. We want to - // preserve `block` and therefore revert `block+1` + // CopyEntityBatchQuery now reverts entity versions + // during copying, making this rewind redundant for new + // copies. We keep it for backward compatibility: a copy + // that was started before this change and is resumed + // after upgrading will have already-copied rows that + // weren't reverted during copy. For data that was + // already reverted during copy, this is a no-op. This + // code can be removed once a release with this change + // has been out for a while and we are sure that there + // are no more copies in progress that started before + // the change + // + // `revert_block` gets rid of everything including the + // block passed to it. We want to preserve `block` and + // therefore revert `block+1` let start = Instant::now(); let block_to_revert: BlockNumber = block .number diff --git a/store/postgres/src/relational_queries.rs b/store/postgres/src/relational_queries.rs index 46655c21c71..a538bdaddf8 100644 --- a/store/postgres/src/relational_queries.rs +++ b/store/postgres/src/relational_queries.rs @@ -5091,6 +5091,7 @@ pub struct CopyEntityBatchQuery<'a> { columns: Vec<&'a Column>, first_vid: i64, last_vid: i64, + target_block: BlockNumber, } impl<'a> CopyEntityBatchQuery<'a> { @@ -5099,6 +5100,7 @@ impl<'a> CopyEntityBatchQuery<'a> { src: &'a Table, first_vid: i64, last_vid: i64, + target_block: BlockNumber, ) -> Result { let mut columns = Vec::new(); for dcol in &dst.columns { @@ -5125,6 +5127,7 @@ impl<'a> CopyEntityBatchQuery<'a> { columns, first_vid, last_vid, + target_block, }) } @@ -5209,7 +5212,16 @@ impl<'a> QueryFragment for CopyEntityBatchQuery<'a> { ); out.push_sql(&checked_conversion); } - (false, false) => out.push_sql(BLOCK_RANGE_COLUMN), + (false, false) => { + let range_conv = format!( + r#" + case when upper({BLOCK_RANGE_COLUMN}) > {} + then int4range(lower({BLOCK_RANGE_COLUMN}), null) + else {BLOCK_RANGE_COLUMN} end"#, + self.target_block + ); + out.push_sql(&range_conv) + } } match (self.src.has_causality_region, self.dst.has_causality_region) { @@ -5239,6 +5251,16 @@ impl<'a> QueryFragment for CopyEntityBatchQuery<'a> { out.push_bind_param::(&self.first_vid)?; out.push_sql(" and vid <= "); out.push_bind_param::(&self.last_vid)?; + out.push_sql(" and "); + if self.src.immutable { + out.push_sql(BLOCK_COLUMN); + } else { + out.push_sql("lower("); + out.push_sql(BLOCK_RANGE_COLUMN); + out.push_sql(")"); + } + out.push_sql(" <= "); + out.push_bind_param::(&self.target_block)?; out.push_sql("\n returning "); if self.dst.immutable { out.push_sql("true");