From 1ef8ff042fb8864234a5bedd1a41897eb8f8e6b1 Mon Sep 17 00:00:00 2001 From: Ava Affine Date: Mon, 25 Nov 2024 18:07:26 -0800 Subject: [PATCH 1/5] implement DocumentBuffer for persistence of runtime data In order to provide persistence of runtime state across the application the documentbuffer provides a simple cache interface that balances cached internal state events between an in memory cache and an on disk storage. From a feature development perspective the DocumentBuffer provides a simple cache interface: - Push() and Pop() individual items - Remove() and Read() bulk items - Peek() at the most recent item - Flush() items in memory to the disk as well as some control features: - Close(), which calls flush and then returns index of the last byte of useful data in the backing store. - Cached(), which returns the number of items cached in memory. Underneath the hood, documentbuffer balances the cache (memory) and store (disk) by "promoting" the most recent documents in store to cache and by "demoting" the least recent documents in cache to store. Thus, the cache is always ordered by most recent, and so is the store. Documentbuffer takes any implementation of readwriteseeker as an interface for a backing store. Theoretically this means that documentbuffer can leverage more than just a standard os.File. Possible implementations could include transactions over the network or device drivers for long term cold storage devices. In fact, documentbuffer comes with an in memory test implementation of the readwriteseeker interface called ReadWriteSeekString. This emulates the functions of Read(), Write(), and Seek() to operate on an internal string buffer. This facility is only provided for testing and mock up purposes. One note about Close(): Since the documentbuffer has no way of truncating the underlying store an edge case can present itself that necessitates Close() and specifically Close()'s return type. If the documentbuffer has Remove()ed or promote()ed more bytes of data from store than it will subsequently Flush() or demote() to disk one or more bytes of junk data may be left over from the overwriting of the previously Remove()/promote()ed data. In this case, or more specifically in all cases Close() wil return the last usable index of the underlying store. It is up to the caller to then truncate it. Regretably there is no reasonable truncate interface that applies polymorphicly to any underlying data stream. Thus the quirk around Close() remains a design challenge. This commit provides comprehensive unit tests for both DocumentBuffer and ReadWriteSeekString. Not implemented in this commit is the whole design for runtime data persistence. It is intended that internal modules for bingobot that provide functionality directly to the user leverage the event pub/sub system as their sole authoritative source for state management. As long as these modules provide the same output for the same input sequence of events consistently than all parts of the application will recover from error status or crashes by re-ingesting the on disk storage of events provided by the documentbuffer. This way the application will always come back up with minimal data loss and potentially the exact same state as before it went down. Signed-off-by: Ava Affine --- .gitlab-ci.yml | 5 + internal/docbuf/docbuf.go | 387 ++++++++++++++++++ internal/docbuf/docbuf_test.go | 304 ++++++++++++++ internal/docbuf/read_write_seek_string.go | 100 +++++ .../docbuf/read_write_seek_string_test.go | 165 ++++++++ 5 files changed, 961 insertions(+) create mode 100644 internal/docbuf/docbuf.go create mode 100644 internal/docbuf/docbuf_test.go create mode 100644 internal/docbuf/read_write_seek_string.go create mode 100644 internal/docbuf/read_write_seek_string_test.go diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 93a2f9b..3093957 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -22,3 +22,8 @@ tests-config-pkg: stage: test script: - go test ./internal/config + +tests-docbuf-pkg: + stage: test + script: + - go test ./internal/docbuf diff --git a/internal/docbuf/docbuf.go b/internal/docbuf/docbuf.go new file mode 100644 index 0000000..89b8187 --- /dev/null +++ b/internal/docbuf/docbuf.go @@ -0,0 +1,387 @@ +package docbuf + +import ( + "encoding/base64" + "errors" + "io" +) + +const ( + DemoteFromEmptyCacheError = "Can't demote from empty cache" + PromoteToFullCacheError = "Can't promote to full cache" + WriteEmptyDocumentError = "Not writing empty document to store" + PartialWriteError = "Couldnt complete document write" + BadReadRequestError = "Can't read less than one documents" + BadReadLengthError = "Unexpected read length" + PopFromEmptyBufferError = "Can't pop from empty buffer" + RemoveOverCapacityError = "Not enough documents in buffer to remove" +) + +type DocumentBuffer interface { + /* Push + * Add a document to the buffer. + * If cache is full oldest document is written to backing writer. + */ + Push(string) error + /* Pop + * removes a document from the cache and pulls a document back + * out of the backing ReaderWriter. + */ + Pop() (string, error) + /* Remove + * removes N documents according to arg. + * backfills cache by pulling documents back out of backing + * ReaderWriter. + */ + Remove(int) ([]string, error) + /* Flush + * empties cache by placing all cached documnent into backing + * ReaderWriter. + */ + Flush() error + /* Peek + * views first document in either cache or backing ReaderWriter + */ + Peek() (string, error) + /* Read + * returns N documents from the cache and backing store without + * removing them from either data structure. + */ + Read(int) ([]string, error) + /* Close + * flushes and then prepares backing store for closure + */ + Close() (int64, error) + /* Cached + * returns current length of cache + */ + Cached() int +} + +type DocBuf struct { + cache []string + store io.ReadWriteSeeker + cacheSize int + endIndex int64 +} + +// put an element from cache into store +func (b *DocBuf) demote() error { + if len(b.cache) < 1 { + return errors.New(DemoteFromEmptyCacheError) + } + + demoted := b.cache[len(b.cache)-1] + b.cache = b.cache[:len(b.cache)-1] + return b.writeToBackingStore(demoted) +} + +/* bounds here refers to checking the capacity of the internal memory cache + * Remove() includes a use case where a user might want to evict more events + * than the internal memory cache can possibly contain, in which case we use + * promote and turn off bounds checking in order to overpopulate memory cache + */ +func (b *DocBuf) promote(bounds bool) error { + if bounds && len(b.cache) >= b.cacheSize { + return errors.New(PromoteToFullCacheError) + } + + doc, err := b.readDocumentsFromDisk(1, true) + if err != nil { + return err + } + + if len(doc) > 0 { + b.cache = append(b.cache, doc[0]) + } + return nil +} + +// encode as base64 so as to avoid needing to escape things +// then documents can be separated by newlines +func (b *DocBuf) writeToBackingStore(doc string) error { + if len(doc) == 0 { + return errors.New(WriteEmptyDocumentError) + } + + str := base64.StdEncoding.EncodeToString([]byte(doc)) + + // seek to end index + c, err := b.store.Seek(int64(b.endIndex), io.SeekStart); + if err != nil { + return err + } + + // separate documents with a newline + if c > 0 { + str = "\n" + str + } + + n, err := b.store.Write([]byte(str)) + if err != nil || n < len(doc) { + if n < len(doc) { + return errors.Join( + errors.New(PartialWriteError), + err) + } + return err + } + + b.endIndex += int64(n) + + return nil +} + +func (b *DocBuf) readDocumentsFromDisk( + count int, + truncate bool, +) ([]string, error) { + docs := []string{} + cursor := int64(0) + + if count < 1 { + return docs, errors.New(BadReadRequestError) + } + + // catch store is empty + cursor, err := b.store.Seek(0, io.SeekEnd) + if b.endIndex == 0 || err != nil || cursor == 0 { + return docs, err + } + + // self repair? + if b.endIndex > cursor { + b.endIndex = cursor + } + + cursor = b.endIndex - 1 + + for len(docs) < count && cursor >= 0 { + doc := "" + char := make([]byte, 1) + char[0] = 0 + + // read bytes backwards from file + for cursor >= 0 { + // set cursor + if _, err := b.store.Seek(cursor, io.SeekStart); err != nil { + return docs, err + } + + // read one byte + n, err := b.store.Read(char) + if err != nil { + return docs, err + } + if n != 1 { + return docs, errors.New(BadReadLengthError) + } + + // break on newline + if char[0] == 10 { + break + } + + cursor -= 1 + doc = string(char) + doc + } + + /* parse document and add to account (docs) + * each read will stop on the newline so make sure + * we dont try to account for an empty document + */ + if (doc != "\n") { + str, err := base64.StdEncoding.DecodeString(doc) + if err != nil { + return docs, err + } + doc = string(str) + docs = append(docs, doc) + } + + if len(docs) < count { + cursor -= 1 + } + } + + if truncate { + if cursor < 0 { + cursor = 0 + } + b.endIndex = cursor + } + return docs, nil +} + + +/* WARNING: this constructor will promote items + * from storage, resulting in a modified backing + * as well as a prefilled cache. + */ +func NewDocumentBuffer( + cacheSize int, + backingStore io.ReadWriteSeeker, +) (DocumentBuffer, error) { + newBuf := DocBuf { + cache: make([]string, 0, cacheSize), + store: backingStore, + cacheSize: cacheSize, + } + + c, err := backingStore.Seek(0, io.SeekEnd) + newBuf.endIndex = c + if err != nil { + return &newBuf, err + } + + // prefill cache + for range cacheSize { + newBuf.promote(true) + } + return &newBuf, nil +} + +/* Push + * Add a document to the buffer. + * If cache is full oldest document is written to backing writer. + */ +func (b *DocBuf) Push(doc string) error { + if len(b.cache) >= b.cacheSize { + if err := b.demote(); err != nil { + return err + } + } + + tc := b.cache + b.cache = append(make([]string, 0, b.cacheSize), doc) + b.cache = append(b.cache, tc...) + return nil +} + +/* Pop + * removes a document from the cache and pulls a document back + * out of the backing ReaderWriter. + */ +func (b *DocBuf) Pop() (string, error) { + if len(b.cache) < 1 { + if err := b.promote(true); err != nil { + return "", err + } + } + + if len(b.cache) < 1 { + return "", errors.New(PopFromEmptyBufferError) + } + + candidate := b.cache[0] + b.cache = b.cache[1:] + e := b.promote(true) + + return candidate, e +} + +/* Remove + * removes N newest documents in order of newest to oldest. + * backfills cache by pulling documents back out of backing + * ReaderWriter (again, newest first). + */ +func (b *DocBuf) Remove(count int) ([]string, error) { + delta := count - b.cacheSize + if delta > 0 { + for range delta { + if err := b.promote(false); err != nil { + return []string{}, err + } + } + } + + if count > len(b.cache) { + tmp := b.cache + b.cache = make([]string, 0, b.cacheSize) + return tmp, errors.New(RemoveOverCapacityError) + } + + candidates := b.cache[:count] + b.cache = b.cache[count:] + + // refill cache + delta = b.cacheSize - len(b.cache) + if delta > 0 { + for range delta { + if err := b.promote(true); err != nil { + return []string{}, err + } + } + } + + return candidates, nil +} + +/* Flush + * empties cache by placing all cached documnent into backing + * ReaderWriter. + */ +func (b *DocBuf) Flush() error { + for range len(b.cache) { + if err := b.demote(); err != nil { + return err + } + } + + return nil +} + +/* Peek + * views first document in either cache or backing ReaderWriter + */ +func (b *DocBuf) Peek() (string, error) { + if len(b.cache) > 0 { + return b.cache[0], nil + } else { + d, e := b.readDocumentsFromDisk(1, false) + if len(d) < 1 { + return "", e + } + return d[0], e + } +} + +/* Read + * returns N documents from the cache and backing store without + * removing them from either data structure. + */ +func (b *DocBuf) Read(count int) ([]string, error) { + delta := count - len(b.cache) + candidates := b.cache[:count - (delta)] + + if delta > 0 { + fromStorage, err := b.readDocumentsFromDisk(delta, false) + if err != nil { + return candidates, err + } + + candidates = append(candidates, fromStorage...) + } + + return candidates, nil +} + +/* Close + * flushes and then prepares backing store for closure. + * returns the end index into the underlying stream... + * it is the callers responsibility to truncate. + */ +func (b *DocBuf) Close() (int64, error) { + if err := b.Flush(); err != nil { + return b.endIndex, err + } + + return b.endIndex, nil +} + +/* Cached + * returns current length of cache + */ + func (b *DocBuf) Cached() int { + return len(b.cache) + } diff --git a/internal/docbuf/docbuf_test.go b/internal/docbuf/docbuf_test.go new file mode 100644 index 0000000..0408070 --- /dev/null +++ b/internal/docbuf/docbuf_test.go @@ -0,0 +1,304 @@ +package docbuf + +import ( + "testing" + "fmt" +) + +func docu(ctr *int) string { + // we add a new line to try to trick the docbuf + // into thinking there is an extra doc here. + // but there isnt one. + s := fmt.Sprintf("%d\n", *ctr) + (*ctr) += 1 + + return s +} + +func TestPushPop(t *testing.T) { + docCtr := 1 + + backingStore := NewReadWriteSeekString() + buf, e := NewDocumentBuffer(3, &backingStore) + if e != nil { + t.Fatalf("error making documentbuffer: %e", e) + } + + // does push add 1 to cache + if err := buf.Push(docu(&docCtr)); err != nil || buf.Cached() != 1 { + t.Fatalf("error pushing: %e", err) + } + + // does push past cache demote first document + if err := buf.Push(docu(&docCtr)); err != nil || buf.Cached() != 2 { + t.Fatalf("error pushing: %e", err) + } + if err := buf.Push(docu(&docCtr)); err != nil || buf.Cached() != 3 { + t.Fatalf("error pushing: %e", err) + } + if err := buf.Push(docu(&docCtr)); err != nil || backingStore.Cursor() == 0 { + t.Fatalf("error pushing: %e", err) + } + + if backingStore.Contents() != "MQo=" { + t.Fatalf("expected oldest doc to be in store, got %s", + backingStore.Contents()) + } + + // does pop promote document from cache + doc, err := buf.Pop() + if err != nil || doc != "4\n" { + t.Fatalf("did not get expected doc from cache: %s (%e)", doc, err) + } + if buf.Cached() != 3 { + t.Fatalf("doc buffer did not promote: %d", buf.Cached()) + } + + // does pop past empty throw the right error + doc, err = buf.Pop() + if err != nil || doc != "3\n" { + t.Fatalf("did not get expected doc from cache: %s (%e)", doc, err) + } + doc, err = buf.Pop() + if err != nil || doc != "2\n" { + t.Fatalf("did not get expected doc from cache: %s (%e)", doc, err) + } + doc, err = buf.Pop() + if err != nil || doc != "1\n" { + t.Logf("bs: %s", backingStore.Contents()) + t.Fatalf("did not get expected doc from cache: %s (%e)", doc, err) + } + doc, err = buf.Pop() + if err == nil || + doc != "" || + err.Error() != "Can't pop from empty buffer" { + t.Fatalf("did not get expected doc from cache: %s", doc) + } +} + +func TestRemove(t *testing.T) { + docCtr := 1 + backingStore := NewReadWriteSeekString() + buf, e := NewDocumentBuffer(3, &backingStore) + if e != nil { + t.Fatalf("error making documentbuffer: %e", e) + } + + // setup test data + if err := buf.Push(docu(&docCtr)); err != nil || buf.Cached() != 1 { + t.Fatalf("error pushing: %e", err) + } + if err := buf.Push(docu(&docCtr)); err != nil || buf.Cached() != 2 { + t.Fatalf("error pushing: %e", err) + } + if err := buf.Push(docu(&docCtr)); err != nil || buf.Cached() != 3 { + t.Fatalf("error pushing: %e", err) + } + if err := buf.Push(docu(&docCtr)); err != nil || + buf.Cached() != 3 || backingStore.Contents() != "MQo=" { + t.Fatalf("error pushing: %e", err) + } + + // tests + docs, err := buf.Remove(2) + if err != nil || + len(docs) != 2 || + docs[0] != "4\n" || + docs[1] != "3\n" || + buf.Cached() != 2 { + t.Fatalf("error removing: %e", err) + } + + for range 5 { + if err := buf.Push(docu(&docCtr)); err != nil { + t.Fatalf("error pushing: %e", err) + } + } + + docs, err = buf.Remove(4) + if err != nil || + len(docs) != 4 || + docs[0] != "9\n" || + docs[1] != "8\n" || + docs[2] != "7\n" || + docs[3] != "6\n" || + buf.Cached() != 3 { + t.Fatalf("error removing: %e", err) + } + + docs, err = buf.Remove(3) + if err != nil || + len(docs) != 3 || + docs[0] != "5\n" || + docs[1] != "2\n" || + docs[2] != "1\n" || + buf.Cached() != 0 { + t.Fatalf("error removing: %e", err) + } +} + +func TestFlush(t *testing.T) { + docCtr := 1 + expectedDoc := "MQo=\nMgo=\nMwo=\nNAo=" + backingStore := NewReadWriteSeekString() + buf, e := NewDocumentBuffer(3, &backingStore) + if e != nil { + t.Fatalf("error making documentbuffer: %e", e) + } + + // setup test data + if err := buf.Push(docu(&docCtr)); err != nil || buf.Cached() != 1 { + t.Fatalf("error pushing: %e", err) + } + if err := buf.Push(docu(&docCtr)); err != nil || buf.Cached() != 2 { + t.Fatalf("error pushing: %e", err) + } + if err := buf.Push(docu(&docCtr)); err != nil || buf.Cached() != 3 { + t.Fatalf("error pushing: %e", err) + } + if err := buf.Push(docu(&docCtr)); err != nil || + buf.Cached() != 3 || backingStore.Contents() == "Nao=\n" { + t.Fatalf("error pushing: %e", err) + } + + // test + if err := buf.Flush(); err != nil { + t.Fatalf("error flushing buffer: %e", err) + } + + if backingStore.Contents() != expectedDoc { + t.Fatalf("did not get expected document: %s", backingStore.Contents()) + } +} + +func TestPeek(t *testing.T) { + docCtr := 1 + backingStore := NewReadWriteSeekString() + buf, e := NewDocumentBuffer(3, &backingStore) + if e != nil { + t.Fatalf("error making documentbuffer: %e", e) + } + + // setup test data + if err := buf.Push(docu(&docCtr)); err != nil || buf.Cached() != 1 { + t.Fatalf("error pushing: %e", err) + } + + // test + if d, e := buf.Peek(); e != nil || + d != "1\n" || + buf.Cached() != 1 { + t.Fatalf("error peeking: %e", e) + } +} + +func TestRead(t *testing.T) { + docCtr := 1 + backingStore := NewReadWriteSeekString() + buf, e := NewDocumentBuffer(3, &backingStore) + if e != nil { + t.Fatalf("error making documentbuffer: %e", e) + } + + // setup test data + if err := buf.Push(docu(&docCtr)); err != nil || buf.Cached() != 1 { + t.Fatalf("error pushing: %e", err) + } + if err := buf.Push(docu(&docCtr)); err != nil || buf.Cached() != 2 { + t.Fatalf("error pushing: %e", err) + } + if err := buf.Push(docu(&docCtr)); err != nil || buf.Cached() != 3 { + t.Fatalf("error pushing: %e", err) + } + if err := buf.Push(docu(&docCtr)); err != nil || + buf.Cached() != 3 || backingStore.Contents() != "MQo=" { + t.Fatalf("error pushing: %e", err) + } + + // test + if docs, err := buf.Read(4); err != nil || + len(docs) != 4 || + docs[0] != "4\n" || + docs[1] != "3\n" || + docs[2] != "2\n" || + docs[3] != "1\n" || + buf.Cached() != 3 || + backingStore.Contents() != "MQo=" { + t.Fatalf("error reading: %e", err) + } +} + +func TestClose(t *testing.T) { + // do pushes and a remove then Close. assure no error + // is the int return where I can truncate? + docCtr := 1 + backingStore := NewReadWriteSeekString() + buf, e := NewDocumentBuffer(3, &backingStore) + if e != nil { + t.Fatalf("error making documentbuffer: %e", e) + } + + // setup test data + if err := buf.Push(docu(&docCtr)); err != nil || buf.Cached() != 1 { + t.Fatalf("error pushing: %e", err) + } + if err := buf.Push(docu(&docCtr)); err != nil || buf.Cached() != 2 { + t.Fatalf("error pushing: %e", err) + } + if err := buf.Push(docu(&docCtr)); err != nil || buf.Cached() != 3 { + t.Fatalf("error pushing: %e", err) + } + if err := buf.Push(docu(&docCtr)); err != nil || + buf.Cached() != 3 || backingStore.Contents() != "MQo=" { + t.Fatalf("error pushing: %e", err) + } + + for range 5 { + if err := buf.Push(docu(&docCtr)); err != nil { + t.Fatalf("error pushing: %e", err) + } + } + + docs, err := buf.Remove(4) + if err != nil || + len(docs) != 4 || + buf.Cached() != 3 { + t.Fatalf("error removing: %e", err) + } + + expectedDoc := "MQo=\nMgo=\nMwo=\nNAo=\nNQo=\nNgo=" + idx, err := buf.Close() + if err != nil || + idx != 24 || + buf.Cached() != 0 || + backingStore.Contents() != expectedDoc { + t.Fatalf("error closing: %e", err) + } +} + +func TestInitialize(t *testing.T) { + backingStore := NewReadWriteSeekString() + backingStore.Write([]byte("MQo=\nMgo=\nMwo=\nNAo=\nNQo=\nNgo=")) + buf, e := NewDocumentBuffer(3, &backingStore) + if e != nil { + t.Fatalf("error making documentbuffer: %e", e) + } + + // test cached + if buf.Cached() != 3 { + t.Fatalf("expected 3 docs in cache") + } + + // read all docs + docs, err := buf.Read(6) + if err != nil || + len(docs) != 6 || + docs[0] != "6\n" || + docs[1] != "5\n" || + docs[2] != "4\n" || + docs[3] != "3\n" || + docs[4] != "2\n" || + docs[5] != "1\n" { + t.Fatalf("error reading: %e", err) + } +} diff --git a/internal/docbuf/read_write_seek_string.go b/internal/docbuf/read_write_seek_string.go new file mode 100644 index 0000000..b52284e --- /dev/null +++ b/internal/docbuf/read_write_seek_string.go @@ -0,0 +1,100 @@ +package docbuf + +import ( + "errors" + "io" + "fmt" +) + +/* WARNING: + * This code is meant to assist with testing and mock ups + * It is not only not designed to any rigorous standards, + * but additionally does not offer any benefit over a static + * in memory single layer cache. + */ + +type ReadWriteSeekString struct { + inner string + cursor int +} + +func NewReadWriteSeekString() ReadWriteSeekString{ + return ReadWriteSeekString{ + inner: "", + cursor: 0, + } +} + +func (s *ReadWriteSeekString) Read( + buf []byte, +) (int, error) { + i := 0 + for ; i < len(buf); i++ { + if len(s.inner) <= s.cursor { + return i, nil + } + buf[i] = s.inner[s.cursor] + s.cursor += 1 + } + return i, nil +} + +func (s *ReadWriteSeekString) Write( + buf []byte, +) (int, error) { + backfillDelta := s.cursor - (len(s.inner) - 1) + if backfillDelta > 0 { + for range backfillDelta { + s.inner += "\x00" + } + } + + tmpBuf := "" + if s.cursor > 0 { + tmpBuf += s.inner[:s.cursor] + } + tmpBuf += string(buf) + if s.cursor + len(buf) < (len(s.inner) - 1) { + tmpBuf += s.inner[s.cursor + len(buf):] + } + + s.inner = tmpBuf + s.cursor += len(buf) + return len(buf), nil +} + +func (s *ReadWriteSeekString) Seek( + offset int64, + whence int, +) (int64, error) { + var tmpCur int64 + tmpCur = 0 + + switch whence { + case io.SeekCurrent: + tmpCur = int64(s.cursor) + case io.SeekEnd: + tmpCur = int64(len(s.inner)) + case io.SeekStart: + tmpCur = int64(0) + default: + return int64(s.cursor), + errors.New("invalid whence value") + } + + tmpCur += offset + if tmpCur < 0 { + msg := fmt.Sprintf("seek index (%d) is negative", tmpCur) + return int64(s.cursor), errors.New(msg) + } + s.cursor = int(tmpCur) + return tmpCur, nil +} + +func (s *ReadWriteSeekString) Contents() string { + return s.inner +} + +func (s *ReadWriteSeekString) Cursor() int { + return s.cursor +} diff --git a/internal/docbuf/read_write_seek_string_test.go b/internal/docbuf/read_write_seek_string_test.go new file mode 100644 index 0000000..37d5842 --- /dev/null +++ b/internal/docbuf/read_write_seek_string_test.go @@ -0,0 +1,165 @@ +package docbuf + +import ( + "io" + "testing" +) + +func SetupTestingBuffer(t *testing.T) ReadWriteSeekString{ + str := NewReadWriteSeekString() + n, e := str.Write([]byte("test")) + if n != 4 || e != nil { + t.Fatalf("Failed to write to buffer: %e", e) + } + + return str +} + +// can it read +func TestRWSSRead(t *testing.T) { + b := make([]byte, 4) + buf := SetupTestingBuffer(t) + c, err := buf.Seek(0, io.SeekStart) + if err != nil || c != 0 { + t.Fatalf("seek failed: %e", err) + } + + n, err := buf.Read(b) + if n != 4 || err != nil || string(b) != "test" { + t.Fatalf("read failed: %e", err) + } + + m, err := buf.Seek(-3, io.SeekEnd) + if err != nil || m != 1 { + t.Fatalf("seek failed: %e", err) + } + + b = make([]byte, 4) + l, err := buf.Read(b) + if l != 3 || err != nil || string(b[:3]) != "est" { + t.Fatalf("read failed: %e", err) + } + + k, err := buf.Seek(0, io.SeekStart) + if k != 0 || err != nil { + t.Fatalf("seek failed: %e", err) + } + + b = make([]byte, 3) + j, err := buf.Read(b) + if j != 3 || err != nil || string(b) != "tes" { + t.Fatalf("read failed: %e", err) + } + + b = make([]byte, 1) + i, err := buf.Read(b) + if i != 1 || err != nil || string(b) != "t" { + t.Fatalf("read failed: %e", err) + } +} + +// can it write +func TestRWSSWrite(t *testing.T) { + buf := SetupTestingBuffer(t) + if buf.Contents() != "test" || buf.Cursor() != 4 { + t.Fatalf("write failed: %s", buf.Contents()) + } + + m, err := buf.Write([]byte("test2")) + if m != 5 || + err != nil || + buf.Contents() != "testtest2" || + buf.Cursor() != 9 { + t.Fatalf("write failed: %e", err) + } + + n, err := buf.Seek(2, io.SeekStart) + if n != 2 || err != nil { + t.Fatalf("seek failed: %e", err) + } + + o, err := buf.Write([]byte("one")) + if o != 3 || + err != nil || + buf.Contents() != "teoneest2" || + buf.Cursor() != 5 { + t.Fatalf("write failed: %e", err) + } + + p, err := buf.Seek(0, io.SeekEnd) + if p != 9 || err != nil { + t.Fatalf("seek (%d) failed: %e", p, err) + } + + q, err := buf.Write([]byte("two")) + if q != 3 || + err != nil || + buf.Contents() != "teoneest2two" || + buf.Cursor() != 12 { + t.Fatalf("write failed: %e", err) + } +} + +// if it seeks can it read from new position +// if it seeks can it write to new position +func TestRWSSSeek(t *testing.T) { + buf := SetupTestingBuffer(t) + + if n, err := buf.Seek(0, io.SeekStart); + n != 0 || + err != nil { + t.Fatalf("seek failed: %e", err) + } + + if n, err := buf.Seek(3, io.SeekStart); + n != 3 || + err != nil { + t.Fatalf("seek failed: %e", err) + } + + if n, err := buf.Seek(-1, io.SeekStart); + n != 3 || + err == nil || + err.Error() != "seek index (-1) is negative" { + t.Fatalf("seek should have failed but didnt: %e", err) + } + + + if n, err := buf.Seek(0, io.SeekCurrent); + n != 3 || + err != nil { + t.Fatalf("seek failed: %e", err) + } + + if n, err := buf.Seek(-2, io.SeekEnd); + n != 2 || + err != nil { + t.Fatalf("seek failed: %e", err) + } + + if n, err := buf.Seek(-1, io.SeekCurrent); + n != 1 || + err != nil { + t.Fatalf("seek failed: %e", err) + } + + if n, err := buf.Seek(-2, io.SeekCurrent); + n != 1 || + err == nil || + err.Error() != "seek index (-1) is negative" { + t.Fatalf("seek should have failed but didnt: %e", err) + } + + if n, err := buf.Seek(-1, io.SeekEnd); + n != 3 || + err != nil { + t.Fatalf("seek failed: %e", err) + } + + if n, err := buf.Seek(-5, io.SeekEnd); + n != 3 || + err == nil || + err.Error() != "seek index (-1) is negative" { + t.Fatalf("seek should have failed but didnt: %e", err) + } +} From 609c50ff7d0c3f57435b17ba978c389091e7998c Mon Sep 17 00:00:00 2001 From: Ava Affine Date: Tue, 26 Nov 2024 19:18:02 -0800 Subject: [PATCH 2/5] Extend DocumentBuffer with apply interface The existing DocumentBuffer interface provided no facility to iterate over events in order without caching them all in memory and pushing/popping them. This was deemed significantly unoptimal and so Apply() was born. When Apply() is called, DocumentBuffer will recieve a filter callable (functor, lambda, or closure) and will call it on all documents in order throughout the entire cache and store, without caching stored events or modifying the cache either. The filter Apply() uses will recieve one document and return true or false: true to continue iterating and false to stop iterating. Signed-off-by: Ava Affine --- internal/docbuf/docbuf.go | 102 ++++++++++++++++++++++++++++----- internal/docbuf/docbuf_test.go | 33 +++++++++++ 2 files changed, 120 insertions(+), 15 deletions(-) diff --git a/internal/docbuf/docbuf.go b/internal/docbuf/docbuf.go index 89b8187..cc0610e 100644 --- a/internal/docbuf/docbuf.go +++ b/internal/docbuf/docbuf.go @@ -48,6 +48,14 @@ type DocumentBuffer interface { * removing them from either data structure. */ Read(int) ([]string, error) + /* Apply + * Takes a callable/closure/function that accepts a string (document) + * and returns a bool. Apply returns an error, or nil. + * Apply will begin applying this callable to each doc from most recent + * to least recent so long as the callable continues to return true. + * If the callable returns false, Apply ceases iteration and returns. + */ + Apply(func(string)bool) error /* Close * flushes and then prepares backing store for closure */ @@ -86,7 +94,7 @@ func (b *DocBuf) promote(bounds bool) error { return errors.New(PromoteToFullCacheError) } - doc, err := b.readDocumentsFromDisk(1, true) + doc, err := b.readDocumentsFromDisk(1, true, false) if err != nil { return err } @@ -135,26 +143,35 @@ func (b *DocBuf) writeToBackingStore(doc string) error { func (b *DocBuf) readDocumentsFromDisk( count int, truncate bool, + continues bool, ) ([]string, error) { docs := []string{} cursor := int64(0) + var err error if count < 1 { return docs, errors.New(BadReadRequestError) } - // catch store is empty - cursor, err := b.store.Seek(0, io.SeekEnd) - if b.endIndex == 0 || err != nil || cursor == 0 { - return docs, err - } - - // self repair? - if b.endIndex > cursor { - b.endIndex = cursor - } + if continues { + cursor, err = b.store.Seek(0, io.SeekCurrent) + if b.endIndex == 0 || err != nil || cursor == 0 { + return docs, err + } - cursor = b.endIndex - 1 + } else { + // catch store is empty + cursor, err = b.store.Seek(0, io.SeekEnd) + if b.endIndex == 0 || err != nil || cursor == 0 { + return docs, err + } + + // self repair? + if b.endIndex > cursor { + b.endIndex = cursor + } + cursor = b.endIndex - 1 + } for len(docs) < count && cursor >= 0 { doc := "" @@ -190,7 +207,7 @@ func (b *DocBuf) readDocumentsFromDisk( * each read will stop on the newline so make sure * we dont try to account for an empty document */ - if (doc != "\n") { + if len(doc) > 0 && doc != "\n" { str, err := base64.StdEncoding.DecodeString(doc) if err != nil { return docs, err @@ -338,7 +355,7 @@ func (b *DocBuf) Peek() (string, error) { if len(b.cache) > 0 { return b.cache[0], nil } else { - d, e := b.readDocumentsFromDisk(1, false) + d, e := b.readDocumentsFromDisk(1, false, false) if len(d) < 1 { return "", e } @@ -355,7 +372,7 @@ func (b *DocBuf) Read(count int) ([]string, error) { candidates := b.cache[:count - (delta)] if delta > 0 { - fromStorage, err := b.readDocumentsFromDisk(delta, false) + fromStorage, err := b.readDocumentsFromDisk(delta, false, false) if err != nil { return candidates, err } @@ -366,6 +383,61 @@ func (b *DocBuf) Read(count int) ([]string, error) { return candidates, nil } +/* Apply + * Takes a callable/closure/function that accepts a string (document) + * and returns a bool. Apply returns an error, or nil. + * Apply will begin applying this callable to each doc from most recent + * to least recent so long as the callable continues to return true. + * If the callable returns false, Apply ceases iteration and returns. + */ +func (b *DocBuf) Apply(f func(string)bool) error { + // iterate over internal cache applying function + for _, i := range b.cache { + if !f(i) { + return nil + } + } + + // begin iterating with readDocumentsFromDisk + first := true + flag := true + for flag { + doc, err := b.readDocumentsFromDisk(1, false, !first) + if err != nil { + return err + } + if len(doc) == 0 { + return nil + } else if len(doc) != 1 { + return errors.New("improper read from buffer") + } + + first = false + flag = f(doc[0]) + + if flag { + var c, d int64 + var err error + // since continue is on we need to bring the cursor back one + if c, err = b.store.Seek(0, io.SeekCurrent); err != nil { + return err + // dont seek earlier than 0 + } else if c < 1 { + break + } + if d, err = b.store.Seek(-1, io.SeekCurrent); err != nil { + return err + } + + if c == d { + return errors.New("Seek failure!") + } + } + } + + return nil +} + /* Close * flushes and then prepares backing store for closure. * returns the end index into the underlying stream... diff --git a/internal/docbuf/docbuf_test.go b/internal/docbuf/docbuf_test.go index 0408070..48bf03e 100644 --- a/internal/docbuf/docbuf_test.go +++ b/internal/docbuf/docbuf_test.go @@ -302,3 +302,36 @@ func TestInitialize(t *testing.T) { t.Fatalf("error reading: %e", err) } } + +func TestApply(t *testing.T) { + backingStore := NewReadWriteSeekString() + backingStore.Write([]byte("MQo=\nMgo=\nMwo=\nNAo=\nNQo=\nNgo=")) + buf, e := NewDocumentBuffer(3, &backingStore) + if e != nil { + t.Fatalf("error making documentbuffer: %e", e) + } + + // test cached + if buf.Cached() != 3 { + t.Fatalf("expected 3 docs in cache") + } + + count := 0 + if err := buf.Apply(func(doc string) bool { + count += 1 + return true + }); err != nil || count != 6 { + t.Fatalf("error applying: %e", err) + } + + count = 0 + if err := buf.Apply(func(doc string) bool { + if doc == "2\n" { + return false + } + count += 1 + return true + }); err != nil || count != 4 { + t.Fatalf("error applying: %e", err) + } +} From 0a29b35f4b6116a6602e14abd0665817a9ccbdb7 Mon Sep 17 00:00:00 2001 From: Ava Affine Date: Sat, 30 Nov 2024 00:31:08 -0800 Subject: [PATCH 3/5] Refactor state module to leverage DocumentBuffer This commit refactors the state module to remove the eventCache and introduce an eventStream instead. The eventStream is not a static array but a DocumentBuffer, and thus many functions had to be altered. This commit is best reviewed side by side with a before/after view instead of just looking at the diff. An Init() function is added to initialize the DocumentBuffer with config values. Facilities are added to both Event and EventType to allow for parsing them to and from string documents. a MakeEvent() function is added to create events of proper subtype based on a general data map input. ApplyToEvents() is added, which wraps around DocumentBuffer's Apply() method to alter the filter input type with one that Marshals and Unmarshals events. GetMatchingEvents() is now a proof of concept for DocumentBuffer's Apply() PruneCache() is now no longer needed. Signed-off-by: Ava Affine --- internal/state/state.go | 329 ++++++++++++++++++++++++----------- internal/state/state_test.go | 130 +++++++++----- 2 files changed, 321 insertions(+), 138 deletions(-) diff --git a/internal/state/state.go b/internal/state/state.go index caba329..ab8469b 100644 --- a/internal/state/state.go +++ b/internal/state/state.go @@ -9,10 +9,13 @@ package state import ( "fmt" - "slices" "sync" "time" + "os" + "errors" + "encoding/json" + "gitlab.com/whom/bingobot/internal/docbuf" "gitlab.com/whom/bingobot/internal/logging" ) @@ -24,32 +27,46 @@ import ( const ( BadEventTypeError = "bad event type" EventValidationFailedError = "event failed validation: " + BadEventNoUID = "event data has no UID field" + BadEventNoCreated = "event data has no created field" + BadEventStoreFilename = "failed to open event store file" + BadEventStreamInit = "failed to initialize event stream" + BadEventCreate = "failed to create event" + BadUserEventCreatedParse = "failed to parse created time" + BadChallengeEvent = "failed to make Challenge Event" + BadRestorationEvent = "failed to make Restoration Event" + BadUserActiveEvent = "failed to make UserActive Event" + BadEventObjMarshal = "error marshalling event" + BadEventObjUnmarshal = "failed to unmarshal event map" + BadEventUnmarshal = "failed to unmarshal event" + BadEventMissingTypeKey = "event map missing type key" +) + +const ( + EventTypeMapKey = "type" ) var eventMutex sync.RWMutex var eventSubscriptionCache = [NumEventTypes][]chan Event{} -var eventCache = []Event{} +var eventStream docbuf.DocumentBuffer +var maxEventsInMemory int -// TODO: Configurable -var eventChannelBufferSize = 256 - -// TODO: pruned events go to DISK -// ASSUMES eventCache is ordered by age (oldest first) -func pruneEventCache() { - eventMutex.Lock() - defer eventMutex.Unlock() - - oldCacheInvalid := false - newCache := []Event{} - for _, obj := range eventCache { - if time.Since(obj.Time()).Hours() > 24*10 { - oldCacheInvalid = true - } else if oldCacheInvalid { - newCache = append(newCache, obj) - } +// expect filename validations in config package +func Init(eventMemCacheSize int, eventStoreFileName string) error { + file, err := os.OpenFile(eventStoreFileName, os.O_CREATE|os.O_RDWR, 0644) + if err != nil { + return errors.Join(errors.New(BadEventStoreFilename), err) } - eventCache = newCache + if eventStream, err = docbuf.NewDocumentBuffer( + eventMemCacheSize, + file, + ); err != nil { + return errors.Join(errors.New(BadEventStreamInit), err) + } + + maxEventsInMemory = eventMemCacheSize + return nil } type EventType int8 @@ -65,14 +82,123 @@ const ( NumEventTypes ) +// either returns a valid event type or NumEventTypes +func EventTypeFromString(doc string) EventType { + switch doc { + case "Vote": + return Vote + case "Challenge": + return Challenge + case "Restoration": + return Restoration + case "UserActive": + return UserActive + default: + // error case + return NumEventTypes + } +} + +func (et EventType) String() string { + events := []string{ + "Vote", + "Challenge", + "Restoration", + "UserActive", + } + + if et < 0 || et >= NumEventTypes { + return "" + } + + return events[et] +} + func (et EventType) Validate() error { if et < 0 || et >= NumEventTypes { - return stringErrorType(BadEventTypeError) + return errors.New(BadEventTypeError) } return nil } +/* Allocates a specific event of type T from event type instance. + * output event not guaranteed to be valid. + * Call Validate() yourself. + */ +func (et EventType) MakeEvent(data map[string]string) (Event, error) { + if err := et.Validate(); err != nil { + return nil, errors.Join(errors.New(BadEventCreate), err) + } + + MakeUserEvent := func(data map[string]string) (*UserEvent, error) { + user, hasUser := data[UserEventUserKey] + if !hasUser { + return nil, errors.New(BadEventNoUID) + } + + created, hasCreated := data[UserEventCreatedKey] + if !hasCreated { + return nil, errors.New(BadEventNoCreated) + } + + createdTime, err := time.Parse(time.RFC3339, created) + if err != nil { + return nil, errors.Join(errors.New(BadUserEventCreatedParse), err) + } + + return &UserEvent{ + uid: user, + created: createdTime, + }, nil + } + + switch et { + case Vote: + return VoteEvent(data), nil + case Challenge: + e, err := MakeUserEvent(data) + if err != nil { + return nil, errors.Join(errors.New(BadChallengeEvent), err) + } + return ChallengeEvent{*e}, nil + case Restoration: + e, err := MakeUserEvent(data) + if err != nil { + return nil, errors.Join(errors.New(BadRestorationEvent), err) + } + return RestorationEvent{*e}, nil + case UserActive: + e, err := MakeUserEvent(data) + if err != nil { + return nil, errors.Join(errors.New(BadUserActiveEvent), err) + } + return UserActiveEvent{*e}, nil + default: + return nil, errors.New(BadEventTypeError) + } +} + +/* adds a new subscriber channel to the event subscription cache + * and returns the channel that it will publish notifications on + */ +func (et EventType) Subscribe() (chan Event, error) { + if err := et.Validate(); err != nil { + return nil, err + } + + ch := make(chan Event, maxEventsInMemory) + + eventMutex.Lock() + defer eventMutex.Unlock() + eventSubscriptionCache[et] = append( + eventSubscriptionCache[et], + ch, + ) + + return ch, nil +} + type Event interface { // gets EventType associated with event Type() EventType @@ -88,80 +214,48 @@ type Event interface { Validate() error } -// TODO: Something better than this -type stringErrorType string +func EventToString(e Event) (string, error) { + m := e.Data() + m[EventTypeMapKey] = e.Type().String() + buf, err := json.Marshal(m) + if err != nil { + return "", errors.Join(errors.New(BadEventObjMarshal), err) + } -func (s stringErrorType) Error() string { - return string(s) + return string(buf), nil } -/* adds a new subscriber channel to the event subscription cache - * and returns the channel that it will publish notifications on - * - * note: channel is prefilled with at most eventChannelBufferSize - * historical events. Truncated if history exceeds event channel - * buffer size. - */ -func (et EventType) SubscribeWithHistory() (chan Event, error) { - if err := et.Validate(); err != nil { - return nil, err +func EventFromString(doc string) (Event, error) { + var obj map[string]string + if err := json.Unmarshal([]byte(doc), &obj); err != nil { + return nil, errors.Join(errors.New(BadEventObjUnmarshal), err) } - ch := make(chan Event, eventChannelBufferSize) - - eventMutex.Lock() - eventSubscriptionCache[et] = append( - eventSubscriptionCache[et], - ch, - ) - eventMutex.Unlock() - - eventMutex.RLock() - defer eventMutex.RUnlock() - numEventsAdded := 0 - for _, ev := range slices.Backward(eventCache) { - if numEventsAdded >= eventChannelBufferSize { - break - } - - if ev.Type() == et { - ch <- ev - numEventsAdded += 1 - } + et, ok := obj[EventTypeMapKey] + if !ok { + return nil, errors.New(BadEventMissingTypeKey) + } + t := EventTypeFromString(et) + ev, err := t.MakeEvent(obj) + if err != nil { + return nil, errors.Join(errors.New(BadEventCreate), err) } - return ch, nil -} - -/* adds a new subscriber channel to the event subscription cache - * and returns the channel that it will publish notifications on - */ -func (et EventType) Subscribe() (chan Event, error) { - if err := et.Validate(); err != nil { - return nil, err - } - - ch := make(chan Event, eventChannelBufferSize) - - eventMutex.Lock() - defer eventMutex.Unlock() - eventSubscriptionCache[et] = append( - eventSubscriptionCache[et], - ch, - ) - - return ch, nil + return ev, ev.Validate() } func PublishEvent(e Event) error { if err := ValidateEvent(e); err != nil { - return stringErrorType( - EventValidationFailedError + err.Error(), - ) + return errors.Join(errors.New(EventValidationFailedError), err) + } + + doc, err := EventToString(e) + if err != nil { + return errors.New(BadEventUnmarshal) } eventMutex.Lock() - eventCache = append(eventCache, e) + eventStream.Push(doc) eventMutex.Unlock() eventMutex.RLock() defer eventMutex.RUnlock() @@ -169,8 +263,8 @@ func PublishEvent(e Event) error { blocking := false for _, c := range eventSubscriptionCache[e.Type()] { - if float32(len(c)) > (float32(eventChannelBufferSize) * 0.25) { - if len(c) == eventChannelBufferSize { + if float32(len(c)) > (float32(maxEventsInMemory) * 0.25) { + if len(c) == maxEventsInMemory { logging.Warn( "PublishEvent() blocking -- event channel full", // log the event time to provide blockage timing information @@ -205,6 +299,45 @@ func ValidateEvent(e Event) error { return nil } +/* Takes a filter and applies it to each individual event + * The filter is a function/closure that accepts one event + * and returns true if ApplyToEvents should continue iterating + * + * If the filter returns false then ApplyToEvents halts and returns. + * + * (this calls docbuf.Apply() under the hood) + */ +func ApplyToEvents( + f func(Event)bool, +) error { + // local variables enclosed by filter function filterWrap + var err error + var ev Event + // wrap f() to be compatible with docbuf.Apply() + filterWrap := func(doc string) bool { + ev, err = EventFromString(doc) + if err != nil { + return false + } + return f(ev) + } + + eventMutex.RLock() + defer eventMutex.RUnlock() + + // cant reuse err or return val directly as err is set + // by filter function if an error happens unmarshalling + // an event. In this case apply might return nil + err2 := eventStream.Apply(filterWrap) + if err2 != nil { + return err2 + } + if err != nil { + return err + } + return nil +} + /* gets all events of type T in cache * that also share all field values in * map 'filters' @@ -219,25 +352,26 @@ func GetMatchingEvents( return matches, err } - eventMutex.RLock() - defer eventMutex.RUnlock() - -Events: - for _, e := range eventCache { + filter := func(e Event) bool { + ev, er := EventToString(e) + fmt.Printf("Checking: %s (%e)\n", ev, er) if e.Type() != t { - continue + return true } for k, v := range filters { val, found := e.Data()[k] if !found || val != v { - continue Events + return true } } + fmt.Println("Found Match") matches = append(matches, e) + return true } - return matches, nil + err := ApplyToEvents(filter) + return matches, err } type VoteEvent map[string]string @@ -303,8 +437,7 @@ func (ve VoteEvent) Validate() error { VoteStatusKey, } { if _, found := ve[key]; !found { - return stringErrorType( - VoteMissingKeyError + key) + return errors.New(VoteMissingKeyError + key) } } @@ -312,15 +445,15 @@ func (ve VoteEvent) Validate() error { if status != VoteStatusTimeout && status != VoteStatusInProgress && status != VoteStatusFinalized { - return stringErrorType(VoteBadStatusError + status) + return errors.New(VoteBadStatusError + status) } result, hasResult := ve[VoteResultKey] if hasResult && status == VoteStatusInProgress { - return stringErrorType(VoteNotFinishedError) + return errors.New(VoteNotFinishedError) } if status != VoteStatusInProgress && !hasResult { - return stringErrorType(VoteMissingResultError) + return errors.New(VoteMissingResultError) } if hasResult && @@ -329,7 +462,7 @@ func (ve VoteEvent) Validate() error { result != VoteResultTie && result != VoteResultTimeout) { - return stringErrorType(VoteBadResultError + result) + return errors.New(VoteBadResultError + result) } return nil @@ -353,7 +486,7 @@ func (ue UserEvent) Time() time.Time { func (ue UserEvent) Data() map[string]string { return map[string]string{ UserEventUserKey: ue.uid, - UserEventCreatedKey: ue.created.Local().String(), + UserEventCreatedKey: ue.created.Format(time.RFC3339), } } diff --git a/internal/state/state_test.go b/internal/state/state_test.go index e775e74..e6d4073 100644 --- a/internal/state/state_test.go +++ b/internal/state/state_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "gitlab.com/whom/bingobot/internal/docbuf" "gitlab.com/whom/bingobot/internal/logging" ) @@ -17,12 +18,71 @@ const TestTok = "TEST_NAME" var loggingInitialized = false func SetupTest(t *testing.T) { + maxEventsInMemory = 271 + + var err error // have to set up logger if !loggingInitialized { logging.Init() loggingInitialized = true } + b := docbuf.NewReadWriteSeekString() + eventStream, err = docbuf.NewDocumentBuffer(300, &b) + if err != nil { + t.Fatalf("error allocating buffer: %e", err) + } +} + +func TestEventMarshalUnmarshal(t *testing.T) { + tm := time.Now() + cev := ChallengeEvent{UserEvent{ + uid: TestTok, + created: tm, + }} + + vev := VoteEvent(map[string]string{ + VoteActionKey: "a", + VoteRequesterKey: "r", + VoteCreatedKey: "c", + VoteStatusKey: VoteStatusFinalized, + VoteResultKey: VoteResultFail, + }) + + cstr, err := EventToString(cev); + if err != nil { + t.Fatalf("error marshalling challenge: %e, %s", err, cstr) + } + + t.Logf("cstr: %s\n", cstr) + + vstr, err := EventToString(vev); + if err != nil { + t.Fatalf("error marshalling vote: %e, %s", err, vstr) + } + + t.Logf("vstr: %s\n", vstr) + + if ev, err := EventFromString(cstr); err != nil || + ev.Data()[UserEventUserKey] != cev.Data()[UserEventUserKey] || + ev.Data()[UserEventCreatedKey] != cev.Data()[UserEventCreatedKey] { + t.Fatalf("error unmarshalling challenge: %e, %v!=%v", err, ev, cev) + } + + if ev, err := EventFromString(vstr); err != nil || + fmt.Sprint(ev) != fmt.Sprint(vev) { + t.Fatalf("error unmarshalling vote: %e, %v!=%v", err, ev, vev) + } +} + +func TestPubSub(t *testing.T) { + SetupTest(t) + + c, e := UserActive.Subscribe() + if e != nil { + t.Errorf("Error subscribing to UserActive events: %e", e) + } + old, _ := time.Parse( time.RFC3339, VeryOldVote, @@ -47,40 +107,21 @@ func SetupTest(t *testing.T) { created: time.Now(), }}) - if len(eventCache) != 272 { - t.Errorf("Unexpected number of events in cache: %d", - len(eventCache)) - } -} - -func CleanupTest() { - eventSubscriptionCache = [NumEventTypes][]chan Event{} - eventCache = []Event{} -} - -func TestPubSub(t *testing.T) { - SetupTest(t) - - c, e := UserActive.SubscribeWithHistory() - if e != nil { - t.Errorf("Error subscribing to UserActive events: %e", e) - } - Loop: for i := 0; true; i++ { select { case e, ok := <-c: if !ok { - t.Errorf("Subscription Channel Closed") + t.Fatalf("Subscription Channel Closed") } if e.Type() != UserActive { - t.Errorf("Non UserActive Event in UserActive subscription: %v", e.Type()) + t.Fatalf("Non UserActive Event in UserActive subscription: %v", e.Type()) } default: - if i == eventChannelBufferSize { + if i == maxEventsInMemory { break Loop } else { - t.Errorf("Unexpected number of events in channel: %d", i) + t.Fatalf("Unexpected number of events in channel: %d", i) } } } @@ -93,18 +134,40 @@ Loop: select { case e, ok := <-c: if !ok || e.Data()[UserEventUserKey] != "uniqueToken" { - t.Errorf("didnt read correct event from channel: %v", e) + t.Fatalf("didnt read correct event from channel: %v", e) } default: - t.Errorf("New event not published to subscription!") + t.Fatalf("New event not published to subscription!") } - - CleanupTest() } func TestFilterCache(t *testing.T) { SetupTest(t) + old, _ := time.Parse( + time.RFC3339, + VeryOldVote, + ) + + for i := range 270 { + if err := PublishEvent(UserActiveEvent{UserEvent{ + uid: fmt.Sprintf("%d", i), + created: old, + }}); err != nil { + t.Errorf("Failed to add event: %e", err) + } + } + + PublishEvent(UserActiveEvent{UserEvent{ + uid: fmt.Sprintf(TestTok), + created: time.Now(), + }}) + + PublishEvent(ChallengeEvent{UserEvent{ + uid: fmt.Sprintf(TestTok), + created: time.Now(), + }}) + events, err := GetMatchingEvents( UserActive, map[string]string{ @@ -123,19 +186,6 @@ func TestFilterCache(t *testing.T) { if events[0].Type() != UserActive { t.Errorf("Got wrong event!: %+v", events[0]) } - - CleanupTest() -} - -func TestPruneCache(t *testing.T) { - SetupTest(t) - pruneEventCache() - - if len(eventCache) != 2 { - t.Errorf("Incorrect number of remaining events: %d", len(eventCache)) - } - - CleanupTest() } func TestVoteEventValidations(t *testing.T) { From 9d3ca54ad41ea0e2b9f2dc5fae9dff82d95ba52e Mon Sep 17 00:00:00 2001 From: Ava Affine Date: Sat, 30 Nov 2024 00:31:25 -0800 Subject: [PATCH 4/5] initialize state package in main.go This commit adds config values to the config package for state configuration. This commit also adds a call to state.Init() to the main function. Signed-off-by: Ava Affine --- internal/config/config.go | 8 ++++++++ main.go | 8 ++++++++ 2 files changed, 16 insertions(+) diff --git a/internal/config/config.go b/internal/config/config.go index 00e1983..01e1e40 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -33,6 +33,12 @@ type AppConfig struct { current recommended value is 1000ms. */ VoiceActivityTimerSleepIntervalMillis int `yaml:"voice_activity_timer_sleep_interval_millis"` + + /* persistent state file store */ + PersistentCacheStore string `yaml:"persistent_cache_store"` + + /* number of internal state events to cache in memory */ + InMemoryEventCacheSize int `yaml:"InMemoryEventCacheSize"` } var config *AppConfig @@ -87,4 +93,6 @@ func setDefaults() { viper.SetDefault("LogAddSource", true) viper.SetDefault("VoiceActivityThresholdSeconds", 600) viper.SetDefault("VoiceActivityTimerSleepIntervalMillis", 1000) + viper.SetDefault("PersistentCacheStore", "/tmp/bingobot") + viper.SetDefault("InMemoryEventCacheSize", 512) } diff --git a/main.go b/main.go index 1c072cb..ea3952c 100644 --- a/main.go +++ b/main.go @@ -9,6 +9,7 @@ import ( "gitlab.com/whom/bingobot/internal/config" "gitlab.com/whom/bingobot/internal/discord" "gitlab.com/whom/bingobot/internal/logging" + "gitlab.com/whom/bingobot/internal/state" ) var ( @@ -28,6 +29,13 @@ func main() { logging.Init() + if err := state.Init( + config.Get().InMemoryEventCacheSize, + config.Get().PersistentCacheStore, + ); err != nil { + log.Fatalf("couldn't initialize state engine: %s", err.Error()) + } + err = startBot() if err != nil { From d818e8c15864f226740475e6b3076e79817c08ab Mon Sep 17 00:00:00 2001 From: Ava Affine Date: Sat, 30 Nov 2024 10:49:16 -0800 Subject: [PATCH 5/5] Handle interrupts and panics to mitigate data loss This commit introduces a deferred call to a state teardown function This is needed to properly flush data to the backing documentbuffer as well as to truncate its underlying store. In doing so we make sure data loss from process termination is minimal to nil. when ever a panic happens or a signal is thrown the call to Teardown() is made Signed-off-by: Ava Affine --- internal/state/state.go | 20 ++++++++++++++++++++ main.go | 10 ++-------- 2 files changed, 22 insertions(+), 8 deletions(-) diff --git a/internal/state/state.go b/internal/state/state.go index ab8469b..1325546 100644 --- a/internal/state/state.go +++ b/internal/state/state.go @@ -48,6 +48,7 @@ const ( var eventMutex sync.RWMutex var eventSubscriptionCache = [NumEventTypes][]chan Event{} +var eventStorageFileName string var eventStream docbuf.DocumentBuffer var maxEventsInMemory int @@ -65,10 +66,29 @@ func Init(eventMemCacheSize int, eventStoreFileName string) error { return errors.Join(errors.New(BadEventStreamInit), err) } + eventStorageFileName = eventStoreFileName maxEventsInMemory = eventMemCacheSize return nil } +// return no error. we are handling SIGINT +func Teardown() { + // riskily ignore all locking.... + // we are handling a termination signal after all + i, e := eventStream.Close() + if e != nil { + logging.Warn("failed to close document buffer: %s", e.Error()) + logging.Warn("will attempt to truncate event store anyways") + } + + e = os.Truncate(eventStorageFileName, i) + if e != nil { + logging.Error("FAILED TO TRUNCATE EVENT STORE!!") + logging.Error("You will likely have garbage data at the end of it") + logging.Error("Attempt manual correction!") + } +} + type EventType int8 const ( diff --git a/main.go b/main.go index ea3952c..21111e9 100644 --- a/main.go +++ b/main.go @@ -3,8 +3,6 @@ package main import ( "flag" "log" - "os" - "os/signal" "gitlab.com/whom/bingobot/internal/config" "gitlab.com/whom/bingobot/internal/discord" @@ -17,8 +15,6 @@ var ( ) func main() { - flag.Parse() - var err error err = config.Init() @@ -28,6 +24,7 @@ func main() { } logging.Init() + flag.Parse() if err := state.Init( config.Get().InMemoryEventCacheSize, @@ -35,6 +32,7 @@ func main() { ); err != nil { log.Fatalf("couldn't initialize state engine: %s", err.Error()) } + defer state.Teardown() err = startBot() @@ -50,10 +48,6 @@ func startBot() error { return err } - sigch := make(chan os.Signal, 1) - signal.Notify(sigch, os.Interrupt) - <-sigch - logging.Info("shutting down gracefully", "type", "shutdown") discord.Close()