100 lines
2.9 KiB
Go
100 lines
2.9 KiB
Go
// Code generated by go run gen.go; DO NOT EDIT.
|
|
|
|
package async
|
|
|
|
// UnboundedInterfaceChan is a channel with an unbounded buffer for caching
|
|
// Interface objects. A channel must be closed via Close method.
|
|
type UnboundedInterfaceChan struct {
|
|
in, out chan interface{}
|
|
close chan struct{}
|
|
q []interface{}
|
|
}
|
|
|
|
// NewUnboundedInterfaceChan returns a unbounded channel with unlimited capacity.
|
|
func NewUnboundedInterfaceChan() *UnboundedInterfaceChan {
|
|
ch := &UnboundedInterfaceChan{
|
|
// The size of Interface is less than 16 bytes, we use 16 to fit
|
|
// a CPU cache line (L2, 256 Bytes), which may reduce cache misses.
|
|
in: make(chan interface{}, 16),
|
|
out: make(chan interface{}, 16),
|
|
close: make(chan struct{}),
|
|
}
|
|
go ch.processing()
|
|
return ch
|
|
}
|
|
|
|
// In returns the send channel of the given channel, which can be used to
|
|
// send values to the channel.
|
|
func (ch *UnboundedInterfaceChan) In() chan<- interface{} { return ch.in }
|
|
|
|
// Out returns the receive channel of the given channel, which can be used
|
|
// to receive values from the channel.
|
|
func (ch *UnboundedInterfaceChan) Out() <-chan interface{} { return ch.out }
|
|
|
|
// Close closes the channel.
|
|
func (ch *UnboundedInterfaceChan) Close() { ch.close <- struct{}{} }
|
|
|
|
func (ch *UnboundedInterfaceChan) processing() {
|
|
// This is a preallocation of the internal unbounded buffer.
|
|
// The size is randomly picked. But if one changes the size, the
|
|
// reallocation size at the subsequent for loop should also be
|
|
// changed too. Furthermore, there is no memory leak since the
|
|
// queue is garbage collected.
|
|
ch.q = make([]interface{}, 0, 1<<10)
|
|
for {
|
|
select {
|
|
case e, ok := <-ch.in:
|
|
if !ok {
|
|
// We don't want the input channel be accidentally closed
|
|
// via close() instead of Close(). If that happens, it is
|
|
// a misuse, do a panic as warning.
|
|
panic("async: misuse of unbounded channel, In() was closed")
|
|
}
|
|
ch.q = append(ch.q, e)
|
|
case <-ch.close:
|
|
ch.closed()
|
|
return
|
|
}
|
|
for len(ch.q) > 0 {
|
|
select {
|
|
case ch.out <- ch.q[0]:
|
|
ch.q[0] = nil // de-reference earlier to help GC
|
|
ch.q = ch.q[1:]
|
|
case e, ok := <-ch.in:
|
|
if !ok {
|
|
// We don't want the input channel be accidentally closed
|
|
// via close() instead of Close(). If that happens, it is
|
|
// a misuse, do a panic as warning.
|
|
panic("async: misuse of unbounded channel, In() was closed")
|
|
}
|
|
ch.q = append(ch.q, e)
|
|
case <-ch.close:
|
|
ch.closed()
|
|
return
|
|
}
|
|
}
|
|
// If the remaining capacity is too small, we prefer to
|
|
// reallocate the entire buffer.
|
|
if cap(ch.q) < 1<<5 {
|
|
ch.q = make([]interface{}, 0, 1<<10)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (ch *UnboundedInterfaceChan) closed() {
|
|
close(ch.in)
|
|
for e := range ch.in {
|
|
ch.q = append(ch.q, e)
|
|
}
|
|
for len(ch.q) > 0 {
|
|
select {
|
|
case ch.out <- ch.q[0]:
|
|
ch.q[0] = nil // de-reference earlier to help GC
|
|
ch.q = ch.q[1:]
|
|
default:
|
|
}
|
|
}
|
|
close(ch.out)
|
|
close(ch.close)
|
|
}
|