package docbuf import ( "encoding/base64" "errors" "io" ) const ( DemoteFromEmptyCacheError = "Can't demote from empty cache" PromoteToFullCacheError = "Can't promote to full cache" WriteEmptyDocumentError = "Not writing empty document to store" PartialWriteError = "Couldnt complete document write" BadReadRequestError = "Can't read less than one documents" BadReadLengthError = "Unexpected read length" PopFromEmptyBufferError = "Can't pop from empty buffer" RemoveOverCapacityError = "Not enough documents in buffer to remove" ) type DocumentBuffer interface { /* Push * Add a document to the buffer. * If cache is full oldest document is written to backing writer. */ Push(string) error /* Pop * removes a document from the cache and pulls a document back * out of the backing ReaderWriter. */ Pop() (string, error) /* Remove * removes N documents according to arg. * backfills cache by pulling documents back out of backing * ReaderWriter. */ Remove(int) ([]string, error) /* Flush * empties cache by placing all cached documnent into backing * ReaderWriter. */ Flush() error /* Peek * views first document in either cache or backing ReaderWriter */ Peek() (string, error) /* Read * returns N documents from the cache and backing store without * removing them from either data structure. */ Read(int) ([]string, error) /* Apply * Takes a callable/closure/function that accepts a string (document) * and returns a bool. Apply returns an error, or nil. * Apply will begin applying this callable to each doc from most recent * to least recent so long as the callable continues to return true. * If the callable returns false, Apply ceases iteration and returns. */ Apply(func(string)bool) error /* Close * flushes and then prepares backing store for closure */ Close() (int64, error) /* Cached * returns current length of cache */ Cached() int } type DocBuf struct { cache []string store io.ReadWriteSeeker cacheSize int endIndex int64 } // put an element from cache into store func (b *DocBuf) demote() error { if len(b.cache) < 1 { return errors.New(DemoteFromEmptyCacheError) } demoted := b.cache[len(b.cache)-1] b.cache = b.cache[:len(b.cache)-1] return b.writeToBackingStore(demoted) } /* bounds here refers to checking the capacity of the internal memory cache * Remove() includes a use case where a user might want to evict more events * than the internal memory cache can possibly contain, in which case we use * promote and turn off bounds checking in order to overpopulate memory cache */ func (b *DocBuf) promote(bounds bool) error { if bounds && len(b.cache) >= b.cacheSize { return errors.New(PromoteToFullCacheError) } doc, err := b.readDocumentsFromDisk(1, true, false) if err != nil { return err } if len(doc) > 0 { b.cache = append(b.cache, doc[0]) } return nil } // encode as base64 so as to avoid needing to escape things // then documents can be separated by newlines func (b *DocBuf) writeToBackingStore(doc string) error { if len(doc) == 0 { return errors.New(WriteEmptyDocumentError) } str := base64.StdEncoding.EncodeToString([]byte(doc)) // seek to end index c, err := b.store.Seek(int64(b.endIndex), io.SeekStart); if err != nil { return err } // separate documents with a newline if c > 0 { str = "\n" + str } n, err := b.store.Write([]byte(str)) if err != nil || n < len(doc) { if n < len(doc) { return errors.Join( errors.New(PartialWriteError), err) } return err } b.endIndex += int64(n) return nil } func (b *DocBuf) readDocumentsFromDisk( count int, truncate bool, continues bool, ) ([]string, error) { docs := []string{} cursor := int64(0) var err error if count < 1 { return docs, errors.New(BadReadRequestError) } if continues { cursor, err = b.store.Seek(0, io.SeekCurrent) if b.endIndex == 0 || err != nil || cursor == 0 { return docs, err } } else { // catch store is empty cursor, err = b.store.Seek(0, io.SeekEnd) if b.endIndex == 0 || err != nil || cursor == 0 { return docs, err } // self repair? if b.endIndex > cursor { b.endIndex = cursor } cursor = b.endIndex - 1 } for len(docs) < count && cursor >= 0 { doc := "" char := make([]byte, 1) char[0] = 0 // read bytes backwards from file for cursor >= 0 { // set cursor if _, err := b.store.Seek(cursor, io.SeekStart); err != nil { return docs, err } // read one byte n, err := b.store.Read(char) if err != nil { return docs, err } if n != 1 { return docs, errors.New(BadReadLengthError) } // break on newline if char[0] == 10 { break } cursor -= 1 doc = string(char) + doc } /* parse document and add to account (docs) * each read will stop on the newline so make sure * we dont try to account for an empty document */ if len(doc) > 0 && doc != "\n" { str, err := base64.StdEncoding.DecodeString(doc) if err != nil { return docs, err } doc = string(str) docs = append(docs, doc) } if len(docs) < count { cursor -= 1 } } if truncate { if cursor < 0 { cursor = 0 } b.endIndex = cursor } return docs, nil } /* WARNING: this constructor will promote items * from storage, resulting in a modified backing * as well as a prefilled cache. */ func NewDocumentBuffer( cacheSize int, backingStore io.ReadWriteSeeker, ) (DocumentBuffer, error) { newBuf := DocBuf { cache: make([]string, 0, cacheSize), store: backingStore, cacheSize: cacheSize, } c, err := backingStore.Seek(0, io.SeekEnd) newBuf.endIndex = c if err != nil { return &newBuf, err } // prefill cache for range cacheSize { newBuf.promote(true) } return &newBuf, nil } /* Push * Add a document to the buffer. * If cache is full oldest document is written to backing writer. */ func (b *DocBuf) Push(doc string) error { if b.cacheSize == 0 { b.writeToBackingStore(doc) return nil } if len(b.cache) >= b.cacheSize { if err := b.demote(); err != nil { return err } } tc := b.cache b.cache = append(make([]string, 0, b.cacheSize), doc) b.cache = append(b.cache, tc...) return nil } /* Pop * removes a document from the cache and pulls a document back * out of the backing ReaderWriter. */ func (b *DocBuf) Pop() (string, error) { if b.cacheSize == 0 { d, e := b.readDocumentsFromDisk(1, true, false) if len(d) > 0 { return d[0], e } return "", e } if len(b.cache) < 1 { if err := b.promote(true); err != nil { return "", err } } if len(b.cache) < 1 { return "", errors.New(PopFromEmptyBufferError) } candidate := b.cache[0] b.cache = b.cache[1:] e := b.promote(true) return candidate, e } /* Remove * removes N newest documents in order of newest to oldest. * backfills cache by pulling documents back out of backing * ReaderWriter (again, newest first). */ func (b *DocBuf) Remove(count int) ([]string, error) { delta := count - b.cacheSize if delta > 0 { for range delta { if err := b.promote(false); err != nil { return []string{}, err } } } if count > len(b.cache) { tmp := b.cache b.cache = make([]string, 0, b.cacheSize) return tmp, errors.New(RemoveOverCapacityError) } candidates := b.cache[:count] b.cache = b.cache[count:] // refill cache delta = b.cacheSize - len(b.cache) if delta > 0 { for range delta { if err := b.promote(true); err != nil { return []string{}, err } } } return candidates, nil } /* Flush * empties cache by placing all cached documnent into backing * ReaderWriter. */ func (b *DocBuf) Flush() error { for range len(b.cache) { if err := b.demote(); err != nil { return err } } return nil } /* Peek * views first document in either cache or backing ReaderWriter */ func (b *DocBuf) Peek() (string, error) { if len(b.cache) > 0 { return b.cache[0], nil } else { d, e := b.readDocumentsFromDisk(1, false, false) if len(d) < 1 { return "", e } return d[0], e } } /* Read * returns N documents from the cache and backing store without * removing them from either data structure. */ func (b *DocBuf) Read(count int) ([]string, error) { delta := count - len(b.cache) candidates := b.cache[:count - (delta)] if delta > 0 { fromStorage, err := b.readDocumentsFromDisk(delta, false, false) if err != nil { return candidates, err } candidates = append(candidates, fromStorage...) } return candidates, nil } /* Apply * Takes a callable/closure/function that accepts a string (document) * and returns a bool. Apply returns an error, or nil. * Apply will begin applying this callable to each doc from most recent * to least recent so long as the callable continues to return true. * If the callable returns false, Apply ceases iteration and returns. */ func (b *DocBuf) Apply(f func(string)bool) error { // iterate over internal cache applying function for _, i := range b.cache { if !f(i) { return nil } } // begin iterating with readDocumentsFromDisk first := true flag := true for flag { doc, err := b.readDocumentsFromDisk(1, false, !first) if err != nil { return err } if len(doc) == 0 { return nil } else if len(doc) != 1 { return errors.New("improper read from buffer") } first = false flag = f(doc[0]) if flag { var c, d int64 var err error // since continue is on we need to bring the cursor back one if c, err = b.store.Seek(0, io.SeekCurrent); err != nil { return err // dont seek earlier than 0 } else if c < 1 { break } if d, err = b.store.Seek(-1, io.SeekCurrent); err != nil { return err } if c == d { return errors.New("Seek failure!") } } } return nil } /* Close * flushes and then prepares backing store for closure. * returns the end index into the underlying stream... * it is the callers responsibility to truncate. */ func (b *DocBuf) Close() (int64, error) { if err := b.Flush(); err != nil { return b.endIndex, err } return b.endIndex, nil } /* Cached * returns current length of cache */ func (b *DocBuf) Cached() int { return len(b.cache) }