diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 93a2f9b..3093957 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -22,3 +22,8 @@ tests-config-pkg: stage: test script: - go test ./internal/config + +tests-docbuf-pkg: + stage: test + script: + - go test ./internal/docbuf diff --git a/internal/config/config.go b/internal/config/config.go index 7830a98..eee917d 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -35,6 +35,12 @@ type AppConfig struct { current recommended value is 1000ms. */ VoiceActivityTimerSleepIntervalMillis int `yaml:"voice_activity_timer_sleep_interval_millis"` + + /* persistent state file store */ + PersistentCacheStore string `yaml:"persistent_cache_store"` + + /* number of internal state events to cache in memory */ + InMemoryEventCacheSize int `yaml:"InMemoryEventCacheSize"` } var config *AppConfig @@ -102,4 +108,6 @@ func setDefaults() { viper.SetDefault("LogAddSource", true) viper.SetDefault("VoiceActivityThresholdSeconds", 600) viper.SetDefault("VoiceActivityTimerSleepIntervalMillis", 1000) + viper.SetDefault("PersistentCacheStore", "/tmp/bingobot") + viper.SetDefault("InMemoryEventCacheSize", 512) } diff --git a/internal/docbuf/docbuf.go b/internal/docbuf/docbuf.go new file mode 100644 index 0000000..cc0610e --- /dev/null +++ b/internal/docbuf/docbuf.go @@ -0,0 +1,459 @@ +package docbuf + +import ( + "encoding/base64" + "errors" + "io" +) + +const ( + DemoteFromEmptyCacheError = "Can't demote from empty cache" + PromoteToFullCacheError = "Can't promote to full cache" + WriteEmptyDocumentError = "Not writing empty document to store" + PartialWriteError = "Couldnt complete document write" + BadReadRequestError = "Can't read less than one documents" + BadReadLengthError = "Unexpected read length" + PopFromEmptyBufferError = "Can't pop from empty buffer" + RemoveOverCapacityError = "Not enough documents in buffer to remove" +) + +type DocumentBuffer interface { + /* Push + * Add a document to the buffer. + * If cache is full oldest document is written to backing writer. + */ + Push(string) error + /* Pop + * removes a document from the cache and pulls a document back + * out of the backing ReaderWriter. + */ + Pop() (string, error) + /* Remove + * removes N documents according to arg. + * backfills cache by pulling documents back out of backing + * ReaderWriter. + */ + Remove(int) ([]string, error) + /* Flush + * empties cache by placing all cached documnent into backing + * ReaderWriter. + */ + Flush() error + /* Peek + * views first document in either cache or backing ReaderWriter + */ + Peek() (string, error) + /* Read + * returns N documents from the cache and backing store without + * removing them from either data structure. + */ + Read(int) ([]string, error) + /* Apply + * Takes a callable/closure/function that accepts a string (document) + * and returns a bool. Apply returns an error, or nil. + * Apply will begin applying this callable to each doc from most recent + * to least recent so long as the callable continues to return true. + * If the callable returns false, Apply ceases iteration and returns. + */ + Apply(func(string)bool) error + /* Close + * flushes and then prepares backing store for closure + */ + Close() (int64, error) + /* Cached + * returns current length of cache + */ + Cached() int +} + +type DocBuf struct { + cache []string + store io.ReadWriteSeeker + cacheSize int + endIndex int64 +} + +// put an element from cache into store +func (b *DocBuf) demote() error { + if len(b.cache) < 1 { + return errors.New(DemoteFromEmptyCacheError) + } + + demoted := b.cache[len(b.cache)-1] + b.cache = b.cache[:len(b.cache)-1] + return b.writeToBackingStore(demoted) +} + +/* bounds here refers to checking the capacity of the internal memory cache + * Remove() includes a use case where a user might want to evict more events + * than the internal memory cache can possibly contain, in which case we use + * promote and turn off bounds checking in order to overpopulate memory cache + */ +func (b *DocBuf) promote(bounds bool) error { + if bounds && len(b.cache) >= b.cacheSize { + return errors.New(PromoteToFullCacheError) + } + + doc, err := b.readDocumentsFromDisk(1, true, false) + if err != nil { + return err + } + + if len(doc) > 0 { + b.cache = append(b.cache, doc[0]) + } + return nil +} + +// encode as base64 so as to avoid needing to escape things +// then documents can be separated by newlines +func (b *DocBuf) writeToBackingStore(doc string) error { + if len(doc) == 0 { + return errors.New(WriteEmptyDocumentError) + } + + str := base64.StdEncoding.EncodeToString([]byte(doc)) + + // seek to end index + c, err := b.store.Seek(int64(b.endIndex), io.SeekStart); + if err != nil { + return err + } + + // separate documents with a newline + if c > 0 { + str = "\n" + str + } + + n, err := b.store.Write([]byte(str)) + if err != nil || n < len(doc) { + if n < len(doc) { + return errors.Join( + errors.New(PartialWriteError), + err) + } + return err + } + + b.endIndex += int64(n) + + return nil +} + +func (b *DocBuf) readDocumentsFromDisk( + count int, + truncate bool, + continues bool, +) ([]string, error) { + docs := []string{} + cursor := int64(0) + var err error + + if count < 1 { + return docs, errors.New(BadReadRequestError) + } + + if continues { + cursor, err = b.store.Seek(0, io.SeekCurrent) + if b.endIndex == 0 || err != nil || cursor == 0 { + return docs, err + } + + } else { + // catch store is empty + cursor, err = b.store.Seek(0, io.SeekEnd) + if b.endIndex == 0 || err != nil || cursor == 0 { + return docs, err + } + + // self repair? + if b.endIndex > cursor { + b.endIndex = cursor + } + cursor = b.endIndex - 1 + } + + for len(docs) < count && cursor >= 0 { + doc := "" + char := make([]byte, 1) + char[0] = 0 + + // read bytes backwards from file + for cursor >= 0 { + // set cursor + if _, err := b.store.Seek(cursor, io.SeekStart); err != nil { + return docs, err + } + + // read one byte + n, err := b.store.Read(char) + if err != nil { + return docs, err + } + if n != 1 { + return docs, errors.New(BadReadLengthError) + } + + // break on newline + if char[0] == 10 { + break + } + + cursor -= 1 + doc = string(char) + doc + } + + /* parse document and add to account (docs) + * each read will stop on the newline so make sure + * we dont try to account for an empty document + */ + if len(doc) > 0 && doc != "\n" { + str, err := base64.StdEncoding.DecodeString(doc) + if err != nil { + return docs, err + } + doc = string(str) + docs = append(docs, doc) + } + + if len(docs) < count { + cursor -= 1 + } + } + + if truncate { + if cursor < 0 { + cursor = 0 + } + b.endIndex = cursor + } + return docs, nil +} + + +/* WARNING: this constructor will promote items + * from storage, resulting in a modified backing + * as well as a prefilled cache. + */ +func NewDocumentBuffer( + cacheSize int, + backingStore io.ReadWriteSeeker, +) (DocumentBuffer, error) { + newBuf := DocBuf { + cache: make([]string, 0, cacheSize), + store: backingStore, + cacheSize: cacheSize, + } + + c, err := backingStore.Seek(0, io.SeekEnd) + newBuf.endIndex = c + if err != nil { + return &newBuf, err + } + + // prefill cache + for range cacheSize { + newBuf.promote(true) + } + return &newBuf, nil +} + +/* Push + * Add a document to the buffer. + * If cache is full oldest document is written to backing writer. + */ +func (b *DocBuf) Push(doc string) error { + if len(b.cache) >= b.cacheSize { + if err := b.demote(); err != nil { + return err + } + } + + tc := b.cache + b.cache = append(make([]string, 0, b.cacheSize), doc) + b.cache = append(b.cache, tc...) + return nil +} + +/* Pop + * removes a document from the cache and pulls a document back + * out of the backing ReaderWriter. + */ +func (b *DocBuf) Pop() (string, error) { + if len(b.cache) < 1 { + if err := b.promote(true); err != nil { + return "", err + } + } + + if len(b.cache) < 1 { + return "", errors.New(PopFromEmptyBufferError) + } + + candidate := b.cache[0] + b.cache = b.cache[1:] + e := b.promote(true) + + return candidate, e +} + +/* Remove + * removes N newest documents in order of newest to oldest. + * backfills cache by pulling documents back out of backing + * ReaderWriter (again, newest first). + */ +func (b *DocBuf) Remove(count int) ([]string, error) { + delta := count - b.cacheSize + if delta > 0 { + for range delta { + if err := b.promote(false); err != nil { + return []string{}, err + } + } + } + + if count > len(b.cache) { + tmp := b.cache + b.cache = make([]string, 0, b.cacheSize) + return tmp, errors.New(RemoveOverCapacityError) + } + + candidates := b.cache[:count] + b.cache = b.cache[count:] + + // refill cache + delta = b.cacheSize - len(b.cache) + if delta > 0 { + for range delta { + if err := b.promote(true); err != nil { + return []string{}, err + } + } + } + + return candidates, nil +} + +/* Flush + * empties cache by placing all cached documnent into backing + * ReaderWriter. + */ +func (b *DocBuf) Flush() error { + for range len(b.cache) { + if err := b.demote(); err != nil { + return err + } + } + + return nil +} + +/* Peek + * views first document in either cache or backing ReaderWriter + */ +func (b *DocBuf) Peek() (string, error) { + if len(b.cache) > 0 { + return b.cache[0], nil + } else { + d, e := b.readDocumentsFromDisk(1, false, false) + if len(d) < 1 { + return "", e + } + return d[0], e + } +} + +/* Read + * returns N documents from the cache and backing store without + * removing them from either data structure. + */ +func (b *DocBuf) Read(count int) ([]string, error) { + delta := count - len(b.cache) + candidates := b.cache[:count - (delta)] + + if delta > 0 { + fromStorage, err := b.readDocumentsFromDisk(delta, false, false) + if err != nil { + return candidates, err + } + + candidates = append(candidates, fromStorage...) + } + + return candidates, nil +} + +/* Apply + * Takes a callable/closure/function that accepts a string (document) + * and returns a bool. Apply returns an error, or nil. + * Apply will begin applying this callable to each doc from most recent + * to least recent so long as the callable continues to return true. + * If the callable returns false, Apply ceases iteration and returns. + */ +func (b *DocBuf) Apply(f func(string)bool) error { + // iterate over internal cache applying function + for _, i := range b.cache { + if !f(i) { + return nil + } + } + + // begin iterating with readDocumentsFromDisk + first := true + flag := true + for flag { + doc, err := b.readDocumentsFromDisk(1, false, !first) + if err != nil { + return err + } + if len(doc) == 0 { + return nil + } else if len(doc) != 1 { + return errors.New("improper read from buffer") + } + + first = false + flag = f(doc[0]) + + if flag { + var c, d int64 + var err error + // since continue is on we need to bring the cursor back one + if c, err = b.store.Seek(0, io.SeekCurrent); err != nil { + return err + // dont seek earlier than 0 + } else if c < 1 { + break + } + if d, err = b.store.Seek(-1, io.SeekCurrent); err != nil { + return err + } + + if c == d { + return errors.New("Seek failure!") + } + } + } + + return nil +} + +/* Close + * flushes and then prepares backing store for closure. + * returns the end index into the underlying stream... + * it is the callers responsibility to truncate. + */ +func (b *DocBuf) Close() (int64, error) { + if err := b.Flush(); err != nil { + return b.endIndex, err + } + + return b.endIndex, nil +} + +/* Cached + * returns current length of cache + */ + func (b *DocBuf) Cached() int { + return len(b.cache) + } diff --git a/internal/docbuf/docbuf_test.go b/internal/docbuf/docbuf_test.go new file mode 100644 index 0000000..48bf03e --- /dev/null +++ b/internal/docbuf/docbuf_test.go @@ -0,0 +1,337 @@ +package docbuf + +import ( + "testing" + "fmt" +) + +func docu(ctr *int) string { + // we add a new line to try to trick the docbuf + // into thinking there is an extra doc here. + // but there isnt one. + s := fmt.Sprintf("%d\n", *ctr) + (*ctr) += 1 + + return s +} + +func TestPushPop(t *testing.T) { + docCtr := 1 + + backingStore := NewReadWriteSeekString() + buf, e := NewDocumentBuffer(3, &backingStore) + if e != nil { + t.Fatalf("error making documentbuffer: %e", e) + } + + // does push add 1 to cache + if err := buf.Push(docu(&docCtr)); err != nil || buf.Cached() != 1 { + t.Fatalf("error pushing: %e", err) + } + + // does push past cache demote first document + if err := buf.Push(docu(&docCtr)); err != nil || buf.Cached() != 2 { + t.Fatalf("error pushing: %e", err) + } + if err := buf.Push(docu(&docCtr)); err != nil || buf.Cached() != 3 { + t.Fatalf("error pushing: %e", err) + } + if err := buf.Push(docu(&docCtr)); err != nil || backingStore.Cursor() == 0 { + t.Fatalf("error pushing: %e", err) + } + + if backingStore.Contents() != "MQo=" { + t.Fatalf("expected oldest doc to be in store, got %s", + backingStore.Contents()) + } + + // does pop promote document from cache + doc, err := buf.Pop() + if err != nil || doc != "4\n" { + t.Fatalf("did not get expected doc from cache: %s (%e)", doc, err) + } + if buf.Cached() != 3 { + t.Fatalf("doc buffer did not promote: %d", buf.Cached()) + } + + // does pop past empty throw the right error + doc, err = buf.Pop() + if err != nil || doc != "3\n" { + t.Fatalf("did not get expected doc from cache: %s (%e)", doc, err) + } + doc, err = buf.Pop() + if err != nil || doc != "2\n" { + t.Fatalf("did not get expected doc from cache: %s (%e)", doc, err) + } + doc, err = buf.Pop() + if err != nil || doc != "1\n" { + t.Logf("bs: %s", backingStore.Contents()) + t.Fatalf("did not get expected doc from cache: %s (%e)", doc, err) + } + doc, err = buf.Pop() + if err == nil || + doc != "" || + err.Error() != "Can't pop from empty buffer" { + t.Fatalf("did not get expected doc from cache: %s", doc) + } +} + +func TestRemove(t *testing.T) { + docCtr := 1 + backingStore := NewReadWriteSeekString() + buf, e := NewDocumentBuffer(3, &backingStore) + if e != nil { + t.Fatalf("error making documentbuffer: %e", e) + } + + // setup test data + if err := buf.Push(docu(&docCtr)); err != nil || buf.Cached() != 1 { + t.Fatalf("error pushing: %e", err) + } + if err := buf.Push(docu(&docCtr)); err != nil || buf.Cached() != 2 { + t.Fatalf("error pushing: %e", err) + } + if err := buf.Push(docu(&docCtr)); err != nil || buf.Cached() != 3 { + t.Fatalf("error pushing: %e", err) + } + if err := buf.Push(docu(&docCtr)); err != nil || + buf.Cached() != 3 || backingStore.Contents() != "MQo=" { + t.Fatalf("error pushing: %e", err) + } + + // tests + docs, err := buf.Remove(2) + if err != nil || + len(docs) != 2 || + docs[0] != "4\n" || + docs[1] != "3\n" || + buf.Cached() != 2 { + t.Fatalf("error removing: %e", err) + } + + for range 5 { + if err := buf.Push(docu(&docCtr)); err != nil { + t.Fatalf("error pushing: %e", err) + } + } + + docs, err = buf.Remove(4) + if err != nil || + len(docs) != 4 || + docs[0] != "9\n" || + docs[1] != "8\n" || + docs[2] != "7\n" || + docs[3] != "6\n" || + buf.Cached() != 3 { + t.Fatalf("error removing: %e", err) + } + + docs, err = buf.Remove(3) + if err != nil || + len(docs) != 3 || + docs[0] != "5\n" || + docs[1] != "2\n" || + docs[2] != "1\n" || + buf.Cached() != 0 { + t.Fatalf("error removing: %e", err) + } +} + +func TestFlush(t *testing.T) { + docCtr := 1 + expectedDoc := "MQo=\nMgo=\nMwo=\nNAo=" + backingStore := NewReadWriteSeekString() + buf, e := NewDocumentBuffer(3, &backingStore) + if e != nil { + t.Fatalf("error making documentbuffer: %e", e) + } + + // setup test data + if err := buf.Push(docu(&docCtr)); err != nil || buf.Cached() != 1 { + t.Fatalf("error pushing: %e", err) + } + if err := buf.Push(docu(&docCtr)); err != nil || buf.Cached() != 2 { + t.Fatalf("error pushing: %e", err) + } + if err := buf.Push(docu(&docCtr)); err != nil || buf.Cached() != 3 { + t.Fatalf("error pushing: %e", err) + } + if err := buf.Push(docu(&docCtr)); err != nil || + buf.Cached() != 3 || backingStore.Contents() == "Nao=\n" { + t.Fatalf("error pushing: %e", err) + } + + // test + if err := buf.Flush(); err != nil { + t.Fatalf("error flushing buffer: %e", err) + } + + if backingStore.Contents() != expectedDoc { + t.Fatalf("did not get expected document: %s", backingStore.Contents()) + } +} + +func TestPeek(t *testing.T) { + docCtr := 1 + backingStore := NewReadWriteSeekString() + buf, e := NewDocumentBuffer(3, &backingStore) + if e != nil { + t.Fatalf("error making documentbuffer: %e", e) + } + + // setup test data + if err := buf.Push(docu(&docCtr)); err != nil || buf.Cached() != 1 { + t.Fatalf("error pushing: %e", err) + } + + // test + if d, e := buf.Peek(); e != nil || + d != "1\n" || + buf.Cached() != 1 { + t.Fatalf("error peeking: %e", e) + } +} + +func TestRead(t *testing.T) { + docCtr := 1 + backingStore := NewReadWriteSeekString() + buf, e := NewDocumentBuffer(3, &backingStore) + if e != nil { + t.Fatalf("error making documentbuffer: %e", e) + } + + // setup test data + if err := buf.Push(docu(&docCtr)); err != nil || buf.Cached() != 1 { + t.Fatalf("error pushing: %e", err) + } + if err := buf.Push(docu(&docCtr)); err != nil || buf.Cached() != 2 { + t.Fatalf("error pushing: %e", err) + } + if err := buf.Push(docu(&docCtr)); err != nil || buf.Cached() != 3 { + t.Fatalf("error pushing: %e", err) + } + if err := buf.Push(docu(&docCtr)); err != nil || + buf.Cached() != 3 || backingStore.Contents() != "MQo=" { + t.Fatalf("error pushing: %e", err) + } + + // test + if docs, err := buf.Read(4); err != nil || + len(docs) != 4 || + docs[0] != "4\n" || + docs[1] != "3\n" || + docs[2] != "2\n" || + docs[3] != "1\n" || + buf.Cached() != 3 || + backingStore.Contents() != "MQo=" { + t.Fatalf("error reading: %e", err) + } +} + +func TestClose(t *testing.T) { + // do pushes and a remove then Close. assure no error + // is the int return where I can truncate? + docCtr := 1 + backingStore := NewReadWriteSeekString() + buf, e := NewDocumentBuffer(3, &backingStore) + if e != nil { + t.Fatalf("error making documentbuffer: %e", e) + } + + // setup test data + if err := buf.Push(docu(&docCtr)); err != nil || buf.Cached() != 1 { + t.Fatalf("error pushing: %e", err) + } + if err := buf.Push(docu(&docCtr)); err != nil || buf.Cached() != 2 { + t.Fatalf("error pushing: %e", err) + } + if err := buf.Push(docu(&docCtr)); err != nil || buf.Cached() != 3 { + t.Fatalf("error pushing: %e", err) + } + if err := buf.Push(docu(&docCtr)); err != nil || + buf.Cached() != 3 || backingStore.Contents() != "MQo=" { + t.Fatalf("error pushing: %e", err) + } + + for range 5 { + if err := buf.Push(docu(&docCtr)); err != nil { + t.Fatalf("error pushing: %e", err) + } + } + + docs, err := buf.Remove(4) + if err != nil || + len(docs) != 4 || + buf.Cached() != 3 { + t.Fatalf("error removing: %e", err) + } + + expectedDoc := "MQo=\nMgo=\nMwo=\nNAo=\nNQo=\nNgo=" + idx, err := buf.Close() + if err != nil || + idx != 24 || + buf.Cached() != 0 || + backingStore.Contents() != expectedDoc { + t.Fatalf("error closing: %e", err) + } +} + +func TestInitialize(t *testing.T) { + backingStore := NewReadWriteSeekString() + backingStore.Write([]byte("MQo=\nMgo=\nMwo=\nNAo=\nNQo=\nNgo=")) + buf, e := NewDocumentBuffer(3, &backingStore) + if e != nil { + t.Fatalf("error making documentbuffer: %e", e) + } + + // test cached + if buf.Cached() != 3 { + t.Fatalf("expected 3 docs in cache") + } + + // read all docs + docs, err := buf.Read(6) + if err != nil || + len(docs) != 6 || + docs[0] != "6\n" || + docs[1] != "5\n" || + docs[2] != "4\n" || + docs[3] != "3\n" || + docs[4] != "2\n" || + docs[5] != "1\n" { + t.Fatalf("error reading: %e", err) + } +} + +func TestApply(t *testing.T) { + backingStore := NewReadWriteSeekString() + backingStore.Write([]byte("MQo=\nMgo=\nMwo=\nNAo=\nNQo=\nNgo=")) + buf, e := NewDocumentBuffer(3, &backingStore) + if e != nil { + t.Fatalf("error making documentbuffer: %e", e) + } + + // test cached + if buf.Cached() != 3 { + t.Fatalf("expected 3 docs in cache") + } + + count := 0 + if err := buf.Apply(func(doc string) bool { + count += 1 + return true + }); err != nil || count != 6 { + t.Fatalf("error applying: %e", err) + } + + count = 0 + if err := buf.Apply(func(doc string) bool { + if doc == "2\n" { + return false + } + count += 1 + return true + }); err != nil || count != 4 { + t.Fatalf("error applying: %e", err) + } +} diff --git a/internal/docbuf/read_write_seek_string.go b/internal/docbuf/read_write_seek_string.go new file mode 100644 index 0000000..b52284e --- /dev/null +++ b/internal/docbuf/read_write_seek_string.go @@ -0,0 +1,100 @@ +package docbuf + +import ( + "errors" + "io" + "fmt" +) + +/* WARNING: + * This code is meant to assist with testing and mock ups + * It is not only not designed to any rigorous standards, + * but additionally does not offer any benefit over a static + * in memory single layer cache. + */ + +type ReadWriteSeekString struct { + inner string + cursor int +} + +func NewReadWriteSeekString() ReadWriteSeekString{ + return ReadWriteSeekString{ + inner: "", + cursor: 0, + } +} + +func (s *ReadWriteSeekString) Read( + buf []byte, +) (int, error) { + i := 0 + for ; i < len(buf); i++ { + if len(s.inner) <= s.cursor { + return i, nil + } + buf[i] = s.inner[s.cursor] + s.cursor += 1 + } + return i, nil +} + +func (s *ReadWriteSeekString) Write( + buf []byte, +) (int, error) { + backfillDelta := s.cursor - (len(s.inner) - 1) + if backfillDelta > 0 { + for range backfillDelta { + s.inner += "\x00" + } + } + + tmpBuf := "" + if s.cursor > 0 { + tmpBuf += s.inner[:s.cursor] + } + tmpBuf += string(buf) + if s.cursor + len(buf) < (len(s.inner) - 1) { + tmpBuf += s.inner[s.cursor + len(buf):] + } + + s.inner = tmpBuf + s.cursor += len(buf) + return len(buf), nil +} + +func (s *ReadWriteSeekString) Seek( + offset int64, + whence int, +) (int64, error) { + var tmpCur int64 + tmpCur = 0 + + switch whence { + case io.SeekCurrent: + tmpCur = int64(s.cursor) + case io.SeekEnd: + tmpCur = int64(len(s.inner)) + case io.SeekStart: + tmpCur = int64(0) + default: + return int64(s.cursor), + errors.New("invalid whence value") + } + + tmpCur += offset + if tmpCur < 0 { + msg := fmt.Sprintf("seek index (%d) is negative", tmpCur) + return int64(s.cursor), errors.New(msg) + } + s.cursor = int(tmpCur) + return tmpCur, nil +} + +func (s *ReadWriteSeekString) Contents() string { + return s.inner +} + +func (s *ReadWriteSeekString) Cursor() int { + return s.cursor +} diff --git a/internal/docbuf/read_write_seek_string_test.go b/internal/docbuf/read_write_seek_string_test.go new file mode 100644 index 0000000..37d5842 --- /dev/null +++ b/internal/docbuf/read_write_seek_string_test.go @@ -0,0 +1,165 @@ +package docbuf + +import ( + "io" + "testing" +) + +func SetupTestingBuffer(t *testing.T) ReadWriteSeekString{ + str := NewReadWriteSeekString() + n, e := str.Write([]byte("test")) + if n != 4 || e != nil { + t.Fatalf("Failed to write to buffer: %e", e) + } + + return str +} + +// can it read +func TestRWSSRead(t *testing.T) { + b := make([]byte, 4) + buf := SetupTestingBuffer(t) + c, err := buf.Seek(0, io.SeekStart) + if err != nil || c != 0 { + t.Fatalf("seek failed: %e", err) + } + + n, err := buf.Read(b) + if n != 4 || err != nil || string(b) != "test" { + t.Fatalf("read failed: %e", err) + } + + m, err := buf.Seek(-3, io.SeekEnd) + if err != nil || m != 1 { + t.Fatalf("seek failed: %e", err) + } + + b = make([]byte, 4) + l, err := buf.Read(b) + if l != 3 || err != nil || string(b[:3]) != "est" { + t.Fatalf("read failed: %e", err) + } + + k, err := buf.Seek(0, io.SeekStart) + if k != 0 || err != nil { + t.Fatalf("seek failed: %e", err) + } + + b = make([]byte, 3) + j, err := buf.Read(b) + if j != 3 || err != nil || string(b) != "tes" { + t.Fatalf("read failed: %e", err) + } + + b = make([]byte, 1) + i, err := buf.Read(b) + if i != 1 || err != nil || string(b) != "t" { + t.Fatalf("read failed: %e", err) + } +} + +// can it write +func TestRWSSWrite(t *testing.T) { + buf := SetupTestingBuffer(t) + if buf.Contents() != "test" || buf.Cursor() != 4 { + t.Fatalf("write failed: %s", buf.Contents()) + } + + m, err := buf.Write([]byte("test2")) + if m != 5 || + err != nil || + buf.Contents() != "testtest2" || + buf.Cursor() != 9 { + t.Fatalf("write failed: %e", err) + } + + n, err := buf.Seek(2, io.SeekStart) + if n != 2 || err != nil { + t.Fatalf("seek failed: %e", err) + } + + o, err := buf.Write([]byte("one")) + if o != 3 || + err != nil || + buf.Contents() != "teoneest2" || + buf.Cursor() != 5 { + t.Fatalf("write failed: %e", err) + } + + p, err := buf.Seek(0, io.SeekEnd) + if p != 9 || err != nil { + t.Fatalf("seek (%d) failed: %e", p, err) + } + + q, err := buf.Write([]byte("two")) + if q != 3 || + err != nil || + buf.Contents() != "teoneest2two" || + buf.Cursor() != 12 { + t.Fatalf("write failed: %e", err) + } +} + +// if it seeks can it read from new position +// if it seeks can it write to new position +func TestRWSSSeek(t *testing.T) { + buf := SetupTestingBuffer(t) + + if n, err := buf.Seek(0, io.SeekStart); + n != 0 || + err != nil { + t.Fatalf("seek failed: %e", err) + } + + if n, err := buf.Seek(3, io.SeekStart); + n != 3 || + err != nil { + t.Fatalf("seek failed: %e", err) + } + + if n, err := buf.Seek(-1, io.SeekStart); + n != 3 || + err == nil || + err.Error() != "seek index (-1) is negative" { + t.Fatalf("seek should have failed but didnt: %e", err) + } + + + if n, err := buf.Seek(0, io.SeekCurrent); + n != 3 || + err != nil { + t.Fatalf("seek failed: %e", err) + } + + if n, err := buf.Seek(-2, io.SeekEnd); + n != 2 || + err != nil { + t.Fatalf("seek failed: %e", err) + } + + if n, err := buf.Seek(-1, io.SeekCurrent); + n != 1 || + err != nil { + t.Fatalf("seek failed: %e", err) + } + + if n, err := buf.Seek(-2, io.SeekCurrent); + n != 1 || + err == nil || + err.Error() != "seek index (-1) is negative" { + t.Fatalf("seek should have failed but didnt: %e", err) + } + + if n, err := buf.Seek(-1, io.SeekEnd); + n != 3 || + err != nil { + t.Fatalf("seek failed: %e", err) + } + + if n, err := buf.Seek(-5, io.SeekEnd); + n != 3 || + err == nil || + err.Error() != "seek index (-1) is negative" { + t.Fatalf("seek should have failed but didnt: %e", err) + } +} diff --git a/internal/state/state.go b/internal/state/state.go index caba329..1325546 100644 --- a/internal/state/state.go +++ b/internal/state/state.go @@ -9,10 +9,13 @@ package state import ( "fmt" - "slices" "sync" "time" + "os" + "errors" + "encoding/json" + "gitlab.com/whom/bingobot/internal/docbuf" "gitlab.com/whom/bingobot/internal/logging" ) @@ -24,32 +27,66 @@ import ( const ( BadEventTypeError = "bad event type" EventValidationFailedError = "event failed validation: " + BadEventNoUID = "event data has no UID field" + BadEventNoCreated = "event data has no created field" + BadEventStoreFilename = "failed to open event store file" + BadEventStreamInit = "failed to initialize event stream" + BadEventCreate = "failed to create event" + BadUserEventCreatedParse = "failed to parse created time" + BadChallengeEvent = "failed to make Challenge Event" + BadRestorationEvent = "failed to make Restoration Event" + BadUserActiveEvent = "failed to make UserActive Event" + BadEventObjMarshal = "error marshalling event" + BadEventObjUnmarshal = "failed to unmarshal event map" + BadEventUnmarshal = "failed to unmarshal event" + BadEventMissingTypeKey = "event map missing type key" +) + +const ( + EventTypeMapKey = "type" ) var eventMutex sync.RWMutex var eventSubscriptionCache = [NumEventTypes][]chan Event{} -var eventCache = []Event{} +var eventStorageFileName string +var eventStream docbuf.DocumentBuffer +var maxEventsInMemory int -// TODO: Configurable -var eventChannelBufferSize = 256 - -// TODO: pruned events go to DISK -// ASSUMES eventCache is ordered by age (oldest first) -func pruneEventCache() { - eventMutex.Lock() - defer eventMutex.Unlock() - - oldCacheInvalid := false - newCache := []Event{} - for _, obj := range eventCache { - if time.Since(obj.Time()).Hours() > 24*10 { - oldCacheInvalid = true - } else if oldCacheInvalid { - newCache = append(newCache, obj) - } +// expect filename validations in config package +func Init(eventMemCacheSize int, eventStoreFileName string) error { + file, err := os.OpenFile(eventStoreFileName, os.O_CREATE|os.O_RDWR, 0644) + if err != nil { + return errors.Join(errors.New(BadEventStoreFilename), err) } - eventCache = newCache + if eventStream, err = docbuf.NewDocumentBuffer( + eventMemCacheSize, + file, + ); err != nil { + return errors.Join(errors.New(BadEventStreamInit), err) + } + + eventStorageFileName = eventStoreFileName + maxEventsInMemory = eventMemCacheSize + return nil +} + +// return no error. we are handling SIGINT +func Teardown() { + // riskily ignore all locking.... + // we are handling a termination signal after all + i, e := eventStream.Close() + if e != nil { + logging.Warn("failed to close document buffer: %s", e.Error()) + logging.Warn("will attempt to truncate event store anyways") + } + + e = os.Truncate(eventStorageFileName, i) + if e != nil { + logging.Error("FAILED TO TRUNCATE EVENT STORE!!") + logging.Error("You will likely have garbage data at the end of it") + logging.Error("Attempt manual correction!") + } } type EventType int8 @@ -65,14 +102,123 @@ const ( NumEventTypes ) +// either returns a valid event type or NumEventTypes +func EventTypeFromString(doc string) EventType { + switch doc { + case "Vote": + return Vote + case "Challenge": + return Challenge + case "Restoration": + return Restoration + case "UserActive": + return UserActive + default: + // error case + return NumEventTypes + } +} + +func (et EventType) String() string { + events := []string{ + "Vote", + "Challenge", + "Restoration", + "UserActive", + } + + if et < 0 || et >= NumEventTypes { + return "" + } + + return events[et] +} + func (et EventType) Validate() error { if et < 0 || et >= NumEventTypes { - return stringErrorType(BadEventTypeError) + return errors.New(BadEventTypeError) } return nil } +/* Allocates a specific event of type T from event type instance. + * output event not guaranteed to be valid. + * Call Validate() yourself. + */ +func (et EventType) MakeEvent(data map[string]string) (Event, error) { + if err := et.Validate(); err != nil { + return nil, errors.Join(errors.New(BadEventCreate), err) + } + + MakeUserEvent := func(data map[string]string) (*UserEvent, error) { + user, hasUser := data[UserEventUserKey] + if !hasUser { + return nil, errors.New(BadEventNoUID) + } + + created, hasCreated := data[UserEventCreatedKey] + if !hasCreated { + return nil, errors.New(BadEventNoCreated) + } + + createdTime, err := time.Parse(time.RFC3339, created) + if err != nil { + return nil, errors.Join(errors.New(BadUserEventCreatedParse), err) + } + + return &UserEvent{ + uid: user, + created: createdTime, + }, nil + } + + switch et { + case Vote: + return VoteEvent(data), nil + case Challenge: + e, err := MakeUserEvent(data) + if err != nil { + return nil, errors.Join(errors.New(BadChallengeEvent), err) + } + return ChallengeEvent{*e}, nil + case Restoration: + e, err := MakeUserEvent(data) + if err != nil { + return nil, errors.Join(errors.New(BadRestorationEvent), err) + } + return RestorationEvent{*e}, nil + case UserActive: + e, err := MakeUserEvent(data) + if err != nil { + return nil, errors.Join(errors.New(BadUserActiveEvent), err) + } + return UserActiveEvent{*e}, nil + default: + return nil, errors.New(BadEventTypeError) + } +} + +/* adds a new subscriber channel to the event subscription cache + * and returns the channel that it will publish notifications on + */ +func (et EventType) Subscribe() (chan Event, error) { + if err := et.Validate(); err != nil { + return nil, err + } + + ch := make(chan Event, maxEventsInMemory) + + eventMutex.Lock() + defer eventMutex.Unlock() + eventSubscriptionCache[et] = append( + eventSubscriptionCache[et], + ch, + ) + + return ch, nil +} + type Event interface { // gets EventType associated with event Type() EventType @@ -88,80 +234,48 @@ type Event interface { Validate() error } -// TODO: Something better than this -type stringErrorType string +func EventToString(e Event) (string, error) { + m := e.Data() + m[EventTypeMapKey] = e.Type().String() + buf, err := json.Marshal(m) + if err != nil { + return "", errors.Join(errors.New(BadEventObjMarshal), err) + } -func (s stringErrorType) Error() string { - return string(s) + return string(buf), nil } -/* adds a new subscriber channel to the event subscription cache - * and returns the channel that it will publish notifications on - * - * note: channel is prefilled with at most eventChannelBufferSize - * historical events. Truncated if history exceeds event channel - * buffer size. - */ -func (et EventType) SubscribeWithHistory() (chan Event, error) { - if err := et.Validate(); err != nil { - return nil, err +func EventFromString(doc string) (Event, error) { + var obj map[string]string + if err := json.Unmarshal([]byte(doc), &obj); err != nil { + return nil, errors.Join(errors.New(BadEventObjUnmarshal), err) } - ch := make(chan Event, eventChannelBufferSize) - - eventMutex.Lock() - eventSubscriptionCache[et] = append( - eventSubscriptionCache[et], - ch, - ) - eventMutex.Unlock() - - eventMutex.RLock() - defer eventMutex.RUnlock() - numEventsAdded := 0 - for _, ev := range slices.Backward(eventCache) { - if numEventsAdded >= eventChannelBufferSize { - break - } - - if ev.Type() == et { - ch <- ev - numEventsAdded += 1 - } + et, ok := obj[EventTypeMapKey] + if !ok { + return nil, errors.New(BadEventMissingTypeKey) + } + t := EventTypeFromString(et) + ev, err := t.MakeEvent(obj) + if err != nil { + return nil, errors.Join(errors.New(BadEventCreate), err) } - return ch, nil -} - -/* adds a new subscriber channel to the event subscription cache - * and returns the channel that it will publish notifications on - */ -func (et EventType) Subscribe() (chan Event, error) { - if err := et.Validate(); err != nil { - return nil, err - } - - ch := make(chan Event, eventChannelBufferSize) - - eventMutex.Lock() - defer eventMutex.Unlock() - eventSubscriptionCache[et] = append( - eventSubscriptionCache[et], - ch, - ) - - return ch, nil + return ev, ev.Validate() } func PublishEvent(e Event) error { if err := ValidateEvent(e); err != nil { - return stringErrorType( - EventValidationFailedError + err.Error(), - ) + return errors.Join(errors.New(EventValidationFailedError), err) + } + + doc, err := EventToString(e) + if err != nil { + return errors.New(BadEventUnmarshal) } eventMutex.Lock() - eventCache = append(eventCache, e) + eventStream.Push(doc) eventMutex.Unlock() eventMutex.RLock() defer eventMutex.RUnlock() @@ -169,8 +283,8 @@ func PublishEvent(e Event) error { blocking := false for _, c := range eventSubscriptionCache[e.Type()] { - if float32(len(c)) > (float32(eventChannelBufferSize) * 0.25) { - if len(c) == eventChannelBufferSize { + if float32(len(c)) > (float32(maxEventsInMemory) * 0.25) { + if len(c) == maxEventsInMemory { logging.Warn( "PublishEvent() blocking -- event channel full", // log the event time to provide blockage timing information @@ -205,6 +319,45 @@ func ValidateEvent(e Event) error { return nil } +/* Takes a filter and applies it to each individual event + * The filter is a function/closure that accepts one event + * and returns true if ApplyToEvents should continue iterating + * + * If the filter returns false then ApplyToEvents halts and returns. + * + * (this calls docbuf.Apply() under the hood) + */ +func ApplyToEvents( + f func(Event)bool, +) error { + // local variables enclosed by filter function filterWrap + var err error + var ev Event + // wrap f() to be compatible with docbuf.Apply() + filterWrap := func(doc string) bool { + ev, err = EventFromString(doc) + if err != nil { + return false + } + return f(ev) + } + + eventMutex.RLock() + defer eventMutex.RUnlock() + + // cant reuse err or return val directly as err is set + // by filter function if an error happens unmarshalling + // an event. In this case apply might return nil + err2 := eventStream.Apply(filterWrap) + if err2 != nil { + return err2 + } + if err != nil { + return err + } + return nil +} + /* gets all events of type T in cache * that also share all field values in * map 'filters' @@ -219,25 +372,26 @@ func GetMatchingEvents( return matches, err } - eventMutex.RLock() - defer eventMutex.RUnlock() - -Events: - for _, e := range eventCache { + filter := func(e Event) bool { + ev, er := EventToString(e) + fmt.Printf("Checking: %s (%e)\n", ev, er) if e.Type() != t { - continue + return true } for k, v := range filters { val, found := e.Data()[k] if !found || val != v { - continue Events + return true } } + fmt.Println("Found Match") matches = append(matches, e) + return true } - return matches, nil + err := ApplyToEvents(filter) + return matches, err } type VoteEvent map[string]string @@ -303,8 +457,7 @@ func (ve VoteEvent) Validate() error { VoteStatusKey, } { if _, found := ve[key]; !found { - return stringErrorType( - VoteMissingKeyError + key) + return errors.New(VoteMissingKeyError + key) } } @@ -312,15 +465,15 @@ func (ve VoteEvent) Validate() error { if status != VoteStatusTimeout && status != VoteStatusInProgress && status != VoteStatusFinalized { - return stringErrorType(VoteBadStatusError + status) + return errors.New(VoteBadStatusError + status) } result, hasResult := ve[VoteResultKey] if hasResult && status == VoteStatusInProgress { - return stringErrorType(VoteNotFinishedError) + return errors.New(VoteNotFinishedError) } if status != VoteStatusInProgress && !hasResult { - return stringErrorType(VoteMissingResultError) + return errors.New(VoteMissingResultError) } if hasResult && @@ -329,7 +482,7 @@ func (ve VoteEvent) Validate() error { result != VoteResultTie && result != VoteResultTimeout) { - return stringErrorType(VoteBadResultError + result) + return errors.New(VoteBadResultError + result) } return nil @@ -353,7 +506,7 @@ func (ue UserEvent) Time() time.Time { func (ue UserEvent) Data() map[string]string { return map[string]string{ UserEventUserKey: ue.uid, - UserEventCreatedKey: ue.created.Local().String(), + UserEventCreatedKey: ue.created.Format(time.RFC3339), } } diff --git a/internal/state/state_test.go b/internal/state/state_test.go index e775e74..e6d4073 100644 --- a/internal/state/state_test.go +++ b/internal/state/state_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "gitlab.com/whom/bingobot/internal/docbuf" "gitlab.com/whom/bingobot/internal/logging" ) @@ -17,12 +18,71 @@ const TestTok = "TEST_NAME" var loggingInitialized = false func SetupTest(t *testing.T) { + maxEventsInMemory = 271 + + var err error // have to set up logger if !loggingInitialized { logging.Init() loggingInitialized = true } + b := docbuf.NewReadWriteSeekString() + eventStream, err = docbuf.NewDocumentBuffer(300, &b) + if err != nil { + t.Fatalf("error allocating buffer: %e", err) + } +} + +func TestEventMarshalUnmarshal(t *testing.T) { + tm := time.Now() + cev := ChallengeEvent{UserEvent{ + uid: TestTok, + created: tm, + }} + + vev := VoteEvent(map[string]string{ + VoteActionKey: "a", + VoteRequesterKey: "r", + VoteCreatedKey: "c", + VoteStatusKey: VoteStatusFinalized, + VoteResultKey: VoteResultFail, + }) + + cstr, err := EventToString(cev); + if err != nil { + t.Fatalf("error marshalling challenge: %e, %s", err, cstr) + } + + t.Logf("cstr: %s\n", cstr) + + vstr, err := EventToString(vev); + if err != nil { + t.Fatalf("error marshalling vote: %e, %s", err, vstr) + } + + t.Logf("vstr: %s\n", vstr) + + if ev, err := EventFromString(cstr); err != nil || + ev.Data()[UserEventUserKey] != cev.Data()[UserEventUserKey] || + ev.Data()[UserEventCreatedKey] != cev.Data()[UserEventCreatedKey] { + t.Fatalf("error unmarshalling challenge: %e, %v!=%v", err, ev, cev) + } + + if ev, err := EventFromString(vstr); err != nil || + fmt.Sprint(ev) != fmt.Sprint(vev) { + t.Fatalf("error unmarshalling vote: %e, %v!=%v", err, ev, vev) + } +} + +func TestPubSub(t *testing.T) { + SetupTest(t) + + c, e := UserActive.Subscribe() + if e != nil { + t.Errorf("Error subscribing to UserActive events: %e", e) + } + old, _ := time.Parse( time.RFC3339, VeryOldVote, @@ -47,40 +107,21 @@ func SetupTest(t *testing.T) { created: time.Now(), }}) - if len(eventCache) != 272 { - t.Errorf("Unexpected number of events in cache: %d", - len(eventCache)) - } -} - -func CleanupTest() { - eventSubscriptionCache = [NumEventTypes][]chan Event{} - eventCache = []Event{} -} - -func TestPubSub(t *testing.T) { - SetupTest(t) - - c, e := UserActive.SubscribeWithHistory() - if e != nil { - t.Errorf("Error subscribing to UserActive events: %e", e) - } - Loop: for i := 0; true; i++ { select { case e, ok := <-c: if !ok { - t.Errorf("Subscription Channel Closed") + t.Fatalf("Subscription Channel Closed") } if e.Type() != UserActive { - t.Errorf("Non UserActive Event in UserActive subscription: %v", e.Type()) + t.Fatalf("Non UserActive Event in UserActive subscription: %v", e.Type()) } default: - if i == eventChannelBufferSize { + if i == maxEventsInMemory { break Loop } else { - t.Errorf("Unexpected number of events in channel: %d", i) + t.Fatalf("Unexpected number of events in channel: %d", i) } } } @@ -93,18 +134,40 @@ Loop: select { case e, ok := <-c: if !ok || e.Data()[UserEventUserKey] != "uniqueToken" { - t.Errorf("didnt read correct event from channel: %v", e) + t.Fatalf("didnt read correct event from channel: %v", e) } default: - t.Errorf("New event not published to subscription!") + t.Fatalf("New event not published to subscription!") } - - CleanupTest() } func TestFilterCache(t *testing.T) { SetupTest(t) + old, _ := time.Parse( + time.RFC3339, + VeryOldVote, + ) + + for i := range 270 { + if err := PublishEvent(UserActiveEvent{UserEvent{ + uid: fmt.Sprintf("%d", i), + created: old, + }}); err != nil { + t.Errorf("Failed to add event: %e", err) + } + } + + PublishEvent(UserActiveEvent{UserEvent{ + uid: fmt.Sprintf(TestTok), + created: time.Now(), + }}) + + PublishEvent(ChallengeEvent{UserEvent{ + uid: fmt.Sprintf(TestTok), + created: time.Now(), + }}) + events, err := GetMatchingEvents( UserActive, map[string]string{ @@ -123,19 +186,6 @@ func TestFilterCache(t *testing.T) { if events[0].Type() != UserActive { t.Errorf("Got wrong event!: %+v", events[0]) } - - CleanupTest() -} - -func TestPruneCache(t *testing.T) { - SetupTest(t) - pruneEventCache() - - if len(eventCache) != 2 { - t.Errorf("Incorrect number of remaining events: %d", len(eventCache)) - } - - CleanupTest() } func TestVoteEventValidations(t *testing.T) { diff --git a/main.go b/main.go index 1c072cb..21111e9 100644 --- a/main.go +++ b/main.go @@ -3,12 +3,11 @@ package main import ( "flag" "log" - "os" - "os/signal" "gitlab.com/whom/bingobot/internal/config" "gitlab.com/whom/bingobot/internal/discord" "gitlab.com/whom/bingobot/internal/logging" + "gitlab.com/whom/bingobot/internal/state" ) var ( @@ -16,8 +15,6 @@ var ( ) func main() { - flag.Parse() - var err error err = config.Init() @@ -27,6 +24,15 @@ func main() { } logging.Init() + flag.Parse() + + if err := state.Init( + config.Get().InMemoryEventCacheSize, + config.Get().PersistentCacheStore, + ); err != nil { + log.Fatalf("couldn't initialize state engine: %s", err.Error()) + } + defer state.Teardown() err = startBot() @@ -42,10 +48,6 @@ func startBot() error { return err } - sigch := make(chan os.Signal, 1) - signal.Notify(sigch, os.Interrupt) - <-sigch - logging.Info("shutting down gracefully", "type", "shutdown") discord.Close()