Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion native/rustycsv/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions native/rustycsv/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ crate-type = ["cdylib", "lib"]
[dependencies]
rustler = { git = "https://github.com/rusterlium/rustler.git", branch = "master" }
rayon = "1.10" # Data parallelism for parallel parsing
thiserror = "2" # Idiomatic error types for library code
mimalloc = { version = "0.1", default-features = false, optional = true }

[target.'cfg(target_env = "musl")'.dependencies]
Expand Down
2 changes: 2 additions & 0 deletions native/rustycsv/src/core/newlines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
/// checks against the custom patterns.

#[derive(Debug, Clone)]
#[must_use]
pub struct Newlines {
/// Newline patterns sorted longest-first for greedy matching.
pub patterns: Vec<Vec<u8>>,
Expand Down Expand Up @@ -32,6 +33,7 @@ impl Newlines {
}

/// Maximum pattern length (used for chunk-boundary safety in streaming).
#[must_use]
pub fn max_pattern_len(&self) -> usize {
self.patterns.iter().map(|p| p.len()).max().unwrap_or(1)
}
Expand Down
84 changes: 84 additions & 0 deletions native/rustycsv/src/core/simd_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

/// A newline terminator position.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[must_use]
pub struct RowEnd {
/// Byte position of terminator start (\n or \r in \r\n).
pub pos: u32,
Expand All @@ -14,6 +15,7 @@ pub struct RowEnd {

/// Structural index: positions of all unquoted separators and row endings.
#[derive(Debug)]
#[must_use]
pub struct StructuralIndex {
/// Positions of unquoted field separators (commas, tabs, etc.).
pub field_seps: Vec<u32>,
Expand Down Expand Up @@ -58,6 +60,7 @@ impl StructuralIndex {

/// Number of rows.
#[inline]
#[must_use]
pub fn row_count(&self) -> usize {
let n = self.row_ends.len();
// If there's content after the last row_end (no trailing newline), there's one more row.
Expand Down Expand Up @@ -130,6 +133,7 @@ impl<'a> Iterator for RowIter<'a> {
let start = self.pos;
let end = self.index.input_len;
self.pos = end;
self.row_idx += 1;
Some((start, end, end))
} else {
None
Expand All @@ -144,6 +148,8 @@ impl<'a> Iterator for RowIter<'a> {
}
}

impl ExactSizeIterator for RowIter<'_> {}

/// A single row from the cursor-based iterator, with its field bounds.
pub struct Row<'a> {
pub start: u32,
Expand Down Expand Up @@ -177,6 +183,7 @@ impl<'a> Iterator for RowFieldIter<'a> {
let start = self.pos;
let end = self.index.input_len;
self.pos = end;
self.row_idx += 1;
(start, end)
} else {
return None;
Expand Down Expand Up @@ -210,6 +217,8 @@ impl<'a> Iterator for RowFieldIter<'a> {
}
}

impl ExactSizeIterator for RowFieldIter<'_> {}

/// Iterator over fields in a single row.
pub struct FieldIter<'a> {
seps: &'a [u32],
Expand Down Expand Up @@ -249,11 +258,16 @@ impl<'a> Iterator for FieldIter<'a> {

#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
if self.done {
return (0, Some(0));
}
let remaining = (self.seps.len() + 1).saturating_sub(self.idx);
(remaining, Some(remaining))
}
}

impl ExactSizeIterator for FieldIter<'_> {}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -373,4 +387,74 @@ mod tests {
assert_eq!(cursor[0], vec![(0, 1), (2, 3)]); // a,b
assert_eq!(cursor[1], vec![(4, 5)]); // c
}

// --- ExactSizeIterator correctness tests ---

#[test]
fn row_iter_exact_size_with_trailing_row() {
// "a\nb" — 2 rows, second has no trailing newline
let idx = make_index(vec![], vec![RowEnd { pos: 1, len: 1 }], 3);
let mut iter = idx.rows();

assert_eq!(iter.len(), 2);
let _ = iter.next(); // consume row 1
assert_eq!(iter.len(), 1);
let _ = iter.next(); // consume trailing row
assert_eq!(iter.len(), 0);
assert!(iter.next().is_none());
}

#[test]
fn row_iter_exact_size_no_trailing_row() {
// "a\n" — 1 row with trailing newline
let idx = make_index(vec![], vec![RowEnd { pos: 1, len: 1 }], 2);
let mut iter = idx.rows();

assert_eq!(iter.len(), 1);
let _ = iter.next();
assert_eq!(iter.len(), 0);
assert!(iter.next().is_none());
}

#[test]
fn row_field_iter_exact_size_with_trailing_row() {
// "a\nb" — 2 rows, second has no trailing newline
let idx = make_index(vec![], vec![RowEnd { pos: 1, len: 1 }], 3);
let mut iter = idx.rows_with_fields();

assert_eq!(iter.len(), 2);
let _ = iter.next();
assert_eq!(iter.len(), 1);
let _ = iter.next(); // trailing row
assert_eq!(iter.len(), 0);
assert!(iter.next().is_none());
}

#[test]
fn field_iter_exact_size_single_field() {
// Row "abc" — 1 field, no separators
let idx = make_index(vec![], vec![RowEnd { pos: 3, len: 1 }], 4);
let mut fields = idx.fields_in_row(0, 3);

assert_eq!(fields.len(), 1);
let _ = fields.next(); // consume the only field
assert_eq!(fields.len(), 0);
assert!(fields.next().is_none());
}

#[test]
fn field_iter_exact_size_multiple_fields() {
// Row "a,b,c" — 3 fields, seps at 1 and 3
let idx = make_index(vec![1, 3], vec![RowEnd { pos: 5, len: 1 }], 6);
let mut fields = idx.fields_in_row(0, 5);

assert_eq!(fields.len(), 3);
let _ = fields.next(); // field "a"
assert_eq!(fields.len(), 2);
let _ = fields.next(); // field "b"
assert_eq!(fields.len(), 1);
let _ = fields.next(); // field "c" (last, sets done=true)
assert_eq!(fields.len(), 0);
assert!(fields.next().is_none());
}
}
22 changes: 10 additions & 12 deletions native/rustycsv/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1575,23 +1575,21 @@ fn encode_string_parallel<'a>(
continue;
}

let utf8_field: Vec<u8> = if needs_quoting {
if needs_quoting {
let mut buf = Vec::with_capacity(field.len() + 8);
write_quoted_field(&mut buf, field, esc);
buf
if needs_encoding {
let encoded = encode_utf8_to_target(&buf, encoding);
out.extend_from_slice(&encoded);
} else {
out.extend_from_slice(&buf);
}
} else if needs_encoding {
field.clone()
} else {
// No formula, no quoting, no encoding — direct extend
out.extend_from_slice(field);
continue;
};

if needs_encoding {
let encoded = encode_utf8_to_target(&utf8_field, encoding);
// Encode directly — no clone needed
let encoded = encode_utf8_to_target(field, encoding);
out.extend_from_slice(&encoded);
} else {
out.extend_from_slice(&utf8_field);
out.extend_from_slice(field);
}
}
out.extend_from_slice(&ls_encoded);
Expand Down
1 change: 1 addition & 0 deletions native/rustycsv/src/resource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ impl StreamingParserEnum {
}

/// Wrapper for StreamingParser that can be stored in a ResourceArc
#[must_use]
pub struct StreamingParserResource {
pub inner: Mutex<StreamingParserEnum>,
}
Expand Down
30 changes: 26 additions & 4 deletions native/rustycsv/src/strategy/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

use std::borrow::Cow;

use super::streaming::shrink_excess;
use crate::core::newlines::{match_newline, Newlines};

// ============================================================================
Expand Down Expand Up @@ -187,6 +188,7 @@ fn parse_row_general<'a>(

/// Field boundary for general parsing
#[derive(Debug, Clone, Copy)]
#[must_use]
pub struct GeneralFieldBound {
pub start: usize,
pub end: usize,
Expand Down Expand Up @@ -546,6 +548,7 @@ fn parse_row_boundaries_general(
// ============================================================================

/// Streaming parser that handles multi-byte separators and escapes
#[must_use]
pub struct GeneralStreamingParser {
buffer: Vec<u8>,
complete_rows: Vec<Vec<Vec<u8>>>,
Expand Down Expand Up @@ -658,22 +661,28 @@ impl GeneralStreamingParser {
self.buffer.drain(0..self.partial_row_start);
self.scan_pos -= self.partial_row_start;
self.partial_row_start = 0;
shrink_excess(&mut self.buffer);
}
}

pub fn take_rows(&mut self, max: usize) -> Vec<Vec<Vec<u8>>> {
let take_count = max.min(self.complete_rows.len());
self.complete_rows.drain(0..take_count).collect()
let rows: Vec<_> = self.complete_rows.drain(0..take_count).collect();
shrink_excess(&mut self.complete_rows);
rows
}

#[must_use]
pub fn available_rows(&self) -> usize {
self.complete_rows.len()
}

#[must_use]
pub fn has_partial(&self) -> bool {
self.partial_row_start < self.buffer.len()
}

#[must_use]
pub fn buffer_size(&self) -> usize {
self.buffer.len()
}
Expand All @@ -684,8 +693,11 @@ impl GeneralStreamingParser {
if !row.is_empty() {
self.complete_rows.push(row);
}
self.partial_row_start = self.buffer.len();
}
// Release the buffer — parsing is done
self.buffer = Vec::new();
self.partial_row_start = 0;
self.scan_pos = 0;
std::mem::take(&mut self.complete_rows)
}
}
Expand Down Expand Up @@ -1199,6 +1211,7 @@ fn parse_row_boundaries_general_with_newlines(
}

/// Streaming parser with custom newline support.
#[must_use]
pub struct GeneralStreamingParserNewlines {
buffer: Vec<u8>,
complete_rows: Vec<Vec<Vec<u8>>>,
Expand Down Expand Up @@ -1317,22 +1330,28 @@ impl GeneralStreamingParserNewlines {
self.buffer.drain(0..self.partial_row_start);
self.scan_pos -= self.partial_row_start;
self.partial_row_start = 0;
shrink_excess(&mut self.buffer);
}
}

pub fn take_rows(&mut self, max: usize) -> Vec<Vec<Vec<u8>>> {
let take_count = max.min(self.complete_rows.len());
self.complete_rows.drain(0..take_count).collect()
let rows: Vec<_> = self.complete_rows.drain(0..take_count).collect();
shrink_excess(&mut self.complete_rows);
rows
}

#[must_use]
pub fn available_rows(&self) -> usize {
self.complete_rows.len()
}

#[must_use]
pub fn has_partial(&self) -> bool {
self.partial_row_start < self.buffer.len()
}

#[must_use]
pub fn buffer_size(&self) -> usize {
self.buffer.len()
}
Expand All @@ -1343,8 +1362,11 @@ impl GeneralStreamingParserNewlines {
if !row.is_empty() {
self.complete_rows.push(row);
}
self.partial_row_start = self.buffer.len();
}
// Release the buffer — parsing is done
self.buffer = Vec::new();
self.partial_row_start = 0;
self.scan_pos = 0;
std::mem::take(&mut self.complete_rows)
}
}
Expand Down
4 changes: 4 additions & 0 deletions native/rustycsv/src/strategy/parallel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ pub(crate) fn get_pool() -> Option<&'static rayon::ThreadPool> {
rayon::ThreadPoolBuilder::new()
.num_threads(recommended_threads())
.thread_name(|i| format!("rustycsv-{i}"))
// Reduce per-thread stack from the 8 MiB default to 2 MiB.
// CSV field extraction has shallow call stacks; the default
// wastes ~48 MiB of virtual memory across 8 persistent threads.
.stack_size(2 * 1024 * 1024)
.build()
.ok()
})
Expand Down
Loading