mirror of
https://github.com/make-42/xyosc
synced 2025-01-18 18:57:10 +01:00
941 lines
21 KiB
Go
941 lines
21 KiB
Go
// Copyright 2019 smallnest. All rights reserved.
|
|
// Use of this source code is governed by a MIT-style
|
|
// license that can be found in the LICENSE file.
|
|
|
|
package ringbuffer
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"io"
|
|
"sync"
|
|
"time"
|
|
"unsafe"
|
|
)
|
|
|
|
var (
|
|
// ErrTooMuchDataToWrite is returned when the data to write is more than the buffer size.
|
|
ErrTooMuchDataToWrite = errors.New("too much data to write")
|
|
|
|
// ErrIsFull is returned when the buffer is full and not blocking.
|
|
ErrIsFull = errors.New("ringbuffer is full")
|
|
|
|
// ErrIsEmpty is returned when the buffer is empty and not blocking.
|
|
ErrIsEmpty = errors.New("ringbuffer is empty")
|
|
|
|
// ErrIsNotEmpty is returned when the buffer is not empty and not blocking.
|
|
ErrIsNotEmpty = errors.New("ringbuffer is not empty")
|
|
|
|
// ErrAcquireLock is returned when the lock is not acquired on Try operations.
|
|
ErrAcquireLock = errors.New("unable to acquire lock")
|
|
|
|
// ErrWriteOnClosed is returned when write on a closed ringbuffer.
|
|
ErrWriteOnClosed = errors.New("write on closed ringbuffer")
|
|
|
|
// ErrReaderClosed is returned when a ReadClosed closed the ringbuffer.
|
|
ErrReaderClosed = errors.New("reader closed")
|
|
)
|
|
|
|
// RingBuffer is a circular buffer that implements io.ReaderWriter interface.
|
|
// It operates like a buffered pipe, where data is written to a RingBuffer
|
|
// and can be read back from another goroutine.
|
|
// It is safe to concurrently read and write RingBuffer.
|
|
type RingBuffer struct {
|
|
buf []byte
|
|
size int
|
|
r int // next position to read
|
|
w int // next position to write
|
|
isFull bool
|
|
err error
|
|
block bool
|
|
rTimeout time.Duration // Applies to writes (waits for the read condition)
|
|
wTimeout time.Duration // Applies to read (wait for the write condition)
|
|
mu sync.Mutex
|
|
wg sync.WaitGroup
|
|
readCond *sync.Cond // Signaled when data has been read.
|
|
writeCond *sync.Cond // Signaled when data has been written.
|
|
}
|
|
|
|
// New returns a new RingBuffer whose buffer has the given size.
|
|
func New(size int) *RingBuffer {
|
|
return &RingBuffer{
|
|
buf: make([]byte, size),
|
|
size: size,
|
|
}
|
|
}
|
|
|
|
// NewBuffer returns a new RingBuffer whose buffer is provided.
|
|
func NewBuffer(b []byte) *RingBuffer {
|
|
return &RingBuffer{
|
|
buf: b,
|
|
size: len(b),
|
|
}
|
|
}
|
|
|
|
// SetBlocking sets the blocking mode of the ring buffer.
|
|
// If block is true, Read and Write will block when there is no data to read or no space to write.
|
|
// If block is false, Read and Write will return ErrIsEmpty or ErrIsFull immediately.
|
|
// By default, the ring buffer is not blocking.
|
|
// This setting should be called before any Read or Write operation or after a Reset.
|
|
func (r *RingBuffer) SetBlocking(block bool) *RingBuffer {
|
|
r.block = block
|
|
if block {
|
|
r.readCond = sync.NewCond(&r.mu)
|
|
r.writeCond = sync.NewCond(&r.mu)
|
|
}
|
|
return r
|
|
}
|
|
|
|
// WithCancel sets a context to cancel the ring buffer.
|
|
// When the context is canceled, the ring buffer will be closed with the context error.
|
|
// A goroutine will be started and run until the provided context is canceled.
|
|
func (r *RingBuffer) WithCancel(ctx context.Context) *RingBuffer {
|
|
go func() {
|
|
select {
|
|
case <-ctx.Done():
|
|
r.CloseWithError(ctx.Err())
|
|
}
|
|
}()
|
|
return r
|
|
}
|
|
|
|
// WithTimeout will set a blocking read/write timeout.
|
|
// If no reads or writes occur within the timeout,
|
|
// the ringbuffer will be closed and context.DeadlineExceeded will be returned.
|
|
// A timeout of 0 or less will disable timeouts (default).
|
|
func (r *RingBuffer) WithTimeout(d time.Duration) *RingBuffer {
|
|
r.mu.Lock()
|
|
r.rTimeout = d
|
|
r.wTimeout = d
|
|
r.mu.Unlock()
|
|
return r
|
|
}
|
|
|
|
// WithReadTimeout will set a blocking read timeout.
|
|
// Reads refers to any call that reads data from the buffer.
|
|
// If no writes occur within the timeout,
|
|
// the ringbuffer will be closed and context.DeadlineExceeded will be returned.
|
|
// A timeout of 0 or less will disable timeouts (default).
|
|
func (r *RingBuffer) WithReadTimeout(d time.Duration) *RingBuffer {
|
|
r.mu.Lock()
|
|
// Read operations wait for writes to complete,
|
|
// therefore we set the wTimeout.
|
|
r.wTimeout = d
|
|
r.mu.Unlock()
|
|
return r
|
|
}
|
|
|
|
// WithWriteTimeout will set a blocking write timeout.
|
|
// Write refers to any call that writes data into the buffer.
|
|
// If no reads occur within the timeout,
|
|
// the ringbuffer will be closed and context.DeadlineExceeded will be returned.
|
|
// A timeout of 0 or less will disable timeouts (default).
|
|
func (r *RingBuffer) WithWriteTimeout(d time.Duration) *RingBuffer {
|
|
r.mu.Lock()
|
|
// Write operations wait for reads to complete,
|
|
// therefore we set the rTimeout.
|
|
r.rTimeout = d
|
|
r.mu.Unlock()
|
|
return r
|
|
}
|
|
|
|
func (r *RingBuffer) setErr(err error, locked bool) error {
|
|
if !locked {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
}
|
|
if r.err != nil && r.err != io.EOF {
|
|
return r.err
|
|
}
|
|
|
|
switch err {
|
|
// Internal errors are transient
|
|
case nil, ErrIsEmpty, ErrIsFull, ErrAcquireLock, ErrTooMuchDataToWrite, ErrIsNotEmpty:
|
|
return err
|
|
default:
|
|
r.err = err
|
|
if r.block {
|
|
r.readCond.Broadcast()
|
|
r.writeCond.Broadcast()
|
|
}
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (r *RingBuffer) readErr(locked bool) error {
|
|
if !locked {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
}
|
|
if r.err != nil {
|
|
if r.err == io.EOF {
|
|
if r.w == r.r && !r.isFull {
|
|
return io.EOF
|
|
}
|
|
return nil
|
|
}
|
|
return r.err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Read reads up to len(p) bytes into p. It returns the number of bytes read (0 <= n <= len(p)) and any error encountered.
|
|
// Even if Read returns n < len(p), it may use all of p as scratch space during the call.
|
|
// If some data is available but not len(p) bytes, Read conventionally returns what is available instead of waiting for more.
|
|
// When Read encounters an error or end-of-file condition after successfully reading n > 0 bytes, it returns the number of bytes read.
|
|
// It may return the (non-nil) error from the same call or return the error (and n == 0) from a subsequent call.
|
|
// Callers should always process the n > 0 bytes returned before considering the error err.
|
|
// Doing so correctly handles I/O errors that happen after reading some bytes and also both of the allowed EOF behaviors.
|
|
func (r *RingBuffer) Read(p []byte) (n int, err error) {
|
|
if len(p) == 0 {
|
|
return 0, r.readErr(false)
|
|
}
|
|
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
if err := r.readErr(true); err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
r.wg.Add(1)
|
|
defer r.wg.Done()
|
|
n, err = r.read(p)
|
|
for err == ErrIsEmpty && r.block {
|
|
if !r.waitWrite() {
|
|
return 0, context.DeadlineExceeded
|
|
}
|
|
if err = r.readErr(true); err != nil {
|
|
break
|
|
}
|
|
n, err = r.read(p)
|
|
}
|
|
if r.block && n > 0 {
|
|
r.readCond.Broadcast()
|
|
}
|
|
return n, err
|
|
}
|
|
|
|
// TryRead read up to len(p) bytes into p like Read, but it is never blocking.
|
|
// If it does not succeed to acquire the lock, it returns ErrAcquireLock.
|
|
func (r *RingBuffer) TryRead(p []byte) (n int, err error) {
|
|
ok := r.mu.TryLock()
|
|
if !ok {
|
|
return 0, ErrAcquireLock
|
|
}
|
|
defer r.mu.Unlock()
|
|
if err := r.readErr(true); err != nil {
|
|
return 0, err
|
|
}
|
|
if len(p) == 0 {
|
|
return 0, r.readErr(true)
|
|
}
|
|
|
|
n, err = r.read(p)
|
|
if r.block && n > 0 {
|
|
r.readCond.Broadcast()
|
|
}
|
|
return n, err
|
|
}
|
|
|
|
func (r *RingBuffer) read(p []byte) (n int, err error) {
|
|
if r.w == r.r && !r.isFull {
|
|
return 0, ErrIsEmpty
|
|
}
|
|
|
|
if r.w > r.r {
|
|
n = r.w - r.r
|
|
if n > len(p) {
|
|
n = len(p)
|
|
}
|
|
copy(p, r.buf[r.r:r.r+n])
|
|
r.r = (r.r + n) % r.size
|
|
return
|
|
}
|
|
|
|
n = r.size - r.r + r.w
|
|
if n > len(p) {
|
|
n = len(p)
|
|
}
|
|
|
|
if r.r+n <= r.size {
|
|
copy(p, r.buf[r.r:r.r+n])
|
|
} else {
|
|
c1 := r.size - r.r
|
|
copy(p, r.buf[r.r:r.size])
|
|
c2 := n - c1
|
|
copy(p[c1:], r.buf[0:c2])
|
|
}
|
|
r.r = (r.r + n) % r.size
|
|
|
|
r.isFull = false
|
|
|
|
return n, r.readErr(true)
|
|
}
|
|
|
|
// Returns true if a read may have happened.
|
|
// Returns false if waited longer than rTimeout.
|
|
// Must be called when locked and returns locked.
|
|
func (r *RingBuffer) waitRead() (ok bool) {
|
|
if r.rTimeout <= 0 {
|
|
r.readCond.Wait()
|
|
return true
|
|
}
|
|
start := time.Now()
|
|
defer time.AfterFunc(r.rTimeout, r.readCond.Broadcast).Stop()
|
|
|
|
r.readCond.Wait()
|
|
if time.Since(start) >= r.rTimeout {
|
|
r.setErr(context.DeadlineExceeded, true)
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
// ReadByte reads and returns the next byte from the input or ErrIsEmpty.
|
|
func (r *RingBuffer) ReadByte() (b byte, err error) {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
if err = r.readErr(true); err != nil {
|
|
return 0, err
|
|
}
|
|
for r.w == r.r && !r.isFull {
|
|
if r.block {
|
|
if !r.waitWrite() {
|
|
return 0, context.DeadlineExceeded
|
|
}
|
|
err = r.readErr(true)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
continue
|
|
}
|
|
return 0, ErrIsEmpty
|
|
}
|
|
b = r.buf[r.r]
|
|
r.r++
|
|
if r.r == r.size {
|
|
r.r = 0
|
|
}
|
|
|
|
r.isFull = false
|
|
return b, r.readErr(true)
|
|
}
|
|
|
|
// Write writes len(p) bytes from p to the underlying buf.
|
|
// It returns the number of bytes written from p (0 <= n <= len(p))
|
|
// and any error encountered that caused the write to stop early.
|
|
// If blocking n < len(p) will be returned only if an error occurred.
|
|
// Write returns a non-nil error if it returns n < len(p).
|
|
// Write will not modify the slice data, even temporarily.
|
|
func (r *RingBuffer) Write(p []byte) (n int, err error) {
|
|
if len(p) == 0 {
|
|
return 0, r.setErr(nil, false)
|
|
}
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
if err := r.err; err != nil {
|
|
if err == io.EOF {
|
|
err = ErrWriteOnClosed
|
|
}
|
|
return 0, err
|
|
}
|
|
wrote := 0
|
|
for len(p) > 0 {
|
|
n, err = r.write(p)
|
|
wrote += n
|
|
if !r.block || err == nil {
|
|
break
|
|
}
|
|
err = r.setErr(err, true)
|
|
if r.block && (err == ErrIsFull || err == ErrTooMuchDataToWrite) {
|
|
r.writeCond.Broadcast()
|
|
r.waitRead()
|
|
p = p[n:]
|
|
err = nil
|
|
continue
|
|
}
|
|
break
|
|
}
|
|
if r.block && wrote > 0 {
|
|
r.writeCond.Broadcast()
|
|
}
|
|
|
|
return wrote, r.setErr(err, true)
|
|
}
|
|
|
|
// waitWrite will wait for a write event.
|
|
// Returns true if a write may have happened.
|
|
// Returns false if waited longer than wTimeout.
|
|
// Must be called when locked and returns locked.
|
|
func (r *RingBuffer) waitWrite() (ok bool) {
|
|
if r.wTimeout <= 0 {
|
|
r.writeCond.Wait()
|
|
return true
|
|
}
|
|
|
|
start := time.Now()
|
|
defer time.AfterFunc(r.wTimeout, r.writeCond.Broadcast).Stop()
|
|
|
|
r.writeCond.Wait()
|
|
if time.Since(start) >= r.wTimeout {
|
|
r.setErr(context.DeadlineExceeded, true)
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
// ReadFrom will fulfill the write side of the ringbuffer.
|
|
// This will do writes directly into the buffer,
|
|
// therefore avoiding a mem-copy when using the Write.
|
|
//
|
|
// ReadFrom will not automatically close the buffer even after returning.
|
|
// For that call CloseWriter().
|
|
//
|
|
// ReadFrom reads data from r until EOF or error.
|
|
// The return value n is the number of bytes read.
|
|
// Any error except EOF encountered during the read is also returned,
|
|
// and the error will cause the Read side to fail as well.
|
|
// ReadFrom only available in blocking mode.
|
|
func (r *RingBuffer) ReadFrom(rd io.Reader) (n int64, err error) {
|
|
if !r.block {
|
|
return 0, errors.New("RingBuffer: ReadFrom only available in blocking mode")
|
|
}
|
|
zeroReads := 0
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
for {
|
|
if err = r.readErr(true); err != nil {
|
|
return n, err
|
|
}
|
|
if r.isFull {
|
|
// Wait for a read
|
|
if !r.waitRead() {
|
|
return 0, context.DeadlineExceeded
|
|
}
|
|
continue
|
|
}
|
|
|
|
var toRead []byte
|
|
if r.w >= r.r {
|
|
// After reader, read until end of buffer
|
|
toRead = r.buf[r.w:]
|
|
} else {
|
|
// Before reader, read until reader.
|
|
toRead = r.buf[r.w:r.r]
|
|
}
|
|
// Unlock while reading
|
|
r.mu.Unlock()
|
|
nr, rerr := rd.Read(toRead)
|
|
r.mu.Lock()
|
|
if rerr != nil && rerr != io.EOF {
|
|
err = r.setErr(err, true)
|
|
break
|
|
}
|
|
if nr == 0 && rerr == nil {
|
|
zeroReads++
|
|
if zeroReads >= 100 {
|
|
err = r.setErr(io.ErrNoProgress, true)
|
|
}
|
|
continue
|
|
}
|
|
zeroReads = 0
|
|
r.w += nr
|
|
if r.w == r.size {
|
|
r.w = 0
|
|
}
|
|
r.isFull = r.r == r.w && nr > 0
|
|
n += int64(nr)
|
|
r.writeCond.Broadcast()
|
|
if rerr == io.EOF {
|
|
// We do not close.
|
|
break
|
|
}
|
|
}
|
|
return n, err
|
|
}
|
|
|
|
// WriteTo writes data to w until there's no more data to write or
|
|
// when an error occurs. The return value n is the number of bytes
|
|
// written. Any error encountered during the write is also returned.
|
|
//
|
|
// If a non-nil error is returned the write side will also see the error.
|
|
func (r *RingBuffer) WriteTo(w io.Writer) (n int64, err error) {
|
|
if !r.block {
|
|
return 0, errors.New("RingBuffer: WriteTo only available in blocking mode")
|
|
}
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
|
|
// Don't write more than half, to unblock reads earlier.
|
|
maxWrite := len(r.buf) / 2
|
|
// But write at least 8K if possible
|
|
if maxWrite < 8<<10 {
|
|
maxWrite = len(r.buf)
|
|
}
|
|
for {
|
|
if err = r.readErr(true); err != nil {
|
|
break
|
|
}
|
|
if r.r == r.w && !r.isFull {
|
|
// Wait for a write to make space
|
|
if !r.waitWrite() {
|
|
return 0, context.DeadlineExceeded
|
|
}
|
|
continue
|
|
}
|
|
|
|
var toWrite []byte
|
|
if r.r >= r.w {
|
|
// After writer, we can write until end of buffer
|
|
toWrite = r.buf[r.r:]
|
|
} else {
|
|
// Before reader, we can read until writer.
|
|
toWrite = r.buf[r.r:r.w]
|
|
}
|
|
if len(toWrite) > maxWrite {
|
|
toWrite = toWrite[:maxWrite]
|
|
}
|
|
// Unlock while reading
|
|
r.mu.Unlock()
|
|
nr, werr := w.Write(toWrite)
|
|
r.mu.Lock()
|
|
if werr != nil {
|
|
err = r.setErr(werr, true)
|
|
break
|
|
}
|
|
if nr != len(toWrite) {
|
|
err = r.setErr(io.ErrShortWrite, true)
|
|
break
|
|
}
|
|
r.r += nr
|
|
if r.r == r.size {
|
|
r.r = 0
|
|
}
|
|
r.isFull = false
|
|
n += int64(nr)
|
|
r.readCond.Broadcast()
|
|
}
|
|
if err == io.EOF {
|
|
err = nil
|
|
}
|
|
return n, err
|
|
}
|
|
|
|
// Copy will pipe all data from the reader to the writer through the ringbuffer.
|
|
// The ringbuffer will switch to blocking mode.
|
|
// Reads and writes will be done async.
|
|
// No internal mem-copies are used for the transfer.
|
|
//
|
|
// Calling CloseWithError will cancel the transfer and make the function return when
|
|
// any ongoing reads or writes have finished.
|
|
//
|
|
// Calling Read or Write functions concurrently with running this will lead to unpredictable results.
|
|
func (r *RingBuffer) Copy(dst io.Writer, src io.Reader) (written int64, err error) {
|
|
r.SetBlocking(true)
|
|
var wg sync.WaitGroup
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
r.ReadFrom(src)
|
|
r.CloseWriter()
|
|
}()
|
|
defer wg.Wait()
|
|
return r.WriteTo(dst)
|
|
}
|
|
|
|
// TryWrite writes len(p) bytes from p to the underlying buf like Write, but it is not blocking.
|
|
// If it does not succeed to acquire the lock, it returns ErrAcquireLock.
|
|
func (r *RingBuffer) TryWrite(p []byte) (n int, err error) {
|
|
if len(p) == 0 {
|
|
return 0, r.setErr(nil, false)
|
|
}
|
|
ok := r.mu.TryLock()
|
|
if !ok {
|
|
return 0, ErrAcquireLock
|
|
}
|
|
defer r.mu.Unlock()
|
|
if err := r.err; err != nil {
|
|
if err == io.EOF {
|
|
err = ErrWriteOnClosed
|
|
}
|
|
return 0, err
|
|
}
|
|
|
|
n, err = r.write(p)
|
|
if r.block && n > 0 {
|
|
r.writeCond.Broadcast()
|
|
}
|
|
return n, r.setErr(err, true)
|
|
}
|
|
|
|
func (r *RingBuffer) write(p []byte) (n int, err error) {
|
|
if r.isFull {
|
|
return 0, ErrIsFull
|
|
}
|
|
|
|
var avail int
|
|
if r.w >= r.r {
|
|
avail = r.size - r.w + r.r
|
|
} else {
|
|
avail = r.r - r.w
|
|
}
|
|
|
|
if len(p) > avail {
|
|
err = ErrTooMuchDataToWrite
|
|
p = p[:avail]
|
|
}
|
|
n = len(p)
|
|
|
|
if r.w >= r.r {
|
|
c1 := r.size - r.w
|
|
if c1 >= n {
|
|
copy(r.buf[r.w:], p)
|
|
r.w += n
|
|
} else {
|
|
copy(r.buf[r.w:], p[:c1])
|
|
c2 := n - c1
|
|
copy(r.buf[0:], p[c1:])
|
|
r.w = c2
|
|
}
|
|
} else {
|
|
copy(r.buf[r.w:], p)
|
|
r.w += n
|
|
}
|
|
|
|
if r.w == r.size {
|
|
r.w = 0
|
|
}
|
|
if r.w == r.r {
|
|
r.isFull = true
|
|
}
|
|
|
|
return n, err
|
|
}
|
|
|
|
// WriteByte writes one byte into buffer, and returns ErrIsFull if the buffer is full.
|
|
func (r *RingBuffer) WriteByte(c byte) error {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
if err := r.err; err != nil {
|
|
if err == io.EOF {
|
|
err = ErrWriteOnClosed
|
|
}
|
|
return err
|
|
}
|
|
err := r.writeByte(c)
|
|
for err == ErrIsFull && r.block {
|
|
if !r.waitRead() {
|
|
return context.DeadlineExceeded
|
|
}
|
|
err = r.setErr(r.writeByte(c), true)
|
|
}
|
|
if r.block && err == nil {
|
|
r.writeCond.Broadcast()
|
|
}
|
|
return err
|
|
}
|
|
|
|
// TryWriteByte writes one byte into buffer without blocking.
|
|
// If it does not succeed to acquire the lock, it returns ErrAcquireLock.
|
|
func (r *RingBuffer) TryWriteByte(c byte) error {
|
|
ok := r.mu.TryLock()
|
|
if !ok {
|
|
return ErrAcquireLock
|
|
}
|
|
defer r.mu.Unlock()
|
|
if err := r.err; err != nil {
|
|
if err == io.EOF {
|
|
err = ErrWriteOnClosed
|
|
}
|
|
return err
|
|
}
|
|
|
|
err := r.writeByte(c)
|
|
if err == nil && r.block {
|
|
r.writeCond.Broadcast()
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (r *RingBuffer) writeByte(c byte) error {
|
|
if r.err != nil {
|
|
return r.err
|
|
}
|
|
if r.w == r.r && r.isFull {
|
|
return ErrIsFull
|
|
}
|
|
r.buf[r.w] = c
|
|
r.w++
|
|
|
|
if r.w == r.size {
|
|
r.w = 0
|
|
}
|
|
if r.w == r.r {
|
|
r.isFull = true
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Length returns the number of bytes that can be read without blocking.
|
|
func (r *RingBuffer) Length() int {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
|
|
if r.w == r.r {
|
|
if r.isFull {
|
|
return r.size
|
|
}
|
|
return 0
|
|
}
|
|
|
|
if r.w > r.r {
|
|
return r.w - r.r
|
|
}
|
|
|
|
return r.size - r.r + r.w
|
|
}
|
|
|
|
// Capacity returns the size of the underlying buffer.
|
|
func (r *RingBuffer) Capacity() int {
|
|
return r.size
|
|
}
|
|
|
|
// Free returns the number of bytes that can be written without blocking.
|
|
func (r *RingBuffer) Free() int {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
|
|
if r.w == r.r {
|
|
if r.isFull {
|
|
return 0
|
|
}
|
|
return r.size
|
|
}
|
|
|
|
if r.w < r.r {
|
|
return r.r - r.w
|
|
}
|
|
|
|
return r.size - r.w + r.r
|
|
}
|
|
|
|
// WriteString writes the contents of the string s to buffer, which accepts a slice of bytes.
|
|
func (r *RingBuffer) WriteString(s string) (n int, err error) {
|
|
x := (*[2]uintptr)(unsafe.Pointer(&s))
|
|
h := [3]uintptr{x[0], x[1], x[1]}
|
|
buf := *(*[]byte)(unsafe.Pointer(&h))
|
|
return r.Write(buf)
|
|
}
|
|
|
|
// Bytes returns all available read bytes.
|
|
// It does not move the read pointer and only copy the available data.
|
|
// If the dst is big enough, it will be used as destination,
|
|
// otherwise a new buffer will be allocated.
|
|
func (r *RingBuffer) Bytes(dst []byte) []byte {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
getDst := func(n int) []byte {
|
|
if cap(dst) < n {
|
|
return make([]byte, n)
|
|
}
|
|
return dst[:n]
|
|
}
|
|
|
|
if r.w == r.r {
|
|
if r.isFull {
|
|
buf := getDst(r.size)
|
|
copy(buf, r.buf[r.r:])
|
|
copy(buf[r.size-r.r:], r.buf[:r.w])
|
|
return buf
|
|
}
|
|
return nil
|
|
}
|
|
|
|
if r.w > r.r {
|
|
buf := getDst(r.w - r.r)
|
|
copy(buf, r.buf[r.r:r.w])
|
|
return buf
|
|
}
|
|
|
|
n := r.size - r.r + r.w
|
|
buf := getDst(n)
|
|
|
|
if r.r+n < r.size {
|
|
copy(buf, r.buf[r.r:r.r+n])
|
|
} else {
|
|
c1 := r.size - r.r
|
|
copy(buf, r.buf[r.r:r.size])
|
|
c2 := n - c1
|
|
copy(buf[c1:], r.buf[0:c2])
|
|
}
|
|
|
|
return buf
|
|
}
|
|
|
|
// IsFull returns true when the ringbuffer is full.
|
|
func (r *RingBuffer) IsFull() bool {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
|
|
return r.isFull
|
|
}
|
|
|
|
// IsEmpty returns true when the ringbuffer is empty.
|
|
func (r *RingBuffer) IsEmpty() bool {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
|
|
return !r.isFull && r.w == r.r
|
|
}
|
|
|
|
// CloseWithError closes the writer; reads will return
|
|
// no bytes and the error err, or EOF if err is nil.
|
|
//
|
|
// CloseWithError never overwrites the previous error if it exists
|
|
// and always returns nil.
|
|
func (r *RingBuffer) CloseWithError(err error) {
|
|
if err == nil {
|
|
err = io.EOF
|
|
}
|
|
r.setErr(err, false)
|
|
}
|
|
|
|
// CloseWriter closes the writer.
|
|
// Reads will return any remaining bytes and io.EOF.
|
|
func (r *RingBuffer) CloseWriter() {
|
|
r.setErr(io.EOF, false)
|
|
}
|
|
|
|
// Flush waits for the buffer to be empty and fully read.
|
|
// If not blocking ErrIsNotEmpty will be returned if the buffer still contains data.
|
|
func (r *RingBuffer) Flush() error {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
for r.w != r.r || r.isFull {
|
|
err := r.readErr(true)
|
|
if err != nil {
|
|
if err == io.EOF {
|
|
err = nil
|
|
}
|
|
return err
|
|
}
|
|
if !r.block {
|
|
return ErrIsNotEmpty
|
|
}
|
|
if !r.waitRead() {
|
|
return context.DeadlineExceeded
|
|
}
|
|
}
|
|
|
|
err := r.readErr(true)
|
|
if err == io.EOF {
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
|
|
// Reset the read pointer and writer pointer to zero.
|
|
func (r *RingBuffer) Reset() {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
|
|
// Set error so any readers/writers will return immediately.
|
|
r.setErr(errors.New("reset called"), true)
|
|
if r.block {
|
|
r.readCond.Broadcast()
|
|
r.writeCond.Broadcast()
|
|
}
|
|
|
|
// Unlock the mutex so readers/writers can finish.
|
|
r.mu.Unlock()
|
|
r.wg.Wait()
|
|
r.mu.Lock()
|
|
r.r = 0
|
|
r.w = 0
|
|
r.err = nil
|
|
r.isFull = false
|
|
}
|
|
|
|
// WriteCloser returns a WriteCloser that writes to the ring buffer.
|
|
// When the returned WriteCloser is closed, it will wait for all data to be read before returning.
|
|
func (r *RingBuffer) WriteCloser() io.WriteCloser {
|
|
return &writeCloser{RingBuffer: r}
|
|
}
|
|
|
|
type writeCloser struct {
|
|
*RingBuffer
|
|
}
|
|
|
|
// Close provides a close method for the WriteCloser.
|
|
func (wc *writeCloser) Close() error {
|
|
wc.CloseWriter()
|
|
return wc.Flush()
|
|
}
|
|
|
|
// ReadCloser returns a io.ReadCloser that reads to the ring buffer.
|
|
// When the returned ReadCloser is closed, ErrReaderClosed will be returned on any writes done afterwards.
|
|
func (r *RingBuffer) ReadCloser() io.ReadCloser {
|
|
return &readCloser{RingBuffer: r}
|
|
}
|
|
|
|
type readCloser struct {
|
|
*RingBuffer
|
|
}
|
|
|
|
// Close provides a close method for the ReadCloser.
|
|
func (rc *readCloser) Close() error {
|
|
rc.CloseWithError(ErrReaderClosed)
|
|
err := rc.readErr(false)
|
|
if err == ErrReaderClosed {
|
|
err = nil
|
|
}
|
|
return err
|
|
}
|
|
|
|
// Peek reads up to len(p) bytes into p without moving the read pointer.
|
|
func (r *RingBuffer) Peek(p []byte) (n int, err error) {
|
|
if len(p) == 0 {
|
|
return 0, r.readErr(false)
|
|
}
|
|
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
if err := r.readErr(true); err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
return r.peek(p)
|
|
}
|
|
|
|
func (r *RingBuffer) peek(p []byte) (n int, err error) {
|
|
if r.w == r.r && !r.isFull {
|
|
return 0, ErrIsEmpty
|
|
}
|
|
|
|
if r.w > r.r {
|
|
n = r.w - r.r
|
|
if n > len(p) {
|
|
n = len(p)
|
|
}
|
|
copy(p, r.buf[r.r:r.r+n])
|
|
return
|
|
}
|
|
|
|
n = r.size - r.r + r.w
|
|
if n > len(p) {
|
|
n = len(p)
|
|
}
|
|
|
|
if r.r+n <= r.size {
|
|
copy(p, r.buf[r.r:r.r+n])
|
|
} else {
|
|
c1 := r.size - r.r
|
|
copy(p, r.buf[r.r:r.size])
|
|
c2 := n - c1
|
|
copy(p[c1:], r.buf[0:c2])
|
|
}
|
|
|
|
return n, r.readErr(true)
|
|
}
|