From b839fd9136957d75e2d22f88f7056a2eec5c6275 Mon Sep 17 00:00:00 2001 From: Ankit Kotnala Date: Sat, 9 May 2026 00:14:48 +0530 Subject: [PATCH 1/2] Add Claude Code and Gemini transcript parsing --- server.py | 118 +-------- src/api/routes/memory.py | 173 +------------ src/utils/transcripts.py | 509 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 525 insertions(+), 275 deletions(-) create mode 100644 src/utils/transcripts.py diff --git a/server.py b/server.py index 4bd1859..cec143a 100644 --- a/server.py +++ b/server.py @@ -50,6 +50,7 @@ from src.pipelines.ingest import IngestPipeline from src.pipelines.retrieval import RetrievalPipeline +from src.utils.transcripts import parse_transcript_text as _shared_parse_transcript_text # ═══════════════════════════════════════════════════════════════════ @@ -693,108 +694,6 @@ def _extract_chat_pairs(url: str, html: str) -> tuple[str, str, list[dict[str, s return provider, extraction_method, pairs -def _parse_cursor_transcript(text: str) -> list[dict[str, str]]: - """Parse a Cursor-exported markdown transcript into message pairs.""" - pairs: list[dict[str, str]] = [] - - sections = text.split("---") - - start_idx = 0 - if sections and "Exported on" in sections[0]: - start_idx = 1 - - current_user_query = None - - for section in sections[start_idx:]: - section = section.strip() - if not section: - continue - - if section.startswith("**User**"): - content = section.replace("**User**", "", 1).strip() - current_user_query = content - - elif section.startswith("**Cursor**") or section.startswith("**Assistant**"): - content = section.replace("**Cursor**", "", 1).replace("**Assistant**", "", 1).strip() - - if current_user_query: - pairs.append({ - "user_query": current_user_query, - "agent_response": content, - }) - current_user_query = None - - return pairs - - -def _parse_antigravity_transcript(text: str) -> list[dict[str, str]]: - """Parse an Antigravity-exported markdown transcript into message pairs. - - Antigravity transcripts follow this structure:: - - # Chat Conversation - Note: _This is purely the output..._ - ### User Input - - ### Planner Response - - ... - - Multiple consecutive ``### Planner Response`` blocks are concatenated into - a single agent response (they occur when the agent used tools mid-turn). - """ - pairs: list[dict[str, str]] = [] - - text = text.replace("\r\n", "\n") - - # Split on H3 headings, keeping the headings in the token list - blocks = re.split(r"(?m)^(###\s+.+)$", text) - - current_user_query: str | None = None - planner_chunks: list[str] = [] - - for i, block in enumerate(blocks): - block = block.strip() - if not block: - continue - - if re.match(r"###\s+User Input", block, re.IGNORECASE): - if current_user_query and planner_chunks: - pairs.append({ - "user_query": current_user_query, - "agent_response": "\n\n".join(planner_chunks).strip(), - }) - planner_chunks = [] - current_user_query = None - - elif re.match(r"###\s+Planner Response", block, re.IGNORECASE): - pass # content handled below - - else: - if i > 0: - prev_heading = blocks[i - 1].strip() if i >= 1 else "" - if re.match(r"###\s+User Input", prev_heading, re.IGNORECASE): - if current_user_query and planner_chunks: - pairs.append({ - "user_query": current_user_query, - "agent_response": "\n\n".join(planner_chunks).strip(), - }) - planner_chunks = [] - current_user_query = block - - elif re.match(r"###\s+Planner Response", prev_heading, re.IGNORECASE): - if block: - planner_chunks.append(block) - - if current_user_query and planner_chunks: - pairs.append({ - "user_query": current_user_query, - "agent_response": "\n\n".join(planner_chunks).strip(), - }) - - return pairs - - async def _parse_transcript_with_llm(text: str) -> list[dict[str, str]]: """Use an LLM to parse transcript text when format detection fails.""" from src.models import get_model @@ -842,19 +741,8 @@ async def _parse_transcript_with_llm(text: str) -> list[dict[str, str]]: def _parse_transcript_text(text: str) -> tuple[str, list[dict[str, str]]]: - """Parse transcript text and return (format, pairs).""" - if "_Exported on" in text and "from Cursor" in text: - pairs = _parse_cursor_transcript(text) - if pairs: - return "cursor", pairs - - # Detect Antigravity format - if "# Chat Conversation" in text and ("### User Input" in text or "### Planner Response" in text): - pairs = _parse_antigravity_transcript(text) - if pairs: - return "antigravity", pairs - - return "unknown", [] + """Compatibility wrapper around the shared transcript parser.""" + return _shared_parse_transcript_text(text) async def _scrape_chat_share(url: str) -> dict[str, Any]: diff --git a/src/api/routes/memory.py b/src/api/routes/memory.py index 3397a69..3783269 100644 --- a/src/api/routes/memory.py +++ b/src/api/routes/memory.py @@ -41,6 +41,7 @@ WeaverSummary, ) from src.pipelines.retrieval import RetrievalPipeline +from src.utils.transcripts import parse_transcript_text from bs4 import BeautifulSoup import json @@ -307,146 +308,6 @@ def _extract_chat_pairs(url: str, html: str) -> tuple[str, str, List[MessagePair return provider, extraction_method, pairs -def _parse_cursor_transcript(text: str) -> List[MessagePair]: - """Parse a Cursor-exported markdown transcript into message pairs. - - Cursor transcripts have the format: - _Exported on ... from Cursor_ - --- - **User** - - --- - **Cursor** - - --- - ... - """ - pairs: List[MessagePair] = [] - - # Split by --- separator - sections = text.split("---") - - # Skip the first section if it's the header (contains "Exported on") - start_idx = 0 - if sections and "Exported on" in sections[0]: - start_idx = 1 - - current_user_query = None - - for section in sections[start_idx:]: - section = section.strip() - if not section: - continue - - # Check if this is a User message - if section.startswith("**User**"): - # Extract the user message (remove the **User** header) - content = section.replace("**User**", "", 1).strip() - current_user_query = content - - # Check if this is a Cursor/Agent message - elif section.startswith("**Cursor**") or section.startswith("**Assistant**"): - # Extract the agent response - content = section.replace("**Cursor**", "", 1).replace("**Assistant**", "", 1).strip() - - # If we have a user query, create a pair - if current_user_query: - pairs.append(MessagePair( - user_query=current_user_query, - agent_response=content, - )) - current_user_query = None - - return pairs - - -def _parse_antigravity_transcript(text: str) -> List[MessagePair]: - """Parse an Antigravity-exported markdown transcript into message pairs. - - Antigravity transcripts exported from the Antigravity coding assistant - follow this format:: - - # Chat Conversation - - Note: _This is purely the output of the chat conversation..._ - - ### User Input - - - - ### Planner Response - - - - ### User Input - - ... - - Multiple consecutive ``### Planner Response`` blocks (e.g. when the agent - used tools between messages) are concatenated into a single agent response. - """ - pairs: List[MessagePair] = [] - - # Normalise line endings - text = text.replace("\r\n", "\n") - - # Split into blocks by H3 headings (### ...) - # We keep the heading so we know which role each block belongs to. - blocks = re.split(r"(?m)^(###\s+.+)$", text) - - current_user_query: str | None = None - planner_chunks: List[str] = [] - - for i, block in enumerate(blocks): - block = block.strip() - if not block: - continue - - if re.match(r"###\s+User Input", block, re.IGNORECASE): - # Flush any pending planner chunks as a completed pair - if current_user_query and planner_chunks: - pairs.append(MessagePair( - user_query=current_user_query, - agent_response="\n\n".join(planner_chunks).strip(), - )) - planner_chunks = [] - # The next block (index i+1) is the content of this user turn - current_user_query = None # will be filled by the content block below - - elif re.match(r"###\s+Planner Response", block, re.IGNORECASE): - # The next content block belongs to the agent - pass # content handled in the else branch below - - else: - # This is a content block — figure out which role it belongs to by - # looking at the previous heading token. - if i > 0: - prev_heading = blocks[i - 1].strip() if i >= 1 else "" - if re.match(r"###\s+User Input", prev_heading, re.IGNORECASE): - # New user turn — flush previous pair first - if current_user_query and planner_chunks: - pairs.append(MessagePair( - user_query=current_user_query, - agent_response="\n\n".join(planner_chunks).strip(), - )) - planner_chunks = [] - current_user_query = block - - elif re.match(r"###\s+Planner Response", prev_heading, re.IGNORECASE): - # Accumulate (multiple tool-use steps = multiple planner blocks) - if block: - planner_chunks.append(block) - - # Flush last pair - if current_user_query and planner_chunks: - pairs.append(MessagePair( - user_query=current_user_query, - agent_response="\n\n".join(planner_chunks).strip(), - )) - - return pairs - - async def _parse_transcript_with_llm(text: str) -> List[MessagePair]: """Use an LLM to parse transcript text when format detection fails.""" from src.models import get_model @@ -492,24 +353,6 @@ async def _parse_transcript_with_llm(text: str) -> List[MessagePair]: return [] -def _parse_transcript_text(text: str) -> tuple[str, List[MessagePair]]: - """Parse transcript text and return (format, pairs).""" - - # Detect Cursor format - if "_Exported on" in text and "from Cursor" in text: - pairs = _parse_cursor_transcript(text) - if pairs: - return "cursor", pairs - - # Detect Antigravity format - if "# Chat Conversation" in text and ("### User Input" in text or "### Planner Response" in text): - pairs = _parse_antigravity_transcript(text) - if pairs: - return "antigravity", pairs - - return "unknown", [] - - async def _scrape_chat_share(url: str) -> Dict[str, Any]: html, final_url = await _render_chat_share(url) provider, extraction_method, pairs = _extract_chat_pairs(final_url or url, html) @@ -778,7 +621,10 @@ async def scrape_chat_link(req: ScrapeRequest, request: Request): ) async def parse_transcript( request: Request, - file: UploadFile = File(..., description="Chat transcript file (.txt, .md, .json)") + file: UploadFile = File( + ..., + description="Chat transcript file (.txt, .md, .json, .jsonl)", + ) ): start = time.perf_counter() @@ -791,7 +637,14 @@ async def parse_transcript( return _error(request, "Uploaded file is empty.", 400) # Try to parse the transcript - format_detected, pairs = _parse_transcript_text(text) + format_detected, parsed_pairs = parse_transcript_text(text) + pairs = [MessagePair(**pair) for pair in parsed_pairs] + if pairs: + logger.info( + "Parsed transcript format=%s pairs=%d", + format_detected, + len(pairs), + ) # If no pairs found, try LLM fallback if not pairs: diff --git a/src/utils/transcripts.py b/src/utils/transcripts.py new file mode 100644 index 0000000..a147fac --- /dev/null +++ b/src/utils/transcripts.py @@ -0,0 +1,509 @@ +""" +Deterministic parsers for transcript uploads used by the context importer. + +The public API only needs user/assistant message pairs. Tool calls, tool +results, thinking blocks, and CLI bootstrap messages are intentionally ignored. +""" + +from __future__ import annotations + +import json +import re +from typing import Any, TypedDict + + +class ParsedMessagePair(TypedDict): + user_query: str + agent_response: str + + +_ASSISTANT_ROLES = {"assistant", "model", "claude", "gemini", "cursor"} +_USER_ROLES = {"user", "human"} +_SKIPPED_BLOCK_TYPES = { + "tool_result", + "tool_use", + "thinking", + "redacted_thinking", + "server_tool_use", + "web_search_tool_result", +} +_TOOL_MARKDOWN_RE = re.compile( + r"(?ms)^\*\*Tool (?:Command|Response)\*\*:\s*\n```(?:json)?\n.*?\n```\s*" +) + + +def parse_transcript_text(text: str) -> tuple[str, list[ParsedMessagePair]]: + """Parse transcript text and return ``(format, pairs)``. + + Supported deterministic formats: + - Cursor markdown exports + - Antigravity markdown exports + - Claude Code JSONL session transcripts + - Claude-style role-heading markdown/plain text exports + - Gemini CLI ``/chat share`` JSON and markdown exports + """ + + normalized = text.replace("\r\n", "\n") + + if "_Exported on" in normalized and "from Cursor" in normalized: + pairs = _parse_cursor_transcript(normalized) + if pairs: + return "cursor", pairs + + if "# Chat Conversation" in normalized and ( + "### User Input" in normalized or "### Planner Response" in normalized + ): + pairs = _parse_antigravity_transcript(normalized) + if pairs: + return "antigravity", pairs + + json_format, json_pairs = _parse_json_or_jsonl_transcript(normalized) + if json_pairs: + return json_format, json_pairs + + gemini_pairs = _parse_role_heading_transcript( + normalized, + assistant_roles={"model", "gemini"}, + skip_gemini_bootstrap=True, + ) + if gemini_pairs and _looks_like_gemini_markdown(normalized): + return "gemini", gemini_pairs + + claude_pairs = _parse_role_heading_transcript( + normalized, + assistant_roles={"assistant", "claude"}, + skip_gemini_bootstrap=False, + ) + if claude_pairs and _looks_like_claude_export(normalized): + return "claude_code", claude_pairs + + return "unknown", [] + + +def _parse_cursor_transcript(text: str) -> list[ParsedMessagePair]: + """Parse a Cursor-exported markdown transcript into message pairs.""" + pairs: list[ParsedMessagePair] = [] + sections = text.split("---") + + start_idx = 0 + if sections and "Exported on" in sections[0]: + start_idx = 1 + + current_user_query: str | None = None + + for section in sections[start_idx:]: + section = section.strip() + if not section: + continue + + if section.startswith("**User**"): + current_user_query = section.replace("**User**", "", 1).strip() + elif section.startswith("**Cursor**") or section.startswith("**Assistant**"): + content = ( + section.replace("**Cursor**", "", 1) + .replace("**Assistant**", "", 1) + .strip() + ) + if current_user_query: + pairs.append( + { + "user_query": current_user_query, + "agent_response": content, + } + ) + current_user_query = None + + return pairs + + +def _parse_antigravity_transcript(text: str) -> list[ParsedMessagePair]: + """Parse an Antigravity-exported markdown transcript into message pairs.""" + pairs: list[ParsedMessagePair] = [] + blocks = re.split(r"(?m)^(###\s+.+)$", text) + + current_user_query: str | None = None + planner_chunks: list[str] = [] + + for i, block in enumerate(blocks): + block = block.strip() + if not block: + continue + + if re.match(r"###\s+User Input", block, re.IGNORECASE): + if current_user_query and planner_chunks: + pairs.append( + { + "user_query": current_user_query, + "agent_response": "\n\n".join(planner_chunks).strip(), + } + ) + planner_chunks = [] + current_user_query = None + + elif re.match(r"###\s+Planner Response", block, re.IGNORECASE): + continue + + elif i > 0: + prev_heading = blocks[i - 1].strip() if i >= 1 else "" + if re.match(r"###\s+User Input", prev_heading, re.IGNORECASE): + if current_user_query and planner_chunks: + pairs.append( + { + "user_query": current_user_query, + "agent_response": "\n\n".join(planner_chunks).strip(), + } + ) + planner_chunks = [] + current_user_query = block + + elif re.match(r"###\s+Planner Response", prev_heading, re.IGNORECASE): + if block: + planner_chunks.append(block) + + if current_user_query and planner_chunks: + pairs.append( + { + "user_query": current_user_query, + "agent_response": "\n\n".join(planner_chunks).strip(), + } + ) + + return pairs + + +def _parse_json_or_jsonl_transcript( + text: str, +) -> tuple[str, list[ParsedMessagePair]]: + records = _load_jsonl_records(text) + if records: + return _detect_record_format(records), _pair_role_records(records) + + payload = _load_json_payload(text) + records = _records_from_json_payload(payload) + if records: + return _detect_record_format(records), _pair_role_records(records) + + return "unknown", [] + + +def _load_jsonl_records(text: str) -> list[dict[str, Any]]: + records: list[dict[str, Any]] = [] + non_empty_lines = [line.strip() for line in text.splitlines() if line.strip()] + if len(non_empty_lines) < 2: + return [] + + for line in non_empty_lines: + try: + value = json.loads(line) + except json.JSONDecodeError: + return [] + if not isinstance(value, dict): + return [] + records.append(value) + + return records + + +def _load_json_payload(text: str) -> Any: + try: + return json.loads(text) + except json.JSONDecodeError: + return None + + +def _records_from_json_payload(payload: Any) -> list[dict[str, Any]]: + if isinstance(payload, list): + return [item for item in payload if isinstance(item, dict)] + + if not isinstance(payload, dict): + return [] + + for key in ("history", "messages", "conversation", "items", "records"): + value = payload.get(key) + if isinstance(value, list): + return [item for item in value if isinstance(item, dict)] + + return [] + + +def _detect_record_format(records: list[dict[str, Any]]) -> str: + if any( + "sessionId" in record + or "parentUuid" in record + or ("message" in record and record.get("type") in {"user", "assistant"}) + for record in records + ): + return "claude_code" + + if any( + record.get("role") == "model" + or "parts" in record + or ( + isinstance(record.get("message"), dict) + and record["message"].get("role") == "model" + ) + for record in records + ): + return "gemini" + + return "json" + + +def _pair_role_records(records: list[dict[str, Any]]) -> list[ParsedMessagePair]: + pairs: list[ParsedMessagePair] = [] + current_user_query: str | None = None + assistant_chunks: list[str] = [] + + def flush_pair() -> None: + nonlocal current_user_query, assistant_chunks + if current_user_query and assistant_chunks: + pairs.append( + { + "user_query": current_user_query, + "agent_response": "\n\n".join(assistant_chunks).strip(), + } + ) + assistant_chunks = [] + + for record in records: + role = _record_role(record) + if role not in _USER_ROLES and role not in _ASSISTANT_ROLES: + continue + + if role in _USER_ROLES and _record_has_tool_result(record): + continue + + text = _record_text(record) + if not text or _is_gemini_cli_setup_text(text): + continue + + if role in _USER_ROLES: + flush_pair() + current_user_query = text + elif current_user_query: + assistant_chunks.append(text) + + flush_pair() + return pairs + + +def _record_role(record: dict[str, Any]) -> str: + message = record.get("message") + raw_role = "" + + if isinstance(message, dict): + raw_role = str(message.get("role") or "") + + raw_role = raw_role or str(record.get("role") or record.get("type") or "") + role = raw_role.lower() + + if role == "human": + return "user" + if role in {"assistant", "model", "user"}: + return role + return role + + +def _record_text(record: dict[str, Any]) -> str: + message = record.get("message") + source = message if isinstance(message, dict) else record + + if "content" in source: + return _extract_text(source.get("content")) + if "parts" in source: + return _extract_text(source.get("parts")) + if "text" in source: + return _extract_text(source.get("text")) + + return "" + + +def _record_has_tool_result(record: dict[str, Any]) -> bool: + message = record.get("message") + source = message if isinstance(message, dict) else record + return _has_tool_result(source.get("content")) or _has_tool_result(source.get("parts")) + + +def _extract_text(value: Any) -> str: + if value is None: + return "" + + if isinstance(value, str): + return _clean_text(value) + + if isinstance(value, list): + chunks: list[str] = [] + for item in value: + if isinstance(item, str): + chunks.append(item) + continue + + if not isinstance(item, dict): + continue + + block_type = str(item.get("type") or "").lower() + if block_type in _SKIPPED_BLOCK_TYPES: + continue + if item.get("functionCall") or item.get("functionResponse"): + continue + + if "text" in item: + chunks.append(str(item["text"])) + elif "content" in item: + nested = _extract_text(item["content"]) + if nested: + chunks.append(nested) + elif "parts" in item: + nested = _extract_text(item["parts"]) + if nested: + chunks.append(nested) + + return _clean_text("\n\n".join(chunk for chunk in chunks if chunk)) + + if isinstance(value, dict): + block_type = str(value.get("type") or "").lower() + if block_type in _SKIPPED_BLOCK_TYPES: + return "" + if value.get("functionCall") or value.get("functionResponse"): + return "" + if "text" in value: + return _clean_text(str(value["text"])) + if "content" in value: + return _extract_text(value["content"]) + if "parts" in value: + return _extract_text(value["parts"]) + + return "" + + +def _has_tool_result(value: Any) -> bool: + if isinstance(value, list): + return any(_has_tool_result(item) for item in value) + if isinstance(value, dict): + block_type = str(value.get("type") or "").lower() + if block_type == "tool_result" or value.get("functionResponse"): + return True + return _has_tool_result(value.get("content")) or _has_tool_result( + value.get("parts") + ) + return False + + +def _parse_role_heading_transcript( + text: str, + *, + assistant_roles: set[str], + skip_gemini_bootstrap: bool, +) -> list[ParsedMessagePair]: + pattern = re.compile( + r"(?im)^(?:#{1,6}\s*)?(USER|HUMAN|ASSISTANT|CLAUDE|MODEL|GEMINI)" + r"(?:[^\S\n]+[^\n]*)?[^\S\n]*$" + ) + matches = list(pattern.finditer(text)) + if not matches: + pattern = re.compile( + r"(?im)^(USER|HUMAN|ASSISTANT|CLAUDE|MODEL|GEMINI)\s*:\s*" + ) + matches = list(pattern.finditer(text)) + return _parse_inline_role_labels(text, matches, assistant_roles) + + pairs: list[ParsedMessagePair] = [] + current_user_query: str | None = None + assistant_chunks: list[str] = [] + + def flush_pair() -> None: + nonlocal current_user_query, assistant_chunks + if current_user_query and assistant_chunks: + pairs.append( + { + "user_query": current_user_query, + "agent_response": "\n\n".join(assistant_chunks).strip(), + } + ) + assistant_chunks = [] + + for index, match in enumerate(matches): + role = match.group(1).lower() + start = match.end() + end = matches[index + 1].start() if index + 1 < len(matches) else len(text) + content = _clean_text(text[start:end]) + content = _strip_tool_markdown(content) + if not content: + continue + if skip_gemini_bootstrap and _is_gemini_cli_setup_text(content): + continue + + if role in _USER_ROLES: + flush_pair() + current_user_query = content + elif role in assistant_roles and current_user_query: + assistant_chunks.append(content) + + flush_pair() + return pairs + + +def _parse_inline_role_labels( + text: str, + matches: list[re.Match[str]], + assistant_roles: set[str], +) -> list[ParsedMessagePair]: + pairs: list[ParsedMessagePair] = [] + current_user_query: str | None = None + assistant_chunks: list[str] = [] + + def flush_pair() -> None: + nonlocal current_user_query, assistant_chunks + if current_user_query and assistant_chunks: + pairs.append( + { + "user_query": current_user_query, + "agent_response": "\n\n".join(assistant_chunks).strip(), + } + ) + assistant_chunks = [] + + for index, match in enumerate(matches): + role = match.group(1).lower() + start = match.end() + end = matches[index + 1].start() if index + 1 < len(matches) else len(text) + content = _strip_tool_markdown(_clean_text(text[start:end])) + if not content: + continue + + if role in _USER_ROLES: + flush_pair() + current_user_query = content + elif role in assistant_roles and current_user_query: + assistant_chunks.append(content) + + flush_pair() + return pairs + + +def _strip_tool_markdown(text: str) -> str: + return _clean_text(_TOOL_MARKDOWN_RE.sub("", text)) + + +def _clean_text(text: str) -> str: + return text.strip().strip("-").strip() + + +def _is_gemini_cli_setup_text(text: str) -> bool: + return text.startswith("This is the Gemini CLI. We are setting up the context") + + +def _looks_like_gemini_markdown(text: str) -> bool: + return bool( + re.search(r"(?im)^#{1,6}\s*USER\b", text) + and re.search(r"(?im)^#{1,6}\s*(MODEL|GEMINI)\b", text) + ) + + +def _looks_like_claude_export(text: str) -> bool: + if "Claude Code" in text or ".claude/projects" in text: + return True + return bool( + re.search(r"(?im)^(?:#{1,6}\s*)?(USER|HUMAN)\b", text) + and re.search(r"(?im)^(?:#{1,6}\s*)?(ASSISTANT|CLAUDE)\b", text) + ) From ee7fc294f613df26eee97e98252db0b605741a14 Mon Sep 17 00:00:00 2001 From: Ankit Kotnala Date: Sat, 9 May 2026 00:28:25 +0530 Subject: [PATCH 2/2] Add Claude Code and Gemini transcript parsing --- src/utils/transcripts.py | 36 ++++++++++++++++++++++++++---------- 1 file changed, 26 insertions(+), 10 deletions(-) diff --git a/src/utils/transcripts.py b/src/utils/transcripts.py index a147fac..4281f71 100644 --- a/src/utils/transcripts.py +++ b/src/utils/transcripts.py @@ -97,7 +97,8 @@ def _parse_cursor_transcript(text: str) -> list[ParsedMessagePair]: continue if section.startswith("**User**"): - current_user_query = section.replace("**User**", "", 1).strip() + content = section.replace("**User**", "", 1).strip() + current_user_query = _append_user_text(current_user_query, content) elif section.startswith("**Cursor**") or section.startswith("**Assistant**"): content = ( section.replace("**Cursor**", "", 1) @@ -138,7 +139,7 @@ def _parse_antigravity_transcript(text: str) -> list[ParsedMessagePair]: } ) planner_chunks = [] - current_user_query = None + current_user_query = None elif re.match(r"###\s+Planner Response", block, re.IGNORECASE): continue @@ -154,7 +155,9 @@ def _parse_antigravity_transcript(text: str) -> list[ParsedMessagePair]: } ) planner_chunks = [] - current_user_query = block + current_user_query = block + else: + current_user_query = _append_user_text(current_user_query, block) elif re.match(r"###\s+Planner Response", prev_heading, re.IGNORECASE): if block: @@ -278,8 +281,11 @@ def flush_pair() -> None: continue if role in _USER_ROLES: - flush_pair() - current_user_query = text + if assistant_chunks: + flush_pair() + current_user_query = text + else: + current_user_query = _append_user_text(current_user_query, text) elif current_user_query: assistant_chunks.append(text) @@ -434,8 +440,11 @@ def flush_pair() -> None: continue if role in _USER_ROLES: - flush_pair() - current_user_query = content + if assistant_chunks: + flush_pair() + current_user_query = content + else: + current_user_query = _append_user_text(current_user_query, content) elif role in assistant_roles and current_user_query: assistant_chunks.append(content) @@ -472,8 +481,11 @@ def flush_pair() -> None: continue if role in _USER_ROLES: - flush_pair() - current_user_query = content + if assistant_chunks: + flush_pair() + current_user_query = content + else: + current_user_query = _append_user_text(current_user_query, content) elif role in assistant_roles and current_user_query: assistant_chunks.append(content) @@ -486,7 +498,11 @@ def _strip_tool_markdown(text: str) -> str: def _clean_text(text: str) -> str: - return text.strip().strip("-").strip() + return text.strip() + + +def _append_user_text(current: str | None, text: str) -> str: + return f"{current}\n\n{text}" if current else text def _is_gemini_cli_setup_text(text: str) -> bool: