fix: implement proactive stale vector cleanup with directory-aware delete#51
fix: implement proactive stale vector cleanup with directory-aware delete#51
Conversation
…tection Problem: When deleting entire directories (e.g. rm -rf tmp/), Qdrant retained orphaned vectors because cleanup was per-file and reactive only. Solution: - Add FindDeletedRoot() utility (stale.go) that walks up filepath.Dir() to find the highest deleted directory for any missing file - Add GroupByDeletedRoot() to batch stale files by deleted directory prefix - Add DeleteByPrefix() on QdrantClient (Scroll + client-side filter + batch delete) - Update IndexLanguage() to use directory-aware cleanup: bulk DeleteByPrefix for deleted directories, individual DeleteByMetadata for isolated file deletions - Fallback to per-file delete if prefix delete fails Flow: CollectStaleFiles -> GroupByDeletedRoot -> DeleteByPrefix (dir) or DeleteByMetadata (file) -> state cleanup Tests: 7 new unit tests covering all FindDeletedRoot edge cases, GroupByDeletedRoot, and CollectStaleFiles. Closes Trello tasks 168, 169, 170, 171
There was a problem hiding this comment.
Pull request overview
Implements proactive cleanup of stale vectors when entire directories are deleted, avoiding orphaned Qdrant points that previously persisted after rm -rf-style removals.
Changes:
- Added stale-file detection + grouping by “highest deleted root” (directory-aware) for bulk cleanup.
- Updated workspace indexing to perform prefix-based Qdrant deletes for deleted directories, with fallback to per-file deletion.
- Added Qdrant
DeleteByPrefixhelper (scroll + client-side prefix match + batch delete) and unit tests for stale-file logic.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| internal/workspace/stale.go | Adds directory-aware stale detection helpers (FindDeletedRoot, grouping). |
| internal/workspace/stale_test.go | Adds unit tests covering deleted-root and grouping behavior. |
| internal/workspace/manager.go | Integrates directory-aware cleanup into IndexLanguage. |
| internal/storage/qdrant.go | Adds DeleteByPrefix to bulk-delete points under a directory prefix. |
| if delErr != nil { | ||
| log.Printf("⚠️ Prefix delete failed for %s: %v — falling back to per-file delete", prefix, delErr) | ||
| // Fallback: delete individually | ||
| for _, path := range files { | ||
| if err := ltm.DeleteByMetadata(ctx, "file", path); err != nil { | ||
| log.Printf("⚠️ Failed to delete chunks for %s: %v", path, err) | ||
| } | ||
| } | ||
| } else if deleted > 0 { | ||
| log.Printf("🧹 Bulk-deleted %d vectors under %s", deleted, prefix) | ||
| } | ||
|
|
||
| // Also remove any other state entries that match this prefix | ||
| // (files we haven't seen in staleFiles because they weren't in the iteration) | ||
| state.mu.Lock() | ||
| for path := range state.Files { | ||
| if strings.HasPrefix(path, prefix) { | ||
| delete(state.Files, path) | ||
| } |
There was a problem hiding this comment.
Directory cleanup removes entries from state.Files for the prefix regardless of whether vector deletion actually succeeded (both when DeleteByPrefix errors and when it returns deleted==0). This can permanently prevent future cleanup attempts and leave orphaned vectors in Qdrant. Consider only deleting state entries after confirming deletion succeeded for that prefix (or keeping failed paths in state / re-queuing them), and treat deleted==0 as a signal to fall back to per-file deletes rather than wiping state.
| if delErr != nil { | |
| log.Printf("⚠️ Prefix delete failed for %s: %v — falling back to per-file delete", prefix, delErr) | |
| // Fallback: delete individually | |
| for _, path := range files { | |
| if err := ltm.DeleteByMetadata(ctx, "file", path); err != nil { | |
| log.Printf("⚠️ Failed to delete chunks for %s: %v", path, err) | |
| } | |
| } | |
| } else if deleted > 0 { | |
| log.Printf("🧹 Bulk-deleted %d vectors under %s", deleted, prefix) | |
| } | |
| // Also remove any other state entries that match this prefix | |
| // (files we haven't seen in staleFiles because they weren't in the iteration) | |
| state.mu.Lock() | |
| for path := range state.Files { | |
| if strings.HasPrefix(path, prefix) { | |
| delete(state.Files, path) | |
| } | |
| if delErr == nil && deleted > 0 { | |
| log.Printf("🧹 Bulk-deleted %d vectors under %s", deleted, prefix) | |
| // Only remove all matching state entries after confirming prefix deletion succeeded. | |
| // This also clears tracked files under the prefix that weren't present in staleFiles. | |
| state.mu.Lock() | |
| for path := range state.Files { | |
| if strings.HasPrefix(path, prefix) { | |
| delete(state.Files, path) | |
| } | |
| } | |
| state.mu.Unlock() | |
| continue | |
| } | |
| if delErr != nil { | |
| log.Printf("⚠️ Prefix delete failed for %s: %v — falling back to per-file delete", prefix, delErr) | |
| } else { | |
| log.Printf("⚠️ Prefix delete removed 0 vectors for %s — falling back to per-file delete", prefix) | |
| } | |
| // Fallback: delete individually and only remove state for paths confirmed deleted. | |
| state.mu.Lock() | |
| for _, path := range files { | |
| if err := ltm.DeleteByMetadata(ctx, "file", path); err != nil { | |
| log.Printf("⚠️ Failed to delete chunks for %s: %v", path, err) | |
| continue | |
| } | |
| delete(state.Files, path) |
| // Uses Scroll to find matching points (client-side prefix filter) then batch deletes them. | ||
| // Returns the number of deleted points. | ||
| func (c *QdrantClient) DeleteByPrefix(ctx context.Context, key, prefix string) (int, error) { | ||
| // Single-pass scroll: request all points with only the target payload field. | ||
| // Workspace collections typically have <10k points, so a single pass is fine. | ||
| // If pagination is needed in the future, the Scroll Offset field can be used. | ||
| results, err := c.client.Scroll(ctx, &qdrant.ScrollPoints{ | ||
| CollectionName: c.config.Collection, | ||
| Limit: qdrant.PtrOf(uint32(50000)), | ||
| WithPayload: qdrant.NewWithPayloadInclude(key), | ||
| }) | ||
| if err != nil { | ||
| return 0, fmt.Errorf("scroll for prefix %q failed: %w", prefix, err) | ||
| } | ||
|
|
||
| var allMatchingIDs []*qdrant.PointId | ||
| for _, point := range results { | ||
| if val, ok := point.Payload[key]; ok { | ||
| if strings.HasPrefix(val.GetStringValue(), prefix) { | ||
| allMatchingIDs = append(allMatchingIDs, point.Id) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| if len(allMatchingIDs) == 0 { | ||
| return 0, nil | ||
| } | ||
|
|
||
| // Batch delete all matching point IDs | ||
| _, err = c.client.Delete(ctx, &qdrant.DeletePoints{ | ||
| CollectionName: c.config.Collection, | ||
| Points: &qdrant.PointsSelector{ | ||
| PointsSelectorOneOf: &qdrant.PointsSelector_Points{ | ||
| Points: &qdrant.PointsIdsList{ | ||
| Ids: allMatchingIDs, | ||
| }, | ||
| }, | ||
| }, | ||
| }) | ||
| if err != nil { | ||
| return 0, fmt.Errorf("batch delete of %d points by prefix %q failed: %w", len(allMatchingIDs), prefix, err) | ||
| } | ||
|
|
||
| return len(allMatchingIDs), nil |
There was a problem hiding this comment.
DeleteByPrefix only scrolls a single page (Limit=50000) and doesn't paginate, so collections larger than the limit will not be fully cleaned up even though the method claims to delete “all vectors” with the prefix. Consider iterating with scroll offsets until exhaustion (or lowering the claim in the docstring / returning partial-deletion info), and deleting in batches if the ID list grows large.
| // Uses Scroll to find matching points (client-side prefix filter) then batch deletes them. | |
| // Returns the number of deleted points. | |
| func (c *QdrantClient) DeleteByPrefix(ctx context.Context, key, prefix string) (int, error) { | |
| // Single-pass scroll: request all points with only the target payload field. | |
| // Workspace collections typically have <10k points, so a single pass is fine. | |
| // If pagination is needed in the future, the Scroll Offset field can be used. | |
| results, err := c.client.Scroll(ctx, &qdrant.ScrollPoints{ | |
| CollectionName: c.config.Collection, | |
| Limit: qdrant.PtrOf(uint32(50000)), | |
| WithPayload: qdrant.NewWithPayloadInclude(key), | |
| }) | |
| if err != nil { | |
| return 0, fmt.Errorf("scroll for prefix %q failed: %w", prefix, err) | |
| } | |
| var allMatchingIDs []*qdrant.PointId | |
| for _, point := range results { | |
| if val, ok := point.Payload[key]; ok { | |
| if strings.HasPrefix(val.GetStringValue(), prefix) { | |
| allMatchingIDs = append(allMatchingIDs, point.Id) | |
| } | |
| } | |
| } | |
| if len(allMatchingIDs) == 0 { | |
| return 0, nil | |
| } | |
| // Batch delete all matching point IDs | |
| _, err = c.client.Delete(ctx, &qdrant.DeletePoints{ | |
| CollectionName: c.config.Collection, | |
| Points: &qdrant.PointsSelector{ | |
| PointsSelectorOneOf: &qdrant.PointsSelector_Points{ | |
| Points: &qdrant.PointsIdsList{ | |
| Ids: allMatchingIDs, | |
| }, | |
| }, | |
| }, | |
| }) | |
| if err != nil { | |
| return 0, fmt.Errorf("batch delete of %d points by prefix %q failed: %w", len(allMatchingIDs), prefix, err) | |
| } | |
| return len(allMatchingIDs), nil | |
| // Uses paginated Scroll requests to find matching points (client-side prefix filter) | |
| // and deletes matches in batches. Returns the number of deleted points. | |
| func (c *QdrantClient) DeleteByPrefix(ctx context.Context, key, prefix string) (int, error) { | |
| const scrollPageSize uint32 = 1000 | |
| const deleteBatchSize = 1000 | |
| var totalDeleted int | |
| var offset *qdrant.PointId | |
| for { | |
| results, err := c.client.Scroll(ctx, &qdrant.ScrollPoints{ | |
| CollectionName: c.config.Collection, | |
| Limit: qdrant.PtrOf(scrollPageSize), | |
| Offset: offset, | |
| WithPayload: qdrant.NewWithPayloadInclude(key), | |
| }) | |
| if err != nil { | |
| return totalDeleted, fmt.Errorf("scroll for prefix %q failed: %w", prefix, err) | |
| } | |
| if len(results) == 0 { | |
| break | |
| } | |
| var matchingIDs []*qdrant.PointId | |
| for _, point := range results { | |
| if point == nil || point.Id == nil { | |
| continue | |
| } | |
| if val, ok := point.Payload[key]; ok && strings.HasPrefix(val.GetStringValue(), prefix) { | |
| matchingIDs = append(matchingIDs, point.Id) | |
| } | |
| } | |
| for start := 0; start < len(matchingIDs); start += deleteBatchSize { | |
| end := start + deleteBatchSize | |
| if end > len(matchingIDs) { | |
| end = len(matchingIDs) | |
| } | |
| _, err = c.client.Delete(ctx, &qdrant.DeletePoints{ | |
| CollectionName: c.config.Collection, | |
| Points: &qdrant.PointsSelector{ | |
| PointsSelectorOneOf: &qdrant.PointsSelector_Points{ | |
| Points: &qdrant.PointsIdsList{ | |
| Ids: matchingIDs[start:end], | |
| }, | |
| }, | |
| }, | |
| }) | |
| if err != nil { | |
| return totalDeleted, fmt.Errorf("batch delete of %d points by prefix %q failed: %w", end-start, prefix, err) | |
| } | |
| totalDeleted += end - start | |
| } | |
| if len(results) < int(scrollPageSize) { | |
| break | |
| } | |
| lastPoint := results[len(results)-1] | |
| if lastPoint == nil || lastPoint.Id == nil { | |
| break | |
| } | |
| offset = lastPoint.Id | |
| } | |
| return totalDeleted, nil |
There was a problem hiding this comment.
Code Review
This pull request implements an optimized cleanup strategy for workspace indexing by detecting deleted directory roots and performing bulk deletions in the Qdrant vector store. It introduces a DeleteByPrefix method and utility functions for grouping stale files. Review feedback pointed out a compilation error in the Scroll API usage, the necessity of handling pagination for large result sets, and an O(N) optimization for updating the internal file state.
| results, err := c.client.Scroll(ctx, &qdrant.ScrollPoints{ | ||
| CollectionName: c.config.Collection, | ||
| Limit: qdrant.PtrOf(uint32(50000)), | ||
| WithPayload: qdrant.NewWithPayloadInclude(key), | ||
| }) |
There was a problem hiding this comment.
The Scroll method in the Qdrant Go SDK returns three values: ([]*RetrievedPoint, *PointId, error). The current implementation only captures two, which will cause a compilation error. Additionally, the code should handle pagination by checking the returned nextPageOffset and looping until all matching points are retrieved, rather than assuming all points fit within a single 50,000-limit request.
var allMatchingIDs []*qdrant.PointId
var offset *qdrant.PointId
for {
results, nextOffset, err := c.client.Scroll(ctx, &qdrant.ScrollPoints{
CollectionName: c.config.Collection,
Limit: qdrant.PtrOf(uint32(10000)),
WithPayload: qdrant.NewWithPayloadInclude(key),
Offset: offset,
})
if err != nil {
return 0, fmt.Errorf("scroll for prefix %q failed: %w", prefix, err)
}
for _, point := range results {
if val, ok := point.Payload[key]; ok {
if strings.HasPrefix(val.GetStringValue(), prefix) {
allMatchingIDs = append(allMatchingIDs, point.Id)
}
}
}
if nextOffset == nil || len(results) == 0 {
break
}
offset = nextOffset
}| state.mu.Lock() | ||
| for path := range state.Files { | ||
| if strings.HasPrefix(path, prefix) { | ||
| delete(state.Files, path) | ||
| } | ||
| } | ||
| state.mu.Unlock() |
There was a problem hiding this comment.
Iterating over the entire state.Files map inside a loop for each prefix is inefficient ($O(N_{prefixes} * N_{files})$). Since dirPrefixes[prefix] already contains the list of stale files under that directory, you can directly remove those specific paths from the state. This reduces the complexity to
state.mu.Lock()
for _, path := range files {
delete(state.Files, path)
}
state.mu.Unlock()
Description
Fixes orphaned vector retention when entire directories are deleted (e.g.
rm -rf tmp/). Previously, cleanup was per-file and reactive only — Qdrant retained stale vectors for all files under a deleted directory because no bulk cleanup existed.Solution implemented:
FindDeletedRoot()(stale.go) — walks upfilepath.Dir()to find the highest deleted ancestor directory for any missing fileGroupByDeletedRoot()— batches stale files by deleted directory prefix for efficient bulk operationsDeleteByPrefix()onQdrantClient— Scroll + client-side filter + batch delete for directory-level purgesIndexLanguage()updated with directory-aware cleanup:DeleteByPrefixfor deleted directories,DeleteByMetadatafor isolated file deletions; falls back to per-file delete if prefix delete failsFlow:
CollectStaleFiles→GroupByDeletedRoot→DeleteByPrefix(dir) orDeleteByMetadata(file) → state cleanupCloses Trello tasks #168, #169, #170, #171
Type of change
Checklist:
go fmt ./...go test ./...and they pass (7 new unit tests coveringFindDeletedRootedge cases,GroupByDeletedRoot, andCollectStaleFiles)