Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Changed

- iOS: rebuilt the data layer around streaming so opening large tables and running ad-hoc queries no longer materialises the entire result set in memory. New shared types in `TableProModels` (`Cell`, `Row`, `CellRef`, `StreamElement`, `StreamOptions`) and a new `executeStreaming(query:options:)` method on `DatabaseDriver` (with a backwards-compatible default implementation, so the macOS app and all 14 plugins keep working unchanged). MySQL switched from `mysql_store_result` (buffered, libmariadb mallocs the full result before Swift sees the first row) to `mysql_use_result` (server-side cursor, one row per fetch). Postgres switched from `PQexec` to `PQsendQuery` + `PQsetSingleRowMode` + `PQgetResult` per-row. SQLite stops eagerly base64-encoding BLOBs; cells now arrive as `Cell.binary(byteCount:ref:)` carrying only the byte count and a re-fetch reference. Cell-text > 4 KB arrives as `Cell.truncatedText` carrying a 4 KB prefix plus total length and ref. New `MemoryPressureMonitor` actor wraps `DispatchSourceMemoryPressure` + `os_proc_available_memory` and is started at app launch in `TableProMobileApp`; `DataBrowserViewModel` and `QueryEditorViewModel` (new, `@Observable`) listen and call `RowWindow.shrink(to:)` on warning (100 rows) or critical (50 rows + cancel in-flight stream). `RowWindow` is a bounded sliding buffer (default 200 rows) replacing the unbounded `[[String?]]` `@State` arrays in views. Cancellation propagates through `AsyncThrowingStream.onTermination` to the C layer (`PQcancel`, `sqlite3_interrupt`). New `StreamingExporter` writes CSV / JSON / SQL to a `FileHandle` in O(1) memory regardless of result size. The TabView in `RowDetailView` (which pre-rendered 2-3 full rows on iPad split view) is replaced with a single `rowContent` plus horizontal `DragGesture` for prev/next, so memory cost is O(1) per row.

### Added

