From f4029b53346a419500825c6be1aa9a55da8f18e7 Mon Sep 17 00:00:00 2001 From: phoenix Date: Sat, 25 Apr 2026 21:50:14 +0800 Subject: [PATCH] feat: add streaming JSON parsing API - JSONStreamParser: push-based streaming parser for JSON Lines and JSON Array modes - JSON Lines: extracts multiple JSON documents from a byte stream using STOP_WHEN_DONE - JSON Array: state machine to parse elements from a large JSON array one by one - Internal buffer with lazy compaction for efficient memory management - JSONIncrementalReader: accumulates chunks for large single-document parsing - StreamingJSONLinesDecoder / StreamingJSONArrayDecoder: Codable-layer streaming decoders - JSONValueStream / DecodingStream: AsyncSequence adapters for async byte streams - AsyncSequence extensions: .jsonValues() and .decode() convenience methods - Document.streamParse: internal API using yyjson_doc_get_read_size for accurate byte counting - 33 new tests covering JSON Lines, JSON Array, incremental, edge cases, and Codable layer - All 755 existing tests pass with zero regressions --- Sources/ReerJSON/StreamDecoder.swift | 459 ++++++++++++++++++ Sources/ReerJSON/StreamParser.swift | 386 +++++++++++++++ Sources/ReerJSON/Value.swift | 64 +++ Tests/ReerJSONTests/StreamParserTests.swift | 491 ++++++++++++++++++++ 4 files changed, 1400 insertions(+) create mode 100644 Sources/ReerJSON/StreamDecoder.swift create mode 100644 Sources/ReerJSON/StreamParser.swift create mode 100644 Tests/ReerJSONTests/StreamParserTests.swift diff --git a/Sources/ReerJSON/StreamDecoder.swift b/Sources/ReerJSON/StreamDecoder.swift new file mode 100644 index 0000000..40a1256 --- /dev/null +++ b/Sources/ReerJSON/StreamDecoder.swift @@ -0,0 +1,459 @@ +// +// Copyright © 2026 reers. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +import Foundation + +// MARK: - StreamingJSONLinesDecoder + +/// A streaming decoder for JSON Lines (NDJSON) format. +/// +/// Each top-level JSON value in the stream is decoded into `T`. +/// +/// ```swift +/// var decoder = StreamingJSONLinesDecoder(Item.self) +/// let items1 = try decoder.parseBuffer(chunk1) +/// let items2 = try decoder.parseBuffer(chunk2) +/// let remaining = try decoder.finalize() +/// ``` +public struct StreamingJSONLinesDecoder: @unchecked Sendable { + + private var parser: JSONStreamParser + // ReerJSONDecoder uses internal locking, safe to share. + private let decoder: ReerJSONDecoder + private let type: T.Type + + /// Creates a new JSON Lines streaming decoder. + /// + /// - Parameters: + /// - type: The `Decodable` type to decode each value into. + /// - options: Options for reading JSON. + /// - decoder: An optional ``ReerJSONDecoder`` with custom strategies. + /// If `nil`, a default decoder is used. + public init( + _ type: T.Type, + options: JSONReadOptions = .default, + decoder: ReerJSONDecoder? = nil + ) { + self.type = type + self.parser = JSONStreamParser(mode: .jsonLines, options: options) + self.decoder = decoder ?? ReerJSONDecoder() + } + + /// Feeds data to the decoder and returns all decoded values. + /// + /// - Parameter data: New data to append. + /// - Returns: An array of decoded `T` values. + /// - Throws: ``JSONError`` or `DecodingError` on failure. + public mutating func parseBuffer(_ data: Data) throws -> [T] { + let values = try parser.parse(data) + return try values.map { value in + let data = try value.data() + return try decoder.decode(type, from: data) + } + } + + /// Signals end-of-stream and returns any remaining decoded values. + /// + /// - Returns: An array of remaining decoded `T` values. + /// - Throws: ``JSONError`` or `DecodingError` on failure. + public mutating func finalize() throws -> [T] { + let values = try parser.finalize() + return try values.map { value in + let data = try value.data() + return try decoder.decode(type, from: data) + } + } + + /// Resets the decoder to its initial state. + public mutating func reset() { + parser.reset() + } +} + +// MARK: - StreamingJSONArrayDecoder + +/// A streaming decoder for JSON array format. +/// +/// The stream is expected to be a single JSON array. Each element is decoded +/// individually as it becomes available. +/// +/// ```swift +/// var decoder = StreamingJSONArrayDecoder(Item.self) +/// let items1 = try decoder.parseBuffer(chunk1) +/// let items2 = try decoder.parseBuffer(chunk2) +/// let remaining = try decoder.finalize() +/// ``` +public struct StreamingJSONArrayDecoder: @unchecked Sendable { + + private var parser: JSONStreamParser + // ReerJSONDecoder uses internal locking, safe to share. + private let decoder: ReerJSONDecoder + private let type: T.Type + + /// Creates a new JSON array streaming decoder. + /// + /// - Parameters: + /// - type: The `Decodable` type to decode each element into. + /// - options: Options for reading JSON. + /// - decoder: An optional ``ReerJSONDecoder`` with custom strategies. + /// If `nil`, a default decoder is used. + public init( + _ type: T.Type, + options: JSONReadOptions = .default, + decoder: ReerJSONDecoder? = nil + ) { + self.type = type + self.parser = JSONStreamParser(mode: .jsonArray, options: options) + self.decoder = decoder ?? ReerJSONDecoder() + } + + /// Feeds data to the decoder and returns all decoded elements. + /// + /// - Parameter data: New data to append. + /// - Returns: An array of decoded `T` values. + /// - Throws: ``JSONError`` or `DecodingError` on failure. + public mutating func parseBuffer(_ data: Data) throws -> [T] { + let values = try parser.parse(data) + return try values.map { value in + let data = try value.data() + return try decoder.decode(type, from: data) + } + } + + /// Signals end-of-stream and returns any remaining decoded elements. + /// + /// - Returns: An array of remaining decoded `T` values. + /// - Throws: ``JSONError`` or `DecodingError` on failure. + public mutating func finalize() throws -> [T] { + let values = try parser.finalize() + return try values.map { value in + let data = try value.data() + return try decoder.decode(type, from: data) + } + } + + /// Resets the decoder to its initial state. + public mutating func reset() { + parser.reset() + } +} + +// MARK: - AsyncSequence Adapters + +/// An `AsyncSequence` that yields ``JSONValue`` items from chunks of `Data`. +@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +public struct JSONValueStream: AsyncSequence, Sendable +where Source.Element == Data { + public typealias Element = JSONValue + + let source: Source + let mode: JSONStreamMode + let options: JSONReadOptions + + public func makeAsyncIterator() -> Iterator { + Iterator(source: source.makeAsyncIterator(), mode: mode, options: options) + } + + public struct Iterator: AsyncIteratorProtocol { + var sourceIterator: Source.AsyncIterator + var parser: JSONStreamParser + var pending: [JSONValue] = [] + var pendingIndex: Int = 0 + var sourceExhausted = false + + init(source: Source.AsyncIterator, mode: JSONStreamMode, options: JSONReadOptions) { + self.sourceIterator = source + self.parser = JSONStreamParser(mode: mode, options: options) + } + + public mutating func next() async throws -> JSONValue? { + while true { + if pendingIndex < pending.count { + let value = pending[pendingIndex] + pendingIndex += 1 + if pendingIndex >= pending.count { + pending.removeAll(keepingCapacity: true) + pendingIndex = 0 + } + return value + } + + if sourceExhausted { + return nil + } + + guard let chunk = try await sourceIterator.next() else { + sourceExhausted = true + let remaining = try parser.finalize() + if !remaining.isEmpty { + pending = remaining + pendingIndex = 0 + continue + } + return nil + } + + let values = try parser.parse(chunk) + if !values.isEmpty { + pending = values + pendingIndex = 0 + } + } + } + } +} + +/// An `AsyncSequence` that yields ``JSONValue`` items from an `AsyncSequence` of bytes. +@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +public struct JSONValueByteStream: AsyncSequence, Sendable +where Source.Element == UInt8 { + public typealias Element = JSONValue + + let source: Source + let mode: JSONStreamMode + let options: JSONReadOptions + let chunkSize: Int + + public func makeAsyncIterator() -> Iterator { + Iterator( + source: source.makeAsyncIterator(), + mode: mode, options: options, + chunkSize: chunkSize + ) + } + + public struct Iterator: AsyncIteratorProtocol { + var sourceIterator: Source.AsyncIterator + var parser: JSONStreamParser + var pending: [JSONValue] = [] + var pendingIndex: Int = 0 + var sourceExhausted = false + let chunkSize: Int + + init( + source: Source.AsyncIterator, + mode: JSONStreamMode, options: JSONReadOptions, + chunkSize: Int + ) { + self.sourceIterator = source + self.parser = JSONStreamParser(mode: mode, options: options) + self.chunkSize = chunkSize + } + + public mutating func next() async throws -> JSONValue? { + while true { + if pendingIndex < pending.count { + let value = pending[pendingIndex] + pendingIndex += 1 + if pendingIndex >= pending.count { + pending.removeAll(keepingCapacity: true) + pendingIndex = 0 + } + return value + } + + if sourceExhausted { + return nil + } + + var chunk = Data() + chunk.reserveCapacity(chunkSize) + for _ in 0..: + AsyncSequence, @unchecked Sendable +where Source.Element == Data { + public typealias Element = T + + let source: Source + let mode: JSONStreamMode + let options: JSONReadOptions + let decoder: ReerJSONDecoder + let type: T.Type + + public func makeAsyncIterator() -> Iterator { + Iterator( + source: source.makeAsyncIterator(), + mode: mode, options: options, + decoder: decoder, type: type + ) + } + + public struct Iterator: AsyncIteratorProtocol { + var sourceIterator: Source.AsyncIterator + var parser: JSONStreamParser + var pending: [T] = [] + var pendingIndex: Int = 0 + var sourceExhausted = false + let decoder: ReerJSONDecoder + let type: T.Type + + init( + source: Source.AsyncIterator, + mode: JSONStreamMode, options: JSONReadOptions, + decoder: ReerJSONDecoder, type: T.Type + ) { + self.sourceIterator = source + self.parser = JSONStreamParser(mode: mode, options: options) + self.decoder = decoder + self.type = type + } + + public mutating func next() async throws -> T? { + while true { + if pendingIndex < pending.count { + let value = pending[pendingIndex] + pendingIndex += 1 + if pendingIndex >= pending.count { + pending.removeAll(keepingCapacity: true) + pendingIndex = 0 + } + return value + } + + if sourceExhausted { + return nil + } + + guard let chunk = try await sourceIterator.next() else { + sourceExhausted = true + let remaining = try parser.finalize() + if !remaining.isEmpty { + pending = try remaining.map { value in + let data = try value.data() + return try decoder.decode(type, from: data) + } + pendingIndex = 0 + continue + } + return nil + } + + let values = try parser.parse(chunk) + if !values.isEmpty { + pending = try values.map { value in + let data = try value.data() + return try decoder.decode(type, from: data) + } + pendingIndex = 0 + } + } + } + } +} + +// MARK: - AsyncSequence Extensions + +@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +extension AsyncSequence where Element == Data, Self: Sendable { + + /// Returns an `AsyncSequence` of ``JSONValue`` items parsed from this + /// data stream. + /// + /// - Parameters: + /// - mode: The stream format (`.jsonLines` or `.jsonArray`). + /// - options: Options for reading JSON. + /// - Returns: A ``JSONValueStream`` yielding parsed values. + public func jsonValues( + mode: JSONStreamMode = .jsonLines, + options: JSONReadOptions = .default + ) -> JSONValueStream { + JSONValueStream(source: self, mode: mode, options: options) + } + + /// Returns an `AsyncSequence` that decodes items from this data stream. + /// + /// - Parameters: + /// - type: The `Decodable` type to decode each value into. + /// - mode: The stream format (`.jsonLines` or `.jsonArray`). + /// - options: Options for reading JSON. + /// - decoder: An optional ``ReerJSONDecoder``. If `nil`, uses a default decoder. + /// - Returns: A ``DecodingStream`` yielding decoded values. + public func decode( + _ type: T.Type, + mode: JSONStreamMode = .jsonLines, + options: JSONReadOptions = .default, + decoder: ReerJSONDecoder? = nil + ) -> DecodingStream { + DecodingStream( + source: self, + mode: mode, options: options, + decoder: decoder ?? ReerJSONDecoder(), + type: type + ) + } +} + +@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +extension AsyncSequence where Element == UInt8, Self: Sendable { + + /// Returns an `AsyncSequence` of ``JSONValue`` items parsed from this + /// byte stream. + /// + /// Bytes are batched internally for efficient parsing. + /// + /// - Parameters: + /// - mode: The stream format (`.jsonLines` or `.jsonArray`). + /// - options: Options for reading JSON. + /// - chunkSize: Number of bytes to batch before parsing. Default is 4096. + /// - Returns: A ``JSONValueByteStream`` yielding parsed values. + public func jsonValues( + mode: JSONStreamMode = .jsonLines, + options: JSONReadOptions = .default, + chunkSize: Int = 4096 + ) -> JSONValueByteStream { + JSONValueByteStream( + source: self, mode: mode, + options: options, chunkSize: chunkSize + ) + } +} diff --git a/Sources/ReerJSON/StreamParser.swift b/Sources/ReerJSON/StreamParser.swift new file mode 100644 index 0000000..2fbbf74 --- /dev/null +++ b/Sources/ReerJSON/StreamParser.swift @@ -0,0 +1,386 @@ +// +// Copyright © 2026 reers. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +import yyjson +import Foundation + +// MARK: - JSONStreamMode + +/// The parsing mode for a stream of JSON data. +public enum JSONStreamMode: Sendable { + /// Each line is an independent JSON value (JSON Lines / NDJSON). + case jsonLines + /// The stream is a single JSON array whose elements are yielded one by one. + case jsonArray +} + +// MARK: - JSONStreamParser + +/// A streaming JSON parser that extracts individual ``JSONValue`` items from +/// a byte stream, supporting both JSON Lines and JSON Array modes. +/// +/// `JSONStreamParser` maintains an internal buffer. You feed data incrementally +/// with ``parse(_:)`` and receive fully-parsed ``JSONValue`` items as they +/// become available. Call ``finalize()`` when the stream ends to flush any +/// remaining buffered data. +/// +/// ## JSON Lines Mode +/// +/// Each top-level JSON value in the buffer is extracted as a separate item. +/// Values may span multiple ``parse(_:)`` calls. +/// +/// ```swift +/// var parser = JSONStreamParser(mode: .jsonLines) +/// let chunk1 = Data("{\"a\":1}\n{\"b\"".utf8) +/// let chunk2 = Data(":2}\n".utf8) +/// let values1 = try parser.parse(chunk1) // [{"a":1}] +/// let values2 = try parser.parse(chunk2) // [{"b":2}] +/// ``` +/// +/// ## JSON Array Mode +/// +/// The stream is expected to be a single JSON array (`[...]`). +/// Each array element is yielded individually. +/// +/// ```swift +/// var parser = JSONStreamParser(mode: .jsonArray) +/// let items = try parser.parse(Data("[1, 2, 3]".utf8)) +/// let remaining = try parser.finalize() +/// // items + remaining contain JSONValues for 1, 2, 3 +/// ``` +public struct JSONStreamParser: Sendable { + + /// The parsing mode. + public let mode: JSONStreamMode + + /// Options for reading JSON. + public let options: JSONReadOptions + + private var buffer: Data + private var readOffset: Int + private var arrayState: ArrayParseState + + /// The number of bytes buffered but not yet consumed. + public var pendingByteCount: Int { + buffer.count - readOffset + } + + /// Creates a new stream parser. + /// + /// - Parameters: + /// - mode: The stream format (`.jsonLines` or `.jsonArray`). + /// - options: Options for reading JSON. Note that `.stopWhenDone` is + /// always applied internally and does not need to be specified. + public init(mode: JSONStreamMode, options: JSONReadOptions = .default) { + self.mode = mode + self.options = options + self.buffer = Data() + self.readOffset = 0 + self.arrayState = .expectOpenBracket + } + + /// Feeds data to the parser and returns all complete JSON values found. + /// + /// - Parameter data: New data to append to the internal buffer. + /// - Returns: An array of fully-parsed ``JSONValue`` items. + /// - Throws: ``JSONError`` if malformed JSON is encountered. + public mutating func parse(_ data: Data) throws -> [JSONValue] { + buffer.append(data) + return try drainBuffer() + } + + /// Feeds raw bytes to the parser and returns all complete JSON values found. + /// + /// - Parameter bytes: A buffer pointer to the raw bytes. + /// - Returns: An array of fully-parsed ``JSONValue`` items. + /// - Throws: ``JSONError`` if malformed JSON is encountered. + public mutating func parse(bytes: UnsafeBufferPointer) throws -> [JSONValue] { + if let base = bytes.baseAddress, bytes.count > 0 { + buffer.append(base, count: bytes.count) + } + return try drainBuffer() + } + + /// Signals end-of-stream and returns any remaining JSON values. + /// + /// After calling this method, the parser is in a finished state. + /// Call ``reset()`` to reuse it. + /// + /// - Returns: An array of any remaining ``JSONValue`` items. + /// - Throws: ``JSONError`` if the remaining buffer contains incomplete JSON. + public mutating func finalize() throws -> [JSONValue] { + let results = try drainBuffer() + + skipWhitespace() + if readOffset < buffer.count { + if mode == .jsonArray { + throw JSONError.invalidJSON("Unexpected end of JSON array stream") + } else { + throw JSONError.invalidJSON("Incomplete JSON value at end of stream") + } + } + + if mode == .jsonArray && arrayState != .done && arrayState != .expectOpenBracket { + throw JSONError.invalidJSON("Unexpected end of JSON array stream") + } + + return results + } + + /// Resets the parser to its initial state, discarding all buffered data. + public mutating func reset() { + buffer.removeAll(keepingCapacity: true) + readOffset = 0 + arrayState = .expectOpenBracket + } + + // MARK: - Private Types + + private enum ArrayParseState: Sendable { + case expectOpenBracket + case expectElementOrClose + case expectCommaOrClose + case done + } + + // MARK: - Drain Logic + + private mutating func drainBuffer() throws -> [JSONValue] { + compactIfNeeded() + + switch mode { + case .jsonLines: + return try drainJSONLines() + case .jsonArray: + return try drainJSONArray() + } + } + + private mutating func drainJSONLines() throws -> [JSONValue] { + var results: [JSONValue] = [] + + while true { + skipWhitespace() + guard buffer.count - readOffset > 0 else { break } + + guard let value = try parseOneValue() else { break } + results.append(value) + } + + return results + } + + private mutating func drainJSONArray() throws -> [JSONValue] { + var results: [JSONValue] = [] + + loop: while true { + skipWhitespace() + + switch arrayState { + case .expectOpenBracket: + guard readOffset < buffer.count else { break loop } + let byte = buffer[buffer.startIndex + readOffset] + guard byte == UInt8(ascii: "[") else { + throw JSONError.invalidJSON("Expected '[' at start of JSON array stream") + } + readOffset += 1 + arrayState = .expectElementOrClose + + case .expectElementOrClose: + skipWhitespace() + guard readOffset < buffer.count else { break loop } + let byte = buffer[buffer.startIndex + readOffset] + if byte == UInt8(ascii: "]") { + readOffset += 1 + arrayState = .done + break loop + } + guard let value = try parseOneValue() else { break loop } + results.append(value) + arrayState = .expectCommaOrClose + + case .expectCommaOrClose: + skipWhitespace() + guard readOffset < buffer.count else { break loop } + let byte = buffer[buffer.startIndex + readOffset] + if byte == UInt8(ascii: ",") { + readOffset += 1 + arrayState = .expectElementOrClose + } else if byte == UInt8(ascii: "]") { + readOffset += 1 + arrayState = .done + break loop + } else { + throw JSONError.invalidJSON( + "Expected ',' or ']' in JSON array, got '\(Unicode.Scalar(byte))'" + ) + } + + case .done: + break loop + } + } + + return results + } + + // MARK: - Core Parse + + /// Tries to parse one JSON value starting at `readOffset`. + /// Returns `nil` if more data is needed. + private mutating func parseOneValue() throws -> JSONValue? { + let available = buffer.count - readOffset + guard available > 0 else { return nil } + + let paddingSize = Int(YYJSON_PADDING_SIZE) + + // Build a padded copy so yyjson has enough trailing zero bytes. + var padded = Data(count: available + paddingSize) + buffer.withUnsafeBytes { srcBuf in + padded.withUnsafeMutableBytes { dstBuf in + let src = srcBuf.baseAddress!.advanced(by: readOffset) + dstBuf.baseAddress!.copyMemory(from: src, byteCount: available) + } + } + + return try padded.withUnsafeBytes { padBuf -> JSONValue? in + let ptr = padBuf.baseAddress!.assumingMemoryBound(to: UInt8.self) + let result = try Document.streamParse( + bytes: ptr, count: available, options: options + ) + switch result { + case .success(let doc, let consumed): + guard let root = doc.root else { + throw JSONError.invalidData("Document has no root value") + } + readOffset += consumed + return JSONValue(value: root, document: doc) + case .needMoreData: + return nil + } + } + } + + // MARK: - Buffer Helpers + + private mutating func skipWhitespace() { + let startIdx = buffer.startIndex + while readOffset < buffer.count { + let byte = buffer[startIdx + readOffset] + guard byte == 0x20 || byte == 0x09 || byte == 0x0A || byte == 0x0D else { break } + readOffset += 1 + } + } + + private mutating func compactIfNeeded() { + guard readOffset > 0, readOffset > buffer.count / 2 else { return } + buffer.removeSubrange(buffer.startIndex ..< buffer.startIndex + readOffset) + readOffset = 0 + } +} + +// MARK: - JSONIncrementalReader + +/// An incremental reader for large JSON documents. +/// +/// Feed chunks of a single large JSON document with ``feed(_:)``. +/// Data is accumulated internally. Call ``finish()`` to parse the complete +/// document, or use ``feed(_:)`` which attempts a parse after each chunk. +/// +/// ```swift +/// let reader = try JSONIncrementalReader(data: firstChunk) +/// for try await chunk in stream { +/// if let doc = try reader.feed(chunk) { +/// print(doc.root?["key"]?.string ?? "") +/// break +/// } +/// } +/// ``` +/// +/// - Note: For a document already fully in memory, prefer +/// ``JSONDocument/init(data:options:)`` which is faster. +/// This type is for when data arrives in chunks over the network. +public final class JSONIncrementalReader: @unchecked Sendable { + + private var buffer: Data + private let options: JSONReadOptions + private var finished: Bool + + /// Creates a new incremental reader with initial data. + /// + /// - Parameters: + /// - data: The first chunk of JSON data. + /// - options: Options for reading JSON. + public init(data: Data, options: JSONReadOptions = .default) throws { + self.buffer = data + self.options = options + self.finished = false + } + + /// Feeds more data and attempts to parse the accumulated buffer. + /// + /// - Parameter data: Additional JSON data. + /// - Returns: A ``JSONDocument`` if the buffer contains a complete document, + /// or `nil` if more data is needed. + /// - Throws: ``JSONError`` for non-recoverable parse errors. + public func feed(_ data: Data) throws -> JSONDocument? { + guard !finished else { + throw JSONError.invalidJSON("Incremental reader already finished") + } + buffer.append(data) + return try attemptParse() + } + + /// Signals end-of-stream and returns the completed document. + /// + /// All accumulated data is parsed as a single JSON document. + /// + /// - Returns: The parsed ``JSONDocument``. + /// - Throws: ``JSONError`` if the document is incomplete or malformed. + public func finish() throws -> JSONDocument { + guard !finished else { + throw JSONError.invalidJSON("Incremental reader already finished") + } + finished = true + let doc = try Document(data: buffer, options: options) + return JSONDocument(_document: doc) + } + + // MARK: - Private + + private func attemptParse() throws -> JSONDocument? { + // Try parsing the accumulated data. If it's complete, return the doc. + // If incomplete, return nil to request more data. + do { + let doc = try Document(data: buffer, options: options) + finished = true + return JSONDocument(_document: doc) + } catch let error as JSONError { + // If the error indicates incomplete data, we need more + if error.message.contains("unexpected end") + || error.message.contains("Unexpected end") + || error.message.contains("Empty content") { + return nil + } + throw error + } + } +} diff --git a/Sources/ReerJSON/Value.swift b/Sources/ReerJSON/Value.swift index 6305394..f5e4558 100644 --- a/Sources/ReerJSON/Value.swift +++ b/Sources/ReerJSON/Value.swift @@ -105,6 +105,65 @@ internal final class Document: @unchecked Sendable { self.doc = doc } + /// Creates a document by parsing bytes with `STOP_WHEN_DONE`, reporting consumed byte count. + /// + /// - Parameters: + /// - bytes: Pointer to the JSON bytes (must have `YYJSON_PADDING_SIZE` padding). + /// - count: Number of valid bytes (excluding padding). + /// - options: Options for reading the JSON. + /// - consumedBytes: On success, set to the number of input bytes consumed. + /// - Throws: `JSONError` if parsing fails. + /// Result of a streaming parse attempt. + enum StreamParseResult { + case success(Document, consumedBytes: Int) + case needMoreData + } + + /// Attempts to parse one JSON value from bytes with `STOP_WHEN_DONE`. + /// + /// - Parameters: + /// - bytes: Pointer to the JSON bytes (must have `YYJSON_PADDING_SIZE` padding). + /// - count: Number of valid bytes (excluding padding). + /// - options: Options for reading the JSON. + /// - Returns: `.success` with consumed byte count, or `.needMoreData` if incomplete. + /// - Throws: `JSONError` for non-recoverable parse errors. + static func streamParse( + bytes: UnsafePointer, count: Int, + options: JSONReadOptions + ) throws -> StreamParseResult { + guard count > 0 else { return .needMoreData } + + var error = yyjson_read_err() + var flags = options.yyjsonFlags + flags |= YYJSON_READ_STOP_WHEN_DONE + flags &= ~yyjson_read_flag(YYJSON_READ_INSITU) + + let ptr = UnsafeMutablePointer( + mutating: UnsafeRawPointer(bytes).assumingMemoryBound(to: CChar.self) + ) + let result = yyjson_read_opts(ptr, count, flags, nil, &error) + + if let doc = result { + let consumed = yyjson_doc_get_read_size(doc) + let document = Document(alreadyParsed: doc) + return .success(document, consumedBytes: consumed) + } + + // Incomplete data — need more + if error.code == YYJSON_READ_ERROR_UNEXPECTED_END + || error.code == YYJSON_READ_ERROR_EMPTY_CONTENT { + return .needMoreData + } + + throw JSONError(parsing: error) + } + + /// Adopts an already-parsed yyjson_doc, taking ownership. + init(alreadyParsed doc: UnsafeMutablePointer) { + self.doc = doc + self.retainedData = nil + } + deinit { yyjson_doc_free(doc) } @@ -149,6 +208,11 @@ internal final class Document: @unchecked Sendable { public struct JSONDocument: ~Copyable, @unchecked Sendable { internal let _document: Document + /// Creates a document from a pre-parsed internal document. + internal init(_document: Document) { + self._document = _document + } + /// Creates a document by parsing JSON data. /// /// - Parameters: diff --git a/Tests/ReerJSONTests/StreamParserTests.swift b/Tests/ReerJSONTests/StreamParserTests.swift new file mode 100644 index 0000000..ecfb86b --- /dev/null +++ b/Tests/ReerJSONTests/StreamParserTests.swift @@ -0,0 +1,491 @@ +// +// Copyright © 2026 reers. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +import XCTest +@testable import ReerJSON + +// MARK: - Test Helpers + +private struct Item: Codable, Equatable, Sendable { + let id: Int + let name: String +} + +// MARK: - JSON Lines Tests + +final class JSONStreamParserJSONLinesTests: XCTestCase { + + func testSingleCompleteChunk() throws { + var parser = JSONStreamParser(mode: .jsonLines) + let data = Data("{\"id\":1,\"name\":\"a\"}\n{\"id\":2,\"name\":\"b\"}\n".utf8) + let values = try parser.parse(data) + XCTAssertEqual(values.count, 2) + XCTAssertEqual(values[0]["id"]?.int64, 1) + XCTAssertEqual(values[1]["name"]?.string, "b") + let remaining = try parser.finalize() + XCTAssertTrue(remaining.isEmpty) + } + + func testCrossChunkValues() throws { + var parser = JSONStreamParser(mode: .jsonLines) + + let chunk1 = Data("{\"id\":1}\n{\"id\"".utf8) + let values1 = try parser.parse(chunk1) + XCTAssertEqual(values1.count, 1) + XCTAssertEqual(values1[0]["id"]?.int64, 1) + + let chunk2 = Data(":2}\n".utf8) + let values2 = try parser.parse(chunk2) + XCTAssertEqual(values2.count, 1) + XCTAssertEqual(values2[0]["id"]?.int64, 2) + + let remaining = try parser.finalize() + XCTAssertTrue(remaining.isEmpty) + } + + func testEmptyLines() throws { + var parser = JSONStreamParser(mode: .jsonLines) + let data = Data("\n\n{\"x\":1}\n\n\n{\"x\":2}\n\n".utf8) + let values = try parser.parse(data) + XCTAssertEqual(values.count, 2) + let remaining = try parser.finalize() + XCTAssertTrue(remaining.isEmpty) + } + + func testWhitespacePadding() throws { + var parser = JSONStreamParser(mode: .jsonLines) + let data = Data(" {\"a\":1} \n {\"a\":2} ".utf8) + let values = try parser.parse(data) + XCTAssertEqual(values.count, 2) + let remaining = try parser.finalize() + XCTAssertTrue(remaining.isEmpty) + } + + func testVariousTypes() throws { + var parser = JSONStreamParser(mode: .jsonLines) + let data = Data("42\n\"hello\"\ntrue\nnull\n[1,2]\n{\"k\":\"v\"}\n".utf8) + let values = try parser.parse(data) + XCTAssertEqual(values.count, 6) + XCTAssertEqual(values[0].int64, 42) + XCTAssertEqual(values[1].string, "hello") + XCTAssertEqual(values[2].bool, true) + XCTAssertTrue(values[3].isNull) + XCTAssertEqual(values[4].array?.count, 2) + XCTAssertEqual(values[5]["k"]?.string, "v") + let remaining = try parser.finalize() + XCTAssertTrue(remaining.isEmpty) + } + + func testEmptyDataParse() throws { + var parser = JSONStreamParser(mode: .jsonLines) + let values = try parser.parse(Data()) + XCTAssertTrue(values.isEmpty) + let remaining = try parser.finalize() + XCTAssertTrue(remaining.isEmpty) + } + + func testIncompleteJSONAtFinalize() throws { + var parser = JSONStreamParser(mode: .jsonLines) + _ = try parser.parse(Data("{\"id\":1}".utf8)) + _ = try parser.parse(Data("{\"incomplete".utf8)) + XCTAssertThrowsError(try parser.finalize()) + } +} + +// MARK: - JSON Array Tests + +final class JSONStreamParserJSONArrayTests: XCTestCase { + + func testNormalArray() throws { + var parser = JSONStreamParser(mode: .jsonArray) + let data = Data("[1, 2, 3]".utf8) + let values = try parser.parse(data) + XCTAssertEqual(values.count, 3) + XCTAssertEqual(values[0].int64, 1) + XCTAssertEqual(values[1].int64, 2) + XCTAssertEqual(values[2].int64, 3) + let remaining = try parser.finalize() + XCTAssertTrue(remaining.isEmpty) + } + + func testNestedObjects() throws { + var parser = JSONStreamParser(mode: .jsonArray) + let data = Data("[{\"a\":1},{\"b\":[2,3]},{\"c\":{\"d\":4}}]".utf8) + let values = try parser.parse(data) + XCTAssertEqual(values.count, 3) + XCTAssertEqual(values[0]["a"]?.int64, 1) + XCTAssertEqual(values[1]["b"]?.array?.count, 2) + XCTAssertEqual(values[2]["c"]?["d"]?.int64, 4) + } + + func testNestedArrays() throws { + var parser = JSONStreamParser(mode: .jsonArray) + let data = Data("[[1,2],[3,[4,5]]]".utf8) + let values = try parser.parse(data) + XCTAssertEqual(values.count, 2) + XCTAssertEqual(values[0].array?.count, 2) + } + + func testEmptyArray() throws { + var parser = JSONStreamParser(mode: .jsonArray) + let data = Data("[]".utf8) + let values = try parser.parse(data) + XCTAssertTrue(values.isEmpty) + let remaining = try parser.finalize() + XCTAssertTrue(remaining.isEmpty) + } + + func testTrailingCommaWithOption() throws { + var parser = JSONStreamParser(mode: .jsonArray, options: .allowTrailingCommas) + let data = Data("[1, 2, 3,]".utf8) + let values = try parser.parse(data) + XCTAssertEqual(values.count, 3) + } + + func testCrossChunkArray() throws { + var parser = JSONStreamParser(mode: .jsonArray) + + let chunk1 = Data("[{\"id\":1},".utf8) + let values1 = try parser.parse(chunk1) + XCTAssertEqual(values1.count, 1) + XCTAssertEqual(values1[0]["id"]?.int64, 1) + + let chunk2 = Data("{\"id\":2}]".utf8) + let values2 = try parser.parse(chunk2) + XCTAssertEqual(values2.count, 1) + XCTAssertEqual(values2[0]["id"]?.int64, 2) + + let remaining = try parser.finalize() + XCTAssertTrue(remaining.isEmpty) + } + + func testArrayWithWhitespace() throws { + var parser = JSONStreamParser(mode: .jsonArray) + let data = Data(" [ 1 , 2 , 3 ] ".utf8) + let values = try parser.parse(data) + XCTAssertEqual(values.count, 3) + let remaining = try parser.finalize() + XCTAssertTrue(remaining.isEmpty) + } + + func testMissingOpenBracket() throws { + var parser = JSONStreamParser(mode: .jsonArray) + XCTAssertThrowsError(try parser.parse(Data("1, 2, 3]".utf8))) + } + + func testIncompleteArray() throws { + var parser = JSONStreamParser(mode: .jsonArray) + _ = try parser.parse(Data("[1, 2".utf8)) + XCTAssertThrowsError(try parser.finalize()) + } + + func testStringElements() throws { + var parser = JSONStreamParser(mode: .jsonArray) + let data = Data("[\"hello\", \"world\"]".utf8) + let values = try parser.parse(data) + XCTAssertEqual(values.count, 2) + XCTAssertEqual(values[0].string, "hello") + XCTAssertEqual(values[1].string, "world") + } + + func testMixedTypes() throws { + var parser = JSONStreamParser(mode: .jsonArray) + let data = Data("[1, \"two\", true, null, {\"k\":\"v\"}, [3]]".utf8) + let values = try parser.parse(data) + XCTAssertEqual(values.count, 6) + XCTAssertEqual(values[0].int64, 1) + XCTAssertEqual(values[1].string, "two") + XCTAssertEqual(values[2].bool, true) + XCTAssertTrue(values[3].isNull) + } +} + +// MARK: - Incremental Reader Tests + +final class JSONIncrementalReaderTests: XCTestCase { + + func testSingleChunk() throws { + let reader = try JSONIncrementalReader(data: Data("{\"key\":\"value\"}".utf8)) + let doc = try reader.finish() + XCTAssertEqual(doc.root?["key"]?.string, "value") + } + + func testMultipleChunks() throws { + let reader = try JSONIncrementalReader(data: Data("{\"ke".utf8)) + // First feed should need more data + do { + if let doc = try reader.feed(Data("y\":\"val".utf8)) { + XCTFail("Should need more data, got doc with root: \(String(describing: doc.root))") + } + } + // Second feed should complete + if let doc = try reader.feed(Data("ue\"}".utf8)) { + XCTAssertEqual(doc.root?["key"]?.string, "value") + } else { + XCTFail("Should have completed parsing") + } + } + + func testLargerDocument() throws { + var items: [[String: Any]] = [] + for i in 0..<100 { + items.append(["id": i, "name": "item_\(i)"]) + } + let jsonData = try JSONSerialization.data(withJSONObject: items) + + let chunkSize = 64 + let firstChunk = Data(jsonData[0.. 0) + } + + func testVeryLargeObject() throws { + var parser = JSONStreamParser(mode: .jsonLines) + var json = "{\"data\":\"" + for _ in 0..<10_000 { + json += "x" + } + json += "\"}\n" + let values = try parser.parse(Data(json.utf8)) + XCTAssertEqual(values.count, 1) + } + + func testArrayResetAndReuse() throws { + var parser = JSONStreamParser(mode: .jsonArray) + _ = try parser.parse(Data("[1,2]".utf8)) + _ = try parser.finalize() + + parser.reset() + let values = try parser.parse(Data("[3,4]".utf8)) + XCTAssertEqual(values.count, 2) + XCTAssertEqual(values[0].int64, 3) + let remaining = try parser.finalize() + XCTAssertTrue(remaining.isEmpty) + } +} + +// MARK: - Codable Decoder Tests + +final class StreamingDecoderTests: XCTestCase { + + func testJSONLinesDecoder() throws { + var decoder = StreamingJSONLinesDecoder(Item.self) + let data = Data("{\"id\":1,\"name\":\"a\"}\n{\"id\":2,\"name\":\"b\"}\n".utf8) + let items = try decoder.parseBuffer(data) + XCTAssertEqual(items, [Item(id: 1, name: "a"), Item(id: 2, name: "b")]) + let remaining = try decoder.finalize() + XCTAssertTrue(remaining.isEmpty) + } + + func testJSONLinesDecoderCrossChunk() throws { + var decoder = StreamingJSONLinesDecoder(Item.self) + let items1 = try decoder.parseBuffer(Data("{\"id\":1,\"name\":\"a\"}\n{\"id\"".utf8)) + XCTAssertEqual(items1, [Item(id: 1, name: "a")]) + let items2 = try decoder.parseBuffer(Data(":2,\"name\":\"b\"}\n".utf8)) + XCTAssertEqual(items2, [Item(id: 2, name: "b")]) + let remaining = try decoder.finalize() + XCTAssertTrue(remaining.isEmpty) + } + + func testJSONArrayDecoder() throws { + var decoder = StreamingJSONArrayDecoder(Item.self) + let data = Data("[{\"id\":1,\"name\":\"a\"},{\"id\":2,\"name\":\"b\"}]".utf8) + let items = try decoder.parseBuffer(data) + XCTAssertEqual(items, [Item(id: 1, name: "a"), Item(id: 2, name: "b")]) + let remaining = try decoder.finalize() + XCTAssertTrue(remaining.isEmpty) + } + + func testJSONArrayDecoderCrossChunk() throws { + var decoder = StreamingJSONArrayDecoder(Item.self) + let items1 = try decoder.parseBuffer(Data("[{\"id\":1,\"name\":\"a\"},".utf8)) + XCTAssertEqual(items1, [Item(id: 1, name: "a")]) + let items2 = try decoder.parseBuffer(Data("{\"id\":2,\"name\":\"b\"}]".utf8)) + XCTAssertEqual(items2, [Item(id: 2, name: "b")]) + let remaining = try decoder.finalize() + XCTAssertTrue(remaining.isEmpty) + } + + func testDecoderReset() throws { + var decoder = StreamingJSONLinesDecoder(Item.self) + _ = try decoder.parseBuffer(Data("{\"id\":1,\"name\":\"a\"}\n".utf8)) + decoder.reset() + let items = try decoder.parseBuffer(Data("{\"id\":2,\"name\":\"b\"}\n".utf8)) + XCTAssertEqual(items, [Item(id: 2, name: "b")]) + } +} + +// MARK: - AsyncSequence Tests + +@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +final class AsyncStreamTests: XCTestCase { + + func testJSONValueStream() async throws { + let chunks: [Data] = [ + Data("{\"id\":1}\n{\"id\"".utf8), + Data(":2}\n{\"id\":3}\n".utf8) + ] + let stream = AsyncStream { continuation in + for chunk in chunks { + continuation.yield(chunk) + } + continuation.finish() + } + + var values: [JSONValue] = [] + for try await value in stream.jsonValues(mode: .jsonLines) { + values.append(value) + } + XCTAssertEqual(values.count, 3) + XCTAssertEqual(values[0]["id"]?.int64, 1) + XCTAssertEqual(values[1]["id"]?.int64, 2) + XCTAssertEqual(values[2]["id"]?.int64, 3) + } + + func testDecodingStream() async throws { + let chunks: [Data] = [ + Data("{\"id\":1,\"name\":\"a\"}\n".utf8), + Data("{\"id\":2,\"name\":\"b\"}\n".utf8) + ] + let stream = AsyncStream { continuation in + for chunk in chunks { + continuation.yield(chunk) + } + continuation.finish() + } + + var items: [Item] = [] + for try await item in stream.decode(Item.self, mode: .jsonLines) { + items.append(item) + } + XCTAssertEqual(items, [Item(id: 1, name: "a"), Item(id: 2, name: "b")]) + } + + func testJSONArrayValueStream() async throws { + let chunks: [Data] = [ + Data("[{\"id\":1,\"name\":\"a\"},".utf8), + Data("{\"id\":2,\"name\":\"b\"}]".utf8) + ] + let stream = AsyncStream { continuation in + for chunk in chunks { + continuation.yield(chunk) + } + continuation.finish() + } + + var values: [JSONValue] = [] + for try await value in stream.jsonValues(mode: .jsonArray) { + values.append(value) + } + XCTAssertEqual(values.count, 2) + } + + func testDecodingStreamArrayMode() async throws { + let chunks: [Data] = [ + Data("[{\"id\":1,\"name\":\"x\"},".utf8), + Data("{\"id\":2,\"name\":\"y\"}]".utf8) + ] + let stream = AsyncStream { continuation in + for chunk in chunks { + continuation.yield(chunk) + } + continuation.finish() + } + + var items: [Item] = [] + for try await item in stream.decode(Item.self, mode: .jsonArray) { + items.append(item) + } + XCTAssertEqual(items, [Item(id: 1, name: "x"), Item(id: 2, name: "y")]) + } + + func testEmptyStream() async throws { + let stream = AsyncStream { continuation in + continuation.finish() + } + + var count = 0 + for try await _ in stream.jsonValues(mode: .jsonLines) { + count += 1 + } + XCTAssertEqual(count, 0) + } +}