0. tests for event replay Tests are now included for the event replay features. This includes many fixes on top of the last commit as well as some tweaks to config module values. 1. activity module An activity module is created to track the useractive events and provide a counter for them. It also encapsulates logic to discard old useractive events. 2. web module A web module is created. This module serves a static webpage showing runtime information. Currently it only shows a snapshot of the user activity data. It is my intention that it eventually also shows an audit log, known users and channels, uptime, and more. Future work will also be needed in order to use HTML templating so that it doesn't look so... basic. Live updates to the information may also be desired. Signed-off-by: Ava Affine <ava@sunnypup.io>
472 lines
10 KiB
Go
472 lines
10 KiB
Go
package docbuf
|
|
|
|
import (
|
|
"encoding/base64"
|
|
"errors"
|
|
"io"
|
|
)
|
|
|
|
const (
|
|
DemoteFromEmptyCacheError = "Can't demote from empty cache"
|
|
PromoteToFullCacheError = "Can't promote to full cache"
|
|
WriteEmptyDocumentError = "Not writing empty document to store"
|
|
PartialWriteError = "Couldnt complete document write"
|
|
BadReadRequestError = "Can't read less than one documents"
|
|
BadReadLengthError = "Unexpected read length"
|
|
PopFromEmptyBufferError = "Can't pop from empty buffer"
|
|
RemoveOverCapacityError = "Not enough documents in buffer to remove"
|
|
)
|
|
|
|
type DocumentBuffer interface {
|
|
/* Push
|
|
* Add a document to the buffer.
|
|
* If cache is full oldest document is written to backing writer.
|
|
*/
|
|
Push(string) error
|
|
/* Pop
|
|
* removes a document from the cache and pulls a document back
|
|
* out of the backing ReaderWriter.
|
|
*/
|
|
Pop() (string, error)
|
|
/* Remove
|
|
* removes N documents according to arg.
|
|
* backfills cache by pulling documents back out of backing
|
|
* ReaderWriter.
|
|
*/
|
|
Remove(int) ([]string, error)
|
|
/* Flush
|
|
* empties cache by placing all cached documnent into backing
|
|
* ReaderWriter.
|
|
*/
|
|
Flush() error
|
|
/* Peek
|
|
* views first document in either cache or backing ReaderWriter
|
|
*/
|
|
Peek() (string, error)
|
|
/* Read
|
|
* returns N documents from the cache and backing store without
|
|
* removing them from either data structure.
|
|
*/
|
|
Read(int) ([]string, error)
|
|
/* Apply
|
|
* Takes a callable/closure/function that accepts a string (document)
|
|
* and returns a bool. Apply returns an error, or nil.
|
|
* Apply will begin applying this callable to each doc from most recent
|
|
* to least recent so long as the callable continues to return true.
|
|
* If the callable returns false, Apply ceases iteration and returns.
|
|
*/
|
|
Apply(func(string)bool) error
|
|
/* Close
|
|
* flushes and then prepares backing store for closure
|
|
*/
|
|
Close() (int64, error)
|
|
/* Cached
|
|
* returns current length of cache
|
|
*/
|
|
Cached() int
|
|
}
|
|
|
|
type DocBuf struct {
|
|
cache []string
|
|
store io.ReadWriteSeeker
|
|
cacheSize int
|
|
endIndex int64
|
|
}
|
|
|
|
// put an element from cache into store
|
|
func (b *DocBuf) demote() error {
|
|
if len(b.cache) < 1 {
|
|
return errors.New(DemoteFromEmptyCacheError)
|
|
}
|
|
|
|
demoted := b.cache[len(b.cache)-1]
|
|
b.cache = b.cache[:len(b.cache)-1]
|
|
return b.writeToBackingStore(demoted)
|
|
}
|
|
|
|
/* bounds here refers to checking the capacity of the internal memory cache
|
|
* Remove() includes a use case where a user might want to evict more events
|
|
* than the internal memory cache can possibly contain, in which case we use
|
|
* promote and turn off bounds checking in order to overpopulate memory cache
|
|
*/
|
|
func (b *DocBuf) promote(bounds bool) error {
|
|
if bounds && len(b.cache) >= b.cacheSize {
|
|
return errors.New(PromoteToFullCacheError)
|
|
}
|
|
|
|
doc, err := b.readDocumentsFromDisk(1, true, false)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if len(doc) > 0 {
|
|
b.cache = append(b.cache, doc[0])
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// encode as base64 so as to avoid needing to escape things
|
|
// then documents can be separated by newlines
|
|
func (b *DocBuf) writeToBackingStore(doc string) error {
|
|
if len(doc) == 0 {
|
|
return errors.New(WriteEmptyDocumentError)
|
|
}
|
|
|
|
str := base64.StdEncoding.EncodeToString([]byte(doc))
|
|
|
|
// seek to end index
|
|
c, err := b.store.Seek(int64(b.endIndex), io.SeekStart);
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// separate documents with a newline
|
|
if c > 0 {
|
|
str = "\n" + str
|
|
}
|
|
|
|
n, err := b.store.Write([]byte(str))
|
|
if err != nil || n < len(doc) {
|
|
if n < len(doc) {
|
|
return errors.Join(
|
|
errors.New(PartialWriteError),
|
|
err)
|
|
}
|
|
return err
|
|
}
|
|
|
|
b.endIndex += int64(n)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (b *DocBuf) readDocumentsFromDisk(
|
|
count int,
|
|
truncate bool,
|
|
continues bool,
|
|
) ([]string, error) {
|
|
docs := []string{}
|
|
cursor := int64(0)
|
|
var err error
|
|
|
|
if count < 1 {
|
|
return docs, errors.New(BadReadRequestError)
|
|
}
|
|
|
|
if continues {
|
|
cursor, err = b.store.Seek(0, io.SeekCurrent)
|
|
if b.endIndex == 0 || err != nil || cursor == 0 {
|
|
return docs, err
|
|
}
|
|
|
|
} else {
|
|
// catch store is empty
|
|
cursor, err = b.store.Seek(0, io.SeekEnd)
|
|
if b.endIndex == 0 || err != nil || cursor == 0 {
|
|
return docs, err
|
|
}
|
|
|
|
// self repair?
|
|
if b.endIndex > cursor {
|
|
b.endIndex = cursor
|
|
}
|
|
cursor = b.endIndex - 1
|
|
}
|
|
|
|
for len(docs) < count && cursor >= 0 {
|
|
doc := ""
|
|
char := make([]byte, 1)
|
|
char[0] = 0
|
|
|
|
// read bytes backwards from file
|
|
for cursor >= 0 {
|
|
// set cursor
|
|
if _, err := b.store.Seek(cursor, io.SeekStart); err != nil {
|
|
return docs, err
|
|
}
|
|
|
|
// read one byte
|
|
n, err := b.store.Read(char)
|
|
if err != nil {
|
|
return docs, err
|
|
}
|
|
if n != 1 {
|
|
return docs, errors.New(BadReadLengthError)
|
|
}
|
|
|
|
// break on newline
|
|
if char[0] == 10 {
|
|
break
|
|
}
|
|
|
|
cursor -= 1
|
|
doc = string(char) + doc
|
|
}
|
|
|
|
/* parse document and add to account (docs)
|
|
* each read will stop on the newline so make sure
|
|
* we dont try to account for an empty document
|
|
*/
|
|
if len(doc) > 0 && doc != "\n" {
|
|
str, err := base64.StdEncoding.DecodeString(doc)
|
|
if err != nil {
|
|
return docs, err
|
|
}
|
|
doc = string(str)
|
|
docs = append(docs, doc)
|
|
}
|
|
|
|
if len(docs) < count {
|
|
cursor -= 1
|
|
}
|
|
}
|
|
|
|
if truncate {
|
|
if cursor < 0 {
|
|
cursor = 0
|
|
}
|
|
b.endIndex = cursor
|
|
}
|
|
return docs, nil
|
|
}
|
|
|
|
|
|
/* WARNING: this constructor will promote items
|
|
* from storage, resulting in a modified backing
|
|
* as well as a prefilled cache.
|
|
*/
|
|
func NewDocumentBuffer(
|
|
cacheSize int,
|
|
backingStore io.ReadWriteSeeker,
|
|
) (DocumentBuffer, error) {
|
|
newBuf := DocBuf {
|
|
cache: make([]string, 0, cacheSize),
|
|
store: backingStore,
|
|
cacheSize: cacheSize,
|
|
}
|
|
|
|
c, err := backingStore.Seek(0, io.SeekEnd)
|
|
newBuf.endIndex = c
|
|
if err != nil {
|
|
return &newBuf, err
|
|
}
|
|
|
|
// prefill cache
|
|
for range cacheSize {
|
|
newBuf.promote(true)
|
|
}
|
|
return &newBuf, nil
|
|
}
|
|
|
|
/* Push
|
|
* Add a document to the buffer.
|
|
* If cache is full oldest document is written to backing writer.
|
|
*/
|
|
func (b *DocBuf) Push(doc string) error {
|
|
if b.cacheSize == 0 {
|
|
b.writeToBackingStore(doc)
|
|
return nil
|
|
}
|
|
|
|
if len(b.cache) >= b.cacheSize {
|
|
if err := b.demote(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
tc := b.cache
|
|
b.cache = append(make([]string, 0, b.cacheSize), doc)
|
|
b.cache = append(b.cache, tc...)
|
|
return nil
|
|
}
|
|
|
|
/* Pop
|
|
* removes a document from the cache and pulls a document back
|
|
* out of the backing ReaderWriter.
|
|
*/
|
|
func (b *DocBuf) Pop() (string, error) {
|
|
if b.cacheSize == 0 {
|
|
d, e := b.readDocumentsFromDisk(1, true, false)
|
|
if len(d) > 0 {
|
|
return d[0], e
|
|
}
|
|
return "", e
|
|
}
|
|
|
|
if len(b.cache) < 1 {
|
|
if err := b.promote(true); err != nil {
|
|
return "", err
|
|
}
|
|
}
|
|
|
|
if len(b.cache) < 1 {
|
|
return "", errors.New(PopFromEmptyBufferError)
|
|
}
|
|
|
|
candidate := b.cache[0]
|
|
b.cache = b.cache[1:]
|
|
e := b.promote(true)
|
|
|
|
return candidate, e
|
|
}
|
|
|
|
/* Remove
|
|
* removes N newest documents in order of newest to oldest.
|
|
* backfills cache by pulling documents back out of backing
|
|
* ReaderWriter (again, newest first).
|
|
*/
|
|
func (b *DocBuf) Remove(count int) ([]string, error) {
|
|
delta := count - b.cacheSize
|
|
if delta > 0 {
|
|
for range delta {
|
|
if err := b.promote(false); err != nil {
|
|
return []string{}, err
|
|
}
|
|
}
|
|
}
|
|
|
|
if count > len(b.cache) {
|
|
tmp := b.cache
|
|
b.cache = make([]string, 0, b.cacheSize)
|
|
return tmp, errors.New(RemoveOverCapacityError)
|
|
}
|
|
|
|
candidates := b.cache[:count]
|
|
b.cache = b.cache[count:]
|
|
|
|
// refill cache
|
|
delta = b.cacheSize - len(b.cache)
|
|
if delta > 0 {
|
|
for range delta {
|
|
if err := b.promote(true); err != nil {
|
|
return []string{}, err
|
|
}
|
|
}
|
|
}
|
|
|
|
return candidates, nil
|
|
}
|
|
|
|
/* Flush
|
|
* empties cache by placing all cached documnent into backing
|
|
* ReaderWriter.
|
|
*/
|
|
func (b *DocBuf) Flush() error {
|
|
for range len(b.cache) {
|
|
if err := b.demote(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
/* Peek
|
|
* views first document in either cache or backing ReaderWriter
|
|
*/
|
|
func (b *DocBuf) Peek() (string, error) {
|
|
if len(b.cache) > 0 {
|
|
return b.cache[0], nil
|
|
} else {
|
|
d, e := b.readDocumentsFromDisk(1, false, false)
|
|
if len(d) < 1 {
|
|
return "", e
|
|
}
|
|
return d[0], e
|
|
}
|
|
}
|
|
|
|
/* Read
|
|
* returns N documents from the cache and backing store without
|
|
* removing them from either data structure.
|
|
*/
|
|
func (b *DocBuf) Read(count int) ([]string, error) {
|
|
delta := count - len(b.cache)
|
|
candidates := b.cache[:count - (delta)]
|
|
|
|
if delta > 0 {
|
|
fromStorage, err := b.readDocumentsFromDisk(delta, false, false)
|
|
if err != nil {
|
|
return candidates, err
|
|
}
|
|
|
|
candidates = append(candidates, fromStorage...)
|
|
}
|
|
|
|
return candidates, nil
|
|
}
|
|
|
|
/* Apply
|
|
* Takes a callable/closure/function that accepts a string (document)
|
|
* and returns a bool. Apply returns an error, or nil.
|
|
* Apply will begin applying this callable to each doc from most recent
|
|
* to least recent so long as the callable continues to return true.
|
|
* If the callable returns false, Apply ceases iteration and returns.
|
|
*/
|
|
func (b *DocBuf) Apply(f func(string)bool) error {
|
|
// iterate over internal cache applying function
|
|
for _, i := range b.cache {
|
|
if !f(i) {
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// begin iterating with readDocumentsFromDisk
|
|
first := true
|
|
flag := true
|
|
for flag {
|
|
doc, err := b.readDocumentsFromDisk(1, false, !first)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if len(doc) == 0 {
|
|
return nil
|
|
} else if len(doc) != 1 {
|
|
return errors.New("improper read from buffer")
|
|
}
|
|
|
|
first = false
|
|
flag = f(doc[0])
|
|
|
|
if flag {
|
|
var c, d int64
|
|
var err error
|
|
// since continue is on we need to bring the cursor back one
|
|
if c, err = b.store.Seek(0, io.SeekCurrent); err != nil {
|
|
return err
|
|
// dont seek earlier than 0
|
|
} else if c < 1 {
|
|
break
|
|
}
|
|
if d, err = b.store.Seek(-1, io.SeekCurrent); err != nil {
|
|
return err
|
|
}
|
|
|
|
if c == d {
|
|
return errors.New("Seek failure!")
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
/* Close
|
|
* flushes and then prepares backing store for closure.
|
|
* returns the end index into the underlying stream...
|
|
* it is the callers responsibility to truncate.
|
|
*/
|
|
func (b *DocBuf) Close() (int64, error) {
|
|
if err := b.Flush(); err != nil {
|
|
return b.endIndex, err
|
|
}
|
|
|
|
return b.endIndex, nil
|
|
}
|
|
|
|
/* Cached
|
|
* returns current length of cache
|
|
*/
|
|
func (b *DocBuf) Cached() int {
|
|
return len(b.cache)
|
|
}
|