diff --git a/internal/state/events.go b/internal/state/events.go index a24b91f..e6ac421 100644 --- a/internal/state/events.go +++ b/internal/state/events.go @@ -102,6 +102,12 @@ func (ve VoteEvent) Validate() error { return nil } +func (ve VoteEvent) Disposable() bool { + // will be implemented with democracy module + logging.Warn("unimplemented: VoteEvent::Disposable") + return false +} + type UserEvent struct { uid string created time.Time @@ -129,6 +135,13 @@ func (ue UserEvent) Validate() error { return nil } +func (ue UserEvent) Disposable() bool { + // when I make a module for challenges, restorations, and UserActives + // then I should implement this + logging.Warn("unimplemented: UserEvent::Disposable") + return false +} + type ChallengeEvent struct { UserEvent } diff --git a/internal/state/state.go b/internal/state/state.go index 4986991..deb0094 100644 --- a/internal/state/state.go +++ b/internal/state/state.go @@ -40,12 +40,18 @@ const ( BadEventObjUnmarshal = "failed to unmarshal event map" BadEventUnmarshal = "failed to unmarshal event" BadEventMissingTypeKey = "event map missing type key" + StartBeforeInitError = "state machine not initialized" + ReopenStoreFileError = "failed to reopen store file" ) const ( EventTypeMapKey = "type" ) +const ( + TmpDocBufBackingFile = "bingobot_events_processing" +) + var eventMutex sync.RWMutex var eventSubscriptionCache = [NumEventTypes][]chan Event{} var eventStorageFileName string @@ -60,7 +66,7 @@ func Init(eventMemCacheSize int, eventStoreFileName string) error { } if eventStream, err = docbuf.NewDocumentBuffer( - eventMemCacheSize, + 0, // temporary nil-cache buffer to replay events from file, ); err != nil { return errors.Join(errors.New(BadEventStreamInit), err) @@ -71,6 +77,79 @@ func Init(eventMemCacheSize int, eventStoreFileName string) error { return nil } +// replay events and begin state machine +func Start() error { + if eventStream == nil { + return errors.New(StartBeforeInitError) + } + + tmpBackFile, err := os.CreateTemp("", TmpDocBufBackingFile) + if err != nil { + return err + } + + tmpBuf, err := docbuf.NewDocumentBuffer(0, tmpBackFile) + if err != nil { + tmpBackFile.Close() + return err + } + + // filter out disposable events into a reverse ordered on disk stack + var doc string + for err != nil && err.Error() != docbuf.PopFromEmptyBufferError { + doc, err = eventStream.Pop() + if err != nil && err.Error() != docbuf.PopFromEmptyBufferError { + return err + } + + ev, err := EventFromString(doc) + if err != nil { + logging.Warn("Discarding the following event for not unmarshaling: %s", doc) + continue + } + + if !ev.Disposable() { + tmpBuf.Push(doc) + } + } + + if err := os.Truncate(eventStorageFileName, 0); err != nil { + return errors.Join(errors.New(ReopenStoreFileError), err) + } + + file, err := os.OpenFile(eventStorageFileName, os.O_CREATE|os.O_RDWR, 0644) + if err != nil { + return errors.Join(errors.New(BadEventStoreFilename), err) + } + + eventStream, err := docbuf.NewDocumentBuffer(maxEventsInMemory, file) + if err != nil { + // return error here will panic without truncating or writing to the file + return err + } + + // unravel tmp stack into properly ordered properly allocated buffer + for err != nil && err.Error() != docbuf.PopFromEmptyBufferError { + doc, err := eventStream.Pop() + if err != nil && err.Error() != docbuf.PopFromEmptyBufferError { + logging.Warn("Could not handle the following error: %s", err.Error()) + continue + } + + ev, err := EventFromString(doc) + if err != nil { + logging.Warn("Could not handle the following error: %s", err.Error()) + continue + } + + if err := PublishEvent(ev); err != nil { + logging.Warn("Could not handle the following error: %s", err.Error()) + } + } + + return nil +} + // return no error. we are handling SIGINT func Teardown() { // riskily ignore all locking.... @@ -232,6 +311,9 @@ type Event interface { // validates state of internal metadata per EventType Validate() error + + // returns true if event can be discarded + Disposable() bool } func EventToString(e Event) (string, error) { diff --git a/main.go b/main.go index 21111e9..6d2e17f 100644 --- a/main.go +++ b/main.go @@ -32,10 +32,15 @@ func main() { ); err != nil { log.Fatalf("couldn't initialize state engine: %s", err.Error()) } - defer state.Teardown() + // TODO: start modules HERE and not elsewhere err = startBot() + if err := state.Start(); err != nil { + log.Fatal("failed to start state machine: %s", err.Error()) + } + defer state.Teardown() + if err != nil { log.Fatal(err) }