459 lines
9.8 KiB
Go
459 lines
9.8 KiB
Go
package docbuf
|
|
|
|
import (
|
|
"encoding/base64"
|
|
"errors"
|
|
"io"
|
|
)
|
|
|
|
const (
|
|
DemoteFromEmptyCacheError = "Can't demote from empty cache"
|
|
PromoteToFullCacheError = "Can't promote to full cache"
|
|
WriteEmptyDocumentError = "Not writing empty document to store"
|
|
PartialWriteError = "Couldnt complete document write"
|
|
BadReadRequestError = "Can't read less than one documents"
|
|
BadReadLengthError = "Unexpected read length"
|
|
PopFromEmptyBufferError = "Can't pop from empty buffer"
|
|
RemoveOverCapacityError = "Not enough documents in buffer to remove"
|
|
)
|
|
|
|
type DocumentBuffer interface {
|
|
/* Push
|
|
* Add a document to the buffer.
|
|
* If cache is full oldest document is written to backing writer.
|
|
*/
|
|
Push(string) error
|
|
/* Pop
|
|
* removes a document from the cache and pulls a document back
|
|
* out of the backing ReaderWriter.
|
|
*/
|
|
Pop() (string, error)
|
|
/* Remove
|
|
* removes N documents according to arg.
|
|
* backfills cache by pulling documents back out of backing
|
|
* ReaderWriter.
|
|
*/
|
|
Remove(int) ([]string, error)
|
|
/* Flush
|
|
* empties cache by placing all cached documnent into backing
|
|
* ReaderWriter.
|
|
*/
|
|
Flush() error
|
|
/* Peek
|
|
* views first document in either cache or backing ReaderWriter
|
|
*/
|
|
Peek() (string, error)
|
|
/* Read
|
|
* returns N documents from the cache and backing store without
|
|
* removing them from either data structure.
|
|
*/
|
|
Read(int) ([]string, error)
|
|
/* Apply
|
|
* Takes a callable/closure/function that accepts a string (document)
|
|
* and returns a bool. Apply returns an error, or nil.
|
|
* Apply will begin applying this callable to each doc from most recent
|
|
* to least recent so long as the callable continues to return true.
|
|
* If the callable returns false, Apply ceases iteration and returns.
|
|
*/
|
|
Apply(func(string)bool) error
|
|
/* Close
|
|
* flushes and then prepares backing store for closure
|
|
*/
|
|
Close() (int64, error)
|
|
/* Cached
|
|
* returns current length of cache
|
|
*/
|
|
Cached() int
|
|
}
|
|
|
|
type DocBuf struct {
|
|
cache []string
|
|
store io.ReadWriteSeeker
|
|
cacheSize int
|
|
endIndex int64
|
|
}
|
|
|
|
// put an element from cache into store
|
|
func (b *DocBuf) demote() error {
|
|
if len(b.cache) < 1 {
|
|
return errors.New(DemoteFromEmptyCacheError)
|
|
}
|
|
|
|
demoted := b.cache[len(b.cache)-1]
|
|
b.cache = b.cache[:len(b.cache)-1]
|
|
return b.writeToBackingStore(demoted)
|
|
}
|
|
|
|
/* bounds here refers to checking the capacity of the internal memory cache
|
|
* Remove() includes a use case where a user might want to evict more events
|
|
* than the internal memory cache can possibly contain, in which case we use
|
|
* promote and turn off bounds checking in order to overpopulate memory cache
|
|
*/
|
|
func (b *DocBuf) promote(bounds bool) error {
|
|
if bounds && len(b.cache) >= b.cacheSize {
|
|
return errors.New(PromoteToFullCacheError)
|
|
}
|
|
|
|
doc, err := b.readDocumentsFromDisk(1, true, false)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if len(doc) > 0 {
|
|
b.cache = append(b.cache, doc[0])
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// encode as base64 so as to avoid needing to escape things
|
|
// then documents can be separated by newlines
|
|
func (b *DocBuf) writeToBackingStore(doc string) error {
|
|
if len(doc) == 0 {
|
|
return errors.New(WriteEmptyDocumentError)
|
|
}
|
|
|
|
str := base64.StdEncoding.EncodeToString([]byte(doc))
|
|
|
|
// seek to end index
|
|
c, err := b.store.Seek(int64(b.endIndex), io.SeekStart);
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// separate documents with a newline
|
|
if c > 0 {
|
|
str = "\n" + str
|
|
}
|
|
|
|
n, err := b.store.Write([]byte(str))
|
|
if err != nil || n < len(doc) {
|
|
if n < len(doc) {
|
|
return errors.Join(
|
|
errors.New(PartialWriteError),
|
|
err)
|
|
}
|
|
return err
|
|
}
|
|
|
|
b.endIndex += int64(n)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (b *DocBuf) readDocumentsFromDisk(
|
|
count int,
|
|
truncate bool,
|
|
continues bool,
|
|
) ([]string, error) {
|
|
docs := []string{}
|
|
cursor := int64(0)
|
|
var err error
|
|
|
|
if count < 1 {
|
|
return docs, errors.New(BadReadRequestError)
|
|
}
|
|
|
|
if continues {
|
|
cursor, err = b.store.Seek(0, io.SeekCurrent)
|
|
if b.endIndex == 0 || err != nil || cursor == 0 {
|
|
return docs, err
|
|
}
|
|
|
|
} else {
|
|
// catch store is empty
|
|
cursor, err = b.store.Seek(0, io.SeekEnd)
|
|
if b.endIndex == 0 || err != nil || cursor == 0 {
|
|
return docs, err
|
|
}
|
|
|
|
// self repair?
|
|
if b.endIndex > cursor {
|
|
b.endIndex = cursor
|
|
}
|
|
cursor = b.endIndex - 1
|
|
}
|
|
|
|
for len(docs) < count && cursor >= 0 {
|
|
doc := ""
|
|
char := make([]byte, 1)
|
|
char[0] = 0
|
|
|
|
// read bytes backwards from file
|
|
for cursor >= 0 {
|
|
// set cursor
|
|
if _, err := b.store.Seek(cursor, io.SeekStart); err != nil {
|
|
return docs, err
|
|
}
|
|
|
|
// read one byte
|
|
n, err := b.store.Read(char)
|
|
if err != nil {
|
|
return docs, err
|
|
}
|
|
if n != 1 {
|
|
return docs, errors.New(BadReadLengthError)
|
|
}
|
|
|
|
// break on newline
|
|
if char[0] == 10 {
|
|
break
|
|
}
|
|
|
|
cursor -= 1
|
|
doc = string(char) + doc
|
|
}
|
|
|
|
/* parse document and add to account (docs)
|
|
* each read will stop on the newline so make sure
|
|
* we dont try to account for an empty document
|
|
*/
|
|
if len(doc) > 0 && doc != "\n" {
|
|
str, err := base64.StdEncoding.DecodeString(doc)
|
|
if err != nil {
|
|
return docs, err
|
|
}
|
|
doc = string(str)
|
|
docs = append(docs, doc)
|
|
}
|
|
|
|
if len(docs) < count {
|
|
cursor -= 1
|
|
}
|
|
}
|
|
|
|
if truncate {
|
|
if cursor < 0 {
|
|
cursor = 0
|
|
}
|
|
b.endIndex = cursor
|
|
}
|
|
return docs, nil
|
|
}
|
|
|
|
|
|
/* WARNING: this constructor will promote items
|
|
* from storage, resulting in a modified backing
|
|
* as well as a prefilled cache.
|
|
*/
|
|
func NewDocumentBuffer(
|
|
cacheSize int,
|
|
backingStore io.ReadWriteSeeker,
|
|
) (DocumentBuffer, error) {
|
|
newBuf := DocBuf {
|
|
cache: make([]string, 0, cacheSize),
|
|
store: backingStore,
|
|
cacheSize: cacheSize,
|
|
}
|
|
|
|
c, err := backingStore.Seek(0, io.SeekEnd)
|
|
newBuf.endIndex = c
|
|
if err != nil {
|
|
return &newBuf, err
|
|
}
|
|
|
|
// prefill cache
|
|
for range cacheSize {
|
|
newBuf.promote(true)
|
|
}
|
|
return &newBuf, nil
|
|
}
|
|
|
|
/* Push
|
|
* Add a document to the buffer.
|
|
* If cache is full oldest document is written to backing writer.
|
|
*/
|
|
func (b *DocBuf) Push(doc string) error {
|
|
if len(b.cache) >= b.cacheSize {
|
|
if err := b.demote(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
tc := b.cache
|
|
b.cache = append(make([]string, 0, b.cacheSize), doc)
|
|
b.cache = append(b.cache, tc...)
|
|
return nil
|
|
}
|
|
|
|
/* Pop
|
|
* removes a document from the cache and pulls a document back
|
|
* out of the backing ReaderWriter.
|
|
*/
|
|
func (b *DocBuf) Pop() (string, error) {
|
|
if len(b.cache) < 1 {
|
|
if err := b.promote(true); err != nil {
|
|
return "", err
|
|
}
|
|
}
|
|
|
|
if len(b.cache) < 1 {
|
|
return "", errors.New(PopFromEmptyBufferError)
|
|
}
|
|
|
|
candidate := b.cache[0]
|
|
b.cache = b.cache[1:]
|
|
e := b.promote(true)
|
|
|
|
return candidate, e
|
|
}
|
|
|
|
/* Remove
|
|
* removes N newest documents in order of newest to oldest.
|
|
* backfills cache by pulling documents back out of backing
|
|
* ReaderWriter (again, newest first).
|
|
*/
|
|
func (b *DocBuf) Remove(count int) ([]string, error) {
|
|
delta := count - b.cacheSize
|
|
if delta > 0 {
|
|
for range delta {
|
|
if err := b.promote(false); err != nil {
|
|
return []string{}, err
|
|
}
|
|
}
|
|
}
|
|
|
|
if count > len(b.cache) {
|
|
tmp := b.cache
|
|
b.cache = make([]string, 0, b.cacheSize)
|
|
return tmp, errors.New(RemoveOverCapacityError)
|
|
}
|
|
|
|
candidates := b.cache[:count]
|
|
b.cache = b.cache[count:]
|
|
|
|
// refill cache
|
|
delta = b.cacheSize - len(b.cache)
|
|
if delta > 0 {
|
|
for range delta {
|
|
if err := b.promote(true); err != nil {
|
|
return []string{}, err
|
|
}
|
|
}
|
|
}
|
|
|
|
return candidates, nil
|
|
}
|
|
|
|
/* Flush
|
|
* empties cache by placing all cached documnent into backing
|
|
* ReaderWriter.
|
|
*/
|
|
func (b *DocBuf) Flush() error {
|
|
for range len(b.cache) {
|
|
if err := b.demote(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
/* Peek
|
|
* views first document in either cache or backing ReaderWriter
|
|
*/
|
|
func (b *DocBuf) Peek() (string, error) {
|
|
if len(b.cache) > 0 {
|
|
return b.cache[0], nil
|
|
} else {
|
|
d, e := b.readDocumentsFromDisk(1, false, false)
|
|
if len(d) < 1 {
|
|
return "", e
|
|
}
|
|
return d[0], e
|
|
}
|
|
}
|
|
|
|
/* Read
|
|
* returns N documents from the cache and backing store without
|
|
* removing them from either data structure.
|
|
*/
|
|
func (b *DocBuf) Read(count int) ([]string, error) {
|
|
delta := count - len(b.cache)
|
|
candidates := b.cache[:count - (delta)]
|
|
|
|
if delta > 0 {
|
|
fromStorage, err := b.readDocumentsFromDisk(delta, false, false)
|
|
if err != nil {
|
|
return candidates, err
|
|
}
|
|
|
|
candidates = append(candidates, fromStorage...)
|
|
}
|
|
|
|
return candidates, nil
|
|
}
|
|
|
|
/* Apply
|
|
* Takes a callable/closure/function that accepts a string (document)
|
|
* and returns a bool. Apply returns an error, or nil.
|
|
* Apply will begin applying this callable to each doc from most recent
|
|
* to least recent so long as the callable continues to return true.
|
|
* If the callable returns false, Apply ceases iteration and returns.
|
|
*/
|
|
func (b *DocBuf) Apply(f func(string)bool) error {
|
|
// iterate over internal cache applying function
|
|
for _, i := range b.cache {
|
|
if !f(i) {
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// begin iterating with readDocumentsFromDisk
|
|
first := true
|
|
flag := true
|
|
for flag {
|
|
doc, err := b.readDocumentsFromDisk(1, false, !first)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if len(doc) == 0 {
|
|
return nil
|
|
} else if len(doc) != 1 {
|
|
return errors.New("improper read from buffer")
|
|
}
|
|
|
|
first = false
|
|
flag = f(doc[0])
|
|
|
|
if flag {
|
|
var c, d int64
|
|
var err error
|
|
// since continue is on we need to bring the cursor back one
|
|
if c, err = b.store.Seek(0, io.SeekCurrent); err != nil {
|
|
return err
|
|
// dont seek earlier than 0
|
|
} else if c < 1 {
|
|
break
|
|
}
|
|
if d, err = b.store.Seek(-1, io.SeekCurrent); err != nil {
|
|
return err
|
|
}
|
|
|
|
if c == d {
|
|
return errors.New("Seek failure!")
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
/* Close
|
|
* flushes and then prepares backing store for closure.
|
|
* returns the end index into the underlying stream...
|
|
* it is the callers responsibility to truncate.
|
|
*/
|
|
func (b *DocBuf) Close() (int64, error) {
|
|
if err := b.Flush(); err != nil {
|
|
return b.endIndex, err
|
|
}
|
|
|
|
return b.endIndex, nil
|
|
}
|
|
|
|
/* Cached
|
|
* returns current length of cache
|
|
*/
|
|
func (b *DocBuf) Cached() int {
|
|
return len(b.cache)
|
|
}
|