From 2515d396a0e4e72a61bd1620147288976debc448 Mon Sep 17 00:00:00 2001 From: Ava Affine Date: Thu, 5 Dec 2024 18:11:41 -0800 Subject: [PATCH] Event replay and cleanup This commit introduces the ability to reconfigure modules/event subscribers by replaying previously stored events at startup. This way modules/subscribers can always come up with the latest activity and data that was present at close at the last run of the program. This process also includes the disposal of unneeded events to minimize disk use over time. The changes include the following: 1. Event interface is extended with the Disposable() function unimplemented stubs for existing interface implementations 2. state.Init() is split into Init() and Start() Init() now initialized the file used by eventStream, and puts in place a 0 size memory cache as a temporary measure on top of the file. Start() reads Pop()s documents one by one from the temp nil-cache eventStream, it attempts to dispose of each event and if the event is not disposable it is pushed onto a second temporary no-memory-cache buffer which is then reverse ordered. The underlying file is truncated and reopened. Finally, the real eventStream is allocated, with in memory cache. All events in the second buffer are published (sent to subscribers as well as added to the new eventStream). 3. Updates to main() to support Init() vs Start() Signed-off-by: Ava Affine --- internal/state/events.go | 13 +++++++ internal/state/state.go | 84 +++++++++++++++++++++++++++++++++++++++- main.go | 7 +++- 3 files changed, 102 insertions(+), 2 deletions(-) diff --git a/internal/state/events.go b/internal/state/events.go index a24b91f..e6ac421 100644 --- a/internal/state/events.go +++ b/internal/state/events.go @@ -102,6 +102,12 @@ func (ve VoteEvent) Validate() error { return nil } +func (ve VoteEvent) Disposable() bool { + // will be implemented with democracy module + logging.Warn("unimplemented: VoteEvent::Disposable") + return false +} + type UserEvent struct { uid string created time.Time @@ -129,6 +135,13 @@ func (ue UserEvent) Validate() error { return nil } +func (ue UserEvent) Disposable() bool { + // when I make a module for challenges, restorations, and UserActives + // then I should implement this + logging.Warn("unimplemented: UserEvent::Disposable") + return false +} + type ChallengeEvent struct { UserEvent } diff --git a/internal/state/state.go b/internal/state/state.go index 4986991..deb0094 100644 --- a/internal/state/state.go +++ b/internal/state/state.go @@ -40,12 +40,18 @@ const ( BadEventObjUnmarshal = "failed to unmarshal event map" BadEventUnmarshal = "failed to unmarshal event" BadEventMissingTypeKey = "event map missing type key" + StartBeforeInitError = "state machine not initialized" + ReopenStoreFileError = "failed to reopen store file" ) const ( EventTypeMapKey = "type" ) +const ( + TmpDocBufBackingFile = "bingobot_events_processing" +) + var eventMutex sync.RWMutex var eventSubscriptionCache = [NumEventTypes][]chan Event{} var eventStorageFileName string @@ -60,7 +66,7 @@ func Init(eventMemCacheSize int, eventStoreFileName string) error { } if eventStream, err = docbuf.NewDocumentBuffer( - eventMemCacheSize, + 0, // temporary nil-cache buffer to replay events from file, ); err != nil { return errors.Join(errors.New(BadEventStreamInit), err) @@ -71,6 +77,79 @@ func Init(eventMemCacheSize int, eventStoreFileName string) error { return nil } +// replay events and begin state machine +func Start() error { + if eventStream == nil { + return errors.New(StartBeforeInitError) + } + + tmpBackFile, err := os.CreateTemp("", TmpDocBufBackingFile) + if err != nil { + return err + } + + tmpBuf, err := docbuf.NewDocumentBuffer(0, tmpBackFile) + if err != nil { + tmpBackFile.Close() + return err + } + + // filter out disposable events into a reverse ordered on disk stack + var doc string + for err != nil && err.Error() != docbuf.PopFromEmptyBufferError { + doc, err = eventStream.Pop() + if err != nil && err.Error() != docbuf.PopFromEmptyBufferError { + return err + } + + ev, err := EventFromString(doc) + if err != nil { + logging.Warn("Discarding the following event for not unmarshaling: %s", doc) + continue + } + + if !ev.Disposable() { + tmpBuf.Push(doc) + } + } + + if err := os.Truncate(eventStorageFileName, 0); err != nil { + return errors.Join(errors.New(ReopenStoreFileError), err) + } + + file, err := os.OpenFile(eventStorageFileName, os.O_CREATE|os.O_RDWR, 0644) + if err != nil { + return errors.Join(errors.New(BadEventStoreFilename), err) + } + + eventStream, err := docbuf.NewDocumentBuffer(maxEventsInMemory, file) + if err != nil { + // return error here will panic without truncating or writing to the file + return err + } + + // unravel tmp stack into properly ordered properly allocated buffer + for err != nil && err.Error() != docbuf.PopFromEmptyBufferError { + doc, err := eventStream.Pop() + if err != nil && err.Error() != docbuf.PopFromEmptyBufferError { + logging.Warn("Could not handle the following error: %s", err.Error()) + continue + } + + ev, err := EventFromString(doc) + if err != nil { + logging.Warn("Could not handle the following error: %s", err.Error()) + continue + } + + if err := PublishEvent(ev); err != nil { + logging.Warn("Could not handle the following error: %s", err.Error()) + } + } + + return nil +} + // return no error. we are handling SIGINT func Teardown() { // riskily ignore all locking.... @@ -232,6 +311,9 @@ type Event interface { // validates state of internal metadata per EventType Validate() error + + // returns true if event can be discarded + Disposable() bool } func EventToString(e Event) (string, error) { diff --git a/main.go b/main.go index 21111e9..6d2e17f 100644 --- a/main.go +++ b/main.go @@ -32,10 +32,15 @@ func main() { ); err != nil { log.Fatalf("couldn't initialize state engine: %s", err.Error()) } - defer state.Teardown() + // TODO: start modules HERE and not elsewhere err = startBot() + if err := state.Start(); err != nil { + log.Fatal("failed to start state machine: %s", err.Error()) + } + defer state.Teardown() + if err != nil { log.Fatal(err) }