Event replay and cleanup

This commit is contained in:
Ava Apples Affine 2025-01-08 00:47:49 +00:00 committed by piper pentagram
parent 2560410820
commit 97bf66c191
8 changed files with 447 additions and 39 deletions

View file

@ -1,6 +1,6 @@
package state
/* STATE
/* State module
* This package encapsulates various state information
* state is represented in various global singletons
* Additionally, this package offers a pub/sub interface
@ -8,15 +8,15 @@ package state
*/
import (
"fmt"
"encoding/json"
"errors"
"os"
"strconv"
"sync"
"time"
"os"
"errors"
"encoding/json"
"gitlab.com/whom/bingobot/pkg/docbuf"
"gitlab.com/whom/bingobot/internal/logging"
"gitlab.com/whom/bingobot/pkg/docbuf"
)
/* Event interface is meant to encapsulate a general interface
@ -36,14 +36,17 @@ const (
BadChallengeEvent = "failed to make Challenge Event"
BadRestorationEvent = "failed to make Restoration Event"
BadUserActiveEvent = "failed to make UserActive Event"
BadTestEvent = "failed to make Test Event"
BadEventObjMarshal = "error marshalling event"
BadEventObjUnmarshal = "failed to unmarshal event map"
BadEventUnmarshal = "failed to unmarshal event"
BadEventMissingTypeKey = "event map missing type key"
)
StartBeforeInitError = "state machine not initialized"
ReopenStoreFileError = "failed to reopen store file"
const (
EventTypeMapKey = "type"
TmpDocBufBackingFile = "bingobot_events_processing"
)
var eventMutex sync.RWMutex
@ -60,7 +63,7 @@ func Init(eventMemCacheSize int, eventStoreFileName string) error {
}
if eventStream, err = docbuf.NewDocumentBuffer(
eventMemCacheSize,
0, // temporary nil-cache buffer to replay events from
file,
); err != nil {
return errors.Join(errors.New(BadEventStreamInit), err)
@ -71,6 +74,89 @@ func Init(eventMemCacheSize int, eventStoreFileName string) error {
return nil
}
// replay events and begin state machine
func Start() error {
if eventStream == nil {
return errors.New(StartBeforeInitError)
}
tmpBackFile, err := os.CreateTemp("", TmpDocBufBackingFile)
if err != nil {
return err
}
tmpBuf, err := docbuf.NewDocumentBuffer(0, tmpBackFile)
if err != nil {
tmpBackFile.Close()
return err
}
// filter out disposable events into a reverse ordered on disk stack
var doc string
for err == nil || err.Error() != docbuf.PopFromEmptyBufferError {
doc, err = eventStream.Pop()
if err != nil && err.Error() != docbuf.PopFromEmptyBufferError {
return err
}
if doc == "" ||
(err != nil && err.Error() == docbuf.PopFromEmptyBufferError) {
break
}
ev, err := EventFromString(doc)
if err != nil {
logging.Warn("Discarding the following event for not unmarshaling: %s", doc)
continue
}
if !ev.Disposable() {
tmpBuf.Push(doc)
}
}
if err := os.Truncate(eventStorageFileName, 0); err != nil {
return errors.Join(errors.New(ReopenStoreFileError), err)
}
file, err := os.OpenFile(eventStorageFileName, os.O_CREATE|os.O_RDWR, 0644)
if err != nil {
return errors.Join(errors.New(BadEventStoreFilename), err)
}
eventStream, err = docbuf.NewDocumentBuffer(maxEventsInMemory, file)
if err != nil {
// return error here will panic without truncating or writing to the file
return err
}
// unravel tmp stack into properly ordered properly allocated buffer
for err == nil || err.Error() != docbuf.PopFromEmptyBufferError {
doc, err := tmpBuf.Pop()
if err != nil && err.Error() != docbuf.PopFromEmptyBufferError {
logging.Warn("Could not handle the following error: %s", err.Error())
continue
}
if doc == "" ||
(err != nil && err.Error() == docbuf.PopFromEmptyBufferError) {
break
}
ev, err := EventFromString(doc)
if err != nil {
logging.Warn("Could not handle the following error: %s", err.Error())
continue
}
if err := PublishEvent(ev); err != nil {
logging.Warn("Could not handle the following error: %s", err.Error())
}
}
return nil
}
// return no error. we are handling SIGINT
func Teardown() {
// riskily ignore all locking....
@ -96,6 +182,7 @@ const (
Challenge
Restoration
UserActive
Test
// ...
// leave this last
@ -113,6 +200,8 @@ func EventTypeFromString(doc string) EventType {
return Restoration
case "UserActive":
return UserActive
case "Test":
return Test
default:
// error case
return NumEventTypes
@ -125,6 +214,7 @@ func (et EventType) String() string {
"Challenge",
"Restoration",
"UserActive",
"Test",
}
if et < 0 || et >= NumEventTypes {
@ -194,6 +284,22 @@ func (et EventType) MakeEvent(data map[string]string) (Event, error) {
return nil, errors.Join(errors.New(BadUserActiveEvent), err)
}
return UserActiveEvent{*e}, nil
case Test:
disp := false
if v, ok := data[TestEventDisposeKey]; ok && v == "t" {
disp = true
}
id := -1
if v, ok := data[TestEventIDKey]; ok {
var err error
if id, err = strconv.Atoi(v); err != nil {
return nil, errors.Join(errors.New(BadTestEvent), err)
}
}
return TestEvent{
Dispose: disp,
ID: id,
}, nil
default:
return nil, errors.New(BadEventTypeError)
}
@ -232,6 +338,9 @@ type Event interface {
// validates state of internal metadata per EventType
Validate() error
// returns true if event can be discarded
Disposable() bool
}
func EventToString(e Event) (string, error) {
@ -283,7 +392,7 @@ func PublishEvent(e Event) error {
blocking := false
for _, c := range eventSubscriptionCache[e.Type()] {
if float32(len(c)) > (float32(maxEventsInMemory) * 0.25) {
if float32(len(c)) > (float32(maxEventsInMemory) * 0.75) {
if len(c) == maxEventsInMemory {
logging.Warn(
"PublishEvent() blocking -- event channel full",
@ -373,8 +482,6 @@ func GetMatchingEvents(
}
filter := func(e Event) bool {
ev, er := EventToString(e)
fmt.Printf("Checking: %s (%e)\n", ev, er)
if e.Type() != t {
return true
}
@ -385,7 +492,6 @@ func GetMatchingEvents(
}
}
fmt.Println("Found Match")
matches = append(matches, e)
return true
}