Conversation
teodordelibasic-db
left a comment
There was a problem hiding this comment.
Thanks a lot for contributing and sorry for the delayed review, we've been quite overwhelmed with product stabilization these past few weeks. I'll prioritize now so that we can get it in, a few other customers also want to use it. I am nowhere near a C#/.NET expert so I am reviewing mostly with LLM help.
A few more general comments below as well:
We should integrate the .NET CI into general repo CI. Currently PRs touching only dotnet/** pass CI without any checks and Rust FFI changes don't trigger cross-SDK .NET tests. In .github/workflows/push.yml we should add:
dotnetoutput in thechangesjob.dotnetpath filter (dotnet/**+.github/workflows/ci-dotnet.yml)dotnetjob callingci-dotnet.ymldotnetin thegatejob'sneedsarray + result check loopcross-sdk-dotnetjob (mirroringcross-sdk-go) triggered whenrust/**changes
A couple of missing misc files in the dotnet/ directory that we have in other SDKs:
CONTRIBUTING.md- guidelines how to setup a devloop for making changes and contributing them for this SDK.NEXT_CHANGELOG.md- used by the release process for unreleased changesNOTICE- license/attribution file
As far as I understand, this is a sync-only API, LLM suggests we could benefit from an API returning Task object for methods CreateStream, WaitForOffset, Flush, and Close. ASP.NET Core users calling current API on request threads will block thread pool threads.
Not a blocker obviously, just want to cross-check with you if it makes sense to add it in the future.
|
|
||
| // Allocate an array of CHeader structs in unmanaged memory. | ||
| var headerSize = Marshal.SizeOf<CHeader>(); | ||
| var arrayPtr = Marshal.AllocHGlobal(headerSize * headers.Count); |
There was a problem hiding this comment.
I think there is an allocator mismatch with the Rust free path. Rust's zerobus_free_headers (rust/ffi/src/lib.rs:798) frees this pointer with libc::free(), which expects memory from the C allocator's malloc.
- On Linux/macOS,
AllocHGlobalusesmalloc, so this works by accident. - On Windows,
AllocHGlobalusesLocalAlloc, which is a different heap than MSVCRT'sfree. This is undefined behavior — heap corruption, sporadic crashes.
The Go SDK allocates with C.malloc (go/ffi.go:368), which matches the Rust free path.
We can replace with NativeMemory.Alloc((nuint)(headerSize * headers.Count)), which calls malloc on all platforms.
| var headerPtr = arrayPtr + idx * headerSize; | ||
| var cHeader = new CHeader | ||
| { | ||
| Key = Marshal.StringToCoTaskMemUTF8(key), |
There was a problem hiding this comment.
This might be a same allocator mismatch. Rust frees these with CString::from_raw() (lib.rs:792-795), which calls Rust's global dealloc = libc::free on the default allocator. StringToCoTaskMemUTF8 uses CoTaskMemAlloc, which on Windows is the COM allocator — not malloc.
Go uses C.CString() (go/ffi.go:379-380), which calls malloc.
We can encode strings manually with Encoding.UTF8.GetBytes, allocate with NativeMemory.Alloc, copy bytes in and add the NUL terminator. Something like:
private static IntPtr AllocUtf8String(string s)
{
var byteCount = Encoding.UTF8.GetByteCount(s);
var ptr = (byte*)NativeMemory.Alloc((nuint)(byteCount + 1));
Encoding.UTF8.GetBytes(s, new Span<byte>(ptr, byteCount));
ptr[byteCount] = 0;
return (IntPtr)ptr;
}| { | ||
| result.Headers = IntPtr.Zero; | ||
| result.Count = 0; | ||
| result.ErrorMessage = Marshal.StringToCoTaskMemUTF8(ex.Message); |
There was a problem hiding this comment.
Same issue potentially, Rust frees error_message with CString::from_raw() (lib.rs:803). We could use the C allocator by applying the same NativeMemory.Alloc + manual UTF-8 encoding fix as above.
| /// Attempts to retrieve the bridge handle and callback reference for the stream. | ||
| /// Internal use only. | ||
| /// </summary> | ||
| internal ZerobusStream Recreate(IntPtr newPtr) |
There was a problem hiding this comment.
The old native stream pointer (_ptr) is leaked here I think. After Recreate():
_disposedis set to1, soDispose(bool)will early-return at line 244_ptrstill holds the old pointer, butStreamFreeis never called for it- The finalizer also no-ops because
_disposed != 0
Rust's zerobus_sdk_recreate_stream takes the old stream by reference - it reads from it but does not free it. The caller should eventually call zerobus_stream_free.
Fix might look something like this:
internal ZerobusStream Recreate(IntPtr newPtr)
{
var disposed = Interlocked.CompareExchange(ref _disposed, 1, 0);
ObjectDisposedException.ThrowIf(disposed != 0, this);
var oldPtr = Interlocked.Exchange(ref _ptr, IntPtr.Zero);
var newStream = _bridgeHandle.IsAllocated
? new ZerobusStream(newPtr, _bridgeHandle, _callbackRef!)
: new ZerobusStream(newPtr);
// Prevent accidental double-free of the shared handle from the old wrapper
_bridgeHandle = default;
// Free the old native stream (don't Close — it's already failed/closed)
if (oldPtr != IntPtr.Zero)
NativeMethods.StreamFree(oldPtr);
return newStream;
}|
|
||
| // Pin all record buffers and collect pointers | ||
| var handles = new GCHandle[records.Length]; | ||
| var ptrs = stackalloc byte*[records.Length]; |
There was a problem hiding this comment.
stackalloc with user-controlled size can overflow the stack. On 64-bit, this allocates records.Length * 16 bytes. For 100k records that's 1.6MB — exceeding the default 1MB .NET thread stack. A StackOverflowException cannot be caught.
Go doesn't have this issue because CGO uses separate stack management.
We can add a threshold and fall back to heap allocation for large batches:
const int StackAllocThreshold = 4096;
byte** ptrs = records.Length <= StackAllocThreshold
? stackalloc byte*[records.Length]
: (byte**)NativeMemory.Alloc((nuint)(records.Length * sizeof(byte*)));(And free the heap allocation in the finally block.)
| // Encode each string as null-terminated UTF-8 and pin | ||
| var encoded = new byte[records.Length][]; | ||
| var handles = new GCHandle[records.Length]; | ||
| var ptrs = stackalloc byte*[records.Length]; |
There was a problem hiding this comment.
Same stackalloc overflow risk as the proto batch path above.
| for (int i = 0; i < records.Length; i++) | ||
| { | ||
| // Encode with null terminator | ||
| var utf8 = Encoding.UTF8.GetBytes(records[i] + '\0'); |
There was a problem hiding this comment.
More of a nit. The records[i] + '\0' allocates a new string per record just to append a NUL character.
Minor optimization - encode without concatenation and append the NUL byte directly:
var byteCount = Encoding.UTF8.GetByteCount(records[i]);
var utf8 = new byte[byteCount + 1];
Encoding.UTF8.GetBytes(records[i], utf8);
// utf8[byteCount] is already 0 from array initialization|
|
||
| return new CStreamConfigurationOptions | ||
| { | ||
| MaxInflightRequests = (nuint)(options.MaxInflightRequests > 0 |
There was a problem hiding this comment.
Setting RecoveryRetries = 0 (meaning "don't retry") silently falls back to the default of 4. The Go SDK has the same bug (comment at go/ffi.go:193). We can add a code comment acknowledging this or use nullable types (uint?) to distinguish "unset" from "explicitly zero".
| } | ||
|
|
||
| /// <inheritdoc /> | ||
| public void Dispose() |
There was a problem hiding this comment.
_disposed is a plain bool with non-atomic reads/writes. If Dispose() is called from two threads simultaneously, both could read _disposed == false before either writes true. We can consider aligning to the same Interlocked.CompareExchange pattern used in ZerobusStream.Dispose(bool).
What changes are proposed in this pull request?
Initial Databricks Zerobus Ingest SDK for .NET to ingest data from .NET applications
How is this tested?