Files
TinfoilVibeServer/TinfoilVibeServer/Utilities/RewindableStream.cs
T
ecenshu 0e2fec8c01
Build & Push Docker image / build-and-push (push) Successful in 14m38s
ci / build_linux (push) Successful in 4m43s
Skip hashed and same location files
Explicit usings
Multipart rar handling
2025-11-23 21:05:58 +10:30

299 lines
9.6 KiB
C#
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
namespace TinfoilVibeServer.Utilities;
/// <summary>
/// Wraps a nonseekable stream so that it can be read and seeked.
/// The wrapper keeps a small circular buffer of recently read data.
/// When the caller seeks outside the buffered range the wrapper
/// disposes the current stream, obtains a new instance via a
/// supplied factory, and reads forward from the start again.
/// </summary>
public sealed class RewindableStream : Stream
{
private readonly Func<Stream> _reopenFactory; // function that returns a fresh stream instance
private readonly int _bufferLimit; // maximum bytes to keep in memory
private Stream _source; // the current underlying stream
private MemoryStream _buffer; // holds the cached bytes
private long _bufferStart; // absolute position in the source of the first byte in _buffer
private long _position; // current read position in the virtual stream
private long? _length; // cached length once we discover it (null = unknown)
private bool _disposed;
/// <summary>
/// Creates a new seekable wrapper.
/// </summary>
/// <param name="source">The initial nonseekable stream.</param>
/// <param name="reopenFactory">
/// Factory that returns a *new* instance of the underlying stream.
/// It is called whenever we need to seek beyond the cached range.
/// </param>
/// <param name="bufferLimit">
/// The maximum number of bytes to keep cached in memory.
/// Older data will be discarded as new data is read. Typical value: 64KB.
/// </param>
public RewindableStream(
Stream source,
Func<Stream> reopenFactory,
int bufferLimit = 64 * 1024, long? length = null)
{
if (source == null) throw new ArgumentNullException(nameof(source));
if (!source.CanRead) throw new ArgumentException("Source stream must be readable", nameof(source));
if (reopenFactory == null) throw new ArgumentNullException(nameof(reopenFactory));
if (bufferLimit <= 0) throw new ArgumentOutOfRangeException(nameof(bufferLimit));
if (length.HasValue && length.Value < 0) throw new ArgumentOutOfRangeException(nameof(length));
_length = null; // unknown until we discover it
if (length.HasValue) _length = length;
_source = source;
_reopenFactory = reopenFactory;
_bufferLimit = bufferLimit;
_buffer = new MemoryStream();
_bufferStart = 0;
_position = 0;
_disposed = false;
}
#region Stream overrides
public override bool CanRead => !_disposed && _source.CanRead;
public override bool CanSeek => true; // we expose seek behaviour
public override bool CanWrite => false; // readonly wrapper
public override long Length
{
get
{
EnsureLengthAsync(CancellationToken.None).GetAwaiter().GetResult();
return _length.Value;
}
}
public override long Position
{
get => _position;
set => Seek(value, SeekOrigin.Begin);
}
public override void Flush()
{
// Nothing to do readonly wrapper.
}
public override int Read(byte[] buffer, int offset, int count)
{
ThrowIfDisposed();
if (buffer == null) throw new ArgumentNullException(nameof(buffer));
if (offset < 0 || count < 0 || offset + count > buffer.Length)
throw new ArgumentOutOfRangeException();
if (count == 0) return 0;
int totalRead = 0;
while (count > 0)
{
// Make sure the requested range is buffered.
EnsureBufferedUpTo(_position + count - 1).GetAwaiter().GetResult();
// How many bytes can we copy from the buffer?
long bufferEnd = _bufferStart + _buffer.Length;
long available = bufferEnd - _position;
if (available <= 0)
{
// We are at EOF nothing more to read.
break;
}
int toCopy = (int)Math.Min(count, available);
_buffer.Position = _position - _bufferStart;
int read = _buffer.Read(buffer, offset, toCopy);
offset += read;
count -= read;
totalRead += read;
_position += read;
if (read == 0) break; // EOF
}
return totalRead;
}
public override long Seek(long offset, SeekOrigin origin)
{
ThrowIfDisposed();
long newPos;
if (origin == SeekOrigin.Begin)
{
newPos = offset;
}
else if (origin == SeekOrigin.Current)
{
newPos = _position + offset;
}
else if (origin == SeekOrigin.End)
{
// We need the length first.
EnsureLengthAsync(CancellationToken.None).GetAwaiter().GetResult();
newPos = _length.Value + offset;
}
else
{
throw new ArgumentOutOfRangeException(nameof(origin));
}
if (newPos < 0)
throw new IOException("Cannot seek to a negative position.");
// If the new position lies outside our cached range, we must
// restart the underlying stream and read forward again.
if (newPos < _bufferStart || newPos > _bufferStart + _buffer.Length)
{
ReopenFromStart(); // resets _source, _buffer, etc.
_position = newPos; // restore the requested position
}
else
{
_position = newPos;
}
// Ensure that we actually have bytes buffered up to the new position
// (unless we are at the very end in which case the call will just
// return as we hit EOF).
EnsureBufferedUpTo(_position).GetAwaiter().GetResult();
return _position;
}
public override void SetLength(long value)
{
throw new NotSupportedException("SetLength is not supported on RewindableStream.");
}
public override void Write(byte[] buffer, int offset, int count)
{
throw new NotSupportedException("Write is not supported on RewindableStream.");
}
#endregion
#region Helper methods
/// <summary>
/// Ensures that the buffer contains data up to the specified absolute position.
/// </summary>
private async Task EnsureBufferedUpTo(long position)
{
ThrowIfDisposed();
if (position < _bufferStart) return; // we already have data before our buffer.
// Read from the underlying stream until we have buffered up to 'position'
// or until EOF.
while (_bufferStart + _buffer.Length <= position)
{
int toRead = (int)Math.Min(_bufferLimit,
position - (_bufferStart + _buffer.Length) + 1);
// Allocate a temporary buffer
byte[] temp = new byte[toRead];
int read = await _source.ReadAsync(temp, 0, temp.Length, CancellationToken.None);
if (read == 0) // EOF
{
// Store the final length if we don't already know it.
if (!_length.HasValue)
{
_length = _bufferStart + _buffer.Length;
}
break;
}
// Append to our circular buffer
_buffer.Position = _buffer.Length; // move to end
_buffer.Write(temp, 0, read);
// Trim if we exceeded the buffer limit.
if (_buffer.Length > _bufferLimit)
{
long excess = _buffer.Length - _bufferLimit;
byte[] remaining = new byte[_buffer.Length - excess];
_buffer.Position = excess;
_buffer.Read(remaining, 0, remaining.Length);
_buffer = new MemoryStream();
_buffer.Write(remaining, 0, remaining.Length);
_bufferStart += excess; // first byte in new buffer is now further ahead
}
}
}
/// <summary>
/// Reopens the underlying stream by disposing the current instance
/// and calling the factory again.
/// </summary>
private void ReopenFromStart()
{
_source.Dispose();
_source = _reopenFactory();
_buffer.SetLength(0);
_bufferStart = 0;
_position = 0;
}
/// <summary>
/// Attempts to discover the length of the underlying source if it supports it.
/// If the source does not expose a length we will read to the end once.
/// </summary>
private async Task EnsureLengthAsync(CancellationToken cancellationToken)
{
if (_length.HasValue) return;
if (_source.CanSeek)
{
long current = _source.Position;
_length = _source.Length;
_source.Position = current;
}
else
{
// We need to read until EOF to determine the length
while (true)
{
byte[] temp = new byte[_bufferLimit];
int read = await _source.ReadAsync(temp, 0, temp.Length, cancellationToken);
if (read == 0) break;
}
_length = _bufferStart + _buffer.Length;
}
}
private void ThrowIfDisposed()
{
if (_disposed) throw new ObjectDisposedException(nameof(RewindableStream));
}
#endregion
#region IDisposable
protected override void Dispose(bool disposing)
{
if (!_disposed && disposing)
{
_source?.Dispose();
_buffer?.Dispose();
}
_disposed = true;
base.Dispose(disposing);
}
#endregion
}