package state /* State module * This package encapsulates various state information * state is represented in various global singletons * Additionally, this package offers a pub/sub interface * for various aspects of state */ import ( "encoding/json" "errors" "os" "strconv" "sync" "time" "gitlab.com/whom/bingobot/internal/logging" "gitlab.com/whom/bingobot/pkg/docbuf" ) /* Event interface is meant to encapsulate a general interface * extendable for any different action the bot takes or that a * user takes. */ const ( BadEventTypeError = "bad event type" EventValidationFailedError = "event failed validation: " BadEventNoUID = "event data has no UID field" BadEventNoCreated = "event data has no created field" BadEventStoreFilename = "failed to open event store file" BadEventStreamInit = "failed to initialize event stream" BadEventCreate = "failed to create event" BadUserEventCreatedParse = "failed to parse created time" BadChallengeEvent = "failed to make Challenge Event" BadRestorationEvent = "failed to make Restoration Event" BadUserActiveEvent = "failed to make UserActive Event" BadTestEvent = "failed to make Test Event" BadEventObjMarshal = "error marshalling event" BadEventObjUnmarshal = "failed to unmarshal event map" BadEventUnmarshal = "failed to unmarshal event" BadEventMissingTypeKey = "event map missing type key" StartBeforeInitError = "state machine not initialized" ReopenStoreFileError = "failed to reopen store file" EventTypeMapKey = "type" TmpDocBufBackingFile = "bingobot_events_processing" ) var eventMutex sync.RWMutex var eventSubscriptionCache = [NumEventTypes][]chan Event{} var eventStorageFileName string var eventStream docbuf.DocumentBuffer var maxEventsInMemory int // expect filename validations in config package func Init(eventMemCacheSize int, eventStoreFileName string) error { file, err := os.OpenFile(eventStoreFileName, os.O_CREATE|os.O_RDWR, 0644) if err != nil { return errors.Join(errors.New(BadEventStoreFilename), err) } if eventStream, err = docbuf.NewDocumentBuffer( 0, // temporary nil-cache buffer to replay events from file, ); err != nil { return errors.Join(errors.New(BadEventStreamInit), err) } eventStorageFileName = eventStoreFileName maxEventsInMemory = eventMemCacheSize return nil } // replay events and begin state machine func Start() error { if eventStream == nil { return errors.New(StartBeforeInitError) } tmpBackFile, err := os.CreateTemp("", TmpDocBufBackingFile) if err != nil { return err } tmpBuf, err := docbuf.NewDocumentBuffer(0, tmpBackFile) if err != nil { tmpBackFile.Close() return err } // filter out disposable events into a reverse ordered on disk stack var doc string for err == nil || err.Error() != docbuf.PopFromEmptyBufferError { doc, err = eventStream.Pop() if err != nil && err.Error() != docbuf.PopFromEmptyBufferError { return err } if doc == "" || (err != nil && err.Error() == docbuf.PopFromEmptyBufferError) { break } ev, err := EventFromString(doc) if err != nil { logging.Warn("Discarding the following event for not unmarshaling: %s", doc) continue } if !ev.Disposable() { tmpBuf.Push(doc) } } if err := os.Truncate(eventStorageFileName, 0); err != nil { return errors.Join(errors.New(ReopenStoreFileError), err) } file, err := os.OpenFile(eventStorageFileName, os.O_CREATE|os.O_RDWR, 0644) if err != nil { return errors.Join(errors.New(BadEventStoreFilename), err) } eventStream, err = docbuf.NewDocumentBuffer(maxEventsInMemory, file) if err != nil { // return error here will panic without truncating or writing to the file return err } // unravel tmp stack into properly ordered properly allocated buffer for err == nil || err.Error() != docbuf.PopFromEmptyBufferError { doc, err := tmpBuf.Pop() if err != nil && err.Error() != docbuf.PopFromEmptyBufferError { logging.Warn("Could not handle the following error: %s", err.Error()) continue } if doc == "" || (err != nil && err.Error() == docbuf.PopFromEmptyBufferError) { break } ev, err := EventFromString(doc) if err != nil { logging.Warn("Could not handle the following error: %s", err.Error()) continue } if err := PublishEvent(ev); err != nil { logging.Warn("Could not handle the following error: %s", err.Error()) } } return nil } // return no error. we are handling SIGINT func Teardown() { // riskily ignore all locking.... // we are handling a termination signal after all i, e := eventStream.Close() if e != nil { logging.Warn("failed to close document buffer: %s", e.Error()) logging.Warn("will attempt to truncate event store anyways") } e = os.Truncate(eventStorageFileName, i) if e != nil { logging.Error("FAILED TO TRUNCATE EVENT STORE!!") logging.Error("You will likely have garbage data at the end of it") logging.Error("Attempt manual correction!") } } type EventType int8 const ( Vote EventType = iota Challenge Restoration UserActive Test ConfessionsChannelLink // ... // leave this last NumEventTypes ) // either returns a valid event type or NumEventTypes func EventTypeFromString(doc string) EventType { switch doc { case "Vote": return Vote case "Challenge": return Challenge case "Restoration": return Restoration case "UserActive": return UserActive case "Test": return Test case "ConfessionsChannelLink": return ConfessionsChannelLink default: // error case return NumEventTypes } } func (et EventType) String() string { events := []string{ "Vote", "Challenge", "Restoration", "UserActive", "Test", "ConfessionsChannelLink", } if et < 0 || et >= NumEventTypes { return "" } return events[et] } func (et EventType) Validate() error { if et < 0 || et >= NumEventTypes { return errors.New(BadEventTypeError) } return nil } /* Allocates a specific event of type T from event type instance. * output event not guaranteed to be valid. * Call Validate() yourself. */ func (et EventType) MakeEvent(data map[string]string) (Event, error) { if err := et.Validate(); err != nil { return nil, errors.Join(errors.New(BadEventCreate), err) } MakeUserEvent := func(data map[string]string) (*UserEvent, error) { user, hasUser := data[UserEventUserKey] if !hasUser { return nil, errors.New(BadEventNoUID) } created, hasCreated := data[UserEventCreatedKey] if !hasCreated { return nil, errors.New(BadEventNoCreated) } createdTime, err := time.Parse(time.RFC3339, created) if err != nil { return nil, errors.Join(errors.New(BadUserEventCreatedParse), err) } return &UserEvent{ uid: user, created: createdTime, }, nil } switch et { case Vote: return VoteEvent(data), nil case Challenge: e, err := MakeUserEvent(data) if err != nil { return nil, errors.Join(errors.New(BadChallengeEvent), err) } return ChallengeEvent{*e}, nil case Restoration: e, err := MakeUserEvent(data) if err != nil { return nil, errors.Join(errors.New(BadRestorationEvent), err) } return RestorationEvent{*e}, nil case UserActive: e, err := MakeUserEvent(data) if err != nil { return nil, errors.Join(errors.New(BadUserActiveEvent), err) } return UserActiveEvent{*e}, nil case Test: disp := false if v, ok := data[TestEventDisposeKey]; ok && v == "t" { disp = true } id := -1 if v, ok := data[TestEventIDKey]; ok { var err error if id, err = strconv.Atoi(v); err != nil { return nil, errors.Join(errors.New(BadTestEvent), err) } } return TestEvent{ Dispose: disp, ID: id, }, nil case ConfessionsChannelLink: gid, ok := data[ConfessionsLinkEventGuildKey] if !ok { return nil, errors.New(BadConfessionsLinkEventError) } cid, ok := data[ConfessionsLinkEventChannelKey] if !ok { return nil, errors.New(BadConfessionsLinkEventError) } ti, ok := data[ConfessionsLinkEventCreatedKey] if !ok { return nil, errors.New(BadConfessionsLinkEventError) } t, err := time.Parse(time.RFC3339, ti) if err != nil { return nil, errors.Join(errors.New(BadConfessionsLinkEventError), err) } return ConfessionsChannelLinkEvent{ GuildID: gid, ChannelID: cid, Created: t, }, nil default: return nil, errors.New(BadEventTypeError) } } /* adds a new subscriber channel to the event subscription cache * and returns the channel that it will publish notifications on */ func (et EventType) Subscribe() (chan Event, error) { if err := et.Validate(); err != nil { return nil, err } ch := make(chan Event, maxEventsInMemory) eventMutex.Lock() defer eventMutex.Unlock() eventSubscriptionCache[et] = append( eventSubscriptionCache[et], ch, ) return ch, nil } type Event interface { // gets EventType associated with event Type() EventType // gets time event created Time() time.Time // gets internal metadata associated with event // may be read only depending on event implementation Data() map[string]string // validates state of internal metadata per EventType Validate() error // returns true if event can be discarded Disposable() bool } func EventToString(e Event) (string, error) { m := e.Data() m[EventTypeMapKey] = e.Type().String() buf, err := json.Marshal(m) if err != nil { return "", errors.Join(errors.New(BadEventObjMarshal), err) } return string(buf), nil } func EventFromString(doc string) (Event, error) { var obj map[string]string if err := json.Unmarshal([]byte(doc), &obj); err != nil { return nil, errors.Join(errors.New(BadEventObjUnmarshal), err) } et, ok := obj[EventTypeMapKey] if !ok { return nil, errors.New(BadEventMissingTypeKey) } t := EventTypeFromString(et) ev, err := t.MakeEvent(obj) if err != nil { return nil, errors.Join(errors.New(BadEventCreate), err) } return ev, ev.Validate() } func PublishEvent(e Event) error { if err := ValidateEvent(e); err != nil { return errors.Join(errors.New(EventValidationFailedError), err) } doc, err := EventToString(e) if err != nil { return errors.New(BadEventUnmarshal) } eventMutex.Lock() eventStream.Push(doc) eventMutex.Unlock() eventMutex.RLock() defer eventMutex.RUnlock() blocking := false for _, c := range eventSubscriptionCache[e.Type()] { if float32(len(c)) > (float32(maxEventsInMemory) * 0.75) { if len(c) == maxEventsInMemory { logging.Warn( "PublishEvent() blocking -- event channel full", // log the event time to provide blockage timing information // in the logs "eventTime", e.Time(), ) blocking = true } } c <- e if blocking { logging.Info("PublishEvent() no longer blocking") blocking = false } } return nil } func ValidateEvent(e Event) error { if err := e.Type().Validate(); err != nil { return err } if err := e.Validate(); err != nil { return err } return nil } /* Takes a filter and applies it to each individual event * The filter is a function/closure that accepts one event * and returns true if ApplyToEvents should continue iterating * * If the filter returns false then ApplyToEvents halts and returns. * * (this calls docbuf.Apply() under the hood) */ func ApplyToEvents( f func(Event)bool, ) error { // local variables enclosed by filter function filterWrap var err error var ev Event // wrap f() to be compatible with docbuf.Apply() filterWrap := func(doc string) bool { ev, err = EventFromString(doc) if err != nil { return false } return f(ev) } eventMutex.RLock() defer eventMutex.RUnlock() // cant reuse err or return val directly as err is set // by filter function if an error happens unmarshalling // an event. In this case apply might return nil err2 := eventStream.Apply(filterWrap) if err2 != nil { return err2 } if err != nil { return err } return nil } /* gets all events of type T in cache * that also share all field values in * map 'filters' */ func GetMatchingEvents( t EventType, filters map[string]string, ) ([]Event, error) { matches := []Event{} if err := t.Validate(); err != nil { return matches, err } filter := func(e Event) bool { if e.Type() != t { return true } for k, v := range filters { val, found := e.Data()[k] if !found || val != v { return true } } matches = append(matches, e) return true } err := ApplyToEvents(filter) return matches, err }