- Linked SQL Folders. Point TablePro at a folder of `.sql` files (e.g. a Git repo of shared queries) and they appear in the Favorites sidebar live. Recursive subfolder watching via `FSEventStreamCreate`; nested directory hierarchy preserved. Add a folder via the `+` button at the bottom of the Favorites sidebar (Mail.app menu pattern), or right-click an existing linked folder for `Add Another SQL Folder...`. Files keep their on-disk identity: clicking a linked file opens it as a regular editor tab, `⌘S` writes back to disk. External modifications surface as a yellow live banner above the editor tab with one-click `Reload`. Save-time conflict prompt shows a side-by-side diff sheet (line-level, via `Swift.CollectionDifference`) with `Keep My Changes` / `Reload from Disk` / `Cancel`. Right-click a linked file for `Edit Metadata...` to update `@name` / `@keyword` / `@description` frontmatter without opening the editor. Drag a favorite row (linked or DB-stored) onto the SQL editor to insert its content. Non-UTF-8 files are detected via `String(contentsOf: usedEncoding:)`, surfaced with a yellow warning icon in the sidebar, and saved back in their original encoding (UTF-16, ISO Latin-1, etc.). UTF-8 BOM bytes at the top of a frontmatter file no longer make metadata silently disappear. Per-connection scope or global. Frontmatter feeds autocomplete keyword expansion alongside DB-stored favorites. No Pro license required.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ public protocol DatabaseDriver: AnyObject, Sendable {
func ping() async throws -> Bool

func execute(query: String) async throws -> QueryResult
func executeStreaming(query: String, options: StreamOptions) -> AsyncThrowingStream<StreamElement, Error>
func cancelCurrentQuery() async throws

func fetchTables(schema: String?) async throws -> [TableInfo]
Expand All @@ -28,3 +29,51 @@ public protocol DatabaseDriver: AnyObject, Sendable {

var serverVersion: String? { get }
}

public extension DatabaseDriver {
func executeStreaming(query: String, options: StreamOptions = .default) -> AsyncThrowingStream<StreamElement, Error> {
AsyncThrowingStream { continuation in
let task = Task {
do {
let result = try await self.execute(query: query)
continuation.yield(.columns(result.columns))

var emitted = 0
for legacyRow in result.rows {
if Task.isCancelled {
continuation.yield(.truncated(reason: .cancelled))
break
}
if emitted >= options.maxRows {
continuation.yield(.truncated(reason: .rowCap(options.maxRows)))
break
}
let cells = legacyRow.enumerated().map { index, value -> Cell in
let typeName = index < result.columns.count ? result.columns[index].typeName : nil
return Cell.from(legacyValue: value, columnTypeName: typeName, options: options)
}
continuation.yield(.row(Row(cells: cells)))
emitted += 1
}

if let message = result.statusMessage {
continuation.yield(.statusMessage(message))
}
if result.rowsAffected != 0 {
continuation.yield(.rowsAffected(result.rowsAffected))
}
if result.isTruncated && emitted < options.maxRows {
continuation.yield(.truncated(reason: .driverLimit("driver returned isTruncated=true")))
}
continuation.finish()
} catch is CancellationError {
continuation.yield(.truncated(reason: .cancelled))
continuation.finish()
} catch {
continuation.finish(throwing: error)
}
}
continuation.onTermination = { _ in task.cancel() }
}
}
}
113 changes: 113 additions & 0 deletions Packages/TableProCore/Sources/TableProModels/Cell.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import Foundation

public enum Cell: Sendable {
case null
case text(String)
case truncatedText(prefix: String, totalBytes: Int, ref: CellRef?)
case binary(byteCount: Int, ref: CellRef?)
}

public extension Cell {
var displayString: String {
switch self {
case .null:
return "NULL"
case .text(let value):
return value
case .truncatedText(let head, let total, _):
return head + "... (\(byteCountFormatter.string(fromByteCount: Int64(total))))"
case .binary(let count, _):
return "[BLOB \(byteCountFormatter.string(fromByteCount: Int64(count)))]"
}
}

var isLoadable: Bool {
switch self {
case .truncatedText(_, _, let ref), .binary(_, let ref):
return ref != nil
case .text, .null:
return false
}
}

var fullValueRef: CellRef? {
switch self {
case .truncatedText(_, _, let ref), .binary(_, let ref):
return ref
case .text, .null:
return nil
}
}
}

public struct CellRef: Sendable, Hashable {
public let table: String
public let column: String
public let primaryKey: [PrimaryKeyComponent]

public init(table: String, column: String, primaryKey: [PrimaryKeyComponent]) {
self.table = table
self.column = column
self.primaryKey = primaryKey
}
}

public struct PrimaryKeyComponent: Sendable, Hashable {
public let column: String
public let value: String

public init(column: String, value: String) {
self.column = column
self.value = value
}
}

public struct Row: Sendable {
public let cells: [Cell]

public init(cells: [Cell]) {
self.cells = cells
}
}

public extension Row {
var legacyValues: [String?] {
cells.map { cell -> String? in
switch cell {
case .null:
return nil
case .text(let value):
return value
case .truncatedText, .binary:
return cell.displayString
}
}
}
}

public extension Cell {
static func from(legacyValue value: String?, columnTypeName: String?, options: StreamOptions, ref: CellRef? = nil) -> Cell {
guard let value else { return .null }
let bytes = value.utf8.count
let upper = (columnTypeName ?? "").uppercased()
let isBinary = upper.contains("BLOB") || upper.contains("BYTEA") || upper.contains("BINARY") || upper.contains("VARBINARY") || upper.contains("IMAGE")

if isBinary {
return .binary(byteCount: bytes, ref: ref)
}

if bytes > options.textTruncationBytes {
let prefixSlice = value.prefix(options.textTruncationBytes)
return .truncatedText(prefix: String(prefixSlice), totalBytes: bytes, ref: ref)
}

return .text(value)
}
}

private let byteCountFormatter: ByteCountFormatter = {
let formatter = ByteCountFormatter()
formatter.allowedUnits = [.useBytes, .useKB, .useMB, .useGB]
formatter.countStyle = .binary
return formatter
}()
47 changes: 47 additions & 0 deletions Packages/TableProCore/Sources/TableProModels/StreamingResult.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import Foundation

public enum StreamElement: Sendable {
case columns([ColumnInfo])
case row(Row)
case rowsAffected(Int)
case statusMessage(String)
case truncated(reason: TruncationReason)
}

public enum TruncationReason: Sendable {
case rowCap(Int)
case memoryPressure
case cancelled
case driverLimit(String)
}

public struct StreamOptions: Sendable {
public let textTruncationBytes: Int
public let inlineBinary: Bool
public let maxRows: Int
public let lazyContext: LazyContext?

public init(
textTruncationBytes: Int = 4_096,
inlineBinary: Bool = false,
maxRows: Int = 100_000,
lazyContext: LazyContext? = nil
) {
self.textTruncationBytes = textTruncationBytes
self.inlineBinary = inlineBinary
self.maxRows = maxRows
self.lazyContext = lazyContext
}

public static let `default` = StreamOptions()
}

public struct LazyContext: Sendable {
public let table: String
public let primaryKeyColumns: [String]

public init(table: String, primaryKeyColumns: [String]) {
self.table = table
self.primaryKeyColumns = primaryKeyColumns
}
}
144 changes: 144 additions & 0 deletions TableProMobile/TableProMobile/Drivers/MySQLDriver.swift
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,50 @@ final class MySQLDriver: DatabaseDriver, @unchecked Sendable {
// No-op for mobile.
}

func executeStreaming(query: String, options: StreamOptions) -> AsyncThrowingStream<StreamElement, Error> {
let actor = self.actor
return AsyncThrowingStream { continuation in
let task = Task {
do {
let beginResult = try await actor.beginStream(query: query)
switch beginResult {
case .noResult(let affectedRows):
if affectedRows != 0 {
continuation.yield(.rowsAffected(affectedRows))
}
continuation.finish()
return
case .rowSet(let columns):
continuation.yield(.columns(columns))
var emitted = 0
while !Task.isCancelled, emitted < options.maxRows {
guard let cells = try await actor.fetchNextRow(options: options, columns: columns) else {
break
}
continuation.yield(.row(Row(cells: cells)))
emitted += 1
}
if Task.isCancelled {
continuation.yield(.truncated(reason: .cancelled))
} else if emitted >= options.maxRows {
continuation.yield(.truncated(reason: .rowCap(options.maxRows)))
}
await actor.endStream()
continuation.finish()
}
} catch is CancellationError {
await actor.endStream()
continuation.yield(.truncated(reason: .cancelled))
continuation.finish()
} catch {
await actor.endStream()
continuation.finish(throwing: error)
}
}
continuation.onTermination = { _ in task.cancel() }
}
}

// MARK: - Schema

func fetchTables(schema: String?) async throws -> [TableInfo] {
Expand Down Expand Up @@ -351,6 +395,106 @@ private actor MySQLActor {
rowsAffected: affected, executionTime: Date().timeIntervalSince(start), isTruncated: isTruncated
)
}

// MARK: - Streaming

private var streamingResult: UnsafeMutablePointer<MYSQL_RES>?
private var streamingColumns: [ColumnInfo] = []

func beginStream(query: String) throws -> MySQLBeginStreamResult {
guard let mysql else { throw MySQLError.notConnected }
if streamingResult != nil {
endStream()
}

guard mysql_real_query(mysql, query, UInt(query.utf8.count)) == 0 else {
throw MySQLError.queryFailed(String(cString: mysql_error(mysql)))
}

guard let result = mysql_use_result(mysql) else {
if mysql_field_count(mysql) != 0 {
throw MySQLError.queryFailed(String(cString: mysql_error(mysql)))
}
let raw = mysql_affected_rows(mysql)
let affected = raw == .max ? 0 : Int(clamping: raw)
return .noResult(affectedRows: affected)
}

streamingResult = result

let fieldCount = Int(mysql_num_fields(result))
var columns: [ColumnInfo] = []
if let fields = mysql_fetch_fields(result) {
for i in 0..<fieldCount {
let field = fields[i]
let name = field.name.map { String(cString: $0) } ?? ""
columns.append(ColumnInfo(
name: name,
typeName: mysqlFieldTypeName(field.type.rawValue),
isPrimaryKey: false,
isNullable: true,
defaultValue: nil,
comment: nil,
characterMaxLength: nil,
ordinalPosition: i
))
}
}
streamingColumns = columns
return .rowSet(columns)
}

func fetchNextRow(options: StreamOptions, columns: [ColumnInfo]) -> [Cell]? {
guard let result = streamingResult else { return nil }
guard let row = mysql_fetch_row(result) else { return nil }

let lengths = mysql_fetch_lengths(result)
var cells: [Cell] = []
cells.reserveCapacity(columns.count)

for i in 0..<columns.count {
if let value = row[i] {
let len = Int(clamping: lengths?[i] ?? 0)
let data = Data(bytes: value, count: len)
let str = String(data: data, encoding: .utf8) ?? String(cString: value)
let cell = Cell.from(
legacyValue: str,
columnTypeName: columns[i].typeName,
options: options,
ref: makeCellRef(column: columns[i].name, row: row, options: options, columns: columns)
)
cells.append(cell)
} else {
cells.append(.null)
}
}
return cells
}

func endStream() {
guard let result = streamingResult else { return }
while mysql_fetch_row(result) != nil {}
mysql_free_result(result)
streamingResult = nil
streamingColumns = []
}

private func makeCellRef(column: String, row: MYSQL_ROW, options: StreamOptions, columns: [ColumnInfo]) -> CellRef? {
guard let lazyContext = options.lazyContext, !lazyContext.primaryKeyColumns.isEmpty else { return nil }

var pkComponents: [PrimaryKeyComponent] = []
for pkColumn in lazyContext.primaryKeyColumns {
guard let columnIndex = columns.firstIndex(where: { $0.name == pkColumn }) else { return nil }
guard let cValue = row[columnIndex] else { return nil }
pkComponents.append(PrimaryKeyComponent(column: pkColumn, value: String(cString: cValue)))
}
return CellRef(table: lazyContext.table, column: column, primaryKey: pkComponents)
}
}

enum MySQLBeginStreamResult: Sendable {
case rowSet([ColumnInfo])
case noResult(affectedRows: Int)
}

// MARK: - MySQL Field Type Names
Expand Down
Loading
Loading