Documentation
¶
Index ¶
- type Buffer
- func (b *Buffer) Close() error
- func (b *Buffer) Keep(keep int)
- func (b *Buffer) Len() int
- func (b *Buffer) NextReader() io.ReadCloser
- func (b *Buffer) NextReaderFromNow() io.ReadCloser
- func (b *Buffer) NumReaders() int
- func (b *Buffer) OnLastReaderClose(runOnLastClose func() error)
- func (b *Buffer) Write(p []byte) (int, error)
- type Reader
- type Writer
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Buffer ¶
type Buffer struct {
// contains filtered or unexported fields
}
Buffer is used to provide multiple readers with access to a shared buffer. Readers may join/leave at any time, however a joining reader will only see whats currently in the buffer onwards. Data is evicted from the buffer once all active readers have read that section.
Example ¶
// Start a new buffer buf := New() // Create two readers r1, r2 := buf.NextReader(), buf.NextReader() // Broadcast a message io.WriteString(buf, "Hello World\n") // Wait var grp sync.WaitGroup grp.Add(4) // Read fast go func() { defer grp.Done() io.Copy(os.Stdout, r1) // "Hello World\n" }() // Read slow go func() { defer grp.Done() <-time.After(100 * time.Millisecond) io.CopyN(os.Stdout, r2, 5) // "Hello" <-time.After(time.Second) io.Copy(os.Stdout, r2) // "World\n" }() // Both readers will read the entire buffer! The slow reader // won't block the fast one from reading ahead either. // Late reader // Since this reader joins after all existing readers have Read "Hello" // "Hello" has already been cleared from the Buffer, this Reader will only see // "World\n" and beyond. go func() { defer grp.Done() <-time.After(500 * time.Millisecond) r3 := buf.NextReader() io.Copy(os.Stdout, r3) // "World\n" }() // Short Reader // **Important!** if your reader isn't going to read until the buffer is empty // you'll need to call Close() when you are done with it to tell the buffer // it's done reading data. go func() { defer grp.Done() <-time.After(100 * time.Millisecond) r4 := buf.NextReader() io.CopyN(os.Stdout, r4, 5) // "Hello" r4.Close() // tell the buffer you're done reading }() // **Important!** mark close so that readers can ret. io.EOF buf.Close() grp.Wait()
Output: Hello World HelloHelloHello World World
func NewCapped ¶ added in v1.1.0
NewCapped creates a new in-memory Buffer whose Write() call blocks to prevent Len() from exceeding the passed capacity
func NewCappedBuffer ¶ added in v1.1.0
NewCappedBuffer creates a new Buffer whose Write() call blocks to prevent Len() from exceeding the passed capacity
func (*Buffer) Close ¶
Close marks the buffer as complete. Readers will return io.EOF instead of blocking when they reach the end of the buffer.
func (*Buffer) Keep ¶ added in v1.2.0
Keep sets the minimum amount of bytes to keep in the buffer even if all other current readers have read those bytes. This allows new readers to join slightly behind. Keep is safe to call concurrently with other methods. Fewer than keep bytes may be in the buffer if less than keep bytes have been written since keep was set. If this buffer has a cap, it is invalid to call this method with a keep >= cap since the buffer would never be able to write new bytes once it reached the cap.
func (*Buffer) Len ¶ added in v1.1.0
Len returns the current size of the buffer. This is safe to call concurrently with all other methods.
func (*Buffer) NextReader ¶
func (b *Buffer) NextReader() io.ReadCloser
NextReader returns a new io.ReadCloser for this shared buffer. Read/Close are safe to call concurrently with the buffers Write/Close methods. Read calls will block if the Buffer is not Closed and contains no data. Note that the returned reader sees all data that is currently in the buffer, data is only dropped out of the buffer once all active readers point to locations in the buffer after that section.
func (*Buffer) NextReaderFromNow ¶ added in v1.1.0
func (b *Buffer) NextReaderFromNow() io.ReadCloser
NextReaderFromNow returns a new io.ReadCloser for this shared buffer. Unlike NextReader(), this reader will only see writes which occur after this reader is returned even if there is other data in the buffer. In other words, this reader points to the end of the buffer.
func (*Buffer) NumReaders ¶ added in v1.1.0
NumReaders returns the number of readers returned by NextReader() which have not called Reader.Close(). This method is safe to call concurrently with all methods.
func (*Buffer) OnLastReaderClose ¶ added in v1.1.0
OnLastReaderClose registers the passed callback to be run after any call to Reader.Close() which drops the NumReaders() to 0. This method is safe to call concurrently with all other methods and Reader methods, however it's only guaranteed to be triggered if it completes before the Reader.Close call which would trigger it.
type Reader ¶
type Reader interface { // Len returns the unread # of bytes in this Reader Len() int // Discard drops the next n bytes from the Reader, as if it were Read() // it returns the # of bytes actually dropped. It may return io.EOF // if all remaining bytes have been discarded. Discard(int) (int, error) // Read bytes into the provided buffer. io.Reader }
Reader provides an io.Reader whose methods MUST be concurrent-safe with the Write method of the Writer from which it was generated. It also MUST be safe for concurrent calls to Writer.Discard for bytes which have already been read by this Reader.
type Writer ¶
type Writer interface { // Len returns the # of bytes buffered for Readers Len() int // Discard drops the next n buffered bytes. It returns the actual number of // bytes dropped and may return io.EOF if all remaining bytes have been // discarded. Discard must be concurrent-safe with methods calls // on generated Readers, when discarding bytes that have been read // by all Readers. Discard(int) (int, error) // NextReader returns a Reader which reads a "snapshot" of the current written bytes // (excluding discarded bytes). The Reader should work independently of the Writer // and be concurrent-safe with the Write method on the Writer. NextReader() Reader // Write writes the given bytes into the Writer's underlying buffer. Which will // be available for reading using NextReader() to grab a snapshot of the current // written bytes. io.Writer }
Writer accepts bytes and generates Readers who consume those bytes. Generated Readers methods must be concurrent-safe with the Write method.
Example ¶
buf := newWriter(make([]byte, 0, 10)) io.Copy(os.Stdout, buf) io.Copy(os.Stdout, io.NewSectionReader(*&buf, 0, 100)) io.WriteString(buf, "Hello ") r := io.NewSectionReader(*&buf, 0, int64(buf.Len())) io.CopyN(os.Stdout, r, 5) io.CopyN(os.Stdout, buf, 5) io.WriteString(buf, "World") r = io.NewSectionReader(*&buf, 0, int64(buf.Len())) io.CopyN(os.Stdout, r, 6) io.WriteString(buf, "abcdefg") io.Copy(os.Stdout, buf) io.Copy(os.Stdout, buf) io.WriteString(buf, "Hello World") r = io.NewSectionReader(*&buf, 0, int64(buf.Len())) io.CopyN(os.Stdout, r, 5) io.CopyN(os.Stdout, buf, 4) io.WriteString(buf, "abcdefg") io.Copy(os.Stdout, buf) io.Copy(os.Stdout, buf)
Output: HelloHello World WorldabcdefgHelloHello Worldabcdefg
func NewMemoryWriter ¶ added in v1.2.2
NewMemoryWriter returns a new Writer for use with NewBuffer that internally stores bytes in a []byte. The given []byte will be the initial buffer used. It's a good idea to preallocate the normal size of your buffer here. ex. NewMemoryWriter(make([]byte, 0, CAPACITY)) where CAPACITY is large enough for Keep() + the number of buffered Write chunks you expect to need to hold in the buffer at any given time. This saves swapping the internal buffer out for a larger one as capacity needs grow, this means less large locking copies of the internal buffer.