Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 3 additions & 115 deletions server.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@

from src.pipelines.ingest import IngestPipeline
from src.pipelines.retrieval import RetrievalPipeline
from src.utils.transcripts import parse_transcript_text as _shared_parse_transcript_text


# ═══════════════════════════════════════════════════════════════════
Expand Down Expand Up @@ -693,108 +694,6 @@ def _extract_chat_pairs(url: str, html: str) -> tuple[str, str, list[dict[str, s
return provider, extraction_method, pairs


def _parse_cursor_transcript(text: str) -> list[dict[str, str]]:
"""Parse a Cursor-exported markdown transcript into message pairs."""
pairs: list[dict[str, str]] = []

sections = text.split("---")

start_idx = 0
if sections and "Exported on" in sections[0]:
start_idx = 1

current_user_query = None

for section in sections[start_idx:]:
section = section.strip()
if not section:
continue

if section.startswith("**User**"):
content = section.replace("**User**", "", 1).strip()
current_user_query = content

elif section.startswith("**Cursor**") or section.startswith("**Assistant**"):
content = section.replace("**Cursor**", "", 1).replace("**Assistant**", "", 1).strip()

if current_user_query:
pairs.append({
"user_query": current_user_query,
"agent_response": content,
})
current_user_query = None

return pairs


def _parse_antigravity_transcript(text: str) -> list[dict[str, str]]:
"""Parse an Antigravity-exported markdown transcript into message pairs.

Antigravity transcripts follow this structure::

# Chat Conversation
Note: _This is purely the output..._
### User Input
<user message>
### Planner Response
<agent response>
...

Multiple consecutive ``### Planner Response`` blocks are concatenated into
a single agent response (they occur when the agent used tools mid-turn).
"""
pairs: list[dict[str, str]] = []

text = text.replace("\r\n", "\n")

# Split on H3 headings, keeping the headings in the token list
blocks = re.split(r"(?m)^(###\s+.+)$", text)

current_user_query: str | None = None
planner_chunks: list[str] = []

for i, block in enumerate(blocks):
block = block.strip()
if not block:
continue

if re.match(r"###\s+User Input", block, re.IGNORECASE):
if current_user_query and planner_chunks:
pairs.append({
"user_query": current_user_query,
"agent_response": "\n\n".join(planner_chunks).strip(),
})
planner_chunks = []
current_user_query = None

elif re.match(r"###\s+Planner Response", block, re.IGNORECASE):
pass # content handled below

else:
if i > 0:
prev_heading = blocks[i - 1].strip() if i >= 1 else ""
if re.match(r"###\s+User Input", prev_heading, re.IGNORECASE):
if current_user_query and planner_chunks:
pairs.append({
"user_query": current_user_query,
"agent_response": "\n\n".join(planner_chunks).strip(),
})
planner_chunks = []
current_user_query = block

elif re.match(r"###\s+Planner Response", prev_heading, re.IGNORECASE):
if block:
planner_chunks.append(block)

if current_user_query and planner_chunks:
pairs.append({
"user_query": current_user_query,
"agent_response": "\n\n".join(planner_chunks).strip(),
})

return pairs


async def _parse_transcript_with_llm(text: str) -> list[dict[str, str]]:
"""Use an LLM to parse transcript text when format detection fails."""
from src.models import get_model
Expand Down Expand Up @@ -842,19 +741,8 @@ async def _parse_transcript_with_llm(text: str) -> list[dict[str, str]]:


def _parse_transcript_text(text: str) -> tuple[str, list[dict[str, str]]]:
"""Parse transcript text and return (format, pairs)."""
if "_Exported on" in text and "from Cursor" in text:
pairs = _parse_cursor_transcript(text)
if pairs:
return "cursor", pairs

# Detect Antigravity format
if "# Chat Conversation" in text and ("### User Input" in text or "### Planner Response" in text):
pairs = _parse_antigravity_transcript(text)
if pairs:
return "antigravity", pairs

return "unknown", []
"""Compatibility wrapper around the shared transcript parser."""
return _shared_parse_transcript_text(text)


async def _scrape_chat_share(url: str) -> dict[str, Any]:
Expand Down
173 changes: 13 additions & 160 deletions src/api/routes/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
WeaverSummary,
)
from src.pipelines.retrieval import RetrievalPipeline
from src.utils.transcripts import parse_transcript_text

from bs4 import BeautifulSoup
import json
Expand Down Expand Up @@ -307,146 +308,6 @@ def _extract_chat_pairs(url: str, html: str) -> tuple[str, str, List[MessagePair
return provider, extraction_method, pairs


def _parse_cursor_transcript(text: str) -> List[MessagePair]:
"""Parse a Cursor-exported markdown transcript into message pairs.

Cursor transcripts have the format:
_Exported on ... from Cursor_
---
**User**
<user message>
---
**Cursor**
<agent response>
---
...
"""
pairs: List[MessagePair] = []

# Split by --- separator
sections = text.split("---")

# Skip the first section if it's the header (contains "Exported on")
start_idx = 0
if sections and "Exported on" in sections[0]:
start_idx = 1

current_user_query = None

for section in sections[start_idx:]:
section = section.strip()
if not section:
continue

# Check if this is a User message
if section.startswith("**User**"):
# Extract the user message (remove the **User** header)
content = section.replace("**User**", "", 1).strip()
current_user_query = content

# Check if this is a Cursor/Agent message
elif section.startswith("**Cursor**") or section.startswith("**Assistant**"):
# Extract the agent response
content = section.replace("**Cursor**", "", 1).replace("**Assistant**", "", 1).strip()

# If we have a user query, create a pair
if current_user_query:
pairs.append(MessagePair(
user_query=current_user_query,
agent_response=content,
))
current_user_query = None

return pairs


def _parse_antigravity_transcript(text: str) -> List[MessagePair]:
"""Parse an Antigravity-exported markdown transcript into message pairs.

Antigravity transcripts exported from the Antigravity coding assistant
follow this format::

# Chat Conversation

Note: _This is purely the output of the chat conversation..._

### User Input

<user message>

### Planner Response

<agent response>

### User Input

...

Multiple consecutive ``### Planner Response`` blocks (e.g. when the agent
used tools between messages) are concatenated into a single agent response.
"""
pairs: List[MessagePair] = []

# Normalise line endings
text = text.replace("\r\n", "\n")

# Split into blocks by H3 headings (### ...)
# We keep the heading so we know which role each block belongs to.
blocks = re.split(r"(?m)^(###\s+.+)$", text)

current_user_query: str | None = None
planner_chunks: List[str] = []

for i, block in enumerate(blocks):
block = block.strip()
if not block:
continue

if re.match(r"###\s+User Input", block, re.IGNORECASE):
# Flush any pending planner chunks as a completed pair
if current_user_query and planner_chunks:
pairs.append(MessagePair(
user_query=current_user_query,
agent_response="\n\n".join(planner_chunks).strip(),
))
planner_chunks = []
# The next block (index i+1) is the content of this user turn
current_user_query = None # will be filled by the content block below

elif re.match(r"###\s+Planner Response", block, re.IGNORECASE):
# The next content block belongs to the agent
pass # content handled in the else branch below

else:
# This is a content block — figure out which role it belongs to by
# looking at the previous heading token.
if i > 0:
prev_heading = blocks[i - 1].strip() if i >= 1 else ""
if re.match(r"###\s+User Input", prev_heading, re.IGNORECASE):
# New user turn — flush previous pair first
if current_user_query and planner_chunks:
pairs.append(MessagePair(
user_query=current_user_query,
agent_response="\n\n".join(planner_chunks).strip(),
))
planner_chunks = []
current_user_query = block

elif re.match(r"###\s+Planner Response", prev_heading, re.IGNORECASE):
# Accumulate (multiple tool-use steps = multiple planner blocks)
if block:
planner_chunks.append(block)

# Flush last pair
if current_user_query and planner_chunks:
pairs.append(MessagePair(
user_query=current_user_query,
agent_response="\n\n".join(planner_chunks).strip(),
))

return pairs


async def _parse_transcript_with_llm(text: str) -> List[MessagePair]:
"""Use an LLM to parse transcript text when format detection fails."""
from src.models import get_model
Expand Down Expand Up @@ -492,24 +353,6 @@ async def _parse_transcript_with_llm(text: str) -> List[MessagePair]:
return []


def _parse_transcript_text(text: str) -> tuple[str, List[MessagePair]]:
"""Parse transcript text and return (format, pairs)."""

# Detect Cursor format
if "_Exported on" in text and "from Cursor" in text:
pairs = _parse_cursor_transcript(text)
if pairs:
return "cursor", pairs

# Detect Antigravity format
if "# Chat Conversation" in text and ("### User Input" in text or "### Planner Response" in text):
pairs = _parse_antigravity_transcript(text)
if pairs:
return "antigravity", pairs

return "unknown", []


async def _scrape_chat_share(url: str) -> Dict[str, Any]:
html, final_url = await _render_chat_share(url)
provider, extraction_method, pairs = _extract_chat_pairs(final_url or url, html)
Expand Down Expand Up @@ -778,7 +621,10 @@ async def scrape_chat_link(req: ScrapeRequest, request: Request):
)
async def parse_transcript(
request: Request,
file: UploadFile = File(..., description="Chat transcript file (.txt, .md, .json)")
file: UploadFile = File(
...,
description="Chat transcript file (.txt, .md, .json, .jsonl)",
)
):
start = time.perf_counter()

Expand All @@ -791,7 +637,14 @@ async def parse_transcript(
return _error(request, "Uploaded file is empty.", 400)

# Try to parse the transcript
format_detected, pairs = _parse_transcript_text(text)
format_detected, parsed_pairs = parse_transcript_text(text)
pairs = [MessagePair(**pair) for pair in parsed_pairs]
if pairs:
logger.info(
"Parsed transcript format=%s pairs=%d",
format_detected,
len(pairs),
)

# If no pairs found, try LLM fallback
if not pairs:
Expand Down
Loading
Loading