Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions posthog/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,10 @@ def __init__(

self.project_root = project_root

self.use_ai_ingestion_pipeline = os.environ.get(
"LLMA_INGESTION_PIPELINE", "false"
).lower() in ("true", "1", "yes")

# personal_api_key: This should be a generated Personal API Key, private
self.personal_api_key = personal_api_key
if debug:
Expand Down Expand Up @@ -316,6 +320,7 @@ def __init__(
retries=max_retries,
timeout=timeout,
historical_migration=historical_migration,
use_ai_ingestion_pipeline=self.use_ai_ingestion_pipeline,
)
self.consumers.append(consumer)

Expand Down
77 changes: 72 additions & 5 deletions posthog/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def __init__(
retries=10,
timeout=15,
historical_migration=False,
use_ai_ingestion_pipeline=False,
):
"""Create a consumer thread."""
Thread.__init__(self)
Expand All @@ -57,6 +58,7 @@ def __init__(
self.retries = retries
self.timeout = timeout
self.historical_migration = historical_migration
self.use_ai_ingestion_pipeline = use_ai_ingestion_pipeline

def run(self):
"""Runs the consumer."""
Expand Down Expand Up @@ -136,17 +138,82 @@ def fatal_exception(exc):
# retry on all other errors (eg. network)
return False

if self.use_ai_ingestion_pipeline:
ai_events = []
non_ai_events = []

for item in batch:
event_name = item.get("event", "")
if event_name.startswith("$ai_"):
ai_events.append(item)
else:
non_ai_events.append(item)

for ai_event in ai_events:
self._send_ai_event(ai_event, fatal_exception)

if non_ai_events:

@backoff.on_exception(
backoff.expo,
Exception,
max_tries=self.retries + 1,
giveup=fatal_exception,
)
def send_batch_request():
batch_post(
self.api_key,
self.host,
gzip=self.gzip,
timeout=self.timeout,
batch=non_ai_events,
historical_migration=self.historical_migration,
)

send_batch_request()
else:
@backoff.on_exception(
backoff.expo,
Exception,
max_tries=self.retries + 1,
giveup=fatal_exception,
)
def send_request():
batch_post(
self.api_key,
self.host,
gzip=self.gzip,
timeout=self.timeout,
batch=batch,
historical_migration=self.historical_migration,
)

send_request()

def _send_ai_event(self, event, fatal_exception):
"""Send a single AI event to the /i/v0/ai endpoint"""
from posthog.request import ai_post
from posthog.utils import extract_ai_blob_properties

# Extract blob properties from the event
properties = event.get("properties", {})
cleaned_properties, blobs = extract_ai_blob_properties(properties)

@backoff.on_exception(
backoff.expo, Exception, max_tries=self.retries + 1, giveup=fatal_exception
)
def send_request():
batch_post(
def send_ai_request():
ai_post(
self.api_key,
self.host,
gzip=self.gzip,
timeout=self.timeout,
batch=batch,
historical_migration=self.historical_migration,
event_name=event.get("event"),
distinct_id=event.get("distinct_id"),
properties=cleaned_properties,
blobs=blobs,
timestamp=event.get("timestamp"),
uuid=event.get("uuid"),
)

send_request()
send_ai_request()
166 changes: 165 additions & 1 deletion posthog/request.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import json
import logging
import re
import secrets
import socket
from dataclasses import dataclass
from datetime import date, datetime
from gzip import GzipFile
from io import BytesIO
from typing import Any, List, Optional, Tuple, Union

from uuid import uuid4

import requests
from dateutil.tz import tzutc
Expand Down Expand Up @@ -318,3 +319,166 @@ def default(self, obj: Any):
return obj.isoformat()

return json.JSONEncoder.default(self, obj)


def build_ai_multipart_request(
event_name: str,
distinct_id: str,
properties: dict[str, Any],
blobs: dict[str, Any],
timestamp: Optional[str] = None,
event_uuid: Optional[str] = None,
) -> tuple[bytes, str]:
"""
Build a multipart/form-data request body for AI events.

Args:
event_name: The event name (e.g., "$ai_generation")
distinct_id: The distinct ID for the event
properties: Event properties (without blob properties)
blobs: Dictionary of blob properties to include as separate parts
timestamp: Optional timestamp for the event
event_uuid: Optional UUID for the event

Returns:
Tuple of (body_bytes, boundary) for the multipart request

Format follows the /i/v0/ai endpoint spec:
Part 1: "event" - JSON with {uuid, event, distinct_id, timestamp}
Part 2: "event.properties" - JSON with non-blob properties
Part 3+: "event.properties.$ai_input" etc. - Blob data as JSON
"""
# Generate a random boundary that's unlikely to appear in the data
boundary = "----WebKitFormBoundary" + secrets.token_hex(16)

# Ensure we have a UUID
if event_uuid is None:
event_uuid = str(uuid4())

# Build the event part
event_data = {
"uuid": event_uuid,
"event": event_name,
"distinct_id": distinct_id,
}
if timestamp is not None:
event_data["timestamp"] = timestamp

# Build multipart body
parts = []

# Part 1: event
parts.append(f"--{boundary}\r\n".encode())
parts.append(b'Content-Disposition: form-data; name="event"\r\n')
parts.append(b"Content-Type: application/json\r\n\r\n")
parts.append(json.dumps(event_data, cls=DatetimeSerializer).encode("utf-8"))
parts.append(b"\r\n")

# Part 2: event.properties
parts.append(f"--{boundary}\r\n".encode())
parts.append(b'Content-Disposition: form-data; name="event.properties"\r\n')
parts.append(b"Content-Type: application/json\r\n\r\n")
parts.append(json.dumps(properties, cls=DatetimeSerializer).encode("utf-8"))
parts.append(b"\r\n")

# Part 3+: blob parts
for blob_name, blob_value in blobs.items():
parts.append(f"--{boundary}\r\n".encode())
parts.append(
f'Content-Disposition: form-data; name="event.properties.{blob_name}"\r\n'.encode()
)
parts.append(b"Content-Type: application/json\r\n\r\n")
parts.append(json.dumps(blob_value, cls=DatetimeSerializer).encode("utf-8"))
parts.append(b"\r\n")

# Final boundary
parts.append(f"--{boundary}--\r\n".encode())

# Combine all parts
body = b"".join(parts)

return body, boundary


def ai_post(
api_key: str,
host: Optional[str] = None,
gzip: bool = False,
timeout: int = 15,
**kwargs,
) -> requests.Response:
"""
Post an AI event to the /i/v0/ai endpoint using multipart/form-data.

Args:
api_key: The PostHog API key
host: The host to post to
gzip: Whether to gzip compress the request
timeout: Request timeout in seconds
**kwargs: Event parameters including event_name, distinct_id, properties, blobs, etc.

Returns:
The response from the server

Raises:
APIError: If the request fails
"""
log = logging.getLogger("posthog")

# Extract event parameters
event_name = kwargs.get("event_name")
distinct_id = kwargs.get("distinct_id")
properties = kwargs.get("properties", {})
blobs = kwargs.get("blobs", {})
timestamp = kwargs.get("timestamp")
event_uuid = kwargs.get("uuid")

# Build multipart request
body, boundary = build_ai_multipart_request(
event_name=event_name,
distinct_id=distinct_id,
properties=properties,
blobs=blobs,
timestamp=timestamp,
event_uuid=event_uuid,
)

# Optionally gzip compress the body if enabled and body is large enough
# Spec recommends compression for requests > 10KB
data = body
headers = {
"Content-Type": f"multipart/form-data; boundary={boundary}",
"Authorization": f"Bearer {api_key}",
"User-Agent": USER_AGENT,
}

if gzip or len(body) > 10 * 1024: # Compress if gzip enabled or body > 10KB
headers["Content-Encoding"] = "gzip"
buf = BytesIO()
with GzipFile(fileobj=buf, mode="w") as gz:
gz.write(body)
data = buf.getvalue()
log.debug("Compressed AI event from %d bytes to %d bytes", len(body), len(data))

url = remove_trailing_slash(host or DEFAULT_HOST) + "/i/v0/ai"
log.debug("Posting AI event to %s", url)
log.debug(
"Event: %s, Distinct ID: %s, Blobs: %s",
event_name,
distinct_id,
list(blobs.keys()),
)

res = _session.post(url, data=data, headers=headers, timeout=timeout)

if res.status_code == 200:
log.debug("AI event uploaded successfully")
return res

# Handle errors
try:
payload = res.json()
log.debug("Received error response: %s", payload)
raise APIError(res.status_code, payload.get("detail", "Unknown error"))
except (KeyError, ValueError):
raise APIError(res.status_code, res.text)
Loading
Loading