Merge branch 'ava-docbuf-insanity' into 'main'
runtime data persistence See merge request whom/bingobot!16
This commit is contained in:
commit
41c420ebcc
9 changed files with 1425 additions and 146 deletions
|
|
@ -22,3 +22,8 @@ tests-config-pkg:
|
|||
stage: test
|
||||
script:
|
||||
- go test ./internal/config
|
||||
|
||||
tests-docbuf-pkg:
|
||||
stage: test
|
||||
script:
|
||||
- go test ./internal/docbuf
|
||||
|
|
|
|||
|
|
@ -35,6 +35,12 @@ type AppConfig struct {
|
|||
current recommended value is 1000ms.
|
||||
*/
|
||||
VoiceActivityTimerSleepIntervalMillis int `yaml:"voice_activity_timer_sleep_interval_millis"`
|
||||
|
||||
/* persistent state file store */
|
||||
PersistentCacheStore string `yaml:"persistent_cache_store"`
|
||||
|
||||
/* number of internal state events to cache in memory */
|
||||
InMemoryEventCacheSize int `yaml:"InMemoryEventCacheSize"`
|
||||
}
|
||||
|
||||
var config *AppConfig
|
||||
|
|
@ -102,4 +108,6 @@ func setDefaults() {
|
|||
viper.SetDefault("LogAddSource", true)
|
||||
viper.SetDefault("VoiceActivityThresholdSeconds", 600)
|
||||
viper.SetDefault("VoiceActivityTimerSleepIntervalMillis", 1000)
|
||||
viper.SetDefault("PersistentCacheStore", "/tmp/bingobot")
|
||||
viper.SetDefault("InMemoryEventCacheSize", 512)
|
||||
}
|
||||
|
|
|
|||
459
internal/docbuf/docbuf.go
Normal file
459
internal/docbuf/docbuf.go
Normal file
|
|
@ -0,0 +1,459 @@
|
|||
package docbuf
|
||||
|
||||
import (
|
||||
"encoding/base64"
|
||||
"errors"
|
||||
"io"
|
||||
)
|
||||
|
||||
const (
|
||||
DemoteFromEmptyCacheError = "Can't demote from empty cache"
|
||||
PromoteToFullCacheError = "Can't promote to full cache"
|
||||
WriteEmptyDocumentError = "Not writing empty document to store"
|
||||
PartialWriteError = "Couldnt complete document write"
|
||||
BadReadRequestError = "Can't read less than one documents"
|
||||
BadReadLengthError = "Unexpected read length"
|
||||
PopFromEmptyBufferError = "Can't pop from empty buffer"
|
||||
RemoveOverCapacityError = "Not enough documents in buffer to remove"
|
||||
)
|
||||
|
||||
type DocumentBuffer interface {
|
||||
/* Push
|
||||
* Add a document to the buffer.
|
||||
* If cache is full oldest document is written to backing writer.
|
||||
*/
|
||||
Push(string) error
|
||||
/* Pop
|
||||
* removes a document from the cache and pulls a document back
|
||||
* out of the backing ReaderWriter.
|
||||
*/
|
||||
Pop() (string, error)
|
||||
/* Remove
|
||||
* removes N documents according to arg.
|
||||
* backfills cache by pulling documents back out of backing
|
||||
* ReaderWriter.
|
||||
*/
|
||||
Remove(int) ([]string, error)
|
||||
/* Flush
|
||||
* empties cache by placing all cached documnent into backing
|
||||
* ReaderWriter.
|
||||
*/
|
||||
Flush() error
|
||||
/* Peek
|
||||
* views first document in either cache or backing ReaderWriter
|
||||
*/
|
||||
Peek() (string, error)
|
||||
/* Read
|
||||
* returns N documents from the cache and backing store without
|
||||
* removing them from either data structure.
|
||||
*/
|
||||
Read(int) ([]string, error)
|
||||
/* Apply
|
||||
* Takes a callable/closure/function that accepts a string (document)
|
||||
* and returns a bool. Apply returns an error, or nil.
|
||||
* Apply will begin applying this callable to each doc from most recent
|
||||
* to least recent so long as the callable continues to return true.
|
||||
* If the callable returns false, Apply ceases iteration and returns.
|
||||
*/
|
||||
Apply(func(string)bool) error
|
||||
/* Close
|
||||
* flushes and then prepares backing store for closure
|
||||
*/
|
||||
Close() (int64, error)
|
||||
/* Cached
|
||||
* returns current length of cache
|
||||
*/
|
||||
Cached() int
|
||||
}
|
||||
|
||||
type DocBuf struct {
|
||||
cache []string
|
||||
store io.ReadWriteSeeker
|
||||
cacheSize int
|
||||
endIndex int64
|
||||
}
|
||||
|
||||
// put an element from cache into store
|
||||
func (b *DocBuf) demote() error {
|
||||
if len(b.cache) < 1 {
|
||||
return errors.New(DemoteFromEmptyCacheError)
|
||||
}
|
||||
|
||||
demoted := b.cache[len(b.cache)-1]
|
||||
b.cache = b.cache[:len(b.cache)-1]
|
||||
return b.writeToBackingStore(demoted)
|
||||
}
|
||||
|
||||
/* bounds here refers to checking the capacity of the internal memory cache
|
||||
* Remove() includes a use case where a user might want to evict more events
|
||||
* than the internal memory cache can possibly contain, in which case we use
|
||||
* promote and turn off bounds checking in order to overpopulate memory cache
|
||||
*/
|
||||
func (b *DocBuf) promote(bounds bool) error {
|
||||
if bounds && len(b.cache) >= b.cacheSize {
|
||||
return errors.New(PromoteToFullCacheError)
|
||||
}
|
||||
|
||||
doc, err := b.readDocumentsFromDisk(1, true, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(doc) > 0 {
|
||||
b.cache = append(b.cache, doc[0])
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// encode as base64 so as to avoid needing to escape things
|
||||
// then documents can be separated by newlines
|
||||
func (b *DocBuf) writeToBackingStore(doc string) error {
|
||||
if len(doc) == 0 {
|
||||
return errors.New(WriteEmptyDocumentError)
|
||||
}
|
||||
|
||||
str := base64.StdEncoding.EncodeToString([]byte(doc))
|
||||
|
||||
// seek to end index
|
||||
c, err := b.store.Seek(int64(b.endIndex), io.SeekStart);
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// separate documents with a newline
|
||||
if c > 0 {
|
||||
str = "\n" + str
|
||||
}
|
||||
|
||||
n, err := b.store.Write([]byte(str))
|
||||
if err != nil || n < len(doc) {
|
||||
if n < len(doc) {
|
||||
return errors.Join(
|
||||
errors.New(PartialWriteError),
|
||||
err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
b.endIndex += int64(n)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *DocBuf) readDocumentsFromDisk(
|
||||
count int,
|
||||
truncate bool,
|
||||
continues bool,
|
||||
) ([]string, error) {
|
||||
docs := []string{}
|
||||
cursor := int64(0)
|
||||
var err error
|
||||
|
||||
if count < 1 {
|
||||
return docs, errors.New(BadReadRequestError)
|
||||
}
|
||||
|
||||
if continues {
|
||||
cursor, err = b.store.Seek(0, io.SeekCurrent)
|
||||
if b.endIndex == 0 || err != nil || cursor == 0 {
|
||||
return docs, err
|
||||
}
|
||||
|
||||
} else {
|
||||
// catch store is empty
|
||||
cursor, err = b.store.Seek(0, io.SeekEnd)
|
||||
if b.endIndex == 0 || err != nil || cursor == 0 {
|
||||
return docs, err
|
||||
}
|
||||
|
||||
// self repair?
|
||||
if b.endIndex > cursor {
|
||||
b.endIndex = cursor
|
||||
}
|
||||
cursor = b.endIndex - 1
|
||||
}
|
||||
|
||||
for len(docs) < count && cursor >= 0 {
|
||||
doc := ""
|
||||
char := make([]byte, 1)
|
||||
char[0] = 0
|
||||
|
||||
// read bytes backwards from file
|
||||
for cursor >= 0 {
|
||||
// set cursor
|
||||
if _, err := b.store.Seek(cursor, io.SeekStart); err != nil {
|
||||
return docs, err
|
||||
}
|
||||
|
||||
// read one byte
|
||||
n, err := b.store.Read(char)
|
||||
if err != nil {
|
||||
return docs, err
|
||||
}
|
||||
if n != 1 {
|
||||
return docs, errors.New(BadReadLengthError)
|
||||
}
|
||||
|
||||
// break on newline
|
||||
if char[0] == 10 {
|
||||
break
|
||||
}
|
||||
|
||||
cursor -= 1
|
||||
doc = string(char) + doc
|
||||
}
|
||||
|
||||
/* parse document and add to account (docs)
|
||||
* each read will stop on the newline so make sure
|
||||
* we dont try to account for an empty document
|
||||
*/
|
||||
if len(doc) > 0 && doc != "\n" {
|
||||
str, err := base64.StdEncoding.DecodeString(doc)
|
||||
if err != nil {
|
||||
return docs, err
|
||||
}
|
||||
doc = string(str)
|
||||
docs = append(docs, doc)
|
||||
}
|
||||
|
||||
if len(docs) < count {
|
||||
cursor -= 1
|
||||
}
|
||||
}
|
||||
|
||||
if truncate {
|
||||
if cursor < 0 {
|
||||
cursor = 0
|
||||
}
|
||||
b.endIndex = cursor
|
||||
}
|
||||
return docs, nil
|
||||
}
|
||||
|
||||
|
||||
/* WARNING: this constructor will promote items
|
||||
* from storage, resulting in a modified backing
|
||||
* as well as a prefilled cache.
|
||||
*/
|
||||
func NewDocumentBuffer(
|
||||
cacheSize int,
|
||||
backingStore io.ReadWriteSeeker,
|
||||
) (DocumentBuffer, error) {
|
||||
newBuf := DocBuf {
|
||||
cache: make([]string, 0, cacheSize),
|
||||
store: backingStore,
|
||||
cacheSize: cacheSize,
|
||||
}
|
||||
|
||||
c, err := backingStore.Seek(0, io.SeekEnd)
|
||||
newBuf.endIndex = c
|
||||
if err != nil {
|
||||
return &newBuf, err
|
||||
}
|
||||
|
||||
// prefill cache
|
||||
for range cacheSize {
|
||||
newBuf.promote(true)
|
||||
}
|
||||
return &newBuf, nil
|
||||
}
|
||||
|
||||
/* Push
|
||||
* Add a document to the buffer.
|
||||
* If cache is full oldest document is written to backing writer.
|
||||
*/
|
||||
func (b *DocBuf) Push(doc string) error {
|
||||
if len(b.cache) >= b.cacheSize {
|
||||
if err := b.demote(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
tc := b.cache
|
||||
b.cache = append(make([]string, 0, b.cacheSize), doc)
|
||||
b.cache = append(b.cache, tc...)
|
||||
return nil
|
||||
}
|
||||
|
||||
/* Pop
|
||||
* removes a document from the cache and pulls a document back
|
||||
* out of the backing ReaderWriter.
|
||||
*/
|
||||
func (b *DocBuf) Pop() (string, error) {
|
||||
if len(b.cache) < 1 {
|
||||
if err := b.promote(true); err != nil {
|
||||
return "", err
|
||||
}
|
||||
}
|
||||
|
||||
if len(b.cache) < 1 {
|
||||
return "", errors.New(PopFromEmptyBufferError)
|
||||
}
|
||||
|
||||
candidate := b.cache[0]
|
||||
b.cache = b.cache[1:]
|
||||
e := b.promote(true)
|
||||
|
||||
return candidate, e
|
||||
}
|
||||
|
||||
/* Remove
|
||||
* removes N newest documents in order of newest to oldest.
|
||||
* backfills cache by pulling documents back out of backing
|
||||
* ReaderWriter (again, newest first).
|
||||
*/
|
||||
func (b *DocBuf) Remove(count int) ([]string, error) {
|
||||
delta := count - b.cacheSize
|
||||
if delta > 0 {
|
||||
for range delta {
|
||||
if err := b.promote(false); err != nil {
|
||||
return []string{}, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if count > len(b.cache) {
|
||||
tmp := b.cache
|
||||
b.cache = make([]string, 0, b.cacheSize)
|
||||
return tmp, errors.New(RemoveOverCapacityError)
|
||||
}
|
||||
|
||||
candidates := b.cache[:count]
|
||||
b.cache = b.cache[count:]
|
||||
|
||||
// refill cache
|
||||
delta = b.cacheSize - len(b.cache)
|
||||
if delta > 0 {
|
||||
for range delta {
|
||||
if err := b.promote(true); err != nil {
|
||||
return []string{}, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return candidates, nil
|
||||
}
|
||||
|
||||
/* Flush
|
||||
* empties cache by placing all cached documnent into backing
|
||||
* ReaderWriter.
|
||||
*/
|
||||
func (b *DocBuf) Flush() error {
|
||||
for range len(b.cache) {
|
||||
if err := b.demote(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
/* Peek
|
||||
* views first document in either cache or backing ReaderWriter
|
||||
*/
|
||||
func (b *DocBuf) Peek() (string, error) {
|
||||
if len(b.cache) > 0 {
|
||||
return b.cache[0], nil
|
||||
} else {
|
||||
d, e := b.readDocumentsFromDisk(1, false, false)
|
||||
if len(d) < 1 {
|
||||
return "", e
|
||||
}
|
||||
return d[0], e
|
||||
}
|
||||
}
|
||||
|
||||
/* Read
|
||||
* returns N documents from the cache and backing store without
|
||||
* removing them from either data structure.
|
||||
*/
|
||||
func (b *DocBuf) Read(count int) ([]string, error) {
|
||||
delta := count - len(b.cache)
|
||||
candidates := b.cache[:count - (delta)]
|
||||
|
||||
if delta > 0 {
|
||||
fromStorage, err := b.readDocumentsFromDisk(delta, false, false)
|
||||
if err != nil {
|
||||
return candidates, err
|
||||
}
|
||||
|
||||
candidates = append(candidates, fromStorage...)
|
||||
}
|
||||
|
||||
return candidates, nil
|
||||
}
|
||||
|
||||
/* Apply
|
||||
* Takes a callable/closure/function that accepts a string (document)
|
||||
* and returns a bool. Apply returns an error, or nil.
|
||||
* Apply will begin applying this callable to each doc from most recent
|
||||
* to least recent so long as the callable continues to return true.
|
||||
* If the callable returns false, Apply ceases iteration and returns.
|
||||
*/
|
||||
func (b *DocBuf) Apply(f func(string)bool) error {
|
||||
// iterate over internal cache applying function
|
||||
for _, i := range b.cache {
|
||||
if !f(i) {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// begin iterating with readDocumentsFromDisk
|
||||
first := true
|
||||
flag := true
|
||||
for flag {
|
||||
doc, err := b.readDocumentsFromDisk(1, false, !first)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(doc) == 0 {
|
||||
return nil
|
||||
} else if len(doc) != 1 {
|
||||
return errors.New("improper read from buffer")
|
||||
}
|
||||
|
||||
first = false
|
||||
flag = f(doc[0])
|
||||
|
||||
if flag {
|
||||
var c, d int64
|
||||
var err error
|
||||
// since continue is on we need to bring the cursor back one
|
||||
if c, err = b.store.Seek(0, io.SeekCurrent); err != nil {
|
||||
return err
|
||||
// dont seek earlier than 0
|
||||
} else if c < 1 {
|
||||
break
|
||||
}
|
||||
if d, err = b.store.Seek(-1, io.SeekCurrent); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if c == d {
|
||||
return errors.New("Seek failure!")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
/* Close
|
||||
* flushes and then prepares backing store for closure.
|
||||
* returns the end index into the underlying stream...
|
||||
* it is the callers responsibility to truncate.
|
||||
*/
|
||||
func (b *DocBuf) Close() (int64, error) {
|
||||
if err := b.Flush(); err != nil {
|
||||
return b.endIndex, err
|
||||
}
|
||||
|
||||
return b.endIndex, nil
|
||||
}
|
||||
|
||||
/* Cached
|
||||
* returns current length of cache
|
||||
*/
|
||||
func (b *DocBuf) Cached() int {
|
||||
return len(b.cache)
|
||||
}
|
||||
337
internal/docbuf/docbuf_test.go
Normal file
337
internal/docbuf/docbuf_test.go
Normal file
|
|
@ -0,0 +1,337 @@
|
|||
package docbuf
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
func docu(ctr *int) string {
|
||||
// we add a new line to try to trick the docbuf
|
||||
// into thinking there is an extra doc here.
|
||||
// but there isnt one.
|
||||
s := fmt.Sprintf("%d\n", *ctr)
|
||||
(*ctr) += 1
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
func TestPushPop(t *testing.T) {
|
||||
docCtr := 1
|
||||
|
||||
backingStore := NewReadWriteSeekString()
|
||||
buf, e := NewDocumentBuffer(3, &backingStore)
|
||||
if e != nil {
|
||||
t.Fatalf("error making documentbuffer: %e", e)
|
||||
}
|
||||
|
||||
// does push add 1 to cache
|
||||
if err := buf.Push(docu(&docCtr)); err != nil || buf.Cached() != 1 {
|
||||
t.Fatalf("error pushing: %e", err)
|
||||
}
|
||||
|
||||
// does push past cache demote first document
|
||||
if err := buf.Push(docu(&docCtr)); err != nil || buf.Cached() != 2 {
|
||||
t.Fatalf("error pushing: %e", err)
|
||||
}
|
||||
if err := buf.Push(docu(&docCtr)); err != nil || buf.Cached() != 3 {
|
||||
t.Fatalf("error pushing: %e", err)
|
||||
}
|
||||
if err := buf.Push(docu(&docCtr)); err != nil || backingStore.Cursor() == 0 {
|
||||
t.Fatalf("error pushing: %e", err)
|
||||
}
|
||||
|
||||
if backingStore.Contents() != "MQo=" {
|
||||
t.Fatalf("expected oldest doc to be in store, got %s",
|
||||
backingStore.Contents())
|
||||
}
|
||||
|
||||
// does pop promote document from cache
|
||||
doc, err := buf.Pop()
|
||||
if err != nil || doc != "4\n" {
|
||||
t.Fatalf("did not get expected doc from cache: %s (%e)", doc, err)
|
||||
}
|
||||
if buf.Cached() != 3 {
|
||||
t.Fatalf("doc buffer did not promote: %d", buf.Cached())
|
||||
}
|
||||
|
||||
// does pop past empty throw the right error
|
||||
doc, err = buf.Pop()
|
||||
if err != nil || doc != "3\n" {
|
||||
t.Fatalf("did not get expected doc from cache: %s (%e)", doc, err)
|
||||
}
|
||||
doc, err = buf.Pop()
|
||||
if err != nil || doc != "2\n" {
|
||||
t.Fatalf("did not get expected doc from cache: %s (%e)", doc, err)
|
||||
}
|
||||
doc, err = buf.Pop()
|
||||
if err != nil || doc != "1\n" {
|
||||
t.Logf("bs: %s", backingStore.Contents())
|
||||
t.Fatalf("did not get expected doc from cache: %s (%e)", doc, err)
|
||||
}
|
||||
doc, err = buf.Pop()
|
||||
if err == nil ||
|
||||
doc != "" ||
|
||||
err.Error() != "Can't pop from empty buffer" {
|
||||
t.Fatalf("did not get expected doc from cache: %s", doc)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRemove(t *testing.T) {
|
||||
docCtr := 1
|
||||
backingStore := NewReadWriteSeekString()
|
||||
buf, e := NewDocumentBuffer(3, &backingStore)
|
||||
if e != nil {
|
||||
t.Fatalf("error making documentbuffer: %e", e)
|
||||
}
|
||||
|
||||
// setup test data
|
||||
if err := buf.Push(docu(&docCtr)); err != nil || buf.Cached() != 1 {
|
||||
t.Fatalf("error pushing: %e", err)
|
||||
}
|
||||
if err := buf.Push(docu(&docCtr)); err != nil || buf.Cached() != 2 {
|
||||
t.Fatalf("error pushing: %e", err)
|
||||
}
|
||||
if err := buf.Push(docu(&docCtr)); err != nil || buf.Cached() != 3 {
|
||||
t.Fatalf("error pushing: %e", err)
|
||||
}
|
||||
if err := buf.Push(docu(&docCtr)); err != nil ||
|
||||
buf.Cached() != 3 || backingStore.Contents() != "MQo=" {
|
||||
t.Fatalf("error pushing: %e", err)
|
||||
}
|
||||
|
||||
// tests
|
||||
docs, err := buf.Remove(2)
|
||||
if err != nil ||
|
||||
len(docs) != 2 ||
|
||||
docs[0] != "4\n" ||
|
||||
docs[1] != "3\n" ||
|
||||
buf.Cached() != 2 {
|
||||
t.Fatalf("error removing: %e", err)
|
||||
}
|
||||
|
||||
for range 5 {
|
||||
if err := buf.Push(docu(&docCtr)); err != nil {
|
||||
t.Fatalf("error pushing: %e", err)
|
||||
}
|
||||
}
|
||||
|
||||
docs, err = buf.Remove(4)
|
||||
if err != nil ||
|
||||
len(docs) != 4 ||
|
||||
docs[0] != "9\n" ||
|
||||
docs[1] != "8\n" ||
|
||||
docs[2] != "7\n" ||
|
||||
docs[3] != "6\n" ||
|
||||
buf.Cached() != 3 {
|
||||
t.Fatalf("error removing: %e", err)
|
||||
}
|
||||
|
||||
docs, err = buf.Remove(3)
|
||||
if err != nil ||
|
||||
len(docs) != 3 ||
|
||||
docs[0] != "5\n" ||
|
||||
docs[1] != "2\n" ||
|
||||
docs[2] != "1\n" ||
|
||||
buf.Cached() != 0 {
|
||||
t.Fatalf("error removing: %e", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFlush(t *testing.T) {
|
||||
docCtr := 1
|
||||
expectedDoc := "MQo=\nMgo=\nMwo=\nNAo="
|
||||
backingStore := NewReadWriteSeekString()
|
||||
buf, e := NewDocumentBuffer(3, &backingStore)
|
||||
if e != nil {
|
||||
t.Fatalf("error making documentbuffer: %e", e)
|
||||
}
|
||||
|
||||
// setup test data
|
||||
if err := buf.Push(docu(&docCtr)); err != nil || buf.Cached() != 1 {
|
||||
t.Fatalf("error pushing: %e", err)
|
||||
}
|
||||
if err := buf.Push(docu(&docCtr)); err != nil || buf.Cached() != 2 {
|
||||
t.Fatalf("error pushing: %e", err)
|
||||
}
|
||||
if err := buf.Push(docu(&docCtr)); err != nil || buf.Cached() != 3 {
|
||||
t.Fatalf("error pushing: %e", err)
|
||||
}
|
||||
if err := buf.Push(docu(&docCtr)); err != nil ||
|
||||
buf.Cached() != 3 || backingStore.Contents() == "Nao=\n" {
|
||||
t.Fatalf("error pushing: %e", err)
|
||||
}
|
||||
|
||||
// test
|
||||
if err := buf.Flush(); err != nil {
|
||||
t.Fatalf("error flushing buffer: %e", err)
|
||||
}
|
||||
|
||||
if backingStore.Contents() != expectedDoc {
|
||||
t.Fatalf("did not get expected document: %s", backingStore.Contents())
|
||||
}
|
||||
}
|
||||
|
||||
func TestPeek(t *testing.T) {
|
||||
docCtr := 1
|
||||
backingStore := NewReadWriteSeekString()
|
||||
buf, e := NewDocumentBuffer(3, &backingStore)
|
||||
if e != nil {
|
||||
t.Fatalf("error making documentbuffer: %e", e)
|
||||
}
|
||||
|
||||
// setup test data
|
||||
if err := buf.Push(docu(&docCtr)); err != nil || buf.Cached() != 1 {
|
||||
t.Fatalf("error pushing: %e", err)
|
||||
}
|
||||
|
||||
// test
|
||||
if d, e := buf.Peek(); e != nil ||
|
||||
d != "1\n" ||
|
||||
buf.Cached() != 1 {
|
||||
t.Fatalf("error peeking: %e", e)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRead(t *testing.T) {
|
||||
docCtr := 1
|
||||
backingStore := NewReadWriteSeekString()
|
||||
buf, e := NewDocumentBuffer(3, &backingStore)
|
||||
if e != nil {
|
||||
t.Fatalf("error making documentbuffer: %e", e)
|
||||
}
|
||||
|
||||
// setup test data
|
||||
if err := buf.Push(docu(&docCtr)); err != nil || buf.Cached() != 1 {
|
||||
t.Fatalf("error pushing: %e", err)
|
||||
}
|
||||
if err := buf.Push(docu(&docCtr)); err != nil || buf.Cached() != 2 {
|
||||
t.Fatalf("error pushing: %e", err)
|
||||
}
|
||||
if err := buf.Push(docu(&docCtr)); err != nil || buf.Cached() != 3 {
|
||||
t.Fatalf("error pushing: %e", err)
|
||||
}
|
||||
if err := buf.Push(docu(&docCtr)); err != nil ||
|
||||
buf.Cached() != 3 || backingStore.Contents() != "MQo=" {
|
||||
t.Fatalf("error pushing: %e", err)
|
||||
}
|
||||
|
||||
// test
|
||||
if docs, err := buf.Read(4); err != nil ||
|
||||
len(docs) != 4 ||
|
||||
docs[0] != "4\n" ||
|
||||
docs[1] != "3\n" ||
|
||||
docs[2] != "2\n" ||
|
||||
docs[3] != "1\n" ||
|
||||
buf.Cached() != 3 ||
|
||||
backingStore.Contents() != "MQo=" {
|
||||
t.Fatalf("error reading: %e", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestClose(t *testing.T) {
|
||||
// do pushes and a remove then Close. assure no error
|
||||
// is the int return where I can truncate?
|
||||
docCtr := 1
|
||||
backingStore := NewReadWriteSeekString()
|
||||
buf, e := NewDocumentBuffer(3, &backingStore)
|
||||
if e != nil {
|
||||
t.Fatalf("error making documentbuffer: %e", e)
|
||||
}
|
||||
|
||||
// setup test data
|
||||
if err := buf.Push(docu(&docCtr)); err != nil || buf.Cached() != 1 {
|
||||
t.Fatalf("error pushing: %e", err)
|
||||
}
|
||||
if err := buf.Push(docu(&docCtr)); err != nil || buf.Cached() != 2 {
|
||||
t.Fatalf("error pushing: %e", err)
|
||||
}
|
||||
if err := buf.Push(docu(&docCtr)); err != nil || buf.Cached() != 3 {
|
||||
t.Fatalf("error pushing: %e", err)
|
||||
}
|
||||
if err := buf.Push(docu(&docCtr)); err != nil ||
|
||||
buf.Cached() != 3 || backingStore.Contents() != "MQo=" {
|
||||
t.Fatalf("error pushing: %e", err)
|
||||
}
|
||||
|
||||
for range 5 {
|
||||
if err := buf.Push(docu(&docCtr)); err != nil {
|
||||
t.Fatalf("error pushing: %e", err)
|
||||
}
|
||||
}
|
||||
|
||||
docs, err := buf.Remove(4)
|
||||
if err != nil ||
|
||||
len(docs) != 4 ||
|
||||
buf.Cached() != 3 {
|
||||
t.Fatalf("error removing: %e", err)
|
||||
}
|
||||
|
||||
expectedDoc := "MQo=\nMgo=\nMwo=\nNAo=\nNQo=\nNgo="
|
||||
idx, err := buf.Close()
|
||||
if err != nil ||
|
||||
idx != 24 ||
|
||||
buf.Cached() != 0 ||
|
||||
backingStore.Contents() != expectedDoc {
|
||||
t.Fatalf("error closing: %e", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestInitialize(t *testing.T) {
|
||||
backingStore := NewReadWriteSeekString()
|
||||
backingStore.Write([]byte("MQo=\nMgo=\nMwo=\nNAo=\nNQo=\nNgo="))
|
||||
buf, e := NewDocumentBuffer(3, &backingStore)
|
||||
if e != nil {
|
||||
t.Fatalf("error making documentbuffer: %e", e)
|
||||
}
|
||||
|
||||
// test cached
|
||||
if buf.Cached() != 3 {
|
||||
t.Fatalf("expected 3 docs in cache")
|
||||
}
|
||||
|
||||
// read all docs
|
||||
docs, err := buf.Read(6)
|
||||
if err != nil ||
|
||||
len(docs) != 6 ||
|
||||
docs[0] != "6\n" ||
|
||||
docs[1] != "5\n" ||
|
||||
docs[2] != "4\n" ||
|
||||
docs[3] != "3\n" ||
|
||||
docs[4] != "2\n" ||
|
||||
docs[5] != "1\n" {
|
||||
t.Fatalf("error reading: %e", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestApply(t *testing.T) {
|
||||
backingStore := NewReadWriteSeekString()
|
||||
backingStore.Write([]byte("MQo=\nMgo=\nMwo=\nNAo=\nNQo=\nNgo="))
|
||||
buf, e := NewDocumentBuffer(3, &backingStore)
|
||||
if e != nil {
|
||||
t.Fatalf("error making documentbuffer: %e", e)
|
||||
}
|
||||
|
||||
// test cached
|
||||
if buf.Cached() != 3 {
|
||||
t.Fatalf("expected 3 docs in cache")
|
||||
}
|
||||
|
||||
count := 0
|
||||
if err := buf.Apply(func(doc string) bool {
|
||||
count += 1
|
||||
return true
|
||||
}); err != nil || count != 6 {
|
||||
t.Fatalf("error applying: %e", err)
|
||||
}
|
||||
|
||||
count = 0
|
||||
if err := buf.Apply(func(doc string) bool {
|
||||
if doc == "2\n" {
|
||||
return false
|
||||
}
|
||||
count += 1
|
||||
return true
|
||||
}); err != nil || count != 4 {
|
||||
t.Fatalf("error applying: %e", err)
|
||||
}
|
||||
}
|
||||
100
internal/docbuf/read_write_seek_string.go
Normal file
100
internal/docbuf/read_write_seek_string.go
Normal file
|
|
@ -0,0 +1,100 @@
|
|||
package docbuf
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
/* WARNING:
|
||||
* This code is meant to assist with testing and mock ups
|
||||
* It is not only not designed to any rigorous standards,
|
||||
* but additionally does not offer any benefit over a static
|
||||
* in memory single layer cache.
|
||||
*/
|
||||
|
||||
type ReadWriteSeekString struct {
|
||||
inner string
|
||||
cursor int
|
||||
}
|
||||
|
||||
func NewReadWriteSeekString() ReadWriteSeekString{
|
||||
return ReadWriteSeekString{
|
||||
inner: "",
|
||||
cursor: 0,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *ReadWriteSeekString) Read(
|
||||
buf []byte,
|
||||
) (int, error) {
|
||||
i := 0
|
||||
for ; i < len(buf); i++ {
|
||||
if len(s.inner) <= s.cursor {
|
||||
return i, nil
|
||||
}
|
||||
buf[i] = s.inner[s.cursor]
|
||||
s.cursor += 1
|
||||
}
|
||||
return i, nil
|
||||
}
|
||||
|
||||
func (s *ReadWriteSeekString) Write(
|
||||
buf []byte,
|
||||
) (int, error) {
|
||||
backfillDelta := s.cursor - (len(s.inner) - 1)
|
||||
if backfillDelta > 0 {
|
||||
for range backfillDelta {
|
||||
s.inner += "\x00"
|
||||
}
|
||||
}
|
||||
|
||||
tmpBuf := ""
|
||||
if s.cursor > 0 {
|
||||
tmpBuf += s.inner[:s.cursor]
|
||||
}
|
||||
tmpBuf += string(buf)
|
||||
if s.cursor + len(buf) < (len(s.inner) - 1) {
|
||||
tmpBuf += s.inner[s.cursor + len(buf):]
|
||||
}
|
||||
|
||||
s.inner = tmpBuf
|
||||
s.cursor += len(buf)
|
||||
return len(buf), nil
|
||||
}
|
||||
|
||||
func (s *ReadWriteSeekString) Seek(
|
||||
offset int64,
|
||||
whence int,
|
||||
) (int64, error) {
|
||||
var tmpCur int64
|
||||
tmpCur = 0
|
||||
|
||||
switch whence {
|
||||
case io.SeekCurrent:
|
||||
tmpCur = int64(s.cursor)
|
||||
case io.SeekEnd:
|
||||
tmpCur = int64(len(s.inner))
|
||||
case io.SeekStart:
|
||||
tmpCur = int64(0)
|
||||
default:
|
||||
return int64(s.cursor),
|
||||
errors.New("invalid whence value")
|
||||
}
|
||||
|
||||
tmpCur += offset
|
||||
if tmpCur < 0 {
|
||||
msg := fmt.Sprintf("seek index (%d) is negative", tmpCur)
|
||||
return int64(s.cursor), errors.New(msg)
|
||||
}
|
||||
s.cursor = int(tmpCur)
|
||||
return tmpCur, nil
|
||||
}
|
||||
|
||||
func (s *ReadWriteSeekString) Contents() string {
|
||||
return s.inner
|
||||
}
|
||||
|
||||
func (s *ReadWriteSeekString) Cursor() int {
|
||||
return s.cursor
|
||||
}
|
||||
165
internal/docbuf/read_write_seek_string_test.go
Normal file
165
internal/docbuf/read_write_seek_string_test.go
Normal file
|
|
@ -0,0 +1,165 @@
|
|||
package docbuf
|
||||
|
||||
import (
|
||||
"io"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func SetupTestingBuffer(t *testing.T) ReadWriteSeekString{
|
||||
str := NewReadWriteSeekString()
|
||||
n, e := str.Write([]byte("test"))
|
||||
if n != 4 || e != nil {
|
||||
t.Fatalf("Failed to write to buffer: %e", e)
|
||||
}
|
||||
|
||||
return str
|
||||
}
|
||||
|
||||
// can it read
|
||||
func TestRWSSRead(t *testing.T) {
|
||||
b := make([]byte, 4)
|
||||
buf := SetupTestingBuffer(t)
|
||||
c, err := buf.Seek(0, io.SeekStart)
|
||||
if err != nil || c != 0 {
|
||||
t.Fatalf("seek failed: %e", err)
|
||||
}
|
||||
|
||||
n, err := buf.Read(b)
|
||||
if n != 4 || err != nil || string(b) != "test" {
|
||||
t.Fatalf("read failed: %e", err)
|
||||
}
|
||||
|
||||
m, err := buf.Seek(-3, io.SeekEnd)
|
||||
if err != nil || m != 1 {
|
||||
t.Fatalf("seek failed: %e", err)
|
||||
}
|
||||
|
||||
b = make([]byte, 4)
|
||||
l, err := buf.Read(b)
|
||||
if l != 3 || err != nil || string(b[:3]) != "est" {
|
||||
t.Fatalf("read failed: %e", err)
|
||||
}
|
||||
|
||||
k, err := buf.Seek(0, io.SeekStart)
|
||||
if k != 0 || err != nil {
|
||||
t.Fatalf("seek failed: %e", err)
|
||||
}
|
||||
|
||||
b = make([]byte, 3)
|
||||
j, err := buf.Read(b)
|
||||
if j != 3 || err != nil || string(b) != "tes" {
|
||||
t.Fatalf("read failed: %e", err)
|
||||
}
|
||||
|
||||
b = make([]byte, 1)
|
||||
i, err := buf.Read(b)
|
||||
if i != 1 || err != nil || string(b) != "t" {
|
||||
t.Fatalf("read failed: %e", err)
|
||||
}
|
||||
}
|
||||
|
||||
// can it write
|
||||
func TestRWSSWrite(t *testing.T) {
|
||||
buf := SetupTestingBuffer(t)
|
||||
if buf.Contents() != "test" || buf.Cursor() != 4 {
|
||||
t.Fatalf("write failed: %s", buf.Contents())
|
||||
}
|
||||
|
||||
m, err := buf.Write([]byte("test2"))
|
||||
if m != 5 ||
|
||||
err != nil ||
|
||||
buf.Contents() != "testtest2" ||
|
||||
buf.Cursor() != 9 {
|
||||
t.Fatalf("write failed: %e", err)
|
||||
}
|
||||
|
||||
n, err := buf.Seek(2, io.SeekStart)
|
||||
if n != 2 || err != nil {
|
||||
t.Fatalf("seek failed: %e", err)
|
||||
}
|
||||
|
||||
o, err := buf.Write([]byte("one"))
|
||||
if o != 3 ||
|
||||
err != nil ||
|
||||
buf.Contents() != "teoneest2" ||
|
||||
buf.Cursor() != 5 {
|
||||
t.Fatalf("write failed: %e", err)
|
||||
}
|
||||
|
||||
p, err := buf.Seek(0, io.SeekEnd)
|
||||
if p != 9 || err != nil {
|
||||
t.Fatalf("seek (%d) failed: %e", p, err)
|
||||
}
|
||||
|
||||
q, err := buf.Write([]byte("two"))
|
||||
if q != 3 ||
|
||||
err != nil ||
|
||||
buf.Contents() != "teoneest2two" ||
|
||||
buf.Cursor() != 12 {
|
||||
t.Fatalf("write failed: %e", err)
|
||||
}
|
||||
}
|
||||
|
||||
// if it seeks can it read from new position
|
||||
// if it seeks can it write to new position
|
||||
func TestRWSSSeek(t *testing.T) {
|
||||
buf := SetupTestingBuffer(t)
|
||||
|
||||
if n, err := buf.Seek(0, io.SeekStart);
|
||||
n != 0 ||
|
||||
err != nil {
|
||||
t.Fatalf("seek failed: %e", err)
|
||||
}
|
||||
|
||||
if n, err := buf.Seek(3, io.SeekStart);
|
||||
n != 3 ||
|
||||
err != nil {
|
||||
t.Fatalf("seek failed: %e", err)
|
||||
}
|
||||
|
||||
if n, err := buf.Seek(-1, io.SeekStart);
|
||||
n != 3 ||
|
||||
err == nil ||
|
||||
err.Error() != "seek index (-1) is negative" {
|
||||
t.Fatalf("seek should have failed but didnt: %e", err)
|
||||
}
|
||||
|
||||
|
||||
if n, err := buf.Seek(0, io.SeekCurrent);
|
||||
n != 3 ||
|
||||
err != nil {
|
||||
t.Fatalf("seek failed: %e", err)
|
||||
}
|
||||
|
||||
if n, err := buf.Seek(-2, io.SeekEnd);
|
||||
n != 2 ||
|
||||
err != nil {
|
||||
t.Fatalf("seek failed: %e", err)
|
||||
}
|
||||
|
||||
if n, err := buf.Seek(-1, io.SeekCurrent);
|
||||
n != 1 ||
|
||||
err != nil {
|
||||
t.Fatalf("seek failed: %e", err)
|
||||
}
|
||||
|
||||
if n, err := buf.Seek(-2, io.SeekCurrent);
|
||||
n != 1 ||
|
||||
err == nil ||
|
||||
err.Error() != "seek index (-1) is negative" {
|
||||
t.Fatalf("seek should have failed but didnt: %e", err)
|
||||
}
|
||||
|
||||
if n, err := buf.Seek(-1, io.SeekEnd);
|
||||
n != 3 ||
|
||||
err != nil {
|
||||
t.Fatalf("seek failed: %e", err)
|
||||
}
|
||||
|
||||
if n, err := buf.Seek(-5, io.SeekEnd);
|
||||
n != 3 ||
|
||||
err == nil ||
|
||||
err.Error() != "seek index (-1) is negative" {
|
||||
t.Fatalf("seek should have failed but didnt: %e", err)
|
||||
}
|
||||
}
|
||||
|
|
@ -9,10 +9,13 @@ package state
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"slices"
|
||||
"sync"
|
||||
"time"
|
||||
"os"
|
||||
"errors"
|
||||
"encoding/json"
|
||||
|
||||
"gitlab.com/whom/bingobot/internal/docbuf"
|
||||
"gitlab.com/whom/bingobot/internal/logging"
|
||||
)
|
||||
|
||||
|
|
@ -24,32 +27,66 @@ import (
|
|||
const (
|
||||
BadEventTypeError = "bad event type"
|
||||
EventValidationFailedError = "event failed validation: "
|
||||
BadEventNoUID = "event data has no UID field"
|
||||
BadEventNoCreated = "event data has no created field"
|
||||
BadEventStoreFilename = "failed to open event store file"
|
||||
BadEventStreamInit = "failed to initialize event stream"
|
||||
BadEventCreate = "failed to create event"
|
||||
BadUserEventCreatedParse = "failed to parse created time"
|
||||
BadChallengeEvent = "failed to make Challenge Event"
|
||||
BadRestorationEvent = "failed to make Restoration Event"
|
||||
BadUserActiveEvent = "failed to make UserActive Event"
|
||||
BadEventObjMarshal = "error marshalling event"
|
||||
BadEventObjUnmarshal = "failed to unmarshal event map"
|
||||
BadEventUnmarshal = "failed to unmarshal event"
|
||||
BadEventMissingTypeKey = "event map missing type key"
|
||||
)
|
||||
|
||||
const (
|
||||
EventTypeMapKey = "type"
|
||||
)
|
||||
|
||||
var eventMutex sync.RWMutex
|
||||
var eventSubscriptionCache = [NumEventTypes][]chan Event{}
|
||||
var eventCache = []Event{}
|
||||
var eventStorageFileName string
|
||||
var eventStream docbuf.DocumentBuffer
|
||||
var maxEventsInMemory int
|
||||
|
||||
// TODO: Configurable
|
||||
var eventChannelBufferSize = 256
|
||||
|
||||
// TODO: pruned events go to DISK
|
||||
// ASSUMES eventCache is ordered by age (oldest first)
|
||||
func pruneEventCache() {
|
||||
eventMutex.Lock()
|
||||
defer eventMutex.Unlock()
|
||||
|
||||
oldCacheInvalid := false
|
||||
newCache := []Event{}
|
||||
for _, obj := range eventCache {
|
||||
if time.Since(obj.Time()).Hours() > 24*10 {
|
||||
oldCacheInvalid = true
|
||||
} else if oldCacheInvalid {
|
||||
newCache = append(newCache, obj)
|
||||
}
|
||||
// expect filename validations in config package
|
||||
func Init(eventMemCacheSize int, eventStoreFileName string) error {
|
||||
file, err := os.OpenFile(eventStoreFileName, os.O_CREATE|os.O_RDWR, 0644)
|
||||
if err != nil {
|
||||
return errors.Join(errors.New(BadEventStoreFilename), err)
|
||||
}
|
||||
|
||||
eventCache = newCache
|
||||
if eventStream, err = docbuf.NewDocumentBuffer(
|
||||
eventMemCacheSize,
|
||||
file,
|
||||
); err != nil {
|
||||
return errors.Join(errors.New(BadEventStreamInit), err)
|
||||
}
|
||||
|
||||
eventStorageFileName = eventStoreFileName
|
||||
maxEventsInMemory = eventMemCacheSize
|
||||
return nil
|
||||
}
|
||||
|
||||
// return no error. we are handling SIGINT
|
||||
func Teardown() {
|
||||
// riskily ignore all locking....
|
||||
// we are handling a termination signal after all
|
||||
i, e := eventStream.Close()
|
||||
if e != nil {
|
||||
logging.Warn("failed to close document buffer: %s", e.Error())
|
||||
logging.Warn("will attempt to truncate event store anyways")
|
||||
}
|
||||
|
||||
e = os.Truncate(eventStorageFileName, i)
|
||||
if e != nil {
|
||||
logging.Error("FAILED TO TRUNCATE EVENT STORE!!")
|
||||
logging.Error("You will likely have garbage data at the end of it")
|
||||
logging.Error("Attempt manual correction!")
|
||||
}
|
||||
}
|
||||
|
||||
type EventType int8
|
||||
|
|
@ -65,14 +102,123 @@ const (
|
|||
NumEventTypes
|
||||
)
|
||||
|
||||
// either returns a valid event type or NumEventTypes
|
||||
func EventTypeFromString(doc string) EventType {
|
||||
switch doc {
|
||||
case "Vote":
|
||||
return Vote
|
||||
case "Challenge":
|
||||
return Challenge
|
||||
case "Restoration":
|
||||
return Restoration
|
||||
case "UserActive":
|
||||
return UserActive
|
||||
default:
|
||||
// error case
|
||||
return NumEventTypes
|
||||
}
|
||||
}
|
||||
|
||||
func (et EventType) String() string {
|
||||
events := []string{
|
||||
"Vote",
|
||||
"Challenge",
|
||||
"Restoration",
|
||||
"UserActive",
|
||||
}
|
||||
|
||||
if et < 0 || et >= NumEventTypes {
|
||||
return ""
|
||||
}
|
||||
|
||||
return events[et]
|
||||
}
|
||||
|
||||
func (et EventType) Validate() error {
|
||||
if et < 0 || et >= NumEventTypes {
|
||||
return stringErrorType(BadEventTypeError)
|
||||
return errors.New(BadEventTypeError)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
/* Allocates a specific event of type T from event type instance.
|
||||
* output event not guaranteed to be valid.
|
||||
* Call Validate() yourself.
|
||||
*/
|
||||
func (et EventType) MakeEvent(data map[string]string) (Event, error) {
|
||||
if err := et.Validate(); err != nil {
|
||||
return nil, errors.Join(errors.New(BadEventCreate), err)
|
||||
}
|
||||
|
||||
MakeUserEvent := func(data map[string]string) (*UserEvent, error) {
|
||||
user, hasUser := data[UserEventUserKey]
|
||||
if !hasUser {
|
||||
return nil, errors.New(BadEventNoUID)
|
||||
}
|
||||
|
||||
created, hasCreated := data[UserEventCreatedKey]
|
||||
if !hasCreated {
|
||||
return nil, errors.New(BadEventNoCreated)
|
||||
}
|
||||
|
||||
createdTime, err := time.Parse(time.RFC3339, created)
|
||||
if err != nil {
|
||||
return nil, errors.Join(errors.New(BadUserEventCreatedParse), err)
|
||||
}
|
||||
|
||||
return &UserEvent{
|
||||
uid: user,
|
||||
created: createdTime,
|
||||
}, nil
|
||||
}
|
||||
|
||||
switch et {
|
||||
case Vote:
|
||||
return VoteEvent(data), nil
|
||||
case Challenge:
|
||||
e, err := MakeUserEvent(data)
|
||||
if err != nil {
|
||||
return nil, errors.Join(errors.New(BadChallengeEvent), err)
|
||||
}
|
||||
return ChallengeEvent{*e}, nil
|
||||
case Restoration:
|
||||
e, err := MakeUserEvent(data)
|
||||
if err != nil {
|
||||
return nil, errors.Join(errors.New(BadRestorationEvent), err)
|
||||
}
|
||||
return RestorationEvent{*e}, nil
|
||||
case UserActive:
|
||||
e, err := MakeUserEvent(data)
|
||||
if err != nil {
|
||||
return nil, errors.Join(errors.New(BadUserActiveEvent), err)
|
||||
}
|
||||
return UserActiveEvent{*e}, nil
|
||||
default:
|
||||
return nil, errors.New(BadEventTypeError)
|
||||
}
|
||||
}
|
||||
|
||||
/* adds a new subscriber channel to the event subscription cache
|
||||
* and returns the channel that it will publish notifications on
|
||||
*/
|
||||
func (et EventType) Subscribe() (chan Event, error) {
|
||||
if err := et.Validate(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ch := make(chan Event, maxEventsInMemory)
|
||||
|
||||
eventMutex.Lock()
|
||||
defer eventMutex.Unlock()
|
||||
eventSubscriptionCache[et] = append(
|
||||
eventSubscriptionCache[et],
|
||||
ch,
|
||||
)
|
||||
|
||||
return ch, nil
|
||||
}
|
||||
|
||||
type Event interface {
|
||||
// gets EventType associated with event
|
||||
Type() EventType
|
||||
|
|
@ -88,80 +234,48 @@ type Event interface {
|
|||
Validate() error
|
||||
}
|
||||
|
||||
// TODO: Something better than this
|
||||
type stringErrorType string
|
||||
func EventToString(e Event) (string, error) {
|
||||
m := e.Data()
|
||||
m[EventTypeMapKey] = e.Type().String()
|
||||
buf, err := json.Marshal(m)
|
||||
if err != nil {
|
||||
return "", errors.Join(errors.New(BadEventObjMarshal), err)
|
||||
}
|
||||
|
||||
func (s stringErrorType) Error() string {
|
||||
return string(s)
|
||||
return string(buf), nil
|
||||
}
|
||||
|
||||
/* adds a new subscriber channel to the event subscription cache
|
||||
* and returns the channel that it will publish notifications on
|
||||
*
|
||||
* note: channel is prefilled with at most eventChannelBufferSize
|
||||
* historical events. Truncated if history exceeds event channel
|
||||
* buffer size.
|
||||
*/
|
||||
func (et EventType) SubscribeWithHistory() (chan Event, error) {
|
||||
if err := et.Validate(); err != nil {
|
||||
return nil, err
|
||||
func EventFromString(doc string) (Event, error) {
|
||||
var obj map[string]string
|
||||
if err := json.Unmarshal([]byte(doc), &obj); err != nil {
|
||||
return nil, errors.Join(errors.New(BadEventObjUnmarshal), err)
|
||||
}
|
||||
|
||||
ch := make(chan Event, eventChannelBufferSize)
|
||||
|
||||
eventMutex.Lock()
|
||||
eventSubscriptionCache[et] = append(
|
||||
eventSubscriptionCache[et],
|
||||
ch,
|
||||
)
|
||||
eventMutex.Unlock()
|
||||
|
||||
eventMutex.RLock()
|
||||
defer eventMutex.RUnlock()
|
||||
numEventsAdded := 0
|
||||
for _, ev := range slices.Backward(eventCache) {
|
||||
if numEventsAdded >= eventChannelBufferSize {
|
||||
break
|
||||
}
|
||||
|
||||
if ev.Type() == et {
|
||||
ch <- ev
|
||||
numEventsAdded += 1
|
||||
}
|
||||
et, ok := obj[EventTypeMapKey]
|
||||
if !ok {
|
||||
return nil, errors.New(BadEventMissingTypeKey)
|
||||
}
|
||||
t := EventTypeFromString(et)
|
||||
ev, err := t.MakeEvent(obj)
|
||||
if err != nil {
|
||||
return nil, errors.Join(errors.New(BadEventCreate), err)
|
||||
}
|
||||
|
||||
return ch, nil
|
||||
}
|
||||
|
||||
/* adds a new subscriber channel to the event subscription cache
|
||||
* and returns the channel that it will publish notifications on
|
||||
*/
|
||||
func (et EventType) Subscribe() (chan Event, error) {
|
||||
if err := et.Validate(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ch := make(chan Event, eventChannelBufferSize)
|
||||
|
||||
eventMutex.Lock()
|
||||
defer eventMutex.Unlock()
|
||||
eventSubscriptionCache[et] = append(
|
||||
eventSubscriptionCache[et],
|
||||
ch,
|
||||
)
|
||||
|
||||
return ch, nil
|
||||
return ev, ev.Validate()
|
||||
}
|
||||
|
||||
func PublishEvent(e Event) error {
|
||||
if err := ValidateEvent(e); err != nil {
|
||||
return stringErrorType(
|
||||
EventValidationFailedError + err.Error(),
|
||||
)
|
||||
return errors.Join(errors.New(EventValidationFailedError), err)
|
||||
}
|
||||
|
||||
doc, err := EventToString(e)
|
||||
if err != nil {
|
||||
return errors.New(BadEventUnmarshal)
|
||||
}
|
||||
|
||||
eventMutex.Lock()
|
||||
eventCache = append(eventCache, e)
|
||||
eventStream.Push(doc)
|
||||
eventMutex.Unlock()
|
||||
eventMutex.RLock()
|
||||
defer eventMutex.RUnlock()
|
||||
|
|
@ -169,8 +283,8 @@ func PublishEvent(e Event) error {
|
|||
blocking := false
|
||||
|
||||
for _, c := range eventSubscriptionCache[e.Type()] {
|
||||
if float32(len(c)) > (float32(eventChannelBufferSize) * 0.25) {
|
||||
if len(c) == eventChannelBufferSize {
|
||||
if float32(len(c)) > (float32(maxEventsInMemory) * 0.25) {
|
||||
if len(c) == maxEventsInMemory {
|
||||
logging.Warn(
|
||||
"PublishEvent() blocking -- event channel full",
|
||||
// log the event time to provide blockage timing information
|
||||
|
|
@ -205,6 +319,45 @@ func ValidateEvent(e Event) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
/* Takes a filter and applies it to each individual event
|
||||
* The filter is a function/closure that accepts one event
|
||||
* and returns true if ApplyToEvents should continue iterating
|
||||
*
|
||||
* If the filter returns false then ApplyToEvents halts and returns.
|
||||
*
|
||||
* (this calls docbuf.Apply() under the hood)
|
||||
*/
|
||||
func ApplyToEvents(
|
||||
f func(Event)bool,
|
||||
) error {
|
||||
// local variables enclosed by filter function filterWrap
|
||||
var err error
|
||||
var ev Event
|
||||
// wrap f() to be compatible with docbuf.Apply()
|
||||
filterWrap := func(doc string) bool {
|
||||
ev, err = EventFromString(doc)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
return f(ev)
|
||||
}
|
||||
|
||||
eventMutex.RLock()
|
||||
defer eventMutex.RUnlock()
|
||||
|
||||
// cant reuse err or return val directly as err is set
|
||||
// by filter function if an error happens unmarshalling
|
||||
// an event. In this case apply might return nil
|
||||
err2 := eventStream.Apply(filterWrap)
|
||||
if err2 != nil {
|
||||
return err2
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
/* gets all events of type T in cache
|
||||
* that also share all field values in
|
||||
* map 'filters'
|
||||
|
|
@ -219,25 +372,26 @@ func GetMatchingEvents(
|
|||
return matches, err
|
||||
}
|
||||
|
||||
eventMutex.RLock()
|
||||
defer eventMutex.RUnlock()
|
||||
|
||||
Events:
|
||||
for _, e := range eventCache {
|
||||
filter := func(e Event) bool {
|
||||
ev, er := EventToString(e)
|
||||
fmt.Printf("Checking: %s (%e)\n", ev, er)
|
||||
if e.Type() != t {
|
||||
continue
|
||||
return true
|
||||
}
|
||||
for k, v := range filters {
|
||||
val, found := e.Data()[k]
|
||||
if !found || val != v {
|
||||
continue Events
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
fmt.Println("Found Match")
|
||||
matches = append(matches, e)
|
||||
return true
|
||||
}
|
||||
|
||||
return matches, nil
|
||||
err := ApplyToEvents(filter)
|
||||
return matches, err
|
||||
}
|
||||
|
||||
type VoteEvent map[string]string
|
||||
|
|
@ -303,8 +457,7 @@ func (ve VoteEvent) Validate() error {
|
|||
VoteStatusKey,
|
||||
} {
|
||||
if _, found := ve[key]; !found {
|
||||
return stringErrorType(
|
||||
VoteMissingKeyError + key)
|
||||
return errors.New(VoteMissingKeyError + key)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -312,15 +465,15 @@ func (ve VoteEvent) Validate() error {
|
|||
if status != VoteStatusTimeout &&
|
||||
status != VoteStatusInProgress &&
|
||||
status != VoteStatusFinalized {
|
||||
return stringErrorType(VoteBadStatusError + status)
|
||||
return errors.New(VoteBadStatusError + status)
|
||||
}
|
||||
|
||||
result, hasResult := ve[VoteResultKey]
|
||||
if hasResult && status == VoteStatusInProgress {
|
||||
return stringErrorType(VoteNotFinishedError)
|
||||
return errors.New(VoteNotFinishedError)
|
||||
}
|
||||
if status != VoteStatusInProgress && !hasResult {
|
||||
return stringErrorType(VoteMissingResultError)
|
||||
return errors.New(VoteMissingResultError)
|
||||
}
|
||||
|
||||
if hasResult &&
|
||||
|
|
@ -329,7 +482,7 @@ func (ve VoteEvent) Validate() error {
|
|||
result != VoteResultTie &&
|
||||
result != VoteResultTimeout) {
|
||||
|
||||
return stringErrorType(VoteBadResultError + result)
|
||||
return errors.New(VoteBadResultError + result)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
@ -353,7 +506,7 @@ func (ue UserEvent) Time() time.Time {
|
|||
func (ue UserEvent) Data() map[string]string {
|
||||
return map[string]string{
|
||||
UserEventUserKey: ue.uid,
|
||||
UserEventCreatedKey: ue.created.Local().String(),
|
||||
UserEventCreatedKey: ue.created.Format(time.RFC3339),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"gitlab.com/whom/bingobot/internal/docbuf"
|
||||
"gitlab.com/whom/bingobot/internal/logging"
|
||||
)
|
||||
|
||||
|
|
@ -17,12 +18,71 @@ const TestTok = "TEST_NAME"
|
|||
var loggingInitialized = false
|
||||
|
||||
func SetupTest(t *testing.T) {
|
||||
maxEventsInMemory = 271
|
||||
|
||||
var err error
|
||||
// have to set up logger
|
||||
if !loggingInitialized {
|
||||
logging.Init()
|
||||
loggingInitialized = true
|
||||
}
|
||||
|
||||
b := docbuf.NewReadWriteSeekString()
|
||||
eventStream, err = docbuf.NewDocumentBuffer(300, &b)
|
||||
if err != nil {
|
||||
t.Fatalf("error allocating buffer: %e", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventMarshalUnmarshal(t *testing.T) {
|
||||
tm := time.Now()
|
||||
cev := ChallengeEvent{UserEvent{
|
||||
uid: TestTok,
|
||||
created: tm,
|
||||
}}
|
||||
|
||||
vev := VoteEvent(map[string]string{
|
||||
VoteActionKey: "a",
|
||||
VoteRequesterKey: "r",
|
||||
VoteCreatedKey: "c",
|
||||
VoteStatusKey: VoteStatusFinalized,
|
||||
VoteResultKey: VoteResultFail,
|
||||
})
|
||||
|
||||
cstr, err := EventToString(cev);
|
||||
if err != nil {
|
||||
t.Fatalf("error marshalling challenge: %e, %s", err, cstr)
|
||||
}
|
||||
|
||||
t.Logf("cstr: %s\n", cstr)
|
||||
|
||||
vstr, err := EventToString(vev);
|
||||
if err != nil {
|
||||
t.Fatalf("error marshalling vote: %e, %s", err, vstr)
|
||||
}
|
||||
|
||||
t.Logf("vstr: %s\n", vstr)
|
||||
|
||||
if ev, err := EventFromString(cstr); err != nil ||
|
||||
ev.Data()[UserEventUserKey] != cev.Data()[UserEventUserKey] ||
|
||||
ev.Data()[UserEventCreatedKey] != cev.Data()[UserEventCreatedKey] {
|
||||
t.Fatalf("error unmarshalling challenge: %e, %v!=%v", err, ev, cev)
|
||||
}
|
||||
|
||||
if ev, err := EventFromString(vstr); err != nil ||
|
||||
fmt.Sprint(ev) != fmt.Sprint(vev) {
|
||||
t.Fatalf("error unmarshalling vote: %e, %v!=%v", err, ev, vev)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPubSub(t *testing.T) {
|
||||
SetupTest(t)
|
||||
|
||||
c, e := UserActive.Subscribe()
|
||||
if e != nil {
|
||||
t.Errorf("Error subscribing to UserActive events: %e", e)
|
||||
}
|
||||
|
||||
old, _ := time.Parse(
|
||||
time.RFC3339,
|
||||
VeryOldVote,
|
||||
|
|
@ -47,40 +107,21 @@ func SetupTest(t *testing.T) {
|
|||
created: time.Now(),
|
||||
}})
|
||||
|
||||
if len(eventCache) != 272 {
|
||||
t.Errorf("Unexpected number of events in cache: %d",
|
||||
len(eventCache))
|
||||
}
|
||||
}
|
||||
|
||||
func CleanupTest() {
|
||||
eventSubscriptionCache = [NumEventTypes][]chan Event{}
|
||||
eventCache = []Event{}
|
||||
}
|
||||
|
||||
func TestPubSub(t *testing.T) {
|
||||
SetupTest(t)
|
||||
|
||||
c, e := UserActive.SubscribeWithHistory()
|
||||
if e != nil {
|
||||
t.Errorf("Error subscribing to UserActive events: %e", e)
|
||||
}
|
||||
|
||||
Loop:
|
||||
for i := 0; true; i++ {
|
||||
select {
|
||||
case e, ok := <-c:
|
||||
if !ok {
|
||||
t.Errorf("Subscription Channel Closed")
|
||||
t.Fatalf("Subscription Channel Closed")
|
||||
}
|
||||
if e.Type() != UserActive {
|
||||
t.Errorf("Non UserActive Event in UserActive subscription: %v", e.Type())
|
||||
t.Fatalf("Non UserActive Event in UserActive subscription: %v", e.Type())
|
||||
}
|
||||
default:
|
||||
if i == eventChannelBufferSize {
|
||||
if i == maxEventsInMemory {
|
||||
break Loop
|
||||
} else {
|
||||
t.Errorf("Unexpected number of events in channel: %d", i)
|
||||
t.Fatalf("Unexpected number of events in channel: %d", i)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -93,18 +134,40 @@ Loop:
|
|||
select {
|
||||
case e, ok := <-c:
|
||||
if !ok || e.Data()[UserEventUserKey] != "uniqueToken" {
|
||||
t.Errorf("didnt read correct event from channel: %v", e)
|
||||
t.Fatalf("didnt read correct event from channel: %v", e)
|
||||
}
|
||||
default:
|
||||
t.Errorf("New event not published to subscription!")
|
||||
t.Fatalf("New event not published to subscription!")
|
||||
}
|
||||
|
||||
CleanupTest()
|
||||
}
|
||||
|
||||
func TestFilterCache(t *testing.T) {
|
||||
SetupTest(t)
|
||||
|
||||
old, _ := time.Parse(
|
||||
time.RFC3339,
|
||||
VeryOldVote,
|
||||
)
|
||||
|
||||
for i := range 270 {
|
||||
if err := PublishEvent(UserActiveEvent{UserEvent{
|
||||
uid: fmt.Sprintf("%d", i),
|
||||
created: old,
|
||||
}}); err != nil {
|
||||
t.Errorf("Failed to add event: %e", err)
|
||||
}
|
||||
}
|
||||
|
||||
PublishEvent(UserActiveEvent{UserEvent{
|
||||
uid: fmt.Sprintf(TestTok),
|
||||
created: time.Now(),
|
||||
}})
|
||||
|
||||
PublishEvent(ChallengeEvent{UserEvent{
|
||||
uid: fmt.Sprintf(TestTok),
|
||||
created: time.Now(),
|
||||
}})
|
||||
|
||||
events, err := GetMatchingEvents(
|
||||
UserActive,
|
||||
map[string]string{
|
||||
|
|
@ -123,19 +186,6 @@ func TestFilterCache(t *testing.T) {
|
|||
if events[0].Type() != UserActive {
|
||||
t.Errorf("Got wrong event!: %+v", events[0])
|
||||
}
|
||||
|
||||
CleanupTest()
|
||||
}
|
||||
|
||||
func TestPruneCache(t *testing.T) {
|
||||
SetupTest(t)
|
||||
pruneEventCache()
|
||||
|
||||
if len(eventCache) != 2 {
|
||||
t.Errorf("Incorrect number of remaining events: %d", len(eventCache))
|
||||
}
|
||||
|
||||
CleanupTest()
|
||||
}
|
||||
|
||||
func TestVoteEventValidations(t *testing.T) {
|
||||
|
|
|
|||
18
main.go
18
main.go
|
|
@ -3,12 +3,11 @@ package main
|
|||
import (
|
||||
"flag"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
|
||||
"gitlab.com/whom/bingobot/internal/config"
|
||||
"gitlab.com/whom/bingobot/internal/discord"
|
||||
"gitlab.com/whom/bingobot/internal/logging"
|
||||
"gitlab.com/whom/bingobot/internal/state"
|
||||
)
|
||||
|
||||
var (
|
||||
|
|
@ -16,8 +15,6 @@ var (
|
|||
)
|
||||
|
||||
func main() {
|
||||
flag.Parse()
|
||||
|
||||
var err error
|
||||
|
||||
err = config.Init()
|
||||
|
|
@ -27,6 +24,15 @@ func main() {
|
|||
}
|
||||
|
||||
logging.Init()
|
||||
flag.Parse()
|
||||
|
||||
if err := state.Init(
|
||||
config.Get().InMemoryEventCacheSize,
|
||||
config.Get().PersistentCacheStore,
|
||||
); err != nil {
|
||||
log.Fatalf("couldn't initialize state engine: %s", err.Error())
|
||||
}
|
||||
defer state.Teardown()
|
||||
|
||||
err = startBot()
|
||||
|
||||
|
|
@ -42,10 +48,6 @@ func startBot() error {
|
|||
return err
|
||||
}
|
||||
|
||||
sigch := make(chan os.Signal, 1)
|
||||
signal.Notify(sigch, os.Interrupt)
|
||||
<-sigch
|
||||
|
||||
logging.Info("shutting down gracefully", "type", "shutdown")
|
||||
discord.Close()
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue