From f6e769bab6d1af62b8086db12b2af938d282f743 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Fri, 30 Jan 2026 10:32:51 +0800 Subject: [PATCH 1/7] Json array support --- Cargo.lock | 4 + Cargo.toml | 2 + .../custom_data_source/csv_json_opener.rs | 1 + datafusion/common/src/config.rs | 16 + datafusion/core/src/dataframe/mod.rs | 2 +- .../core/src/datasource/file_format/json.rs | 252 +++++++++- .../src/datasource/file_format/options.rs | 61 ++- .../core/src/datasource/listing/table.rs | 3 +- .../core/src/datasource/physical_plan/json.rs | 12 +- datafusion/core/src/execution/context/json.rs | 10 +- datafusion/core/src/prelude.rs | 2 +- datafusion/core/tests/data/json_array.json | 5 + .../core/tests/data/json_empty_array.json | 1 + datafusion/core/tests/dataframe/mod.rs | 9 +- datafusion/datasource-json/Cargo.toml | 2 + datafusion/datasource-json/src/file_format.rs | 150 +++++- datafusion/datasource-json/src/mod.rs | 1 + datafusion/datasource-json/src/source.rs | 133 ++++- datafusion/datasource-json/src/utils.rs | 475 ++++++++++++++++++ .../proto/datafusion_common.proto | 1 + datafusion/proto-common/src/from_proto/mod.rs | 1 + .../proto-common/src/generated/pbjson.rs | 18 + .../proto-common/src/generated/prost.rs | 3 + datafusion/proto-common/src/to_proto/mod.rs | 1 + .../src/generated/datafusion_proto_common.rs | 3 + .../proto/src/logical_plan/file_formats.rs | 2 + .../tests/cases/roundtrip_logical_plan.rs | 4 +- datafusion/sqllogictest/test_files/json.slt | 28 ++ 28 files changed, 1122 insertions(+), 80 deletions(-) create mode 100644 datafusion/core/tests/data/json_array.json create mode 100644 datafusion/core/tests/data/json_empty_array.json create mode 100644 datafusion/datasource-json/src/utils.rs diff --git a/Cargo.lock b/Cargo.lock index 1f28687f4f839..3de1724fcdb3a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2036,6 +2036,8 @@ dependencies = [ "futures", "object_store", "tokio", + "tokio-stream", + "tokio-util", ] [[package]] @@ -6149,6 +6151,7 @@ dependencies = [ "futures-core", "pin-project-lite", "tokio", + "tokio-util", ] [[package]] @@ -6159,6 +6162,7 @@ checksum = "9ae9cec805b01e8fc3fd2fe289f89149a9b66dd16786abd8b19cfa7b48cb0098" dependencies = [ "bytes", "futures-core", + "futures-io", "futures-sink", "pin-project-lite", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 102749a55ae4b..19a6e48668488 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -188,6 +188,8 @@ strum_macros = "0.27.2" tempfile = "3" testcontainers-modules = { version = "0.14" } tokio = { version = "1.48", features = ["macros", "rt", "sync"] } +tokio-stream = "0.1" +tokio-util = "0.7" url = "2.5.7" uuid = "1.20" zstd = { version = "0.13", default-features = false } diff --git a/datafusion-examples/examples/custom_data_source/csv_json_opener.rs b/datafusion-examples/examples/custom_data_source/csv_json_opener.rs index 347f1a0464716..fc1130313e00c 100644 --- a/datafusion-examples/examples/custom_data_source/csv_json_opener.rs +++ b/datafusion-examples/examples/custom_data_source/csv_json_opener.rs @@ -125,6 +125,7 @@ async fn json_opener() -> Result<()> { projected, FileCompressionType::UNCOMPRESSED, Arc::new(object_store), + true, ); let scan_config = FileScanConfigBuilder::new( diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index a18861aa3a695..c55a24f4e8442 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -3065,6 +3065,22 @@ config_namespace! { /// If not specified, the default level for the compression algorithm is used. pub compression_level: Option, default = None pub schema_infer_max_rec: Option, default = None + /// The JSON format to use when reading files. + /// + /// When `true` (default), expects newline-delimited JSON (NDJSON): + /// ```text + /// {"key1": 1, "key2": "val"} + /// {"key1": 2, "key2": "vals"} + /// ``` + /// + /// When `false`, expects JSON array format: + /// ```text + /// [ + /// {"key1": 1, "key2": "val"}, + /// {"key1": 2, "key2": "vals"} + /// ] + /// ``` + pub newline_delimited: bool, default = true } } diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index fadc6ad792556..2292f5855bfde 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -512,7 +512,7 @@ impl DataFrame { /// # #[tokio::main] /// # async fn main() -> Result<()> { /// let ctx = SessionContext::new(); - /// let df = ctx.read_json("tests/data/unnest.json", NdJsonReadOptions::default()).await?; + /// let df = ctx.read_json("tests/data/unnest.json", JsonReadOptions::default()).await?; /// // expand into multiple columns if it's json array, flatten field name if it's nested structure /// let df = df.unnest_columns(&["b","c","d"])?; /// let expected = vec![ diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index cb2e9d787ee92..f8737031eab6a 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -25,7 +25,7 @@ mod tests { use super::*; use crate::datasource::file_format::test_util::scan_format; - use crate::prelude::{NdJsonReadOptions, SessionConfig, SessionContext}; + use crate::prelude::{SessionConfig, SessionContext}; use crate::test::object_store::local_unpartitioned_file; use arrow::array::RecordBatch; use arrow_schema::Schema; @@ -46,12 +46,54 @@ mod tests { use datafusion_common::internal_err; use datafusion_common::stats::Precision; + use crate::execution::options::JsonReadOptions; use datafusion_common::Result; + use datafusion_datasource::file_compression_type::FileCompressionType; use futures::StreamExt; use insta::assert_snapshot; use object_store::local::LocalFileSystem; use regex::Regex; use rstest::rstest; + // ==================== Test Helpers ==================== + + /// Create a temporary JSON file and return (TempDir, path) + fn create_temp_json(content: &str) -> (tempfile::TempDir, String) { + let tmp_dir = tempfile::TempDir::new().unwrap(); + let path = format!("{}/test.json", tmp_dir.path().to_string_lossy()); + std::fs::write(&path, content).unwrap(); + (tmp_dir, path) + } + + /// Infer schema from JSON array format file + async fn infer_json_array_schema( + content: &str, + ) -> Result { + let (_tmp_dir, path) = create_temp_json(content); + let session = SessionContext::new(); + let ctx = session.state(); + let store = Arc::new(LocalFileSystem::new()) as _; + let format = JsonFormat::default().with_newline_delimited(false); + format + .infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)]) + .await + } + + /// Register a JSON array table and run a query + async fn query_json_array(content: &str, query: &str) -> Result> { + let (_tmp_dir, path) = create_temp_json(content); + let ctx = SessionContext::new(); + let options = JsonReadOptions::default().newline_delimited(false); + ctx.register_json("test_table", &path, options).await?; + ctx.sql(query).await?.collect().await + } + + /// Register a JSON array table and run a query, return formatted string + async fn query_json_array_str(content: &str, query: &str) -> Result { + let result = query_json_array(content, query).await?; + Ok(batches_to_string(&result)) + } + + // ==================== Existing Tests ==================== #[tokio::test] async fn read_small_batches() -> Result<()> { @@ -208,7 +250,7 @@ mod tests { let ctx = SessionContext::new_with_config(config); let table_path = "tests/data/1.json"; - let options = NdJsonReadOptions::default(); + let options = JsonReadOptions::default(); ctx.register_json("json_parallel", table_path, options) .await?; @@ -240,7 +282,7 @@ mod tests { let ctx = SessionContext::new_with_config(config); let table_path = "tests/data/empty.json"; - let options = NdJsonReadOptions::default(); + let options = JsonReadOptions::default(); ctx.register_json("json_parallel_empty", table_path, options) .await?; @@ -314,7 +356,6 @@ mod tests { .digest(r#"{ "c1": 11, "c2": 12, "c3": 13, "c4": 14, "c5": 15 }"#.into()); let mut all_batches = RecordBatch::new_empty(schema.clone()); - // We get RequiresMoreData after 2 batches because of how json::Decoder works for _ in 0..2 { let output = deserializer.next()?; let DeserializerOutput::RecordBatch(batch) = output else { @@ -358,7 +399,6 @@ mod tests { let df = ctx.sql("SELECT CAST(1 AS BIGINT) AS id LIMIT 0").await?; df.write_json(&path, crate::dataframe::DataFrameWriteOptions::new(), None) .await?; - // Expected the file to exist and be empty assert!(std::path::Path::new(&path).exists()); let metadata = std::fs::metadata(&path)?; assert_eq!(metadata.len(), 0); @@ -385,10 +425,210 @@ mod tests { let df = ctx.read_batch(empty_batch.clone())?; df.write_json(&path, crate::dataframe::DataFrameWriteOptions::new(), None) .await?; - // Expected the file to exist and be empty assert!(std::path::Path::new(&path).exists()); let metadata = std::fs::metadata(&path)?; assert_eq!(metadata.len(), 0); Ok(()) } + + // ==================== JSON Array Format Tests ==================== + + #[tokio::test] + async fn test_json_array_schema_inference() -> Result<()> { + let schema = infer_json_array_schema( + r#"[{"a": 1, "b": 2.0, "c": true}, {"a": 2, "b": 3.5, "c": false}]"#, + ) + .await?; + + let fields: Vec<_> = schema + .fields() + .iter() + .map(|f| format!("{}: {:?}", f.name(), f.data_type())) + .collect(); + assert_eq!(vec!["a: Int64", "b: Float64", "c: Boolean"], fields); + Ok(()) + } + + #[tokio::test] + async fn test_json_array_empty() -> Result<()> { + let schema = infer_json_array_schema("[]").await?; + assert_eq!(schema.fields().len(), 0); + Ok(()) + } + + #[tokio::test] + async fn test_json_array_nested_struct() -> Result<()> { + let schema = infer_json_array_schema( + r#"[{"id": 1, "info": {"name": "Alice", "age": 30}}]"#, + ) + .await?; + + let info_field = schema.field_with_name("info").unwrap(); + assert!(matches!(info_field.data_type(), DataType::Struct(_))); + Ok(()) + } + + #[tokio::test] + async fn test_json_array_list_type() -> Result<()> { + let schema = + infer_json_array_schema(r#"[{"id": 1, "tags": ["a", "b", "c"]}]"#).await?; + + let tags_field = schema.field_with_name("tags").unwrap(); + assert!(matches!(tags_field.data_type(), DataType::List(_))); + Ok(()) + } + + #[tokio::test] + async fn test_json_array_basic_query() -> Result<()> { + let result = query_json_array_str( + r#"[{"a": 1, "b": "hello"}, {"a": 2, "b": "world"}, {"a": 3, "b": "test"}]"#, + "SELECT a, b FROM test_table ORDER BY a", + ) + .await?; + + assert_snapshot!(result, @r" + +---+-------+ + | a | b | + +---+-------+ + | 1 | hello | + | 2 | world | + | 3 | test | + +---+-------+ + "); + Ok(()) + } + + #[tokio::test] + async fn test_json_array_with_nulls() -> Result<()> { + let result = query_json_array_str( + r#"[{"id": 1, "name": "Alice"}, {"id": 2, "name": null}, {"id": 3, "name": "Charlie"}]"#, + "SELECT id, name FROM test_table ORDER BY id", + ) + .await?; + + assert_snapshot!(result, @r" + +----+---------+ + | id | name | + +----+---------+ + | 1 | Alice | + | 2 | | + | 3 | Charlie | + +----+---------+ + "); + Ok(()) + } + + #[tokio::test] + async fn test_json_array_unnest() -> Result<()> { + let result = query_json_array_str( + r#"[{"id": 1, "values": [10, 20, 30]}, {"id": 2, "values": [40, 50]}]"#, + "SELECT id, unnest(values) as value FROM test_table ORDER BY id, value", + ) + .await?; + + assert_snapshot!(result, @r" + +----+-------+ + | id | value | + +----+-------+ + | 1 | 10 | + | 1 | 20 | + | 1 | 30 | + | 2 | 40 | + | 2 | 50 | + +----+-------+ + "); + Ok(()) + } + + #[tokio::test] + async fn test_json_array_unnest_struct() -> Result<()> { + let result = query_json_array_str( + r#"[{"id": 1, "orders": [{"product": "A", "qty": 2}, {"product": "B", "qty": 3}]}, {"id": 2, "orders": [{"product": "C", "qty": 1}]}]"#, + "SELECT id, unnest(orders)['product'] as product, unnest(orders)['qty'] as qty FROM test_table ORDER BY id, product", + ) + .await?; + + assert_snapshot!(result, @r" + +----+---------+-----+ + | id | product | qty | + +----+---------+-----+ + | 1 | A | 2 | + | 1 | B | 3 | + | 2 | C | 1 | + +----+---------+-----+ + "); + Ok(()) + } + + #[tokio::test] + async fn test_json_array_nested_struct_access() -> Result<()> { + let result = query_json_array_str( + r#"[{"id": 1, "dept": {"name": "Engineering", "head": "Alice"}}, {"id": 2, "dept": {"name": "Sales", "head": "Bob"}}]"#, + "SELECT id, dept['name'] as dept_name, dept['head'] as head FROM test_table ORDER BY id", + ) + .await?; + + assert_snapshot!(result, @r" + +----+-------------+-------+ + | id | dept_name | head | + +----+-------------+-------+ + | 1 | Engineering | Alice | + | 2 | Sales | Bob | + +----+-------------+-------+ + "); + Ok(()) + } + + #[tokio::test] + async fn test_json_array_with_compression() -> Result<()> { + use flate2::Compression; + use flate2::write::GzEncoder; + use std::io::Write; + + let tmp_dir = tempfile::TempDir::new()?; + let path = format!("{}/array.json.gz", tmp_dir.path().to_string_lossy()); + + let file = std::fs::File::create(&path)?; + let mut encoder = GzEncoder::new(file, Compression::default()); + encoder.write_all( + r#"[{"a": 1, "b": "hello"}, {"a": 2, "b": "world"}]"#.as_bytes(), + )?; + encoder.finish()?; + + let ctx = SessionContext::new(); + let options = JsonReadOptions::default() + .newline_delimited(false) + .file_compression_type(FileCompressionType::GZIP) + .file_extension(".json.gz"); + + ctx.register_json("test_table", &path, options).await?; + let result = ctx + .sql("SELECT a, b FROM test_table ORDER BY a") + .await? + .collect() + .await?; + + assert_snapshot!(batches_to_string(&result), @r" + +---+-------+ + | a | b | + +---+-------+ + | 1 | hello | + | 2 | world | + +---+-------+ + "); + Ok(()) + } + + #[tokio::test] + async fn test_json_array_list_of_structs() -> Result<()> { + let batches = query_json_array( + r#"[{"id": 1, "items": [{"name": "x", "price": 10.5}]}, {"id": 2, "items": []}]"#, + "SELECT id, items FROM test_table ORDER BY id", + ) + .await?; + + assert_eq!(1, batches.len()); + assert_eq!(2, batches[0].num_rows()); + Ok(()) + } } diff --git a/datafusion/core/src/datasource/file_format/options.rs b/datafusion/core/src/datasource/file_format/options.rs index 146c5f6f5fd0f..bd0ac36087381 100644 --- a/datafusion/core/src/datasource/file_format/options.rs +++ b/datafusion/core/src/datasource/file_format/options.rs @@ -442,14 +442,23 @@ impl<'a> AvroReadOptions<'a> { } } -/// Options that control the reading of Line-delimited JSON files (NDJson) +#[deprecated( + since = "53.0.0", + note = "Use `JsonReadOptions` instead. This alias will be removed in a future version." +)] +#[doc = "Deprecated: Use [`JsonReadOptions`] instead."] +pub type NdJsonReadOptions<'a> = JsonReadOptions<'a>; + +/// Options that control the reading of JSON files. +/// +/// Supports both newline-delimited JSON (NDJSON) and JSON array formats. /// /// Note this structure is supplied when a datasource is created and -/// can not not vary from statement to statement. For settings that +/// can not vary from statement to statement. For settings that /// can vary statement to statement see /// [`ConfigOptions`](crate::config::ConfigOptions). #[derive(Clone)] -pub struct NdJsonReadOptions<'a> { +pub struct JsonReadOptions<'a> { /// The data source schema. pub schema: Option<&'a Schema>, /// Max number of rows to read from JSON files for schema inference if needed. Defaults to `DEFAULT_SCHEMA_INFER_MAX_RECORD`. @@ -465,9 +474,25 @@ pub struct NdJsonReadOptions<'a> { pub infinite: bool, /// Indicates how the file is sorted pub file_sort_order: Vec>, + /// Whether to read as newline-delimited JSON (default: true). + /// + /// When `true` (default), expects newline-delimited JSON (NDJSON): + /// ```text + /// {"key1": 1, "key2": "val"} + /// {"key1": 2, "key2": "vals"} + /// ``` + /// + /// When `false`, expects JSON array format: + /// ```text + /// [ + /// {"key1": 1, "key2": "val"}, + /// {"key1": 2, "key2": "vals"} + /// ] + /// ``` + pub newline_delimited: bool, } -impl Default for NdJsonReadOptions<'_> { +impl Default for JsonReadOptions<'_> { fn default() -> Self { Self { schema: None, @@ -477,11 +502,12 @@ impl Default for NdJsonReadOptions<'_> { file_compression_type: FileCompressionType::UNCOMPRESSED, infinite: false, file_sort_order: vec![], + newline_delimited: true, } } } -impl<'a> NdJsonReadOptions<'a> { +impl<'a> JsonReadOptions<'a> { /// Specify table_partition_cols for partition pruning pub fn table_partition_cols( mut self, @@ -529,6 +555,26 @@ impl<'a> NdJsonReadOptions<'a> { self.schema_infer_max_records = schema_infer_max_records; self } + + /// Set whether to read as newline-delimited JSON. + /// + /// When `true` (default), expects newline-delimited JSON (NDJSON): + /// ```text + /// {"key1": 1, "key2": "val"} + /// {"key1": 2, "key2": "vals"} + /// ``` + /// + /// When `false`, expects JSON array format: + /// ```text + /// [ + /// {"key1": 1, "key2": "val"}, + /// {"key1": 2, "key2": "vals"} + /// ] + /// ``` + pub fn newline_delimited(mut self, newline_delimited: bool) -> Self { + self.newline_delimited = newline_delimited; + self + } } #[async_trait] @@ -654,7 +700,7 @@ impl ReadOptions<'_> for ParquetReadOptions<'_> { } #[async_trait] -impl ReadOptions<'_> for NdJsonReadOptions<'_> { +impl ReadOptions<'_> for JsonReadOptions<'_> { fn to_listing_options( &self, config: &SessionConfig, @@ -663,7 +709,8 @@ impl ReadOptions<'_> for NdJsonReadOptions<'_> { let file_format = JsonFormat::default() .with_options(table_options.json) .with_schema_infer_max_rec(self.schema_infer_max_records) - .with_file_compression_type(self.file_compression_type.to_owned()); + .with_file_compression_type(self.file_compression_type.to_owned()) + .with_newline_delimited(self.newline_delimited); ListingOptions::new(Arc::new(file_format)) .with_file_extension(self.file_extension) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 4e33f3cad51a4..5dd11739c1f57 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -110,6 +110,7 @@ mod tests { #[cfg(feature = "parquet")] use crate::datasource::file_format::parquet::ParquetFormat; use crate::datasource::listing::table::ListingTableConfigExt; + use crate::execution::options::JsonReadOptions; use crate::prelude::*; use crate::{ datasource::{ @@ -808,7 +809,7 @@ mod tests { .register_json( "t", tmp_dir.path().to_str().unwrap(), - NdJsonReadOptions::default() + JsonReadOptions::default() .schema(schema.as_ref()) .file_compression_type(file_compression_type), ) diff --git a/datafusion/core/src/datasource/physical_plan/json.rs b/datafusion/core/src/datasource/physical_plan/json.rs index 8de6a60258f08..b70791c7b2390 100644 --- a/datafusion/core/src/datasource/physical_plan/json.rs +++ b/datafusion/core/src/datasource/physical_plan/json.rs @@ -32,7 +32,7 @@ mod tests { use crate::dataframe::DataFrameWriteOptions; use crate::execution::SessionState; - use crate::prelude::{CsvReadOptions, NdJsonReadOptions, SessionContext}; + use crate::prelude::{CsvReadOptions, JsonReadOptions, SessionContext}; use crate::test::partitioned_file_groups; use datafusion_common::Result; use datafusion_common::cast::{as_int32_array, as_int64_array, as_string_array}; @@ -136,7 +136,7 @@ mod tests { .get_ext_with_compression(&file_compression_type) .unwrap(); - let read_options = NdJsonReadOptions::default() + let read_options = JsonReadOptions::default() .file_extension(ext.as_str()) .file_compression_type(file_compression_type.to_owned()); let frame = ctx.read_json(path, read_options).await.unwrap(); @@ -389,7 +389,7 @@ mod tests { let path = format!("{TEST_DATA_BASE}/1.json"); // register json file with the execution context - ctx.register_json("test", path.as_str(), NdJsonReadOptions::default()) + ctx.register_json("test", path.as_str(), JsonReadOptions::default()) .await?; // register a local file system object store for /tmp directory @@ -431,7 +431,7 @@ mod tests { } // register each partition as well as the top level dir - let json_read_option = NdJsonReadOptions::default(); + let json_read_option = JsonReadOptions::default(); ctx.register_json( "part0", &format!("{out_dir}/{part_0_name}"), @@ -511,7 +511,7 @@ mod tests { async fn read_test_data(schema_infer_max_records: usize) -> Result { let ctx = SessionContext::new(); - let options = NdJsonReadOptions { + let options = JsonReadOptions { schema_infer_max_records, ..Default::default() }; @@ -587,7 +587,7 @@ mod tests { .get_ext_with_compression(&file_compression_type) .unwrap(); - let read_option = NdJsonReadOptions::default() + let read_option = JsonReadOptions::default() .file_compression_type(file_compression_type) .file_extension(ext.as_str()); diff --git a/datafusion/core/src/execution/context/json.rs b/datafusion/core/src/execution/context/json.rs index e9d799400863d..f7df2ad7a1cd6 100644 --- a/datafusion/core/src/execution/context/json.rs +++ b/datafusion/core/src/execution/context/json.rs @@ -15,13 +15,13 @@ // specific language governing permissions and limitations // under the License. +use super::super::options::ReadOptions; +use super::{DataFilePaths, DataFrame, ExecutionPlan, Result, SessionContext}; +use crate::execution::options::JsonReadOptions; use datafusion_common::TableReference; use datafusion_datasource_json::source::plan_to_json; use std::sync::Arc; -use super::super::options::{NdJsonReadOptions, ReadOptions}; -use super::{DataFilePaths, DataFrame, ExecutionPlan, Result, SessionContext}; - impl SessionContext { /// Creates a [`DataFrame`] for reading an JSON data source. /// @@ -32,7 +32,7 @@ impl SessionContext { pub async fn read_json( &self, table_paths: P, - options: NdJsonReadOptions<'_>, + options: JsonReadOptions<'_>, ) -> Result { self._read_type(table_paths, options).await } @@ -43,7 +43,7 @@ impl SessionContext { &self, table_ref: impl Into, table_path: impl AsRef, - options: NdJsonReadOptions<'_>, + options: JsonReadOptions<'_>, ) -> Result<()> { let listing_options = options .to_listing_options(&self.copied_config(), self.copied_table_options()); diff --git a/datafusion/core/src/prelude.rs b/datafusion/core/src/prelude.rs index 50e4a2649c923..31d9d7eb471f0 100644 --- a/datafusion/core/src/prelude.rs +++ b/datafusion/core/src/prelude.rs @@ -29,7 +29,7 @@ pub use crate::dataframe; pub use crate::dataframe::DataFrame; pub use crate::execution::context::{SQLOptions, SessionConfig, SessionContext}; pub use crate::execution::options::{ - AvroReadOptions, CsvReadOptions, NdJsonReadOptions, ParquetReadOptions, + AvroReadOptions, CsvReadOptions, JsonReadOptions, ParquetReadOptions, }; pub use datafusion_common::Column; diff --git a/datafusion/core/tests/data/json_array.json b/datafusion/core/tests/data/json_array.json new file mode 100644 index 0000000000000..1a8716dbf4beb --- /dev/null +++ b/datafusion/core/tests/data/json_array.json @@ -0,0 +1,5 @@ +[ + {"a": 1, "b": "hello"}, + {"a": 2, "b": "world"}, + {"a": 3, "b": "test"} +] diff --git a/datafusion/core/tests/data/json_empty_array.json b/datafusion/core/tests/data/json_empty_array.json new file mode 100644 index 0000000000000..fe51488c7066f --- /dev/null +++ b/datafusion/core/tests/data/json_empty_array.json @@ -0,0 +1 @@ +[] diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index bab00ced1cb13..6c0452a99bccc 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -57,9 +57,7 @@ use datafusion::error::Result; use datafusion::execution::context::SessionContext; use datafusion::execution::session_state::SessionStateBuilder; use datafusion::logical_expr::{ColumnarValue, Volatility}; -use datafusion::prelude::{ - CsvReadOptions, JoinType, NdJsonReadOptions, ParquetReadOptions, -}; +use datafusion::prelude::{CsvReadOptions, JoinType, ParquetReadOptions}; use datafusion::test_util::{ parquet_test_data, populate_csv_partitions, register_aggregate_csv, test_table, test_table_with_cache_factory, test_table_with_name, @@ -94,6 +92,7 @@ use datafusion_physical_plan::empty::EmptyExec; use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties, displayable}; use datafusion::error::Result as DataFusionResult; +use datafusion::execution::options::JsonReadOptions; use datafusion_functions_window::expr_fn::lag; // Get string representation of the plan @@ -2896,7 +2895,7 @@ async fn write_json_with_order() -> Result<()> { ctx.register_json( "data", test_path.to_str().unwrap(), - NdJsonReadOptions::default().schema(&schema), + JsonReadOptions::default().schema(&schema), ) .await?; @@ -6322,7 +6321,7 @@ async fn register_non_json_file() { .register_json( "data", "tests/data/test_binary.parquet", - NdJsonReadOptions::default(), + JsonReadOptions::default(), ) .await; assert_contains!( diff --git a/datafusion/datasource-json/Cargo.toml b/datafusion/datasource-json/Cargo.toml index 37fa8d43a0816..657c40b6b46ee 100644 --- a/datafusion/datasource-json/Cargo.toml +++ b/datafusion/datasource-json/Cargo.toml @@ -45,6 +45,8 @@ datafusion-session = { workspace = true } futures = { workspace = true } object_store = { workspace = true } tokio = { workspace = true } +tokio-stream = { workspace = true, features = ["sync"] } +tokio-util = { workspace = true, features = ["io", "io-util", "compat"] } # Note: add additional linter rules in lib.rs. # Rust does not support workspace + new linter rules in subcrates yet diff --git a/datafusion/datasource-json/src/file_format.rs b/datafusion/datasource-json/src/file_format.rs index a14458b5acd36..881e5f3d873e6 100644 --- a/datafusion/datasource-json/src/file_format.rs +++ b/datafusion/datasource-json/src/file_format.rs @@ -15,13 +15,13 @@ // specific language governing permissions and limitations // under the License. -//! [`JsonFormat`]: Line delimited JSON [`FileFormat`] abstractions +//! [`JsonFormat`]: Line delimited and array JSON [`FileFormat`] abstractions use std::any::Any; use std::collections::HashMap; use std::fmt; use std::fmt::Debug; -use std::io::BufReader; +use std::io::{BufReader, Read}; use std::sync::Arc; use crate::source::JsonSource; @@ -31,6 +31,7 @@ use arrow::datatypes::{Schema, SchemaRef}; use arrow::error::ArrowError; use arrow::json; use arrow::json::reader::{ValueIter, infer_json_schema_from_iterator}; +use bytes::{Buf, Bytes}; use datafusion_common::config::{ConfigField, ConfigFileType, JsonOptions}; use datafusion_common::file_options::json_writer::JsonWriterOptions; use datafusion_common::{ @@ -48,6 +49,7 @@ use datafusion_datasource::file_format::{ use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder}; use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig}; use datafusion_datasource::sink::{DataSink, DataSinkExec}; +use datafusion_datasource::source::DataSourceExec; use datafusion_datasource::write::BatchSerializer; use datafusion_datasource::write::demux::DemuxedStreamReceiver; use datafusion_datasource::write::orchestration::spawn_writer_tasks_and_join; @@ -57,9 +59,8 @@ use datafusion_physical_expr_common::sort_expr::LexRequirement; use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan}; use datafusion_session::Session; +use crate::utils::JsonArrayToNdjsonReader; use async_trait::async_trait; -use bytes::{Buf, Bytes}; -use datafusion_datasource::source::DataSourceExec; use object_store::{GetResultPayload, ObjectMeta, ObjectStore}; #[derive(Default)] @@ -132,7 +133,26 @@ impl Debug for JsonFormatFactory { } } -/// New line delimited JSON `FileFormat` implementation. +/// JSON `FileFormat` implementation supporting both line-delimited and array formats. +/// +/// # Supported Formats +/// +/// ## Line-Delimited JSON (default, `newline_delimited = true`) +/// ```text +/// {"key1": 1, "key2": "val"} +/// {"key1": 2, "key2": "vals"} +/// ``` +/// +/// ## JSON Array Format (`newline_delimited = false`) +/// ```text +/// [ +/// {"key1": 1, "key2": "val"}, +/// {"key1": 2, "key2": "vals"} +/// ] +/// ``` +/// +/// Note: JSON array format is processed using streaming conversion, +/// which is memory-efficient even for large files. #[derive(Debug, Default)] pub struct JsonFormat { options: JsonOptions, @@ -166,6 +186,57 @@ impl JsonFormat { self.options.compression = file_compression_type.into(); self } + + /// Set whether to read as newline-delimited JSON (NDJSON). + /// + /// When `true` (default), expects newline-delimited format: + /// ```text + /// {"a": 1} + /// {"a": 2} + /// ``` + /// + /// When `false`, expects JSON array format: + /// ```text + /// [{"a": 1}, {"a": 2}] + /// ``` + pub fn with_newline_delimited(mut self, newline_delimited: bool) -> Self { + self.options.newline_delimited = newline_delimited; + self + } + + /// Returns whether this format expects newline-delimited JSON. + pub fn is_newline_delimited(&self) -> bool { + self.options.newline_delimited + } +} + +/// Infer schema from JSON array format using streaming conversion. +/// +/// This function converts JSON array format to NDJSON on-the-fly and uses +/// arrow-json's schema inference. It properly tracks the number of records +/// processed for correct `records_to_read` management. +/// +/// # Returns +/// A tuple of (Schema, records_consumed) where records_consumed is the +/// number of records that were processed for schema inference. +fn infer_schema_from_json_array( + reader: R, + max_records: usize, +) -> Result<(Schema, usize)> { + let ndjson_reader = JsonArrayToNdjsonReader::new(reader); + + let iter = ValueIter::new(ndjson_reader, None); + let mut count = 0; + + let schema = infer_json_schema_from_iterator(iter.take_while(|_| { + let should_take = count < max_records; + if should_take { + count += 1; + } + should_take + }))?; + + Ok((schema, count)) } #[async_trait] @@ -202,37 +273,67 @@ impl FileFormat for JsonFormat { .schema_infer_max_rec .unwrap_or(DEFAULT_SCHEMA_INFER_MAX_RECORD); let file_compression_type = FileCompressionType::from(self.options.compression); + let newline_delimited = self.options.newline_delimited; + for object in objects { - let mut take_while = || { - let should_take = records_to_read > 0; - if should_take { - records_to_read -= 1; - } - should_take - }; + // Early exit if we've read enough records + if records_to_read == 0 { + break; + } let r = store.as_ref().get(&object.location).await?; - let schema = match r.payload { + + let (schema, records_consumed) = match r.payload { #[cfg(not(target_arch = "wasm32"))] GetResultPayload::File(file, _) => { let decoder = file_compression_type.convert_read(file)?; - let mut reader = BufReader::new(decoder); - let iter = ValueIter::new(&mut reader, None); - infer_json_schema_from_iterator(iter.take_while(|_| take_while()))? + let reader = BufReader::new(decoder); + + if newline_delimited { + // NDJSON: use ValueIter directly + let iter = ValueIter::new(reader, None); + let mut count = 0; + let schema = + infer_json_schema_from_iterator(iter.take_while(|_| { + let should_take = count < records_to_read; + if should_take { + count += 1; + } + should_take + }))?; + (schema, count) + } else { + // JSON array format: use streaming converter + infer_schema_from_json_array(reader, records_to_read)? + } } GetResultPayload::Stream(_) => { let data = r.bytes().await?; let decoder = file_compression_type.convert_read(data.reader())?; - let mut reader = BufReader::new(decoder); - let iter = ValueIter::new(&mut reader, None); - infer_json_schema_from_iterator(iter.take_while(|_| take_while()))? + let reader = BufReader::new(decoder); + + if newline_delimited { + let iter = ValueIter::new(reader, None); + let mut count = 0; + let schema = + infer_json_schema_from_iterator(iter.take_while(|_| { + let should_take = count < records_to_read; + if should_take { + count += 1; + } + should_take + }))?; + (schema, count) + } else { + // JSON array format: use streaming converter + infer_schema_from_json_array(reader, records_to_read)? + } } }; schemas.push(schema); - if records_to_read == 0 { - break; - } + // Correctly decrement records_to_read + records_to_read = records_to_read.saturating_sub(records_consumed); } let schema = Schema::try_merge(schemas)?; @@ -281,7 +382,10 @@ impl FileFormat for JsonFormat { } fn file_source(&self, table_schema: TableSchema) -> Arc { - Arc::new(JsonSource::new(table_schema)) + Arc::new( + JsonSource::new(table_schema) + .with_newline_delimited(self.options.newline_delimited), + ) } } diff --git a/datafusion/datasource-json/src/mod.rs b/datafusion/datasource-json/src/mod.rs index c39ee2cd9377d..7dc0a0c7ba0f9 100644 --- a/datafusion/datasource-json/src/mod.rs +++ b/datafusion/datasource-json/src/mod.rs @@ -22,5 +22,6 @@ pub mod file_format; pub mod source; +pub mod utils; pub use file_format::*; diff --git a/datafusion/datasource-json/src/source.rs b/datafusion/datasource-json/src/source.rs index 5797054f11b9c..f3213d6cb1152 100644 --- a/datafusion/datasource-json/src/source.rs +++ b/datafusion/datasource-json/src/source.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! Execution plan for reading line-delimited JSON files +//! Execution plan for reading JSON files (line-delimited and array formats) use std::any::Any; use std::io::{BufReader, Read, Seek, SeekFrom}; @@ -23,9 +23,10 @@ use std::sync::Arc; use std::task::Poll; use crate::file_format::JsonDecoder; +use crate::utils::JsonArrayToNdjsonReader; use datafusion_common::error::{DataFusionError, Result}; -use datafusion_common_runtime::JoinSet; +use datafusion_common_runtime::{JoinSet, SpawnedTask}; use datafusion_datasource::decoder::{DecoderDeserializer, deserialize_stream}; use datafusion_datasource::file_compression_type::FileCompressionType; use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; @@ -47,6 +48,12 @@ use futures::{StreamExt, TryStreamExt}; use object_store::buffered::BufWriter; use object_store::{GetOptions, GetResultPayload, ObjectStore}; use tokio::io::AsyncWriteExt; +use tokio_stream::wrappers::ReceiverStream; +use tokio_util::io::{StreamReader, SyncIoBridge}; + +// ============================================================================ +// JsonOpener and JsonSource +// ============================================================================ /// A [`FileOpener`] that opens a JSON file and yields a [`FileOpenFuture`] pub struct JsonOpener { @@ -54,21 +61,26 @@ pub struct JsonOpener { projected_schema: SchemaRef, file_compression_type: FileCompressionType, object_store: Arc, + /// When `true` (default), expects newline-delimited JSON (NDJSON). + /// When `false`, expects JSON array format `[{...}, {...}]`. + newline_delimited: bool, } impl JsonOpener { - /// Returns a [`JsonOpener`] + /// Returns a [`JsonOpener`] pub fn new( batch_size: usize, projected_schema: SchemaRef, file_compression_type: FileCompressionType, object_store: Arc, + newline_delimited: bool, ) -> Self { Self { batch_size, projected_schema, file_compression_type, object_store, + newline_delimited, } } } @@ -80,6 +92,9 @@ pub struct JsonSource { batch_size: Option, metrics: ExecutionPlanMetricsSet, projection: SplitProjection, + /// When `true` (default), expects newline-delimited JSON (NDJSON). + /// When `false`, expects JSON array format `[{...}, {...}]`. + newline_delimited: bool, } impl JsonSource { @@ -91,8 +106,18 @@ impl JsonSource { table_schema, batch_size: None, metrics: ExecutionPlanMetricsSet::new(), + newline_delimited: true, } } + + /// Set whether to read as newline-delimited JSON. + /// + /// When `true` (default), expects newline-delimited format. + /// When `false`, expects JSON array format `[{...}, {...}]`. + pub fn with_newline_delimited(mut self, newline_delimited: bool) -> Self { + self.newline_delimited = newline_delimited; + self + } } impl From for Arc { @@ -120,6 +145,7 @@ impl FileSource for JsonSource { projected_schema, file_compression_type: base_config.file_compression_type, object_store, + newline_delimited: self.newline_delimited, }) as Arc; // Wrap with ProjectionOpener @@ -172,7 +198,7 @@ impl FileSource for JsonSource { } impl FileOpener for JsonOpener { - /// Open a partitioned NDJSON file. + /// Open a partitioned JSON file. /// /// If `file_meta.range` is `None`, the entire file is opened. /// Else `file_meta.range` is `Some(FileRange{start, end})`, which corresponds to the byte range [start, end) within the file. @@ -181,11 +207,23 @@ impl FileOpener for JsonOpener { /// are applied to determine which lines to read: /// 1. The first line of the partition is the line in which the index of the first character >= `start`. /// 2. The last line of the partition is the line in which the byte at position `end - 1` resides. + /// + /// Note: JSON array format does not support range-based scanning. fn open(&self, partitioned_file: PartitionedFile) -> Result { let store = Arc::clone(&self.object_store); let schema = Arc::clone(&self.projected_schema); let batch_size = self.batch_size; let file_compression_type = self.file_compression_type.to_owned(); + let newline_delimited = self.newline_delimited; + + // JSON array format requires reading the complete file + if !newline_delimited && partitioned_file.range.is_some() { + return Err(DataFusionError::NotImplemented( + "JSON array format does not support range-based file scanning. \ + Disable repartition_file_scans or use newline-delimited JSON format." + .to_string(), + )); + } Ok(Box::pin(async move { let calculated_range = @@ -222,27 +260,76 @@ impl FileOpener for JsonOpener { } }; - let reader = ReaderBuilder::new(schema) - .with_batch_size(batch_size) - .build(BufReader::new(bytes))?; - - Ok(futures::stream::iter(reader) - .map(|r| r.map_err(Into::into)) - .boxed()) + if newline_delimited { + // NDJSON: use BufReader directly + let reader = BufReader::new(bytes); + let arrow_reader = ReaderBuilder::new(schema) + .with_batch_size(batch_size) + .build(reader)?; + + Ok(futures::stream::iter(arrow_reader) + .map(|r| r.map_err(Into::into)) + .boxed()) + } else { + // JSON array format: wrap with streaming converter + // JsonArrayToNdjsonReader implements BufRead + let ndjson_reader = JsonArrayToNdjsonReader::new(bytes); + let arrow_reader = ReaderBuilder::new(schema) + .with_batch_size(batch_size) + .build(ndjson_reader)?; + + Ok(futures::stream::iter(arrow_reader) + .map(|r| r.map_err(Into::into)) + .boxed()) + } } GetResultPayload::Stream(s) => { - let s = s.map_err(DataFusionError::from); - - let decoder = ReaderBuilder::new(schema) - .with_batch_size(batch_size) - .build_decoder()?; - let input = file_compression_type.convert_stream(s.boxed())?.fuse(); - - let stream = deserialize_stream( - input, - DecoderDeserializer::new(JsonDecoder::new(decoder)), - ); - Ok(stream.map_err(Into::into).boxed()) + if newline_delimited { + // Newline-delimited JSON (NDJSON) streaming reader + let s = s.map_err(DataFusionError::from); + let decoder = ReaderBuilder::new(schema) + .with_batch_size(batch_size) + .build_decoder()?; + let input = + file_compression_type.convert_stream(s.boxed())?.fuse(); + let stream = deserialize_stream( + input, + DecoderDeserializer::new(JsonDecoder::new(decoder)), + ); + Ok(stream.map_err(Into::into).boxed()) + } else { + // JSON array format: streaming conversion without loading entire file + let s = s.map_err(DataFusionError::from); + let decompressed_stream = + file_compression_type.convert_stream(s.boxed())?; + + // Convert async stream to sync reader for JsonArrayToNdjsonReader + let stream_reader = StreamReader::new( + decompressed_stream.map_err(DataFusionError::from), + ); + let sync_reader = SyncIoBridge::new(stream_reader); + + // Use streaming converter - processes data in chunks without loading entire file + let ndjson_reader = JsonArrayToNdjsonReader::new(sync_reader); + + let arrow_reader = ReaderBuilder::new(schema) + .with_batch_size(batch_size) + .build(ndjson_reader)?; + + // Process arrow reader in blocking task to avoid blocking async executor + let (tx, rx) = tokio::sync::mpsc::channel(2); + SpawnedTask::spawn_blocking(move || { + for batch_result in arrow_reader { + if tx.blocking_send(batch_result).is_err() { + break; // Receiver dropped + } + } + }); + + Ok(ReceiverStream::new(rx) + .map(|r| r.map_err(Into::into)) + .boxed()) + } } } })) diff --git a/datafusion/datasource-json/src/utils.rs b/datafusion/datasource-json/src/utils.rs new file mode 100644 index 0000000000000..58e55670b8db8 --- /dev/null +++ b/datafusion/datasource-json/src/utils.rs @@ -0,0 +1,475 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Utility types for JSON processing + +use std::io::{BufRead, Read}; + +// ============================================================================ +// JsonArrayToNdjsonReader - Streaming JSON Array to NDJSON Converter +// ============================================================================ +// +// Architecture: +// +// ```text +// ┌─────────────────────────────────────────────────────────────┐ +// │ JSON Array File (potentially very large, e.g. 1GB) │ +// │ [{"a":1}, {"a":2}, {"a":3}, ...... {"a":1000000}] │ +// └─────────────────────────────────────────────────────────────┘ +// │ +// ▼ read small chunks at a time (e.g. 64KB) +// ┌───────────────────┐ +// │ JsonArrayToNdjson │ ← character substitution only: +// │ Reader │ '[' skip, ',' → '\n', ']' stop +// └───────────────────┘ +// │ +// ▼ outputs NDJSON format +// ┌───────────────────┐ +// │ Arrow Reader │ ← internal buffer, batch parsing +// │ batch_size=1024 │ +// └───────────────────┘ +// │ +// ▼ outputs RecordBatch for every batch_size rows +// ┌───────────────────┐ +// │ RecordBatch │ +// │ (1024 rows) │ +// └───────────────────┘ +// ``` +// +// Memory Efficiency: +// +// | Approach | Memory for 1GB file | Parse count | +// |---------------------------------------|---------------------|-------------| +// | Load entire file + serde_json | ~5GB | 3x | +// | Streaming with JsonArrayToNdjsonReader| ~few MB | 1x | +// + +/// Default buffer size for JsonArrayToNdjsonReader (64KB) +const DEFAULT_BUF_SIZE: usize = 64 * 1024; + +/// Parser state for JSON array streaming +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum JsonArrayState { + /// Initial state, looking for opening '[' + Start, + /// Inside the JSON array, processing objects + InArray, + /// Reached the closing ']', finished + Done, +} + +/// A streaming reader that converts JSON array format to NDJSON format. +/// +/// This reader wraps an underlying reader containing JSON array data +/// `[{...}, {...}, ...]` and transforms it on-the-fly to newline-delimited +/// JSON format that Arrow's JSON reader can process. +/// +/// Implements both `Read` and `BufRead` traits for compatibility with Arrow's +/// `ReaderBuilder::build()` which requires `BufRead`. +/// +/// # Transformation Rules +/// +/// - Skip leading `[` and whitespace before it +/// - Convert top-level `,` (between objects) to `\n` +/// - Skip whitespace at top level (between objects) +/// - Stop at trailing `]` +/// - Preserve everything inside objects (including nested `[`, `]`, `,`) +/// - Properly handle strings (ignore special chars inside quotes) +/// +/// # Example +/// +/// ```text +/// Input: [{"a":1}, {"b":[1,2]}, {"c":"x,y"}] +/// Output: {"a":1} +/// {"b":[1,2]} +/// {"c":"x,y"} +/// ``` +pub struct JsonArrayToNdjsonReader { + inner: R, + state: JsonArrayState, + /// Tracks nesting depth of `{` and `[` to identify top-level commas + depth: i32, + /// Whether we're currently inside a JSON string + in_string: bool, + /// Whether the next character is escaped (after `\`) + escape_next: bool, + /// Internal buffer for BufRead implementation + buffer: Vec, + /// Current position in the buffer + pos: usize, + /// Number of valid bytes in the buffer + filled: usize, + /// Whether trailing non-whitespace content was detected after ']' + has_trailing_content: bool, +} + +impl JsonArrayToNdjsonReader { + /// Create a new streaming reader that converts JSON array to NDJSON. + pub fn new(reader: R) -> Self { + Self { + inner: reader, + state: JsonArrayState::Start, + depth: 0, + in_string: false, + escape_next: false, + buffer: vec![0; DEFAULT_BUF_SIZE], + pos: 0, + filled: 0, + has_trailing_content: false, + } + } + + /// Check if the JSON array was properly terminated. + /// + /// This should be called after all data has been read. + /// + /// Returns an error if: + /// - Unbalanced braces/brackets (depth != 0) + /// - Unterminated string + /// - Missing closing `]` + /// - Unexpected trailing content after `]` + pub fn validate_complete(&self) -> std::io::Result<()> { + if self.depth != 0 { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "Malformed JSON array: unbalanced braces or brackets", + )); + } + if self.in_string { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "Malformed JSON array: unterminated string", + )); + } + if self.state != JsonArrayState::Done { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "Incomplete JSON array: expected closing bracket ']'", + )); + } + if self.has_trailing_content { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "Malformed JSON: unexpected trailing content after ']'", + )); + } + Ok(()) + } + + /// Process a single byte and return the transformed byte (if any) + fn process_byte(&mut self, byte: u8) -> Option { + match self.state { + JsonArrayState::Start => { + // Looking for the opening '[', skip whitespace + if byte == b'[' { + self.state = JsonArrayState::InArray; + } + // Skip whitespace and the '[' itself + None + } + JsonArrayState::InArray => { + // Handle escape sequences in strings + if self.escape_next { + self.escape_next = false; + return Some(byte); + } + + if self.in_string { + // Inside a string: handle escape and closing quote + match byte { + b'\\' => self.escape_next = true, + b'"' => self.in_string = false, + _ => {} + } + Some(byte) + } else { + // Outside strings: track depth and transform + match byte { + b'"' => { + self.in_string = true; + Some(byte) + } + b'{' | b'[' => { + self.depth += 1; + Some(byte) + } + b'}' => { + self.depth -= 1; + Some(byte) + } + b']' => { + if self.depth == 0 { + // Top-level ']' means end of array + self.state = JsonArrayState::Done; + None + } else { + // Nested ']' inside an object + self.depth -= 1; + Some(byte) + } + } + b',' if self.depth == 0 => { + // Top-level comma between objects → newline + Some(b'\n') + } + _ => { + // At depth 0, skip whitespace between objects + if self.depth == 0 && byte.is_ascii_whitespace() { + None + } else { + Some(byte) + } + } + } + } + } + JsonArrayState::Done => { + // After ']', check for non-whitespace trailing content + if !byte.is_ascii_whitespace() { + self.has_trailing_content = true; + } + None + } + } + } + + /// Fill the internal buffer with transformed data + fn fill_internal_buffer(&mut self) -> std::io::Result<()> { + // Read raw data from inner reader + let mut raw_buf = vec![0u8; DEFAULT_BUF_SIZE]; + let mut write_pos = 0; + + loop { + let bytes_read = self.inner.read(&mut raw_buf)?; + if bytes_read == 0 { + break; // EOF + } + + for &byte in &raw_buf[..bytes_read] { + if let Some(transformed) = self.process_byte(byte) + && write_pos < self.buffer.len() + { + self.buffer[write_pos] = transformed; + write_pos += 1; + } + // Note: process_byte is called for all bytes to track state, + // even when buffer is full or in Done state + } + + // Only stop if buffer is full + if write_pos >= self.buffer.len() { + break; + } + } + + self.pos = 0; + self.filled = write_pos; + Ok(()) + } +} + +impl Read for JsonArrayToNdjsonReader { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + // If buffer is empty, fill it + if self.pos >= self.filled { + self.fill_internal_buffer()?; + if self.filled == 0 { + return Ok(0); // EOF + } + } + + // Copy from internal buffer to output + let available = self.filled - self.pos; + let to_copy = std::cmp::min(available, buf.len()); + buf[..to_copy].copy_from_slice(&self.buffer[self.pos..self.pos + to_copy]); + self.pos += to_copy; + Ok(to_copy) + } +} + +impl BufRead for JsonArrayToNdjsonReader { + fn fill_buf(&mut self) -> std::io::Result<&[u8]> { + if self.pos >= self.filled { + self.fill_internal_buffer()?; + } + Ok(&self.buffer[self.pos..self.filled]) + } + + fn consume(&mut self, amt: usize) { + self.pos = std::cmp::min(self.pos + amt, self.filled); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_json_array_to_ndjson_simple() { + let input = r#"[{"a":1}, {"a":2}, {"a":3}]"#; + let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes()); + let mut output = String::new(); + reader.read_to_string(&mut output).unwrap(); + assert_eq!(output, "{\"a\":1}\n{\"a\":2}\n{\"a\":3}"); + } + + #[test] + fn test_json_array_to_ndjson_nested() { + let input = r#"[{"a":{"b":1}}, {"c":[1,2,3]}]"#; + let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes()); + let mut output = String::new(); + reader.read_to_string(&mut output).unwrap(); + assert_eq!(output, "{\"a\":{\"b\":1}}\n{\"c\":[1,2,3]}"); + } + + #[test] + fn test_json_array_to_ndjson_strings_with_special_chars() { + let input = r#"[{"a":"[1,2]"}, {"b":"x,y"}]"#; + let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes()); + let mut output = String::new(); + reader.read_to_string(&mut output).unwrap(); + assert_eq!(output, "{\"a\":\"[1,2]\"}\n{\"b\":\"x,y\"}"); + } + + #[test] + fn test_json_array_to_ndjson_escaped_quotes() { + let input = r#"[{"a":"say \"hello\""}, {"b":1}]"#; + let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes()); + let mut output = String::new(); + reader.read_to_string(&mut output).unwrap(); + assert_eq!(output, "{\"a\":\"say \\\"hello\\\"\"}\n{\"b\":1}"); + } + + #[test] + fn test_json_array_to_ndjson_empty() { + let input = r#"[]"#; + let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes()); + let mut output = String::new(); + reader.read_to_string(&mut output).unwrap(); + assert_eq!(output, ""); + } + + #[test] + fn test_json_array_to_ndjson_single_element() { + let input = r#"[{"a":1}]"#; + let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes()); + let mut output = String::new(); + reader.read_to_string(&mut output).unwrap(); + assert_eq!(output, "{\"a\":1}"); + } + + #[test] + fn test_json_array_to_ndjson_bufread() { + let input = r#"[{"a":1}, {"a":2}]"#; + let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes()); + + let buf = reader.fill_buf().unwrap(); + assert!(!buf.is_empty()); + + let first_len = buf.len(); + reader.consume(first_len); + + let mut output = String::new(); + reader.read_to_string(&mut output).unwrap(); + } + + #[test] + fn test_json_array_to_ndjson_whitespace() { + let input = r#" [ {"a":1} , {"a":2} ] "#; + let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes()); + let mut output = String::new(); + reader.read_to_string(&mut output).unwrap(); + // Top-level whitespace is skipped, internal whitespace preserved + assert_eq!(output, "{\"a\":1}\n{\"a\":2}"); + } + + #[test] + fn test_validate_complete_valid_json() { + let valid_json = r#"[{"a":1},{"a":2}]"#; + let mut reader = JsonArrayToNdjsonReader::new(valid_json.as_bytes()); + let mut output = String::new(); + reader.read_to_string(&mut output).unwrap(); + reader.validate_complete().unwrap(); + } + + #[test] + fn test_json_array_with_trailing_junk() { + let input = r#" [ {"a":1} , {"a":2} ] some { junk [ here ] "#; + let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes()); + let mut output = String::new(); + reader.read_to_string(&mut output).unwrap(); + + // Should extract the valid array content + assert_eq!(output, "{\"a\":1}\n{\"a\":2}"); + + // But validation should catch the trailing junk + let result = reader.validate_complete(); + assert!(result.is_err()); + let err_msg = result.unwrap_err().to_string(); + assert!( + err_msg.contains("trailing content") + || err_msg.contains("Unexpected trailing"), + "Expected trailing content error, got: {err_msg}" + ); + } + + #[test] + fn test_validate_complete_incomplete_array() { + let invalid_json = r#"[{"a":1},{"a":2}"#; // Missing closing ] + let mut reader = JsonArrayToNdjsonReader::new(invalid_json.as_bytes()); + let mut output = String::new(); + reader.read_to_string(&mut output).unwrap(); + + let result = reader.validate_complete(); + assert!(result.is_err()); + let err_msg = result.unwrap_err().to_string(); + assert!( + err_msg.contains("expected closing bracket") + || err_msg.contains("missing closing"), + "Expected missing bracket error, got: {err_msg}" + ); + } + + #[test] + fn test_validate_complete_unbalanced_braces() { + let invalid_json = r#"[{"a":1},{"a":2]"#; // Wrong closing bracket + let mut reader = JsonArrayToNdjsonReader::new(invalid_json.as_bytes()); + let mut output = String::new(); + reader.read_to_string(&mut output).unwrap(); + + let result = reader.validate_complete(); + assert!(result.is_err()); + let err_msg = result.unwrap_err().to_string(); + assert!( + err_msg.contains("unbalanced") + || err_msg.contains("expected closing bracket"), + "Expected unbalanced or missing bracket error, got: {err_msg}" + ); + } + + #[test] + fn test_validate_complete_valid_with_trailing_whitespace() { + let input = r#"[{"a":1},{"a":2}] + "#; // Trailing whitespace is OK + let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes()); + let mut output = String::new(); + reader.read_to_string(&mut output).unwrap(); + + // Whitespace after ] should be allowed + reader.validate_complete().unwrap(); + } +} diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index 08bb25bd715b9..f4572b3cd7da7 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -469,6 +469,7 @@ message JsonOptions { CompressionTypeVariant compression = 1; // Compression type optional uint64 schema_infer_max_rec = 2; // Optional max records for schema inference optional uint32 compression_level = 3; // Optional compression level + optional bool newline_delimited = 4; // Whether to read as newline-delimited JSON (default true). When false, expects JSON array format [{},...] } message TableParquetOptions { diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index 3c41b8cad9ed1..cc6ac2012761c 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -1105,6 +1105,7 @@ impl TryFrom<&protobuf::JsonOptions> for JsonOptions { compression: compression.into(), compression_level: proto_opts.compression_level, schema_infer_max_rec: proto_opts.schema_infer_max_rec.map(|h| h as usize), + newline_delimited: proto_opts.newline_delimited.unwrap_or(true), }) } } diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index ef0eae1981d93..1139fc71b53af 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -4589,6 +4589,9 @@ impl serde::Serialize for JsonOptions { if self.compression_level.is_some() { len += 1; } + if self.newline_delimited.is_some() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion_common.JsonOptions", len)?; if self.compression != 0 { let v = CompressionTypeVariant::try_from(self.compression) @@ -4603,6 +4606,9 @@ impl serde::Serialize for JsonOptions { if let Some(v) = self.compression_level.as_ref() { struct_ser.serialize_field("compressionLevel", v)?; } + if let Some(v) = self.newline_delimited.as_ref() { + struct_ser.serialize_field("newlineDelimited", v)?; + } struct_ser.end() } } @@ -4618,6 +4624,8 @@ impl<'de> serde::Deserialize<'de> for JsonOptions { "schemaInferMaxRec", "compression_level", "compressionLevel", + "newline_delimited", + "newlineDelimited", ]; #[allow(clippy::enum_variant_names)] @@ -4625,6 +4633,7 @@ impl<'de> serde::Deserialize<'de> for JsonOptions { Compression, SchemaInferMaxRec, CompressionLevel, + NewlineDelimited, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -4649,6 +4658,7 @@ impl<'de> serde::Deserialize<'de> for JsonOptions { "compression" => Ok(GeneratedField::Compression), "schemaInferMaxRec" | "schema_infer_max_rec" => Ok(GeneratedField::SchemaInferMaxRec), "compressionLevel" | "compression_level" => Ok(GeneratedField::CompressionLevel), + "newlineDelimited" | "newline_delimited" => Ok(GeneratedField::NewlineDelimited), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -4671,6 +4681,7 @@ impl<'de> serde::Deserialize<'de> for JsonOptions { let mut compression__ = None; let mut schema_infer_max_rec__ = None; let mut compression_level__ = None; + let mut newline_delimited__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::Compression => { @@ -4695,12 +4706,19 @@ impl<'de> serde::Deserialize<'de> for JsonOptions { map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| x.0) ; } + GeneratedField::NewlineDelimited => { + if newline_delimited__.is_some() { + return Err(serde::de::Error::duplicate_field("newlineDelimited")); + } + newline_delimited__ = map_.next_value()?; + } } } Ok(JsonOptions { compression: compression__.unwrap_or_default(), schema_infer_max_rec: schema_infer_max_rec__, compression_level: compression_level__, + newline_delimited: newline_delimited__, }) } } diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index 16601dcf46977..0acd66e25e04c 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -665,6 +665,9 @@ pub struct JsonOptions { /// Optional compression level #[prost(uint32, optional, tag = "3")] pub compression_level: ::core::option::Option, + /// Whether to read as newline-delimited JSON (default true). When false, expects JSON array format \[{},...\] + #[prost(bool, optional, tag = "4")] + pub newline_delimited: ::core::option::Option, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct TableParquetOptions { diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index fee3656482005..1f480e5bc793e 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -990,6 +990,7 @@ impl TryFrom<&JsonOptions> for protobuf::JsonOptions { compression: compression.into(), schema_infer_max_rec: opts.schema_infer_max_rec.map(|h| h as u64), compression_level: opts.compression_level, + newline_delimited: Some(opts.newline_delimited), }) } } diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index 16601dcf46977..0acd66e25e04c 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -665,6 +665,9 @@ pub struct JsonOptions { /// Optional compression level #[prost(uint32, optional, tag = "3")] pub compression_level: ::core::option::Option, + /// Whether to read as newline-delimited JSON (default true). When false, expects JSON array format \[{},...\] + #[prost(bool, optional, tag = "4")] + pub newline_delimited: ::core::option::Option, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct TableParquetOptions { diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index 436a06493766d..08f42b0af7290 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -241,6 +241,7 @@ impl JsonOptionsProto { compression: options.compression as i32, schema_infer_max_rec: options.schema_infer_max_rec.map(|v| v as u64), compression_level: options.compression_level, + newline_delimited: Some(options.newline_delimited), } } else { JsonOptionsProto::default() @@ -260,6 +261,7 @@ impl From<&JsonOptionsProto> for JsonOptions { }, schema_infer_max_rec: proto.schema_infer_max_rec.map(|v| v as usize), compression_level: proto.compression_level, + newline_delimited: proto.newline_delimited.unwrap_or(true), } } } diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index e5c218e5ebe2e..4e0907d9e2241 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -28,7 +28,7 @@ use datafusion::datasource::file_format::json::{JsonFormat, JsonFormatFactory}; use datafusion::datasource::listing::{ ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, }; -use datafusion::execution::options::ArrowReadOptions; +use datafusion::execution::options::{ArrowReadOptions, JsonReadOptions}; use datafusion::optimizer::Optimizer; use datafusion::optimizer::optimize_unions::OptimizeUnions; use datafusion_common::parquet_config::DFParquetWriterVersion; @@ -755,7 +755,7 @@ async fn create_json_scan(ctx: &SessionContext) -> Result Date: Fri, 30 Jan 2026 11:07:55 +0800 Subject: [PATCH 2/7] fmt --- datafusion/common/src/config.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index c55a24f4e8442..0ba587bbc6961 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -3080,7 +3080,7 @@ config_namespace! { /// {"key1": 2, "key2": "vals"} /// ] /// ``` - pub newline_delimited: bool, default = true + pub newline_delimited: bool, default = true } } From f7176efac1f6186f55a377a91627b335fa9b5000 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Mon, 2 Feb 2026 22:45:24 +0800 Subject: [PATCH 3/7] Redesign the solution for array json --- Cargo.lock | 2 + datafusion/datasource-json/Cargo.toml | 2 + datafusion/datasource-json/src/source.rs | 456 +++++++++++++++++++++-- datafusion/datasource-json/src/utils.rs | 365 +++++++++++++++--- 4 files changed, 743 insertions(+), 82 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3de1724fcdb3a..3d861c5d2de26 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2034,7 +2034,9 @@ dependencies = [ "datafusion-physical-plan", "datafusion-session", "futures", + "log", "object_store", + "serde_json", "tokio", "tokio-stream", "tokio-util", diff --git a/datafusion/datasource-json/Cargo.toml b/datafusion/datasource-json/Cargo.toml index 657c40b6b46ee..9877ec9d80d66 100644 --- a/datafusion/datasource-json/Cargo.toml +++ b/datafusion/datasource-json/Cargo.toml @@ -47,6 +47,8 @@ object_store = { workspace = true } tokio = { workspace = true } tokio-stream = { workspace = true, features = ["sync"] } tokio-util = { workspace = true, features = ["io", "io-util", "compat"] } +serde_json = "1.0.149" +log = "0.4.29" # Note: add additional linter rules in lib.rs. # Rust does not support workspace + new linter rules in subcrates yet diff --git a/datafusion/datasource-json/src/source.rs b/datafusion/datasource-json/src/source.rs index f3213d6cb1152..b797d4333d380 100644 --- a/datafusion/datasource-json/src/source.rs +++ b/datafusion/datasource-json/src/source.rs @@ -19,11 +19,12 @@ use std::any::Any; use std::io::{BufReader, Read, Seek, SeekFrom}; +use std::pin::Pin; use std::sync::Arc; -use std::task::Poll; +use std::task::{Context, Poll}; use crate::file_format::JsonDecoder; -use crate::utils::JsonArrayToNdjsonReader; +use crate::utils::{ChannelReader, JsonArrayToNdjsonReader}; use datafusion_common::error::{DataFusionError, Result}; use datafusion_common_runtime::{JoinSet, SpawnedTask}; @@ -37,6 +38,7 @@ use datafusion_datasource::{ use datafusion_physical_plan::projection::ProjectionExprs; use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties}; +use arrow::array::RecordBatch; use arrow::json::ReaderBuilder; use arrow::{datatypes::SchemaRef, json}; use datafusion_datasource::file::FileSource; @@ -44,13 +46,52 @@ use datafusion_datasource::file_scan_config::FileScanConfig; use datafusion_execution::TaskContext; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; -use futures::{StreamExt, TryStreamExt}; +use futures::{Stream, StreamExt, TryStreamExt}; use object_store::buffered::BufWriter; use object_store::{GetOptions, GetResultPayload, ObjectStore}; use tokio::io::AsyncWriteExt; use tokio_stream::wrappers::ReceiverStream; -use tokio_util::io::{StreamReader, SyncIoBridge}; +/// Channel buffer size for streaming JSON array processing. +/// With ~128KB average chunk size, 128 chunks ≈ 16MB buffer. +const CHANNEL_BUFFER_SIZE: usize = 128; + +/// Buffer size for JsonArrayToNdjsonReader (2MB each, 4MB total for input+output) +const JSON_CONVERTER_BUFFER_SIZE: usize = 2 * 1024 * 1024; + +// ============================================================================ +// JsonArrayStream - Custom stream wrapper to hold SpawnedTask handles +// ============================================================================ + +/// A stream wrapper that holds SpawnedTask handles to keep them alive +/// until the stream is fully consumed or dropped. +/// +/// This ensures cancel-safety: when the stream is dropped, the tasks +/// are properly aborted via SpawnedTask's Drop implementation. +struct JsonArrayStream { + inner: ReceiverStream>, + /// Task that reads from object store and sends bytes to channel. + /// Kept alive until stream is consumed or dropped. + _read_task: SpawnedTask<()>, + /// Task that parses JSON and sends RecordBatches. + /// Kept alive until stream is consumed or dropped. + _parse_task: SpawnedTask<()>, +} + +impl Stream for JsonArrayStream { + type Item = std::result::Result; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + Pin::new(&mut self.inner).poll_next(cx) + } + + fn size_hint(&self) -> (usize, Option) { + self.inner.size_hint() + } +} // ============================================================================ // JsonOpener and JsonSource // ============================================================================ @@ -256,7 +297,7 @@ impl FileOpener for JsonOpener { Some(_) => { file.seek(SeekFrom::Start(result.range.start as _))?; let limit = result.range.end - result.range.start; - file_compression_type.convert_read(file.take(limit as u64))? + file_compression_type.convert_read(file.take(limit))? } }; @@ -272,8 +313,10 @@ impl FileOpener for JsonOpener { .boxed()) } else { // JSON array format: wrap with streaming converter - // JsonArrayToNdjsonReader implements BufRead - let ndjson_reader = JsonArrayToNdjsonReader::new(bytes); + let ndjson_reader = JsonArrayToNdjsonReader::with_capacity( + bytes, + JSON_CONVERTER_BUFFER_SIZE, + ); let arrow_reader = ReaderBuilder::new(schema) .with_batch_size(batch_size) .build(ndjson_reader)?; @@ -298,37 +341,88 @@ impl FileOpener for JsonOpener { ); Ok(stream.map_err(Into::into).boxed()) } else { - // JSON array format: streaming conversion without loading entire file + // JSON array format: streaming conversion with channel-based byte transfer + // + // Architecture: + // 1. Async task reads from object store stream, decompresses, sends to channel + // 2. Blocking task receives bytes, converts JSON array to NDJSON, parses to Arrow + // 3. RecordBatches are sent back via another channel + // + // Memory budget (~32MB): + // - sync_channel: CHANNEL_BUFFER_SIZE chunks (~16MB) + // - JsonArrayToNdjsonReader: 2 × JSON_CONVERTER_BUFFER_SIZE (~4MB) + // - Arrow JsonReader internal buffer (~8MB) + // - Miscellaneous (~4MB) + let s = s.map_err(DataFusionError::from); let decompressed_stream = file_compression_type.convert_stream(s.boxed())?; - // Convert async stream to sync reader for JsonArrayToNdjsonReader - let stream_reader = StreamReader::new( - decompressed_stream.map_err(DataFusionError::from), - ); - let sync_reader = SyncIoBridge::new(stream_reader); - - // Use streaming converter - processes data in chunks without loading entire file - let ndjson_reader = JsonArrayToNdjsonReader::new(sync_reader); - - let arrow_reader = ReaderBuilder::new(schema) - .with_batch_size(batch_size) - .build(ndjson_reader)?; + // Channel for bytes: async producer -> sync consumer + let (byte_tx, byte_rx) = + std::sync::mpsc::sync_channel::( + CHANNEL_BUFFER_SIZE, + ); + + // Channel for results: sync producer -> async consumer + let (result_tx, result_rx) = tokio::sync::mpsc::channel(2); + + // Async task: read from object store stream and send bytes to channel + // Store the SpawnedTask to keep it alive until stream is dropped + let read_task = SpawnedTask::spawn(async move { + tokio::pin!(decompressed_stream); + while let Some(chunk) = decompressed_stream.next().await { + match chunk { + Ok(bytes) => { + if byte_tx.send(bytes).is_err() { + break; // Consumer dropped + } + } + Err(e) => { + log::error!("Error reading JSON stream: {e}"); + break; + } + } + } + // byte_tx dropped here, signals EOF to ChannelReader + }); - // Process arrow reader in blocking task to avoid blocking async executor - let (tx, rx) = tokio::sync::mpsc::channel(2); - SpawnedTask::spawn_blocking(move || { - for batch_result in arrow_reader { - if tx.blocking_send(batch_result).is_err() { - break; // Receiver dropped + // Blocking task: receive bytes from channel and parse JSON + // Store the SpawnedTask to keep it alive until stream is dropped + let parse_task = SpawnedTask::spawn_blocking(move || { + let channel_reader = ChannelReader::new(byte_rx); + let ndjson_reader = JsonArrayToNdjsonReader::with_capacity( + channel_reader, + JSON_CONVERTER_BUFFER_SIZE, + ); + + match ReaderBuilder::new(schema) + .with_batch_size(batch_size) + .build(ndjson_reader) + { + Ok(arrow_reader) => { + for batch_result in arrow_reader { + if result_tx.blocking_send(batch_result).is_err() + { + break; // Receiver dropped + } + } + } + Err(e) => { + let _ = result_tx.blocking_send(Err(e)); } } + // result_tx dropped here, closes the stream }); - Ok(ReceiverStream::new(rx) - .map(|r| r.map_err(Into::into)) - .boxed()) + // Wrap in JsonArrayStream to keep tasks alive until stream is consumed + let stream = JsonArrayStream { + inner: ReceiverStream::new(result_rx), + _read_task: read_task, + _parse_task: parse_task, + }; + + Ok(stream.map(|r| r.map_err(Into::into)).boxed()) } } } @@ -390,3 +484,307 @@ pub async fn plan_to_json( Ok(()) } + +#[cfg(test)] +mod tests { + use super::*; + use arrow::datatypes::{DataType, Field, Schema}; + use bytes::Bytes; + use datafusion_datasource::FileRange; + use futures::TryStreamExt; + use object_store::PutPayload; + use object_store::memory::InMemory; + use object_store::path::Path; + + /// Helper to create a test schema + fn test_schema() -> SchemaRef { + Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int64, true), + Field::new("name", DataType::Utf8, true), + ])) + } + + #[tokio::test] + async fn test_json_array_from_file() -> Result<()> { + // Test reading JSON array format from a file + let json_data = r#"[{"id": 1, "name": "alice"}, {"id": 2, "name": "bob"}]"#; + + let store = Arc::new(InMemory::new()); + let path = Path::from("test.json"); + store + .put(&path, PutPayload::from_static(json_data.as_bytes())) + .await?; + + let opener = JsonOpener::new( + 1024, + test_schema(), + FileCompressionType::UNCOMPRESSED, + store.clone(), + false, // JSON array format + ); + + let meta = store.head(&path).await?; + let file = PartitionedFile::new(path.to_string(), meta.size); + + let stream = opener.open(file)?.await?; + let batches: Vec<_> = stream.try_collect().await?; + + assert_eq!(batches.len(), 1); + assert_eq!(batches[0].num_rows(), 2); + + Ok(()) + } + + #[tokio::test] + async fn test_json_array_from_stream() -> Result<()> { + // Test reading JSON array format from object store stream (simulates S3) + let json_data = r#"[{"id": 1, "name": "alice"}, {"id": 2, "name": "bob"}, {"id": 3, "name": "charlie"}]"#; + + // Use InMemory store which returns Stream payload + let store = Arc::new(InMemory::new()); + let path = Path::from("test_stream.json"); + store + .put(&path, PutPayload::from_static(json_data.as_bytes())) + .await?; + + let opener = JsonOpener::new( + 2, // small batch size to test multiple batches + test_schema(), + FileCompressionType::UNCOMPRESSED, + store.clone(), + false, // JSON array format + ); + + let meta = store.head(&path).await?; + let file = PartitionedFile::new(path.to_string(), meta.size); + + let stream = opener.open(file)?.await?; + let batches: Vec<_> = stream.try_collect().await?; + + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 3); + + Ok(()) + } + + #[tokio::test] + async fn test_json_array_nested_objects() -> Result<()> { + // Test JSON array with nested objects and arrays + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int64, true), + Field::new("data", DataType::Utf8, true), + ])); + + let json_data = r#"[ + {"id": 1, "data": "{\"nested\": true}"}, + {"id": 2, "data": "[1, 2, 3]"} + ]"#; + + let store = Arc::new(InMemory::new()); + let path = Path::from("nested.json"); + store + .put(&path, PutPayload::from_static(json_data.as_bytes())) + .await?; + + let opener = JsonOpener::new( + 1024, + schema, + FileCompressionType::UNCOMPRESSED, + store.clone(), + false, + ); + + let meta = store.head(&path).await?; + let file = PartitionedFile::new(path.to_string(), meta.size); + + let stream = opener.open(file)?.await?; + let batches: Vec<_> = stream.try_collect().await?; + + assert_eq!(batches[0].num_rows(), 2); + + Ok(()) + } + + #[tokio::test] + async fn test_json_array_empty() -> Result<()> { + // Test empty JSON array + let json_data = "[]"; + + let store = Arc::new(InMemory::new()); + let path = Path::from("empty.json"); + store + .put(&path, PutPayload::from_static(json_data.as_bytes())) + .await?; + + let opener = JsonOpener::new( + 1024, + test_schema(), + FileCompressionType::UNCOMPRESSED, + store.clone(), + false, + ); + + let meta = store.head(&path).await?; + let file = PartitionedFile::new(path.to_string(), meta.size); + + let stream = opener.open(file)?.await?; + let batches: Vec<_> = stream.try_collect().await?; + + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 0); + + Ok(()) + } + + #[tokio::test] + async fn test_json_array_range_not_supported() { + // Test that range-based scanning returns error for JSON array format + let store = Arc::new(InMemory::new()); + let path = Path::from("test.json"); + store + .put(&path, PutPayload::from_static(b"[]")) + .await + .unwrap(); + + let opener = JsonOpener::new( + 1024, + test_schema(), + FileCompressionType::UNCOMPRESSED, + store.clone(), + false, // JSON array format + ); + + let meta = store.head(&path).await.unwrap(); + let mut file = PartitionedFile::new(path.to_string(), meta.size); + file.range = Some(FileRange { start: 0, end: 10 }); + + let result = opener.open(file); + match result { + Ok(_) => panic!("Expected error for range-based JSON array scanning"), + Err(e) => { + assert!( + e.to_string().contains("does not support range-based"), + "Unexpected error message: {e}" + ); + } + } + } + + #[tokio::test] + async fn test_ndjson_still_works() -> Result<()> { + // Ensure NDJSON format still works correctly + let json_data = + "{\"id\": 1, \"name\": \"alice\"}\n{\"id\": 2, \"name\": \"bob\"}\n"; + + let store = Arc::new(InMemory::new()); + let path = Path::from("test.ndjson"); + store + .put(&path, PutPayload::from_static(json_data.as_bytes())) + .await?; + + let opener = JsonOpener::new( + 1024, + test_schema(), + FileCompressionType::UNCOMPRESSED, + store.clone(), + true, // NDJSON format + ); + + let meta = store.head(&path).await?; + let file = PartitionedFile::new(path.to_string(), meta.size); + + let stream = opener.open(file)?.await?; + let batches: Vec<_> = stream.try_collect().await?; + + assert_eq!(batches.len(), 1); + assert_eq!(batches[0].num_rows(), 2); + + Ok(()) + } + + #[tokio::test] + async fn test_json_array_large_file() -> Result<()> { + // Test with a larger JSON array to verify streaming works + let mut json_data = String::from("["); + for i in 0..1000 { + if i > 0 { + json_data.push(','); + } + json_data.push_str(&format!(r#"{{"id": {i}, "name": "user{i}"}}"#)); + } + json_data.push(']'); + + let store = Arc::new(InMemory::new()); + let path = Path::from("large.json"); + store + .put(&path, PutPayload::from(Bytes::from(json_data))) + .await?; + + let opener = JsonOpener::new( + 100, // batch size of 100 + test_schema(), + FileCompressionType::UNCOMPRESSED, + store.clone(), + false, + ); + + let meta = store.head(&path).await?; + let file = PartitionedFile::new(path.to_string(), meta.size); + + let stream = opener.open(file)?.await?; + let batches: Vec<_> = stream.try_collect().await?; + + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 1000); + + // Should have multiple batches due to batch_size=100 + assert!(batches.len() >= 10); + + Ok(()) + } + + #[tokio::test] + async fn test_json_array_stream_cancellation() -> Result<()> { + // Test that cancellation works correctly (tasks are aborted when stream is dropped) + let mut json_data = String::from("["); + for i in 0..10000 { + if i > 0 { + json_data.push(','); + } + json_data.push_str(&format!(r#"{{"id": {i}, "name": "user{i}"}}"#)); + } + json_data.push(']'); + + let store = Arc::new(InMemory::new()); + let path = Path::from("cancel_test.json"); + store + .put(&path, PutPayload::from(Bytes::from(json_data))) + .await?; + + let opener = JsonOpener::new( + 10, // small batch size + test_schema(), + FileCompressionType::UNCOMPRESSED, + store.clone(), + false, + ); + + let meta = store.head(&path).await?; + let file = PartitionedFile::new(path.to_string(), meta.size); + + let mut stream = opener.open(file)?.await?; + + // Read only first batch, then drop the stream (simulating cancellation) + let first_batch = stream.next().await; + assert!(first_batch.is_some()); + + // Drop the stream - this should abort the spawned tasks via SpawnedTask's Drop + drop(stream); + + // Give tasks time to be aborted + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + // If we reach here without hanging, cancellation worked + Ok(()) + } +} diff --git a/datafusion/datasource-json/src/utils.rs b/datafusion/datasource-json/src/utils.rs index 58e55670b8db8..dfc3cccaa4bf5 100644 --- a/datafusion/datasource-json/src/utils.rs +++ b/datafusion/datasource-json/src/utils.rs @@ -18,6 +18,9 @@ //! Utility types for JSON processing use std::io::{BufRead, Read}; +use std::sync::mpsc::Receiver; + +use bytes::Bytes; // ============================================================================ // JsonArrayToNdjsonReader - Streaming JSON Array to NDJSON Converter @@ -27,11 +30,11 @@ use std::io::{BufRead, Read}; // // ```text // ┌─────────────────────────────────────────────────────────────┐ -// │ JSON Array File (potentially very large, e.g. 1GB) │ +// │ JSON Array File (potentially very large, e.g. 33GB) │ // │ [{"a":1}, {"a":2}, {"a":3}, ...... {"a":1000000}] │ // └─────────────────────────────────────────────────────────────┘ // │ -// ▼ read small chunks at a time (e.g. 64KB) +// ▼ read chunks via ChannelReader // ┌───────────────────┐ // │ JsonArrayToNdjson │ ← character substitution only: // │ Reader │ '[' skip, ',' → '\n', ']' stop @@ -40,26 +43,31 @@ use std::io::{BufRead, Read}; // ▼ outputs NDJSON format // ┌───────────────────┐ // │ Arrow Reader │ ← internal buffer, batch parsing -// │ batch_size=1024 │ +// │ batch_size=8192 │ // └───────────────────┘ // │ -// ▼ outputs RecordBatch for every batch_size rows +// ▼ outputs RecordBatch // ┌───────────────────┐ // │ RecordBatch │ -// │ (1024 rows) │ // └───────────────────┘ // ``` // // Memory Efficiency: // -// | Approach | Memory for 1GB file | Parse count | -// |---------------------------------------|---------------------|-------------| -// | Load entire file + serde_json | ~5GB | 3x | -// | Streaming with JsonArrayToNdjsonReader| ~few MB | 1x | +// | Approach | Memory for 33GB file | Parse count | +// |---------------------------------------|----------------------|-------------| +// | Load entire file + serde_json | ~100GB+ | 3x | +// | Streaming with JsonArrayToNdjsonReader| ~32MB (configurable) | 1x | +// +// Design Note: +// +// This implementation uses `inner: R` directly (not `BufReader`) and manages +// its own input buffer. This is critical for compatibility with `SyncIoBridge` +// and `ChannelReader` in `spawn_blocking` contexts. // -/// Default buffer size for JsonArrayToNdjsonReader (64KB) -const DEFAULT_BUF_SIZE: usize = 64 * 1024; +/// Default buffer size for JsonArrayToNdjsonReader (2MB for better throughput) +const DEFAULT_BUF_SIZE: usize = 2 * 1024 * 1024; /// Parser state for JSON array streaming #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -99,6 +107,7 @@ enum JsonArrayState { /// {"c":"x,y"} /// ``` pub struct JsonArrayToNdjsonReader { + /// Inner reader - we use R directly (not `BufReader`) for SyncIoBridge compatibility inner: R, state: JsonArrayState, /// Tracks nesting depth of `{` and `[` to identify top-level commas @@ -107,12 +116,18 @@ pub struct JsonArrayToNdjsonReader { in_string: bool, /// Whether the next character is escaped (after `\`) escape_next: bool, - /// Internal buffer for BufRead implementation - buffer: Vec, - /// Current position in the buffer - pos: usize, - /// Number of valid bytes in the buffer - filled: usize, + /// Input buffer - stores raw bytes read from inner reader + input_buffer: Vec, + /// Current read position in input buffer + input_pos: usize, + /// Number of valid bytes in input buffer + input_filled: usize, + /// Output buffer - stores transformed NDJSON bytes + output_buffer: Vec, + /// Current read position in output buffer + output_pos: usize, + /// Number of valid bytes in output buffer + output_filled: usize, /// Whether trailing non-whitespace content was detected after ']' has_trailing_content: bool, } @@ -120,15 +135,26 @@ pub struct JsonArrayToNdjsonReader { impl JsonArrayToNdjsonReader { /// Create a new streaming reader that converts JSON array to NDJSON. pub fn new(reader: R) -> Self { + Self::with_capacity(reader, DEFAULT_BUF_SIZE) + } + + /// Create a new streaming reader with custom buffer size. + /// + /// Larger buffers improve throughput but use more memory. + /// Total memory usage is approximately 2 * capacity (input + output buffers). + pub fn with_capacity(reader: R, capacity: usize) -> Self { Self { inner: reader, state: JsonArrayState::Start, depth: 0, in_string: false, escape_next: false, - buffer: vec![0; DEFAULT_BUF_SIZE], - pos: 0, - filled: 0, + input_buffer: vec![0; capacity], + input_pos: 0, + input_filled: 0, + output_buffer: vec![0; capacity], + output_pos: 0, + output_filled: 0, has_trailing_content: false, } } @@ -171,6 +197,7 @@ impl JsonArrayToNdjsonReader { } /// Process a single byte and return the transformed byte (if any) + #[inline] fn process_byte(&mut self, byte: u8) -> Option { match self.state { JsonArrayState::Start => { @@ -247,70 +274,178 @@ impl JsonArrayToNdjsonReader { } } - /// Fill the internal buffer with transformed data - fn fill_internal_buffer(&mut self) -> std::io::Result<()> { - // Read raw data from inner reader - let mut raw_buf = vec![0u8; DEFAULT_BUF_SIZE]; + /// Refill input buffer from inner reader if needed. + /// Returns true if there's data available, false on EOF. + fn refill_input_if_needed(&mut self) -> std::io::Result { + if self.input_pos >= self.input_filled { + // Input buffer exhausted, read more from inner + let bytes_read = self.inner.read(&mut self.input_buffer)?; + if bytes_read == 0 { + return Ok(false); // EOF + } + self.input_pos = 0; + self.input_filled = bytes_read; + } + Ok(true) + } + + /// Fill the output buffer with transformed data. + /// + /// This method manages its own input buffer, reading from the inner reader + /// as needed. When the output buffer is full, we stop processing but preserve + /// the current position in the input buffer for the next call. + fn fill_output_buffer(&mut self) -> std::io::Result<()> { let mut write_pos = 0; - loop { - let bytes_read = self.inner.read(&mut raw_buf)?; - if bytes_read == 0 { + while write_pos < self.output_buffer.len() { + // Refill input buffer if exhausted + if !self.refill_input_if_needed()? { break; // EOF } - for &byte in &raw_buf[..bytes_read] { - if let Some(transformed) = self.process_byte(byte) - && write_pos < self.buffer.len() - { - self.buffer[write_pos] = transformed; + // Process bytes from input buffer + while self.input_pos < self.input_filled + && write_pos < self.output_buffer.len() + { + let byte = self.input_buffer[self.input_pos]; + self.input_pos += 1; + + if let Some(transformed) = self.process_byte(byte) { + self.output_buffer[write_pos] = transformed; write_pos += 1; } - // Note: process_byte is called for all bytes to track state, - // even when buffer is full or in Done state - } - - // Only stop if buffer is full - if write_pos >= self.buffer.len() { - break; } } - self.pos = 0; - self.filled = write_pos; + self.output_pos = 0; + self.output_filled = write_pos; Ok(()) } } impl Read for JsonArrayToNdjsonReader { fn read(&mut self, buf: &mut [u8]) -> std::io::Result { - // If buffer is empty, fill it - if self.pos >= self.filled { - self.fill_internal_buffer()?; - if self.filled == 0 { + // If output buffer is empty, fill it + if self.output_pos >= self.output_filled { + self.fill_output_buffer()?; + if self.output_filled == 0 { return Ok(0); // EOF } } - // Copy from internal buffer to output - let available = self.filled - self.pos; + // Copy from output buffer to caller's buffer + let available = self.output_filled - self.output_pos; let to_copy = std::cmp::min(available, buf.len()); - buf[..to_copy].copy_from_slice(&self.buffer[self.pos..self.pos + to_copy]); - self.pos += to_copy; + buf[..to_copy].copy_from_slice( + &self.output_buffer[self.output_pos..self.output_pos + to_copy], + ); + self.output_pos += to_copy; Ok(to_copy) } } impl BufRead for JsonArrayToNdjsonReader { fn fill_buf(&mut self) -> std::io::Result<&[u8]> { - if self.pos >= self.filled { - self.fill_internal_buffer()?; + if self.output_pos >= self.output_filled { + self.fill_output_buffer()?; } - Ok(&self.buffer[self.pos..self.filled]) + Ok(&self.output_buffer[self.output_pos..self.output_filled]) } fn consume(&mut self, amt: usize) { - self.pos = std::cmp::min(self.pos + amt, self.filled); + self.output_pos = std::cmp::min(self.output_pos + amt, self.output_filled); + } +} + +// ============================================================================ +// ChannelReader - Sync reader that receives bytes from async channel +// ============================================================================ +// +// Architecture: +// +// ```text +// ┌─────────────────────────────────────────────────────────────────────────┐ +// │ S3 / MinIO (async) │ +// │ (33GB JSON Array File) │ +// └─────────────────────────────────────────────────────────────────────────┘ +// │ +// ▼ async stream (Bytes chunks) +// ┌─────────────────────────────────────────────────────────────────────────┐ +// │ Async Task (tokio runtime) │ +// │ while let Some(chunk) = stream.next().await │ +// │ byte_tx.send(chunk) │ +// └─────────────────────────────────────────────────────────────────────────┘ +// │ +// ▼ std::sync::mpsc::sync_channel +// │ (bounded, ~32MB buffer) +// ▼ +// ┌─────────────────────────────────────────────────────────────────────────┐ +// │ Blocking Task (spawn_blocking) │ +// │ ┌──────────────┐ ┌────────────────────────┐ ┌──────────────────┐ │ +// │ │ChannelReader │ → │JsonArrayToNdjsonReader │ → │ Arrow JsonReader │ │ +// │ │ (Read) │ │ [{},...] → {}\n{} │ │ (RecordBatch) │ │ +// │ └──────────────┘ └────────────────────────┘ └──────────────────┘ │ +// └─────────────────────────────────────────────────────────────────────────┘ +// │ +// ▼ tokio::sync::mpsc::channel +// ┌─────────────────────────────────────────────────────────────────────────┐ +// │ ReceiverStream (async) │ +// │ → DataFusion execution engine │ +// └─────────────────────────────────────────────────────────────────────────┘ +// ``` +// +// Memory Budget (~32MB total): +// - sync_channel buffer: 128 chunks × ~128KB = ~16MB +// - JsonArrayToNdjsonReader: 2 × 2MB = 4MB +// - Arrow JsonReader internal: ~8MB +// - Miscellaneous: ~4MB +// + +/// A synchronous `Read` implementation that receives bytes from a channel. +/// +/// This enables true streaming between async and sync contexts without +/// loading the entire file into memory. +pub struct ChannelReader { + rx: Receiver, + current: Option, + pos: usize, +} + +impl ChannelReader { + /// Create a new ChannelReader from a receiver. + pub fn new(rx: Receiver) -> Self { + Self { + rx, + current: None, + pos: 0, + } + } +} + +impl Read for ChannelReader { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + loop { + // If we have current chunk with remaining data, read from it + if let Some(ref chunk) = self.current { + let remaining = chunk.len() - self.pos; + if remaining > 0 { + let to_copy = std::cmp::min(remaining, buf.len()); + buf[..to_copy].copy_from_slice(&chunk[self.pos..self.pos + to_copy]); + self.pos += to_copy; + return Ok(to_copy); + } + } + + // Current chunk exhausted, get next from channel + match self.rx.recv() { + Ok(bytes) => { + self.current = Some(bytes); + self.pos = 0; + // Loop back to read from new chunk + } + Err(_) => return Ok(0), // Channel closed = EOF + } + } } } @@ -472,4 +607,128 @@ mod tests { // Whitespace after ] should be allowed reader.validate_complete().unwrap(); } + + /// Test that data is not lost at buffer boundaries. + /// + /// This test creates input larger than the internal buffer to verify + /// that newline characters are not dropped when they occur at buffer boundaries. + #[test] + fn test_buffer_boundary_no_data_loss() { + // Create objects ~9KB each, so 10 objects = ~90KB + let large_value = "x".repeat(9000); + + let mut objects = vec![]; + for i in 0..10 { + objects.push(format!(r#"{{"id":{i},"data":"{large_value}"}}"#)); + } + + let input = format!("[{}]", objects.join(",")); + + // Use small buffer to force multiple fill cycles + let mut reader = JsonArrayToNdjsonReader::with_capacity(input.as_bytes(), 8192); + let mut output = String::new(); + reader.read_to_string(&mut output).unwrap(); + + // Verify correct number of newlines (9 newlines separate 10 objects) + let newline_count = output.matches('\n').count(); + assert_eq!( + newline_count, 9, + "Expected 9 newlines separating 10 objects, got {newline_count}" + ); + + // Verify each line is valid JSON + for (i, line) in output.lines().enumerate() { + let parsed: Result = serde_json::from_str(line); + assert!( + parsed.is_ok(), + "Line {} is not valid JSON: {}...", + i, + &line[..100.min(line.len())] + ); + + // Verify the id field matches expected value + let value = parsed.unwrap(); + assert_eq!( + value["id"].as_i64(), + Some(i as i64), + "Object {i} has wrong id" + ); + } + } + + /// Test with real-world-like data format (with leading whitespace and newlines) + #[test] + fn test_real_world_format_large() { + let large_value = "x".repeat(8000); + + // Format similar to real files: opening bracket on its own line, + // each object indented with 2 spaces + let mut objects = vec![]; + for i in 0..10 { + objects.push(format!(r#" {{"id":{i},"data":"{large_value}"}}"#)); + } + + let input = format!("[\n{}\n]", objects.join(",\n")); + + let mut reader = JsonArrayToNdjsonReader::with_capacity(input.as_bytes(), 8192); + let mut output = String::new(); + reader.read_to_string(&mut output).unwrap(); + + let lines: Vec<&str> = output.lines().collect(); + assert_eq!(lines.len(), 10, "Expected 10 objects"); + + for (i, line) in lines.iter().enumerate() { + assert!( + line.starts_with("{\"id\""), + "Line {} should start with object, got: {}...", + i, + &line[..50.min(line.len())] + ); + } + } + + /// Test ChannelReader + #[test] + fn test_channel_reader() { + let (tx, rx) = std::sync::mpsc::sync_channel(4); + + // Send some chunks + tx.send(Bytes::from("Hello, ")).unwrap(); + tx.send(Bytes::from("World!")).unwrap(); + drop(tx); // Close channel + + let mut reader = ChannelReader::new(rx); + let mut output = String::new(); + reader.read_to_string(&mut output).unwrap(); + + assert_eq!(output, "Hello, World!"); + } + + /// Test ChannelReader with small reads + #[test] + fn test_channel_reader_small_reads() { + let (tx, rx) = std::sync::mpsc::sync_channel(4); + + tx.send(Bytes::from("ABCDEFGHIJ")).unwrap(); + drop(tx); + + let mut reader = ChannelReader::new(rx); + let mut buf = [0u8; 3]; + + // Read in small chunks + assert_eq!(reader.read(&mut buf).unwrap(), 3); + assert_eq!(&buf, b"ABC"); + + assert_eq!(reader.read(&mut buf).unwrap(), 3); + assert_eq!(&buf, b"DEF"); + + assert_eq!(reader.read(&mut buf).unwrap(), 3); + assert_eq!(&buf, b"GHI"); + + assert_eq!(reader.read(&mut buf).unwrap(), 1); + assert_eq!(&buf[..1], b"J"); + + // EOF + assert_eq!(reader.read(&mut buf).unwrap(), 0); + } } From d3d57d42f39117872d6facacce43e1dbffc0b432 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Mon, 2 Feb 2026 22:53:19 +0800 Subject: [PATCH 4/7] fix --- Cargo.lock | 2 -- datafusion/datasource-json/Cargo.toml | 1 - 2 files changed, 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3d861c5d2de26..6e2d8853bc7e0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2039,7 +2039,6 @@ dependencies = [ "serde_json", "tokio", "tokio-stream", - "tokio-util", ] [[package]] @@ -6164,7 +6163,6 @@ checksum = "9ae9cec805b01e8fc3fd2fe289f89149a9b66dd16786abd8b19cfa7b48cb0098" dependencies = [ "bytes", "futures-core", - "futures-io", "futures-sink", "pin-project-lite", "tokio", diff --git a/datafusion/datasource-json/Cargo.toml b/datafusion/datasource-json/Cargo.toml index 9877ec9d80d66..e9a904fe29274 100644 --- a/datafusion/datasource-json/Cargo.toml +++ b/datafusion/datasource-json/Cargo.toml @@ -46,7 +46,6 @@ futures = { workspace = true } object_store = { workspace = true } tokio = { workspace = true } tokio-stream = { workspace = true, features = ["sync"] } -tokio-util = { workspace = true, features = ["io", "io-util", "compat"] } serde_json = "1.0.149" log = "0.4.29" From 828deb2ef23f993b0fc3f3866716bc7f005ae5ac Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Mon, 2 Feb 2026 23:06:02 +0800 Subject: [PATCH 5/7] fix --- datafusion/datasource-json/Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/datasource-json/Cargo.toml b/datafusion/datasource-json/Cargo.toml index e9a904fe29274..0f857b68f56c3 100644 --- a/datafusion/datasource-json/Cargo.toml +++ b/datafusion/datasource-json/Cargo.toml @@ -43,11 +43,11 @@ datafusion-physical-expr-common = { workspace = true } datafusion-physical-plan = { workspace = true } datafusion-session = { workspace = true } futures = { workspace = true } +log = "0.4.29" object_store = { workspace = true } +serde_json = "1.0.149" tokio = { workspace = true } tokio-stream = { workspace = true, features = ["sync"] } -serde_json = "1.0.149" -log = "0.4.29" # Note: add additional linter rules in lib.rs. # Rust does not support workspace + new linter rules in subcrates yet From cad08443bd903fcd60fcf46f4c366fd4297a2852 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Tue, 10 Feb 2026 10:09:28 +0800 Subject: [PATCH 6/7] Address martin-g review comments --- Cargo.lock | 1 - .../core/src/datasource/file_format/json.rs | 13 ++-- datafusion/datasource-json/Cargo.toml | 3 +- datafusion/datasource-json/src/source.rs | 27 +++++-- datafusion/datasource-json/src/utils.rs | 78 +++++++++++++++---- 5 files changed, 91 insertions(+), 31 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0c3365e024f3b..3d115a42229bf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2034,7 +2034,6 @@ dependencies = [ "datafusion-physical-plan", "datafusion-session", "futures", - "log", "object_store", "serde_json", "tokio", diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index f8737031eab6a..5b3e22705620e 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -59,9 +59,9 @@ mod tests { /// Create a temporary JSON file and return (TempDir, path) fn create_temp_json(content: &str) -> (tempfile::TempDir, String) { let tmp_dir = tempfile::TempDir::new().unwrap(); - let path = format!("{}/test.json", tmp_dir.path().to_string_lossy()); + let path = tmp_dir.path().join("test.json"); std::fs::write(&path, content).unwrap(); - (tmp_dir, path) + (tmp_dir, path.to_string_lossy().to_string()) } /// Infer schema from JSON array format file @@ -395,7 +395,8 @@ mod tests { async fn test_write_empty_json_from_sql() -> Result<()> { let ctx = SessionContext::new(); let tmp_dir = tempfile::TempDir::new()?; - let path = format!("{}/empty_sql.json", tmp_dir.path().to_string_lossy()); + let path = tmp_dir.path().join("empty_sql.json"); + let path = path.to_string_lossy().to_string(); let df = ctx.sql("SELECT CAST(1 AS BIGINT) AS id LIMIT 0").await?; df.write_json(&path, crate::dataframe::DataFrameWriteOptions::new(), None) .await?; @@ -421,7 +422,8 @@ mod tests { )?; let tmp_dir = tempfile::TempDir::new()?; - let path = format!("{}/empty_batch.json", tmp_dir.path().to_string_lossy()); + let path = tmp_dir.path().join("empty_batch.json"); + let path = path.to_string_lossy().to_string(); let df = ctx.read_batch(empty_batch.clone())?; df.write_json(&path, crate::dataframe::DataFrameWriteOptions::new(), None) .await?; @@ -586,7 +588,8 @@ mod tests { use std::io::Write; let tmp_dir = tempfile::TempDir::new()?; - let path = format!("{}/array.json.gz", tmp_dir.path().to_string_lossy()); + let path = tmp_dir.path().join("array.json.gz"); + let path = path.to_string_lossy().to_string(); let file = std::fs::File::create(&path)?; let mut encoder = GzEncoder::new(file, Compression::default()); diff --git a/datafusion/datasource-json/Cargo.toml b/datafusion/datasource-json/Cargo.toml index 0f857b68f56c3..bd0cead8d2af8 100644 --- a/datafusion/datasource-json/Cargo.toml +++ b/datafusion/datasource-json/Cargo.toml @@ -43,9 +43,8 @@ datafusion-physical-expr-common = { workspace = true } datafusion-physical-plan = { workspace = true } datafusion-session = { workspace = true } futures = { workspace = true } -log = "0.4.29" object_store = { workspace = true } -serde_json = "1.0.149" +serde_json = { workspace = true } tokio = { workspace = true } tokio-stream = { workspace = true, features = ["sync"] } diff --git a/datafusion/datasource-json/src/source.rs b/datafusion/datasource-json/src/source.rs index b797d4333d380..93edcaf68ea40 100644 --- a/datafusion/datasource-json/src/source.rs +++ b/datafusion/datasource-json/src/source.rs @@ -358,14 +358,18 @@ impl FileOpener for JsonOpener { let decompressed_stream = file_compression_type.convert_stream(s.boxed())?; - // Channel for bytes: async producer -> sync consumer + // Channel for bytes: async producer -> blocking consumer + // Uses tokio::sync::mpsc so the async send never blocks a + // tokio worker thread; the consumer calls blocking_recv() + // inside spawn_blocking. let (byte_tx, byte_rx) = - std::sync::mpsc::sync_channel::( + tokio::sync::mpsc::channel::( CHANNEL_BUFFER_SIZE, ); // Channel for results: sync producer -> async consumer let (result_tx, result_rx) = tokio::sync::mpsc::channel(2); + let error_tx = result_tx.clone(); // Async task: read from object store stream and send bytes to channel // Store the SpawnedTask to keep it alive until stream is dropped @@ -374,12 +378,16 @@ impl FileOpener for JsonOpener { while let Some(chunk) = decompressed_stream.next().await { match chunk { Ok(bytes) => { - if byte_tx.send(bytes).is_err() { + if byte_tx.send(bytes).await.is_err() { break; // Consumer dropped } } Err(e) => { - log::error!("Error reading JSON stream: {e}"); + let _ = error_tx + .send(Err(arrow::error::ArrowError::ExternalError( + Box::new(e), + ))) + .await; break; } } @@ -391,14 +399,14 @@ impl FileOpener for JsonOpener { // Store the SpawnedTask to keep it alive until stream is dropped let parse_task = SpawnedTask::spawn_blocking(move || { let channel_reader = ChannelReader::new(byte_rx); - let ndjson_reader = JsonArrayToNdjsonReader::with_capacity( + let mut ndjson_reader = JsonArrayToNdjsonReader::with_capacity( channel_reader, JSON_CONVERTER_BUFFER_SIZE, ); match ReaderBuilder::new(schema) .with_batch_size(batch_size) - .build(ndjson_reader) + .build(&mut ndjson_reader) { Ok(arrow_reader) => { for batch_result in arrow_reader { @@ -412,6 +420,13 @@ impl FileOpener for JsonOpener { let _ = result_tx.blocking_send(Err(e)); } } + + // Validate the JSON array was properly formed + if let Err(e) = ndjson_reader.validate_complete() { + let _ = result_tx.blocking_send(Err( + arrow::error::ArrowError::JsonError(e.to_string()), + )); + } // result_tx dropped here, closes the stream }); diff --git a/datafusion/datasource-json/src/utils.rs b/datafusion/datasource-json/src/utils.rs index dfc3cccaa4bf5..bc75799edff73 100644 --- a/datafusion/datasource-json/src/utils.rs +++ b/datafusion/datasource-json/src/utils.rs @@ -18,7 +18,6 @@ //! Utility types for JSON processing use std::io::{BufRead, Read}; -use std::sync::mpsc::Receiver; use bytes::Bytes; @@ -130,6 +129,8 @@ pub struct JsonArrayToNdjsonReader { output_filled: usize, /// Whether trailing non-whitespace content was detected after ']' has_trailing_content: bool, + /// Whether leading non-whitespace content was detected before '[' + has_leading_content: bool, } impl JsonArrayToNdjsonReader { @@ -156,6 +157,7 @@ impl JsonArrayToNdjsonReader { output_pos: 0, output_filled: 0, has_trailing_content: false, + has_leading_content: false, } } @@ -169,6 +171,12 @@ impl JsonArrayToNdjsonReader { /// - Missing closing `]` /// - Unexpected trailing content after `]` pub fn validate_complete(&self) -> std::io::Result<()> { + if self.has_leading_content { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "Malformed JSON: unexpected leading content before '['", + )); + } if self.depth != 0 { return Err(std::io::Error::new( std::io::ErrorKind::InvalidData, @@ -204,8 +212,9 @@ impl JsonArrayToNdjsonReader { // Looking for the opening '[', skip whitespace if byte == b'[' { self.state = JsonArrayState::InArray; + } else if !byte.is_ascii_whitespace() { + self.has_leading_content = true; } - // Skip whitespace and the '[' itself None } JsonArrayState::InArray => { @@ -376,7 +385,7 @@ impl BufRead for JsonArrayToNdjsonReader { // │ byte_tx.send(chunk) │ // └─────────────────────────────────────────────────────────────────────────┘ // │ -// ▼ std::sync::mpsc::sync_channel +// ▼ tokio::sync::mpsc::channel // │ (bounded, ~32MB buffer) // ▼ // ┌─────────────────────────────────────────────────────────────────────────┐ @@ -401,19 +410,21 @@ impl BufRead for JsonArrayToNdjsonReader { // - Miscellaneous: ~4MB // -/// A synchronous `Read` implementation that receives bytes from a channel. +/// A synchronous `Read` implementation that receives bytes from an async channel. /// /// This enables true streaming between async and sync contexts without -/// loading the entire file into memory. +/// loading the entire file into memory. Uses `tokio::sync::mpsc::Receiver` +/// with `blocking_recv()` so the async producer never blocks a tokio worker +/// thread, while the sync consumer (running in `spawn_blocking`) safely blocks. pub struct ChannelReader { - rx: Receiver, + rx: tokio::sync::mpsc::Receiver, current: Option, pos: usize, } impl ChannelReader { - /// Create a new ChannelReader from a receiver. - pub fn new(rx: Receiver) -> Self { + /// Create a new ChannelReader from a tokio mpsc receiver. + pub fn new(rx: tokio::sync::mpsc::Receiver) -> Self { Self { rx, current: None, @@ -437,13 +448,13 @@ impl Read for ChannelReader { } // Current chunk exhausted, get next from channel - match self.rx.recv() { - Ok(bytes) => { + match self.rx.blocking_recv() { + Some(bytes) => { self.current = Some(bytes); self.pos = 0; // Loop back to read from new chunk } - Err(_) => return Ok(0), // Channel closed = EOF + None => return Ok(0), // Channel closed = EOF } } } @@ -596,6 +607,39 @@ mod tests { ); } + #[test] + fn test_json_array_with_leading_junk() { + let input = r#"junk[{"a":1}, {"a":2}]"#; + let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes()); + let mut output = String::new(); + reader.read_to_string(&mut output).unwrap(); + + // Should still extract the valid array content + assert_eq!(output, "{\"a\":1}\n{\"a\":2}"); + + // But validation should catch the leading junk + let result = reader.validate_complete(); + assert!(result.is_err()); + let err_msg = result.unwrap_err().to_string(); + assert!( + err_msg.contains("leading content"), + "Expected leading content error, got: {err_msg}" + ); + } + + #[test] + fn test_json_array_with_leading_whitespace_ok() { + let input = r#" + [{"a":1}, {"a":2}]"#; + let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes()); + let mut output = String::new(); + reader.read_to_string(&mut output).unwrap(); + assert_eq!(output, "{\"a\":1}\n{\"a\":2}"); + + // Leading whitespace should be fine + reader.validate_complete().unwrap(); + } + #[test] fn test_validate_complete_valid_with_trailing_whitespace() { let input = r#"[{"a":1},{"a":2}] @@ -690,11 +734,11 @@ mod tests { /// Test ChannelReader #[test] fn test_channel_reader() { - let (tx, rx) = std::sync::mpsc::sync_channel(4); + let (tx, rx) = tokio::sync::mpsc::channel(4); - // Send some chunks - tx.send(Bytes::from("Hello, ")).unwrap(); - tx.send(Bytes::from("World!")).unwrap(); + // Send some chunks (try_send is non-async) + tx.try_send(Bytes::from("Hello, ")).unwrap(); + tx.try_send(Bytes::from("World!")).unwrap(); drop(tx); // Close channel let mut reader = ChannelReader::new(rx); @@ -707,9 +751,9 @@ mod tests { /// Test ChannelReader with small reads #[test] fn test_channel_reader_small_reads() { - let (tx, rx) = std::sync::mpsc::sync_channel(4); + let (tx, rx) = tokio::sync::mpsc::channel(4); - tx.send(Bytes::from("ABCDEFGHIJ")).unwrap(); + tx.try_send(Bytes::from("ABCDEFGHIJ")).unwrap(); drop(tx); let mut reader = ChannelReader::new(rx); From c8e2e4bdfb935cfe7002b1fe450d606500327ed6 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Tue, 10 Feb 2026 10:19:25 +0800 Subject: [PATCH 7/7] fmt --- datafusion/datasource-json/src/source.rs | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/datafusion/datasource-json/src/source.rs b/datafusion/datasource-json/src/source.rs index 93edcaf68ea40..867cfe0e98fea 100644 --- a/datafusion/datasource-json/src/source.rs +++ b/datafusion/datasource-json/src/source.rs @@ -362,10 +362,9 @@ impl FileOpener for JsonOpener { // Uses tokio::sync::mpsc so the async send never blocks a // tokio worker thread; the consumer calls blocking_recv() // inside spawn_blocking. - let (byte_tx, byte_rx) = - tokio::sync::mpsc::channel::( - CHANNEL_BUFFER_SIZE, - ); + let (byte_tx, byte_rx) = tokio::sync::mpsc::channel::( + CHANNEL_BUFFER_SIZE, + ); // Channel for results: sync producer -> async consumer let (result_tx, result_rx) = tokio::sync::mpsc::channel(2); @@ -384,9 +383,11 @@ impl FileOpener for JsonOpener { } Err(e) => { let _ = error_tx - .send(Err(arrow::error::ArrowError::ExternalError( - Box::new(e), - ))) + .send(Err( + arrow::error::ArrowError::ExternalError( + Box::new(e), + ), + )) .await; break; } @@ -399,10 +400,11 @@ impl FileOpener for JsonOpener { // Store the SpawnedTask to keep it alive until stream is dropped let parse_task = SpawnedTask::spawn_blocking(move || { let channel_reader = ChannelReader::new(byte_rx); - let mut ndjson_reader = JsonArrayToNdjsonReader::with_capacity( - channel_reader, - JSON_CONVERTER_BUFFER_SIZE, - ); + let mut ndjson_reader = + JsonArrayToNdjsonReader::with_capacity( + channel_reader, + JSON_CONVERTER_BUFFER_SIZE, + ); match ReaderBuilder::new(schema) .with_batch_size(batch_size)