Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 26 additions & 7 deletions crates/bindings-csharp/Runtime/Internal/ITable.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
namespace SpacetimeDB.Internal;

using System.Buffers;
using SpacetimeDB.BSATN;

internal abstract class RawTableIterBase<T>
where T : IStructuralReadWrite, new()
{
public sealed class Enumerator(FFI.RowIter handle) : IDisposable
{
byte[] buffer = new byte[0x20_000];
public byte[] Current { get; private set; } = [];
private const int InitialBufferSize = 1024;
private byte[]? buffer = ArrayPool<byte>.Shared.Rent(InitialBufferSize);
public ArraySegment<byte> Current { get; private set; } = ArraySegment<byte>.Empty;

public bool MoveNext()
{
Expand All @@ -17,6 +19,11 @@ public bool MoveNext()
return false;
}

if (buffer is null)
{
return false;
}

uint buffer_len;
while (true)
{
Expand All @@ -38,11 +45,10 @@ public bool MoveNext()
{
// Iterator advanced and may also be `EXHAUSTED`.
// When `OK`, we'll need to advance the iterator in the next call to `MoveNext`.
// In both cases, copy over the row data to `Current` from the scratch `buffer`.
// In both cases, update `Current` to point at the valid range in the scratch `buffer`.
case Errno.EXHAUSTED
or Errno.OK:
Current = new byte[buffer_len];
Array.Copy(buffer, 0, Current, 0, buffer_len);
Current = new ArraySegment<byte>(buffer, 0, (int)buffer_len);
return buffer_len != 0;
// Couldn't find the iterator, error!
case Errno.NO_SUCH_ITER:
Expand All @@ -51,7 +57,8 @@ public bool MoveNext()
// Grow `buffer` and try again.
// The `buffer_len` will have been updated with the necessary size.
case Errno.BUFFER_TOO_SMALL:
buffer = new byte[buffer_len];
ArrayPool<byte>.Shared.Return(buffer);
buffer = ArrayPool<byte>.Shared.Rent((int)buffer_len);
continue;
default:
throw new UnknownException(ret);
Expand All @@ -66,6 +73,12 @@ public void Dispose()
FFI.row_iter_bsatn_close(handle);
handle = FFI.RowIter.INVALID;
}

if (buffer is not null)
{
ArrayPool<byte>.Shared.Return(buffer);
buffer = null;
}
}

public void Reset()
Expand All @@ -87,7 +100,13 @@ public IEnumerable<T> Parse()
{
foreach (var chunk in this)
{
using var stream = new MemoryStream(chunk);
using var stream = new MemoryStream(
chunk.Array!,
chunk.Offset,
chunk.Count,
writable: false,
publiclyVisible: true
);
using var reader = new BinaryReader(stream);
while (stream.Position < stream.Length)
{
Expand Down
Loading