#56-added the processing for private api key#99
#56-added the processing for private api key#99jonathanMLDev wants to merge 5 commits intoCppDigest:mainfrom
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdded PineconeInstance enum and private API key support; introduced SourceBaseConfig and new embedding controls; refactored many source configs to inherit from it; reworked PineconeIngestion to be instance-aware and support multi-mode processing (UPSERT and UPDATE_*_BY_ID) with namespace schema management; improved text cleaning and WG21 metadata validation. Changes
Sequence DiagramsequenceDiagram
participant Client as Client
participant Ingest as PineconeIngestion
participant Dense as DenseIndex
participant Sparse as SparseIndex
participant Schema as NamespaceSchema
Client->>Ingest: process_documents(docs, namespace, process_type)
alt process_type == UPSERT
Ingest->>Ingest: _prepare_records_to_process()
Ingest->>Dense: fetch existing IDs (existence check)
Dense-->>Ingest: existing IDs
Ingest->>Ingest: _filter_existing_records()
Ingest->>Schema: _ensure_namespace_schema(api_key, namespace, schema_fields)
Schema-->>Ingest: schema verified/created
Ingest->>Ingest: _upsert_records()
Ingest->>Dense: upsert dense vectors
Ingest->>Sparse: upsert sparse vectors
else process_type == UPDATE_*_BY_ID
Ingest->>Ingest: _update_by_id(records, mode, keys)
Ingest->>Dense: update metadata/document/value by ID
Ingest->>Sparse: update sparse fields by ID
end
Ingest->>Ingest: _build_process_result()
Ingest-->>Client: result (processed, skipped, errors, process_type)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 5
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
pinecone_rag/preprocessor/utility.py (1)
337-347:⚠️ Potential issue | 🟠 MajorWhitespace normalization order destroys line breaks before they can be processed.
Line 337's
re.sub(r"\s+", " ", text)collapses all whitespace (including\n,\r,\t) into single spaces. This means lines 338-347, which attempt to normalize line breaks and limit consecutive newlines, become ineffective—the newlines are already gone.The docstring example
clean_text("Text\\n\\n\\nMore text")returning'Text\\n\\nMore text'won't work as expected; it will return'Text More text'instead.Suggested fix: Move the `\s+` collapse after line break handling, or limit it to horizontal whitespace
- # Normalize line breaks - text = re.sub(r"\s+", " ", text) text = re.sub(r"\r\n", "\n", text) # Windows line breaks text = re.sub(r"\r", "\n", text) # Old Mac line breaks if remove_extra_spaces: # Remove multiple spaces text = re.sub(r" +", " ", text) + # Collapse horizontal whitespace (tabs, etc.) but preserve newlines + text = re.sub(r"[^\S\n]+", " ", text) # Remove multiple newlines (keep max 2) text = re.sub(r"\n{3,}", "\n\n", text)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pinecone_rag/preprocessor/utility.py` around lines 337 - 347, The whitespace collapse re.sub(r"\s+", " ", text) is running before linebreak normalization, which removes newlines needed by the subsequent normalization steps; update the logic in pinecone_rag/preprocessor/utility.py so that either (a) the collapse uses a horizontal-only pattern like r"[ \t]+" (i.e., only spaces/tabs) instead of r"\s+", or (b) move the existing re.sub(r"\s+", " ", text) to after the \r\n/\r normalizations and after the remove_extra_spaces block; adjust code that references the text variable and the remove_extra_spaces branch accordingly so line breaks are preserved while still collapsing extra horizontal whitespace.
🧹 Nitpick comments (2)
pinecone_rag/ingestion.py (1)
931-944: Remove commented-out code.This commented block appears to be development/debug code for configuring the sparse index. It should be removed before merging to keep the codebase clean.
Suggested fix
try: self._ensure_indexes_ready() - # spares_name=f"{self.config.index_name}-sparse" - # self.pc.configure_index( - # name=spares_name, - # embed={ - # "write_parameters": { - # "max_tokens_per_sequence": 2048, - # "truncate": "END", - # }, - # "read_parameters": { - # "max_tokens_per_sequence": 2048, - # "truncate": "END", - # }, - # }, - # ) dense_stats = self.dense_index.describe_index_stats()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pinecone_rag/ingestion.py` around lines 931 - 944, Remove the leftover commented-out sparse index configuration block: delete the lines referencing spares_name=f"{self.config.index_name}-sparse" and the self.pc.configure_index(...) call (and its embed/read/write parameters) from the ingestion.py file; this cleans up development/debug code related to sparse index configuration (symbols to locate: spares_name, self.config.index_name, self.pc.configure_index).pinecone_rag/preprocessor/utility.py (1)
332-332: Consider replacing narrow no-break space with a space instead of removing it.
\u202f(NARROW NO-BREAK SPACE) is still a space character, similar to\xa0and\u2002. Removing it entirely may cause words to merge unexpectedly (e.g., "100\u202fkg" would become "100kg" instead of "100 kg").Suggested fix
- .replace("\u202f", "") + .replace("\u202f", " ")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pinecone_rag/preprocessor/utility.py` at line 332, The current call .replace("\u202f", "") in pinecone_rag/preprocessor/utility.py removes the NARROW NO-BREAK SPACE causing words to merge; change that replacement to preserve a space instead (i.e., replace "\u202f" with a regular space " ") so "100\u202fkg" becomes "100 kg"; locate the literal .replace("\u202f", "") in the file and update it to use a single space replacement (and consider applying the same approach for similar replacements like "\xa0" or "\u2002" if present).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@pinecone_rag/config.py`:
- Line 37: The Pinecone private API key default of empty string can cause
runtime auth failures when ReflectorConfig.instance defaults to
PineconeInstance.PRIVATE; update PineconeConfig (the private_api_key field) to
validate and raise a clear exception if instance == PineconeInstance.PRIVATE and
private_api_key is empty (or alternatively require the env var and document it),
and ensure any code paths that construct Pinecone clients (e.g., ingestion.py
client creation and HTTP request code) rely on that validation so an explicit
error is raised early rather than passing an empty key to Pinecone.
In `@pinecone_rag/ingestion.py`:
- Around line 791-799: The error message appended to failed_documents uses a
stale loop variable key; modify the logic in the block guarded by
update_metadata to compute the actual missing keys from selected_keys vs record
keys (e.g., missing = [k for k in selected_keys if k not in record]) and then
populate the failed_documents entry for doc_id with a reason that references the
computed missing list (or a join of it) instead of the undefined/stale key;
update the code around the variables update_metadata, failed_documents, doc_id,
record, selected_keys to use this computed missing_keys value in the reason.
- Around line 769-778: The code accesses records[0] in the process_type
branching (for ProcessType.UPDATE_DOCUMENT_BY_ID, UPDATE_METADATA_BY_ID,
UPDATE_VALUE_BY_ID) which can raise IndexError when _prepare_records_to_process
yields an empty list; add a guard after preparing records (e.g., right after
calling _prepare_records_to_process inside process_documents) to check if not
records and return or handle as appropriate (return an empty result or raise a
clear ValueError), or alternatively check if records is non-empty before using
records[0] and handle the empty case explicitly; reference symbols: records,
_prepare_records_to_process, process_documents,
ProcessType.UPDATE_DOCUMENT_BY_ID, ProcessType.UPDATE_METADATA_BY_ID,
ProcessType.UPDATE_VALUE_BY_ID.
In `@pinecone_rag/preprocessor/utility.py`:
- Line 334: The replacement of the literal string "\\u00A0" is
incorrect/redundant because it matches the six-character escape sequence, not
the actual non-breaking space (already handled by "\xa0"); either remove the
"\\u00A0" replacement line, or if you intended to handle escaped Unicode
sequences, replace it with a clear implementation (e.g., decode escaped
sequences or use a regex r"\\\\u00A0" to match a backslash-u sequence) and add a
comment explaining why both "\xa0" and the escaped form are needed; locate and
update the replacement near the existing "\xa0" handling in
pinecone_rag/preprocessor/utility.py.
In `@pinecone_rag/preprocessor/wg21paper_preprocessor.py`:
- Around line 149-155: The current strict check using `if ", " not in author` in
wg21paper_preprocessor.py can drop valid author strings; update the parsing
around the `author` variable so it no longer returns None for single-author or
differently-separated lists: remove the `", "` existence check, split `author`
using a more flexible pattern (e.g., split on commas, semicolons, or the word
"and"), trim whitespace from each name, filter out empty entries to produce
`authors`, and fall back to `["unknown"]` when no valid names remain; also add a
log call (use the module's logger) when you would have returned None so skipped
documents are recorded.
---
Outside diff comments:
In `@pinecone_rag/preprocessor/utility.py`:
- Around line 337-347: The whitespace collapse re.sub(r"\s+", " ", text) is
running before linebreak normalization, which removes newlines needed by the
subsequent normalization steps; update the logic in
pinecone_rag/preprocessor/utility.py so that either (a) the collapse uses a
horizontal-only pattern like r"[ \t]+" (i.e., only spaces/tabs) instead of
r"\s+", or (b) move the existing re.sub(r"\s+", " ", text) to after the \r\n/\r
normalizations and after the remove_extra_spaces block; adjust code that
references the text variable and the remove_extra_spaces branch accordingly so
line breaks are preserved while still collapsing extra horizontal whitespace.
---
Nitpick comments:
In `@pinecone_rag/ingestion.py`:
- Around line 931-944: Remove the leftover commented-out sparse index
configuration block: delete the lines referencing
spares_name=f"{self.config.index_name}-sparse" and the
self.pc.configure_index(...) call (and its embed/read/write parameters) from the
ingestion.py file; this cleans up development/debug code related to sparse index
configuration (symbols to locate: spares_name, self.config.index_name,
self.pc.configure_index).
In `@pinecone_rag/preprocessor/utility.py`:
- Line 332: The current call .replace("\u202f", "") in
pinecone_rag/preprocessor/utility.py removes the NARROW NO-BREAK SPACE causing
words to merge; change that replacement to preserve a space instead (i.e.,
replace "\u202f" with a regular space " ") so "100\u202fkg" becomes "100 kg";
locate the literal .replace("\u202f", "") in the file and update it to use a
single space replacement (and consider applying the same approach for similar
replacements like "\xa0" or "\u2002" if present).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 628a7d07-3343-452c-8e2e-25e2378dc3c4
📒 Files selected for processing (4)
pinecone_rag/config.pypinecone_rag/ingestion.pypinecone_rag/preprocessor/utility.pypinecone_rag/preprocessor/wg21paper_preprocessor.py
|
@coderabbitai how about it? |
|
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 5
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@pinecone_rag/ingestion.py`:
- Around line 397-409: The code calls _prepare_records_to_process(documents,
is_chunked) unconditionally which enforces chunk/content validity and filters
out items needed for metadata-only or value-only updates, so move or split
preprocessing: determine process_mode via ProcessType.from_value(process_type)
first, then branch—if process_mode indicates upsert, call
_prepare_records_to_process as before; if process_mode indicates update-by-id,
skip the chunk/content validation and instead prepare records suitable for
_update_by_id (or call a lightweight helper) so metadata-only/value-only updates
with empty/short page_content are not dropped. Ensure _update_by_id receives the
original documents or appropriately transformed records and keep existing
behavior for other modes.
- Around line 789-825: ProcessType.UPDATE_VALUE_BY_ID currently only populates
selected_keys (chunk_text) into update_metadata and always calls
dense_index.update and sparse_index.update with set_metadata, so vector fields
(values and sparse_values) are never forwarded; modify the UPDATE_VALUE_BY_ID
branch in ingestion.py to also extract value payloads (e.g., values and
sparse_values) from record when present, and call dense_index.update with
value=record["values"] (or value=values) and sparse_index.update with
sparse_value=record["sparse_values"] (or appropriate param names) in addition to
set_metadata=update_metadata, ensuring you only pass value/sparse_value when
they exist to avoid changing them unintentionally; update references around
selected_keys, update_metadata, dense_index.update, and sparse_index.update so
the correct parameters are sent for value updates.
- Around line 704-705: The current sequential upsert (dense_index.upsert_records
then sparse_index.upsert_records) can leave IDs in dense only if the sparse
upsert fails; change this to ensure atomicity by wrapping the two upserts in a
try/except: call dense_index.upsert_records and sparse_index.upsert_records, and
if the second (sparse_index.upsert_records) raises, call
dense_index.delete_records (using the same record IDs and namespace) to roll
back the partial dense insert before marking the batch failed; alternatively,
adjust _filter_existing_records to check both dense and sparse indexes when
deciding to skip records so retried batches don't skip partially-upserted
IDs—use the unique symbols dense_index.upsert_records,
sparse_index.upsert_records, dense_index.delete_records, and
_filter_existing_records to locate and implement the fix.
- Around line 412-416: The schema is currently built from only records[0],
missing metadata keys that appear later in the batch; change the logic that
computes schema_fields to collect the union of keys across the entire records
batch (e.g., iterate all records, accumulate keys into a set, exclude "id" and
"chunk_text", convert to list) and pass that deduplicated list to
_ensure_namespace_schema(namespace, schema_fields) so all metadata keys in the
batch are considered filterable; update the code around the existing records /
schema_fields computation in ingestion.py where schema_fields and
_ensure_namespace_schema are used.
- Around line 490-518: The exception handler is marking the original batch
(variable batch) as failed even after _filter_existing_records removed
already-existing records; change the call to _mark_batch_failed to pass the
actual attempted records (filtered_batch) instead of batch so only
attempted/upserted IDs are marked failed, and ensure any index/identifier
argument (currently i) still reflects the correct batch context when calling
_mark_batch_failed from the except block in the same scope as _upsert_batch and
filtered_batch.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 92b89b7d-962d-413c-9aa8-d525077dfef9
📒 Files selected for processing (2)
pinecone_rag/ingestion.pypinecone_rag/preprocessor/utility.py
|
@coderabbitai, are you OK? |
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (3)
pinecone_rag/ingestion.py (3)
727-734:⚠️ Potential issue | 🔴 CriticalRollback is skipped for default namespace, allowing dense/sparse divergence.
If
namespaceisNone, sparse failure won’t trigger dense rollback dueif record_ids and namespace:. That can leave permanent partial state.Suggested fix
- if record_ids and namespace: + if record_ids: try: - self.dense_index.delete(ids=record_ids, namespace=namespace) + delete_kwargs = {"ids": record_ids} + if namespace is not None: + delete_kwargs["namespace"] = namespace + self.dense_index.delete(**delete_kwargs) logger.info( "Rolled back %s records from dense index after sparse failure", len(record_ids), )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pinecone_rag/ingestion.py` around lines 727 - 734, The rollback currently skips when namespace is falsy, causing dense/sparse divergence; change the condition from "if record_ids and namespace:" to just "if record_ids:" so rollback runs for the default namespace too, call self.dense_index.delete(ids=record_ids, namespace=namespace) (or omit the namespace argument only if the delete API rejects None) and keep the existing try/except to log rollback success via logger.info and any exception via rollback_e.
816-833:⚠️ Potential issue | 🔴 Critical
UPDATE_VALUE_BY_IDstill rejects value-only updates whenchunk_textis absent.This mode requires
update_metadatato be non-empty, butselected_keysis only["chunk_text"]. Records with onlyvalues/sparse_valuesare incorrectly marked failed and never updated.Suggested fix
- elif process_type == ProcessType.UPDATE_VALUE_BY_ID: - selected_keys = ["chunk_text"] + elif process_type == ProcessType.UPDATE_VALUE_BY_ID: + selected_keys = ["chunk_text"] # optional metadata payload @@ - if not update_metadata: + has_value_payload = ( + record.get("values") is not None + or record.get("sparse_values") is not None + ) + if not update_metadata and not has_value_payload: missing = [k for k in selected_keys if k not in record] failed_documents.append( @@ - dense_kwargs: Dict[str, Any] = { - "id": doc_id, - "namespace": namespace, - "set_metadata": update_metadata, - } - sparse_kwargs: Dict[str, Any] = { - "id": doc_id, - "namespace": namespace, - "set_metadata": update_metadata, - } + dense_kwargs: Dict[str, Any] = {"id": doc_id, "namespace": namespace} + sparse_kwargs: Dict[str, Any] = {"id": doc_id, "namespace": namespace} + if update_metadata: + dense_kwargs["set_metadata"] = update_metadata + sparse_kwargs["set_metadata"] = update_metadataAlso applies to: 843-859
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pinecone_rag/ingestion.py` around lines 816 - 833, The UPDATE_VALUE_BY_ID branch currently only looks for "chunk_text" (selected_keys = ["chunk_text"]) so records that supply only "values" or "sparse_values" are treated as missing and skipped; update the logic in the loop that builds update_metadata (and the analogous block around the 843-859 region) to accept value-only updates by including "values" and "sparse_values" in selected_keys when process_type == ProcessType.UPDATE_VALUE_BY_ID, or alternatively, if chunk_text is absent but record contains "values" or "sparse_values", populate update_metadata with those fields so the record is not marked missing and can be updated by id.
397-410:⚠️ Potential issue | 🟠 MajorRoute update-by-id inputs through update-specific preprocessing, not chunk validation.
process_documents()still sends all modes through_prepare_records_to_process(), which filters by text/chunk validity. This drops metadata-only/value-only updates before_update_by_id()runs.Suggested fix
- records = self._prepare_records_to_process(documents, is_chunked) + if process_mode == ProcessType.UPSERT: + records = self._prepare_records_to_process(documents, is_chunked) + else: + records = [] + for idx, doc in enumerate(documents): + metadata = doc.metadata or {} + record = dict(metadata) + record["id"] = self._resolve_target_id(metadata, idx) + if doc.page_content: + record["chunk_text"] = doc.page_content.strip() + records.append(record)Also applies to: 433-451
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pinecone_rag/ingestion.py` around lines 397 - 410, process_documents currently sends all modes through _prepare_records_to_process which discards metadata-only or value-only updates; detect when process_mode == ProcessType.UPDATE_BY_ID and route those inputs to an update-specific preprocessing path (either call a new helper like _prepare_update_records_to_process or call _prepare_records_to_process with a skip_chunk_validation flag) so records intended for _update_by_id() are not filtered out; update both occurrences referenced in the diff (around the block that sets process_mode and the similar block at lines 433-451) to short-circuit to the update preprocessing and then call _update_by_id() with the resulting records.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@pinecone_rag/ingestion.py`:
- Around line 495-523: The exception handler can reference filtered_batch before
it's assigned if _filter_existing_records raises; to fix, ensure filtered_batch
is defined before the try (e.g., set filtered_batch = [] right before calling
_filter_existing_records) or change the except to use the original batch
variable when calling _mark_batch_failed; update the block around
_filter_existing_records, _upsert_batch, and _mark_batch_failed so that
_mark_batch_failed always receives a defined list (use the original batch or an
empty list) to avoid UnboundLocalError while preserving error logging.
---
Duplicate comments:
In `@pinecone_rag/ingestion.py`:
- Around line 727-734: The rollback currently skips when namespace is falsy,
causing dense/sparse divergence; change the condition from "if record_ids and
namespace:" to just "if record_ids:" so rollback runs for the default namespace
too, call self.dense_index.delete(ids=record_ids, namespace=namespace) (or omit
the namespace argument only if the delete API rejects None) and keep the
existing try/except to log rollback success via logger.info and any exception
via rollback_e.
- Around line 816-833: The UPDATE_VALUE_BY_ID branch currently only looks for
"chunk_text" (selected_keys = ["chunk_text"]) so records that supply only
"values" or "sparse_values" are treated as missing and skipped; update the logic
in the loop that builds update_metadata (and the analogous block around the
843-859 region) to accept value-only updates by including "values" and
"sparse_values" in selected_keys when process_type ==
ProcessType.UPDATE_VALUE_BY_ID, or alternatively, if chunk_text is absent but
record contains "values" or "sparse_values", populate update_metadata with those
fields so the record is not marked missing and can be updated by id.
- Around line 397-410: process_documents currently sends all modes through
_prepare_records_to_process which discards metadata-only or value-only updates;
detect when process_mode == ProcessType.UPDATE_BY_ID and route those inputs to
an update-specific preprocessing path (either call a new helper like
_prepare_update_records_to_process or call _prepare_records_to_process with a
skip_chunk_validation flag) so records intended for _update_by_id() are not
filtered out; update both occurrences referenced in the diff (around the block
that sets process_mode and the similar block at lines 433-451) to short-circuit
to the update preprocessing and then call _update_by_id() with the resulting
records.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 9d4b6065-d330-4ca0-a57a-0e47838ae526
📒 Files selected for processing (1)
pinecone_rag/ingestion.py
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
pinecone_rag/ingestion.py (1)
473-473: Use f-string conversion flag instead of explicitstr().Replace
{str(error)}with{error!s}for cleaner f-string syntax.♻️ Suggested fix
- "reason": f"Batch upsert failed: {str(error)}", + "reason": f"Batch upsert failed: {error!s}",🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pinecone_rag/ingestion.py` at line 473, Replace the explicit str() call inside the f-string used for the "reason" dict value with the f-string conversion flag: change f"Batch upsert failed: {str(error)}" to use {error!s} so it reads f"Batch upsert failed: {error!s}"; update the occurrence where the "reason" key is set in the batch upsert error handling to use this form (the dict entry with "reason" and the variable error).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@pinecone_rag/ingestion.py`:
- Around line 728-740: The rollback is skipped when namespace is falsy; update
the conditional around the dense-index rollback so it triggers whenever
record_ids exist (regardless of namespace) and still calls
self.dense_index.delete(ids=record_ids, namespace=namespace); keep the
try/except (and logger messages) but remove the "and namespace" check so
default-namespace (namespace=None) operations are rolled back too, referencing
the existing variables record_ids, namespace and the method
self.dense_index.delete.
- Around line 307-309: Replace the private access to index._config.host by
calling the public Pinecone API: call pc.describe_index(index_name) (using the
existing index name variable or index.name) to get the index description and
read the host from its response instead; then strip any "https://" or "http://"
prefix to produce host_clean. Update references to index._config.host to use the
describe_index result and keep the existing host_clean removeprefix logic.
---
Nitpick comments:
In `@pinecone_rag/ingestion.py`:
- Line 473: Replace the explicit str() call inside the f-string used for the
"reason" dict value with the f-string conversion flag: change f"Batch upsert
failed: {str(error)}" to use {error!s} so it reads f"Batch upsert failed:
{error!s}"; update the occurrence where the "reason" key is set in the batch
upsert error handling to use this form (the dict entry with "reason" and the
variable error).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 88b8142d-61cb-4a1f-9343-73e205086760
📒 Files selected for processing (1)
pinecone_rag/ingestion.py
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (2)
pinecone_rag/ingestion.py (2)
740-743:⚠️ Potential issue | 🔴 CriticalRollback condition is too strict and can skip cleanup after sparse failure.
Line 740 gates rollback on
namespacetruthiness. Cleanup should run wheneverrecord_idsexist, otherwise failed sparse writes can leave partial state in dense.💡 Suggested fix
- if record_ids and namespace: + if record_ids: try: self.dense_index.delete(ids=record_ids, namespace=namespace)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pinecone_rag/ingestion.py` around lines 740 - 743, The rollback currently only runs when both record_ids and namespace are truthy, which skips cleanup after sparse failures; change the guard to run whenever record_ids exist and call self.dense_index.delete with or without namespace accordingly. Specifically, update the conditional around self.dense_index.delete to "if record_ids:" then inside the try branch call self.dense_index.delete(ids=record_ids, namespace=namespace) when namespace is set, otherwise call self.dense_index.delete(ids=record_ids) so partial dense records are always cleaned up after failures.
421-463:⚠️ Potential issue | 🟠 MajorSplit preprocessing by process mode to avoid dropping valid update-by-id records.
Line 421 always runs chunk-validity filtering, so metadata/value updates can be filtered out before Line 445+ executes (e.g., empty/short
page_contentbut valid IDs/metadata).💡 Suggested fix
- records = self._prepare_records_to_process(documents, is_chunked) + if process_mode == ProcessType.UPSERT: + records = self._prepare_records_to_process(documents, is_chunked) + else: + records = self._prepare_records_for_update_by_id(documents, is_chunked)# add a lightweight helper (outside this hunk) def _prepare_records_for_update_by_id( self, documents: List[Document], is_chunked: bool ) -> List[Dict[str, Any]]: chunked_documents = ( self.text_splitter.split_documents(documents) if not is_chunked else documents ) records: List[Dict[str, Any]] = [] for doc in chunked_documents: metadata = dict(doc.metadata or {}) record = dict(metadata) if doc.page_content: record["chunk_text"] = doc.page_content.strip() if "id" not in record: record["id"] = self._resolve_target_id(record, len(records)) records.append(record) return recordsAlso applies to: 648-668
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pinecone_rag/ingestion.py` around lines 421 - 463, The current preprocessing via _prepare_records_to_process applies chunk/length filtering for all process modes and can drop records needed for update-by-id flows; add a lightweight helper _prepare_records_for_update_by_id that mirrors the suggested logic (split only when needed, copy metadata, include chunk_text only if page_content present, and ensure an "id" via _resolve_target_id) and call that helper for ProcessType.UPDATE_METADATA_BY_ID, ProcessType.UPDATE_DOCUMENT_BY_ID, and ProcessType.UPDATE_VALUE_BY_ID instead of using _prepare_records_to_process; keep _upsert_records and the existing _ensure_namespace_schema usage for UPSERT unchanged and ensure _update_by_id receives the new records list.
🧹 Nitpick comments (1)
pinecone_rag/ingestion.py (1)
437-440: Result counters mix different units (documents vs records/chunks).
processedis record/chunk based, buttotalis input-document based. This can produce misleading ratios (includingprocessed > totalwhen chunking expands documents).Also applies to: 622-629
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pinecone_rag/ingestion.py` around lines 437 - 440, The result counters mix documents and records/chunks; change the call to _build_process_result so both sides use the same unit: compute a total_records (e.g., total_records = sum(len(chunks) for chunks in per-document chunk lists) or use len(records) if a flat records list exists) and pass that as total, keeping total_processed/total_upserted as record/chunk counts; update the same pattern at the other occurrence (lines ~622-629) so totals and processed are consistently both in records/chunks (or, alternatively, expose both document_count and record_count fields if you need both units).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@pinecone_rag/ingestion.py`:
- Around line 823-845: The _update_by_id logic currently infers selected_keys
from records[0] and directly reads record["id"]; change it to compute the union
of keys across all records per process_type (for
ProcessType.UPDATE_DOCUMENT_BY_ID: union of every record's keys minus "id"; for
ProcessType.UPDATE_METADATA_BY_ID: use keys if provided else union minus
("id","chunk_text"); for ProcessType.UPDATE_VALUE_BY_ID: keep ["chunk_text"]) so
no field present only in later records is missed, and inside the per-record loop
use the existing helper self._resolve_target_id(record) to obtain doc_id instead
of hard-indexing record["id"]; keep the existing per-record filtering that skips
missing keys and the update_metadata emptiness check.
---
Duplicate comments:
In `@pinecone_rag/ingestion.py`:
- Around line 740-743: The rollback currently only runs when both record_ids and
namespace are truthy, which skips cleanup after sparse failures; change the
guard to run whenever record_ids exist and call self.dense_index.delete with or
without namespace accordingly. Specifically, update the conditional around
self.dense_index.delete to "if record_ids:" then inside the try branch call
self.dense_index.delete(ids=record_ids, namespace=namespace) when namespace is
set, otherwise call self.dense_index.delete(ids=record_ids) so partial dense
records are always cleaned up after failures.
- Around line 421-463: The current preprocessing via _prepare_records_to_process
applies chunk/length filtering for all process modes and can drop records needed
for update-by-id flows; add a lightweight helper
_prepare_records_for_update_by_id that mirrors the suggested logic (split only
when needed, copy metadata, include chunk_text only if page_content present, and
ensure an "id" via _resolve_target_id) and call that helper for
ProcessType.UPDATE_METADATA_BY_ID, ProcessType.UPDATE_DOCUMENT_BY_ID, and
ProcessType.UPDATE_VALUE_BY_ID instead of using _prepare_records_to_process;
keep _upsert_records and the existing _ensure_namespace_schema usage for UPSERT
unchanged and ensure _update_by_id receives the new records list.
---
Nitpick comments:
In `@pinecone_rag/ingestion.py`:
- Around line 437-440: The result counters mix documents and records/chunks;
change the call to _build_process_result so both sides use the same unit:
compute a total_records (e.g., total_records = sum(len(chunks) for chunks in
per-document chunk lists) or use len(records) if a flat records list exists) and
pass that as total, keeping total_processed/total_upserted as record/chunk
counts; update the same pattern at the other occurrence (lines ~622-629) so
totals and processed are consistently both in records/chunks (or, alternatively,
expose both document_count and record_count fields if you need both units).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 4ad628d9-c148-4cd1-9a9d-0a88ed3f5ab5
📒 Files selected for processing (1)
pinecone_rag/ingestion.py
| if process_type == ProcessType.UPDATE_DOCUMENT_BY_ID: | ||
| selected_keys = [k for k in records[0].keys() if k != "id"] | ||
| elif process_type == ProcessType.UPDATE_METADATA_BY_ID: | ||
| selected_keys = keys or [ | ||
| k for k in records[0].keys() if k not in ("id", "chunk_text") | ||
| ] | ||
| elif process_type == ProcessType.UPDATE_VALUE_BY_ID: | ||
| selected_keys = ["chunk_text"] | ||
| else: | ||
| raise ValueError(f"Unsupported process type: {process_type}") | ||
|
|
||
| for record in tqdm( | ||
| records, | ||
| desc=f"Updating by id ({process_type.value})", | ||
| leave=False, | ||
| ): | ||
| doc_id = record["id"] | ||
| update_metadata = {} | ||
| for key in selected_keys: | ||
| if key not in record: | ||
| continue | ||
| self._update_batch(records, namespace, batch_num) | ||
| total_updated += len(records) | ||
| update_metadata[key] = record[key] | ||
| if not update_metadata: |
There was a problem hiding this comment.
_update_by_id should not infer update shape from only the first record.
Using records[0] at Lines 823-828 can miss fields present in later records. Also, Line 839 ignores the existing _resolve_target_id helper and hard-requires record["id"].
💡 Suggested fix
- if process_type == ProcessType.UPDATE_DOCUMENT_BY_ID:
- selected_keys = [k for k in records[0].keys() if k != "id"]
- elif process_type == ProcessType.UPDATE_METADATA_BY_ID:
- selected_keys = keys or [
- k for k in records[0].keys() if k not in ("id", "chunk_text")
- ]
+ if process_type == ProcessType.UPDATE_METADATA_BY_ID and keys:
+ selected_keys = list(keys)
...
- for record in tqdm(
+ for idx, record in enumerate(tqdm(
records,
desc=f"Updating by id ({process_type.value})",
leave=False,
- ):
- doc_id = record["id"]
- update_metadata = {}
- for key in selected_keys:
- if key not in record:
- continue
- update_metadata[key] = record[key]
+ )):
+ doc_id = self._resolve_target_id(record, idx)
+ if process_type == ProcessType.UPDATE_DOCUMENT_BY_ID:
+ update_metadata = {k: v for k, v in record.items() if k != "id"}
+ elif process_type == ProcessType.UPDATE_METADATA_BY_ID:
+ update_metadata = (
+ {k: record[k] for k in selected_keys if k in record}
+ if selected_keys
+ else self._extract_metadata_update(record)
+ )
+ else:
+ update_metadata = (
+ {"chunk_text": record["chunk_text"]}
+ if "chunk_text" in record
+ else {}
+ )Also applies to: 839-839
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pinecone_rag/ingestion.py` around lines 823 - 845, The _update_by_id logic
currently infers selected_keys from records[0] and directly reads record["id"];
change it to compute the union of keys across all records per process_type (for
ProcessType.UPDATE_DOCUMENT_BY_ID: union of every record's keys minus "id"; for
ProcessType.UPDATE_METADATA_BY_ID: use keys if provided else union minus
("id","chunk_text"); for ProcessType.UPDATE_VALUE_BY_ID: keep ["chunk_text"]) so
no field present only in later records is missed, and inside the per-record loop
use the existing helper self._resolve_target_id(record) to obtain doc_id instead
of hard-indexing record["id"]; keep the existing per-record filtering that skips
missing keys and the update_metadata emptiness check.
Summary by CodeRabbit
New Features
Bug Fixes