Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 30 additions & 92 deletions graph/src/components/store/entity_cache.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::{anyhow, bail};
use std::borrow::Borrow;
use std::collections::HashMap;
use std::collections::{BTreeSet, HashMap};
use std::fmt::{self, Debug};
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
Expand Down Expand Up @@ -291,114 +291,52 @@ impl EntityCache {
causality_region: eref.causality_region,
};

let mut entity_map = self.store.get_derived(&query).await?;
// Entities that satisfied the query at the start of this block.
let stored = self.store.get_derived(&query).await?;

for (key, entity) in entity_map.iter() {
// Only insert to the cache if it's not already there
for (key, entity) in &stored {
if !self.current.contains_key(key) {
self.current
.insert(key.clone(), Some(Arc::new(entity.clone())));
}
}

let mut keys_to_remove = Vec::new();

// Apply updates from `updates` and `handler_updates` directly to entities in `entity_map` that match the query
for (key, entity) in entity_map.iter_mut() {
let op = match (
self.updates.get(key).cloned(),
self.handler_updates.get(key).cloned(),
) {
(Some(op), None) | (None, Some(op)) => op,
(Some(mut op), Some(op2)) => {
op.accumulate(op2);
op
}
(None, None) => continue,
};

let updated_entity = op
.apply_to(&Some(&*entity))
.map_err(|e| key.unknown_attribute(e))?;

if let Some(updated_entity) = updated_entity {
*entity = updated_entity;
} else {
// if entity_arc is None, it means that the entity was removed by an update
// mark the key for removal from the map
keys_to_remove.push(key.clone());
// Candidate set: keys that were matching at baseline, plus keys
// any in-block write has touched whose entity_type and causality
// region are compatible with the query. The latter catches
// entities an in-block write has moved into the matching set or
// created fresh in this block.
let mut candidates: BTreeSet<EntityKey> = stored.keys().cloned().collect();
for key in self.updates.keys().chain(self.handler_updates.keys()) {
if key.entity_type == query.entity_type
&& key.causality_region == query.causality_region
{
candidates.insert(key.clone());
}
}

// A helper function that checks if an update matches the query and returns the updated entity if it does
fn matches_query(
op: &EntityOp,
query: &DerivedEntityQuery,
key: &EntityKey,
) -> Result<Option<Entity>, anyhow::Error> {
match op {
EntityOp::Update(entity) | EntityOp::Overwrite(entity)
if query.matches(key, entity) =>
{
Ok(Some(entity.clone()))
}
EntityOp::Remove => Ok(None),
_ => Ok(None),
let mut result = Vec::new();
for key in candidates {
// Resolve the entity's final in-block state by layering
// store baseline, then self.updates, then self.handler_updates.
// Each layer's op may mutate, replace, or remove the entity.
let mut entity: Option<Entity> = stored.get(&key).cloned();
if let Some(op) = self.updates.get(&key).cloned() {
entity = op.apply_to(&entity).map_err(|e| key.unknown_attribute(e))?;
}
}

// Iterate over self.updates to find entities that:
// - Aren't already present in the entity_map
// - Match the query
// If these conditions are met:
// - Check if there's an update for the same entity in handler_updates and apply it.
// - Add the entity to entity_map.
for (key, op) in self.updates.iter() {
if !entity_map.contains_key(key)
&& let Some(entity) = matches_query(op, &query, key)?
{
if let Some(handler_op) = self.handler_updates.get(key).cloned() {
// If there's a corresponding update in handler_updates, apply it to the entity
// and insert the updated entity into entity_map
let mut entity = Some(entity);
entity = handler_op
.apply_to(&entity)
.map_err(|e| key.unknown_attribute(e))?;

if let Some(updated_entity) = entity {
entity_map.insert(key.clone(), updated_entity);
}
} else {
// If there isn't a corresponding update in handler_updates or the update doesn't match the query, just insert the entity from self.updates
entity_map.insert(key.clone(), entity);
}
if let Some(op) = self.handler_updates.get(&key).cloned() {
entity = op.apply_to(&entity).map_err(|e| key.unknown_attribute(e))?;
}
}

// Iterate over handler_updates to find entities that:
// - Aren't already present in the entity_map.
// - Aren't present in self.updates.
// - Match the query.
// If these conditions are met, add the entity to entity_map.
for (key, handler_op) in self.handler_updates.iter() {
if !entity_map.contains_key(key)
&& !self.updates.contains_key(key)
&& let Some(entity) = matches_query(handler_op, &query, key)?
// Include the entity only if its final state still matches the query.
if let Some(entity) = entity
&& query.matches(&key, &entity)
{
entity_map.insert(key.clone(), entity);
result.push(entity);
}
}

// Remove entities that are in the store but have been removed by an update.
// We do this last since the loops over updates and handler_updates are only
// concerned with entities that are not in the store yet and by leaving removed
// keys in entity_map we avoid processing these updates a second time when we
// already looked at them when we went through entity_map
for key in keys_to_remove {
entity_map.remove(&key);
}

Ok(entity_map.into_values().collect())
Ok(result)
}

pub fn remove(&mut self, key: EntityKey) {
Expand Down
Loading
Loading