a1ea34bc01
Consolidate data and config into separate folders that will be expected to be mapped in the container Reviewed-on: #1 Co-authored-by: Huy Nguyen <ecenshu@gmail.com> Co-committed-by: Huy Nguyen <ecenshu@gmail.com>
259 lines
8.8 KiB
C#
259 lines
8.8 KiB
C#
using System.Buffers;
|
||
|
||
namespace TinfoilVibeServer.Utilities;
|
||
|
||
/// <summary>
|
||
/// A read‑only, seekable wrapper around a non‑seekable stream.
|
||
/// It buffers the source data on demand in chunks so that you can seek
|
||
/// back and forth without reading the whole source at once.
|
||
/// </summary>
|
||
public sealed class SeekableBufferedStream : Stream
|
||
{
|
||
private const int DefaultChunkSize = 128 * 1024 * 1024; // 128 MiB
|
||
|
||
private readonly Stream _source;
|
||
private readonly ArrayPool<byte> _pool;
|
||
private readonly int _chunkSize;
|
||
private readonly bool _disposeSource;
|
||
|
||
// Buffer block – holds a rented byte[] and the number of bytes actually read.
|
||
private readonly struct BufferBlock
|
||
{
|
||
public readonly byte[] Data;
|
||
public readonly int Length;
|
||
public BufferBlock(byte[] data, int length) { Data = data; Length = length; }
|
||
}
|
||
|
||
private readonly List<BufferBlock> _blocks = new();
|
||
private readonly long _specifiedLength;
|
||
private long _bufferedLength; // total number of bytes buffered so far
|
||
private long _position; // current logical position in the stream
|
||
private bool _eof; // true when the source stream has been exhausted
|
||
|
||
#region ctor / dispose
|
||
|
||
/// <summary>
|
||
/// Creates a new instance.
|
||
/// </summary>
|
||
/// <param name="source">The underlying source stream. Must be readable.</param>
|
||
/// <param name="specifiedLength">Length of underlying stream if known before using</param>
|
||
/// <param name="chunkSize">Size of each buffer chunk (bytes). 128 MiB by default.</param>
|
||
/// <param name="disposeSource">If true, disposing this wrapper will also dispose the source stream.</param>
|
||
public SeekableBufferedStream(Stream source, long specifiedLength = 0, int chunkSize = DefaultChunkSize, bool disposeSource = false)
|
||
{
|
||
if (source == null) throw new ArgumentNullException(nameof(source));
|
||
if (!source.CanRead) throw new ArgumentException("Source stream must be readable.", nameof(source));
|
||
if (chunkSize <= 0) throw new ArgumentOutOfRangeException(nameof(chunkSize), "Chunk size must be positive.");
|
||
if (specifiedLength <= 0) throw new ArgumentOutOfRangeException(nameof(specifiedLength), "Specified length must be positive.");
|
||
|
||
_source = source;
|
||
_specifiedLength = specifiedLength;
|
||
_pool = ArrayPool<byte>.Shared;
|
||
_chunkSize = chunkSize;
|
||
_disposeSource = disposeSource;
|
||
}
|
||
|
||
protected override void Dispose(bool disposing)
|
||
{
|
||
if (disposing)
|
||
{
|
||
foreach (var block in _blocks)
|
||
_pool.Return(block.Data, clearArray: true);
|
||
_blocks.Clear();
|
||
|
||
if (_disposeSource)
|
||
_source.Dispose();
|
||
}
|
||
base.Dispose(disposing);
|
||
}
|
||
|
||
// SeekableBufferedStream.cs – Add IAsyncDisposable support
|
||
public override async ValueTask DisposeAsync()
|
||
{
|
||
Dispose(true);
|
||
await Task.CompletedTask;
|
||
}
|
||
|
||
#endregion
|
||
|
||
#region helpers
|
||
|
||
/// <summary>
|
||
/// Ensures that at least <paramref name="requiredOffset"/> bytes are buffered.
|
||
/// Reads from the source stream until the requested offset is reached or EOF is hit.
|
||
/// </summary>
|
||
private void EnsureBuffered(long requiredOffset)
|
||
{
|
||
if (_eof || _bufferedLength >= requiredOffset)
|
||
return;
|
||
|
||
while (_bufferedLength < requiredOffset && !_eof)
|
||
{
|
||
var buf = _pool.Rent(_chunkSize);
|
||
int read = _source.Read(buf, 0, _chunkSize);
|
||
if (read == 0)
|
||
{
|
||
_eof = true;
|
||
_pool.Return(buf, clearArray: true);
|
||
break;
|
||
}
|
||
|
||
_blocks.Add(new BufferBlock(buf, read));
|
||
_bufferedLength += read;
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// Finds the block that contains <paramref name="pos"/> and the offset inside that block.
|
||
/// </summary>
|
||
private void GetBlockAndOffset(long pos, out int blockIndex, out int offsetInBlock)
|
||
{
|
||
long accumulated = 0;
|
||
for (int i = 0; i < _blocks.Count; i++)
|
||
{
|
||
int blockLen = _blocks[i].Length;
|
||
if (pos < accumulated + blockLen)
|
||
{
|
||
blockIndex = i;
|
||
offsetInBlock = (int)(pos - accumulated);
|
||
return;
|
||
}
|
||
accumulated += blockLen;
|
||
}
|
||
|
||
// This should never happen because we always call EnsureBuffered before accessing.
|
||
throw new InvalidOperationException("Requested position is outside buffered range.");
|
||
}
|
||
|
||
#endregion
|
||
|
||
#region Stream overrides
|
||
|
||
public override bool CanRead => true;
|
||
public override bool CanSeek => true;
|
||
public override bool CanWrite => false;
|
||
public override long Length
|
||
{
|
||
get
|
||
{
|
||
// If we were given a length, we can return that.
|
||
if (_specifiedLength > 0) return _specifiedLength;
|
||
|
||
// If we already hit EOF, we know the length.
|
||
if (_eof) return _bufferedLength;
|
||
|
||
// If the underlying stream is seekable, we can ask it directly.
|
||
if (_source.CanSeek)
|
||
return _source.Length;
|
||
|
||
// Otherwise we need to drain the source to discover its length.
|
||
while (!_eof)
|
||
EnsureBuffered(_bufferedLength + _chunkSize);
|
||
return _bufferedLength;
|
||
}
|
||
}
|
||
|
||
public override long Position
|
||
{
|
||
get => _position;
|
||
set
|
||
{
|
||
if (value < 0) throw new ArgumentOutOfRangeException(nameof(value));
|
||
if (value > Length) throw new ArgumentOutOfRangeException(nameof(value));
|
||
_position = value;
|
||
}
|
||
}
|
||
|
||
public override int Read(byte[] buffer, int offset, int count)
|
||
{
|
||
if (buffer == null) throw new ArgumentNullException(nameof(buffer));
|
||
if (offset < 0 || count < 0 || offset + count > buffer.Length)
|
||
throw new ArgumentOutOfRangeException();
|
||
|
||
// If we are already at or beyond the logical end, nothing to read.
|
||
if (_position >= Length)
|
||
return 0;
|
||
|
||
// We will read at most `count` bytes but not past the logical end.
|
||
long maxRead = Math.Min(count, Length - _position);
|
||
EnsureBuffered(_position + maxRead);
|
||
|
||
int bytesRead = 0;
|
||
while (bytesRead < maxRead)
|
||
{
|
||
GetBlockAndOffset(_position, out int blockIdx, out int blockOffset);
|
||
var block = _blocks[blockIdx];
|
||
int available = block.Length - blockOffset;
|
||
int toCopy = (int)Math.Min(available, maxRead - bytesRead);
|
||
|
||
Buffer.BlockCopy(block.Data, blockOffset, buffer, offset + bytesRead, toCopy);
|
||
|
||
_position += toCopy;
|
||
bytesRead += toCopy;
|
||
}
|
||
|
||
return bytesRead;
|
||
}
|
||
|
||
public override long Seek(long offset, SeekOrigin origin)
|
||
{
|
||
long newPos = origin switch
|
||
{
|
||
SeekOrigin.Begin => offset,
|
||
SeekOrigin.Current => _position + offset,
|
||
SeekOrigin.End => Length + offset,
|
||
_ => throw new ArgumentException("Invalid SeekOrigin", nameof(origin))
|
||
};
|
||
|
||
if (newPos < 0) throw new IOException("Attempted to seek before the beginning of the stream.");
|
||
|
||
// Make sure we have buffered data up to the new position.
|
||
EnsureBuffered(newPos);
|
||
_position = newPos;
|
||
return _position;
|
||
}
|
||
|
||
public override void SetLength(long value) => throw new NotSupportedException();
|
||
|
||
public override void Flush() { /* No-op – read‑only stream */ }
|
||
|
||
public override void Write(byte[] buffer, int offset, int count) =>
|
||
throw new NotSupportedException();
|
||
|
||
public override void WriteByte(byte value) => throw new NotSupportedException();
|
||
|
||
#endregion
|
||
|
||
#region async helpers (optional)
|
||
|
||
public override async ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default)
|
||
{
|
||
// If we are already at or beyond the logical end, nothing to read.
|
||
if (_position >= Length)
|
||
return 0;
|
||
|
||
long maxRead = Math.Min(destination.Length, Length - _position);
|
||
EnsureBuffered(_position + maxRead);
|
||
|
||
int bytesRead = 0;
|
||
while (bytesRead < maxRead)
|
||
{
|
||
GetBlockAndOffset(_position, out int blockIdx, out int blockOffset);
|
||
var block = _blocks[blockIdx];
|
||
int available = block.Length - blockOffset;
|
||
int toCopy = (int)Math.Min(available, maxRead - bytesRead);
|
||
|
||
// We copy synchronously – no async source involved
|
||
destination.Slice(bytesRead, toCopy).Span
|
||
.CopyTo(block.Data.AsSpan(blockOffset, toCopy));
|
||
|
||
_position += toCopy;
|
||
bytesRead += toCopy;
|
||
}
|
||
|
||
await Task.CompletedTask;
|
||
return bytesRead;
|
||
}
|
||
|
||
#endregion
|
||
} |