Files
TinfoilVibeServer/TinfoilVibeServer/Utilities/SeekableBufferedStream.cs
T
ecenshu 209b766a1f Build Snapshot from archives
Download from archives
Process XCI files in archives
2025-11-07 13:31:37 +10:30

251 lines
8.6 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
using System.Buffers;
namespace TinfoilVibeServer.Utilities;
/// <summary>
/// A readonly, seekable wrapper around a nonseekable stream.
/// It buffers the source data on demand in chunks so that you can seek
/// back and forth without reading the whole source at once.
/// </summary>
public sealed class SeekableBufferedStream : Stream
{
private const int DefaultChunkSize = 128 * 1024 * 1024; // 128 MiB
private readonly Stream _source;
private readonly ArrayPool<byte> _pool;
private readonly int _chunkSize;
private readonly bool _disposeSource;
// Buffer block holds a rented byte[] and the number of bytes actually read.
private readonly struct BufferBlock
{
public readonly byte[] Data;
public readonly int Length;
public BufferBlock(byte[] data, int length) { Data = data; Length = length; }
}
private readonly List<BufferBlock> _blocks = new();
private readonly long _specifiedLength = 0;
private long _bufferedLength; // total number of bytes buffered so far
private long _position; // current logical position in the stream
private bool _eof; // true when the source stream has been exhausted
#region ctor / dispose
/// <summary>
/// Creates a new instance.
/// </summary>
/// <param name="source">The underlying source stream. Must be readable.</param>
/// <param name="specifiedLength">Length of underlying stream if known before using</param>
/// <param name="chunkSize">Size of each buffer chunk (bytes). 128 MiB by default.</param>
/// <param name="disposeSource">If true, disposing this wrapper will also dispose the source stream.</param>
public SeekableBufferedStream(Stream source, long specifiedLength = 0, int chunkSize = DefaultChunkSize, bool disposeSource = false)
{
if (source == null) throw new ArgumentNullException(nameof(source));
if (!source.CanRead) throw new ArgumentException("Source stream must be readable.", nameof(source));
if (chunkSize <= 0) throw new ArgumentOutOfRangeException(nameof(chunkSize), "Chunk size must be positive.");
if (specifiedLength <= 0) throw new ArgumentOutOfRangeException(nameof(specifiedLength), "Specified length must be positive.");
_source = source;
_specifiedLength = specifiedLength;
_pool = ArrayPool<byte>.Shared;
_chunkSize = chunkSize;
_disposeSource = disposeSource;
}
protected override void Dispose(bool disposing)
{
if (disposing)
{
foreach (var block in _blocks)
_pool.Return(block.Data, clearArray: true);
_blocks.Clear();
if (_disposeSource)
_source.Dispose();
}
base.Dispose(disposing);
}
#endregion
#region helpers
/// <summary>
/// Ensures that at least <paramref name="requiredOffset"/> bytes are buffered.
/// Reads from the source stream until the requested offset is reached or EOF is hit.
/// </summary>
private void EnsureBuffered(long requiredOffset)
{
if (_eof || _bufferedLength >= requiredOffset)
return;
while (_bufferedLength < requiredOffset && !_eof)
{
var buf = _pool.Rent(_chunkSize);
int read = _source.Read(buf, 0, _chunkSize);
if (read == 0)
{
_eof = true;
_pool.Return(buf, clearArray: true);
break;
}
_blocks.Add(new BufferBlock(buf, read));
_bufferedLength += read;
}
}
/// <summary>
/// Finds the block that contains <paramref name="pos"/> and the offset inside that block.
/// </summary>
private void GetBlockAndOffset(long pos, out int blockIndex, out int offsetInBlock)
{
long accumulated = 0;
for (int i = 0; i < _blocks.Count; i++)
{
int blockLen = _blocks[i].Length;
if (pos < accumulated + blockLen)
{
blockIndex = i;
offsetInBlock = (int)(pos - accumulated);
return;
}
accumulated += blockLen;
}
// This should never happen because we always call EnsureBuffered before accessing.
throw new InvalidOperationException("Requested position is outside buffered range.");
}
#endregion
#region Stream overrides
public override bool CanRead => true;
public override bool CanSeek => true;
public override bool CanWrite => false;
public override long Length
{
get
{
// If we were given a length, we can return that.
if (_specifiedLength > 0) return _specifiedLength;
// If we already hit EOF, we know the length.
if (_eof) return _bufferedLength;
// If the underlying stream is seekable, we can ask it directly.
if (_source.CanSeek)
return _source.Length;
// Otherwise we need to drain the source to discover its length.
while (!_eof)
EnsureBuffered(_bufferedLength + _chunkSize);
return _bufferedLength;
}
}
public override long Position
{
get => _position;
set
{
if (value < 0) throw new ArgumentOutOfRangeException(nameof(value));
if (value > Length) throw new ArgumentOutOfRangeException(nameof(value));
_position = value;
}
}
public override int Read(byte[] buffer, int offset, int count)
{
if (buffer == null) throw new ArgumentNullException(nameof(buffer));
if (offset < 0 || count < 0 || offset + count > buffer.Length)
throw new ArgumentOutOfRangeException();
// If we are already at or beyond the logical end, nothing to read.
if (_position >= Length)
return 0;
// We will read at most `count` bytes but not past the logical end.
long maxRead = Math.Min(count, Length - _position);
EnsureBuffered(_position + maxRead);
int bytesRead = 0;
while (bytesRead < maxRead)
{
GetBlockAndOffset(_position, out int blockIdx, out int blockOffset);
var block = _blocks[blockIdx];
int available = block.Length - blockOffset;
int toCopy = (int)Math.Min(available, maxRead - bytesRead);
Buffer.BlockCopy(block.Data, blockOffset, buffer, offset + bytesRead, toCopy);
_position += toCopy;
bytesRead += toCopy;
}
return bytesRead;
}
public override long Seek(long offset, SeekOrigin origin)
{
long newPos = origin switch
{
SeekOrigin.Begin => offset,
SeekOrigin.Current => _position + offset,
SeekOrigin.End => Length + offset,
_ => throw new ArgumentException("Invalid SeekOrigin", nameof(origin))
};
if (newPos < 0) throw new IOException("Attempted to seek before the beginning of the stream.");
// Make sure we have buffered data up to the new position.
EnsureBuffered(newPos);
_position = newPos;
return _position;
}
public override void SetLength(long value) => throw new NotSupportedException();
public override void Flush() { /* No-op readonly stream */ }
public override void Write(byte[] buffer, int offset, int count) =>
throw new NotSupportedException();
public override void WriteByte(byte value) => throw new NotSupportedException();
#endregion
#region async helpers (optional)
public override async ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default)
{
// If we are already at or beyond the logical end, nothing to read.
if (_position >= Length)
return 0;
long maxRead = Math.Min(destination.Length, Length - _position);
EnsureBuffered(_position + maxRead);
int bytesRead = 0;
while (bytesRead < maxRead)
{
GetBlockAndOffset(_position, out int blockIdx, out int blockOffset);
var block = _blocks[blockIdx];
int available = block.Length - blockOffset;
int toCopy = (int)Math.Min(available, maxRead - bytesRead);
// We copy synchronously no async source involved
destination.Slice(bytesRead, toCopy).Span
.CopyTo(block.Data.AsSpan(blockOffset, toCopy));
_position += toCopy;
bytesRead += toCopy;
}
return bytesRead;
}
#endregion
}