package state /* STATE * This package encapsulates various state information * state is represented in various global singletons * Additionally, this package offers a pub/sub interface * for various aspects of state */ import ( "fmt" "slices" "sync" "time" "gitlab.com/whom/bingobot/internal/discord" "gitlab.com/whom/bingobot/internal/logging" ) /* Event interface is meant to encapsulate a general interface * extendable for any different action the bot takes or that a * user takes. */ const ( BadEventTypeError = "bad event type" EventValidationFailedError = "event failed validation: " ) var eventMutex sync.RWMutex var eventSubscriptionCache = [NumEventTypes][]chan Event{} var eventCache = []Event{} // TODO: Configurable var eventChannelBufferSize = 256 // TODO: pruned events go to DISK // ASSUMES eventCache is ordered by age (oldest first) func pruneEventCache() { eventMutex.Lock() defer eventMutex.Unlock() oldCacheInvalid := false newCache := []Event{} for _, obj := range eventCache { if time.Since(obj.Time()).Hours() > 24*10 { oldCacheInvalid = true } else if oldCacheInvalid { newCache = append(newCache, obj) } } eventCache = newCache } type EventType int8 const ( Vote EventType = iota Challenge Restoration UserActive // ... // leave this last NumEventTypes ) func (et EventType) Validate() error { if et < 0 || et >= NumEventTypes { return stringErrorType(BadEventTypeError) } return nil } type Event interface { // gets EventType associated with event Type() EventType // gets time event created Time() time.Time // gets internal metadata associated with event // may be read only depending on event implementation Data() map[string]string // validates state of internal metadata per EventType Validate() error } // TODO: Something better than this type stringErrorType string func (s stringErrorType) Error() string { return string(s) } /* adds a new subscriber channel to the event subscription cache * and returns the channel that it will publish notifications on * * note: channel is prefilled with at most eventChannelBufferSize * historical events. Truncated if history exceeds event channel * buffer size. */ func (et EventType) SubscribeWithHistory() (chan Event, error) { if err := et.Validate(); err != nil { return nil, err } ch := make(chan Event, eventChannelBufferSize) eventMutex.Lock() eventSubscriptionCache[et] = append( eventSubscriptionCache[et], ch, ) eventMutex.Unlock() eventMutex.RLock() defer eventMutex.RUnlock() numEventsAdded := 0 for _, ev := range slices.Backward(eventCache) { if numEventsAdded >= eventChannelBufferSize { break } if ev.Type() == et { ch <- ev numEventsAdded += 1 } } return ch, nil } /* adds a new subscriber channel to the event subscription cache * and returns the channel that it will publish notifications on */ func (et EventType) Subscribe() (chan Event, error) { if err := et.Validate(); err != nil { return nil, err } ch := make(chan Event, eventChannelBufferSize) eventMutex.Lock() defer eventMutex.Unlock() eventSubscriptionCache[et] = append( eventSubscriptionCache[et], ch, ) return ch, nil } func PublishEvent(e Event) error { if err := ValidateEvent(e); err != nil { return stringErrorType( EventValidationFailedError + err.Error(), ) } eventMutex.Lock() eventCache = append(eventCache, e) eventMutex.Unlock() eventMutex.RLock() defer eventMutex.RUnlock() blocking := false for _, c := range eventSubscriptionCache[e.Type()] { if float32(len(c)) > (float32(eventChannelBufferSize) * 0.25) { if len(c) == eventChannelBufferSize { logging.Warn( "PublishEvent() blocking -- event channel full", // log the event time to provide blockage timing information // in the logs "eventTime", e.Time(), ) blocking = true } } c <- e if blocking { logging.Info("PublishEvent() no longer blocking") blocking = false } } return nil } func ValidateEvent(e Event) error { if err := e.Type().Validate(); err != nil { return err } if err := e.Validate(); err != nil { return err } return nil } /* gets all events of type T in cache * that also share all field values in * map 'filters' */ func GetMatchingEvents( t EventType, filters map[string]string, ) ([]Event, error) { matches := []Event{} if err := t.Validate(); err != nil { return matches, err } eventMutex.RLock() defer eventMutex.RUnlock() Events: for _, e := range eventCache { if e.Type() != t { continue } for k, v := range filters { val, found := e.Data()[k] if !found || val != v { continue Events } } matches = append(matches, e) } return matches, nil } type VoteEvent map[string]string const ( VoteMissingKeyError = "vote data not found: " VoteCreatedKey = "created" VoteRequesterKey = "requester" VoteActionKey = "action" VoteResultKey = "result" VoteResultPass = "pass" VoteResultFail = "fail" VoteResultTie = "tie" VoteResultTimeout = "timeout" VoteBadResultError = "vote has invalid result: " VoteNotFinishedError = "vote has result but isnt finished" VoteMissingResultError = "vote finished but missing result" VoteStatusKey = "status" VoteStatusInProgress = "in_progress" VoteStatusFinalized = "finalized" VoteStatusTimeout = "timed_out" VoteBadStatusError = "vote has invalid status: " VeryOldVote = "1990-01-01T00:00:00Z" ) func (ve VoteEvent) Type() EventType { return Vote } func (ve VoteEvent) Time() time.Time { t, e := time.Parse(time.RFC3339, ve[VoteCreatedKey]) if e != nil { // we have a corrupted event // return old time so that this event gets // pruned from cache logging.Warn( "pruning corrupted vote event", "event", fmt.Sprintf("%+v", ve), ) tooOld, _ := time.Parse( time.RFC3339, VeryOldVote, ) return tooOld } return t } func (ve VoteEvent) Data() map[string]string { return map[string]string(ve) } func (ve VoteEvent) Validate() error { // make sure action, requester, and created are set for _, key := range []string{ VoteActionKey, VoteRequesterKey, VoteCreatedKey, VoteStatusKey, } { if _, found := ve[key]; !found { return stringErrorType( VoteMissingKeyError + key) } } status := ve[VoteStatusKey] if status != VoteStatusTimeout && status != VoteStatusInProgress && status != VoteStatusFinalized { return stringErrorType(VoteBadStatusError + status) } result, hasResult := ve[VoteResultKey] if hasResult && status == VoteStatusInProgress { return stringErrorType(VoteNotFinishedError) } if status != VoteStatusInProgress && !hasResult { return stringErrorType(VoteMissingResultError) } if hasResult && (result != VoteResultPass && result != VoteResultFail && result != VoteResultTie && result != VoteResultTimeout) { return stringErrorType(VoteBadResultError + result) } return nil } type UserEvent struct { uid string created time.Time } const ( UserEventUserKey = "user" UserEventCreatedKey = "created" UserEventBadUserError = "event has bad user" ) func (ue UserEvent) Time() time.Time { return ue.created } func (ue UserEvent) Data() map[string]string { return map[string]string{ UserEventUserKey: ue.uid, UserEventCreatedKey: ue.created.Local().String(), } } func (ue UserEvent) Validate() error { if discord.Connected() { _, err := discord.User(ue.uid) return err } else { // I would love to know how to actually fail here // and still have unit testable code. logging.Error( "can't validate UserEvent: nil discord session", "event", fmt.Sprintf("%+v", ue), ) return nil } } type ChallengeEvent struct { UserEvent } func (ce ChallengeEvent) Type() EventType { return Challenge } func NewChallengeEvent(user string) ChallengeEvent { return ChallengeEvent{UserEvent{ uid: user, created: time.Now(), }} } type RestorationEvent struct { UserEvent } func (re RestorationEvent) Type() EventType { return Restoration } func NewRestorationEvent(user string) RestorationEvent { return RestorationEvent{UserEvent{ uid: user, created: time.Now(), }} } type UserActiveEvent struct { UserEvent } func (ua UserActiveEvent) Type() EventType { return UserActive } func NewUserActiveEvent(user string) UserActiveEvent { return UserActiveEvent{UserEvent{ uid: user, created: time.Now(), }} }