Internal state tracking: This commit introduces the concept of internal events. The thought here is that bingobot will track its current state not by variables set from incoming discord event data but rather from a cache of internal events that represent digested and simplified observations made and actions taken by internal code. Currently the events that are defined include the following: - UserActive Events: These events are intended to be generated when a user is noticed to have been sitting in a voice chat for more than 10 minutes, however they can be generated in response to anything that signifies that a user has been active - Challenge Events: These events signify that a user has been challenged. They are intended to act as a primitive form of restraint from using bot features. It is intended that these are made only as a response to a successful vote. - Restore Events: These events signify that a user has been restored to non-challenged status. It is intended that this only happens in response to a successful vote. - Vote Events: These events serve as the primary means of consensus building, and represent the state of a single active ongoing or finished vote. Currently the events are cached for 10 days and then discarded, but future work will add an on disk document storage for old events should any functionality desire it. This internal event cache is intended to be the authoritative source of truth for information about user permissions, vote results, and any additional future functionality that relies on shared state. It is accessed through a GetMatchingEvents function that accepts both an event type and a map of optional metadata filters. See state_test.go for examples of this functionality. In addition to the cache search function there is also a Publisher/Subscriber setting elaborated on below. Pub/Sub system: This commit also introduces a Pub/Sub system for other internal packages and feature modules to recieve live events and continually process updates from our code as opposed to just handling discord events. - PublishEvent() In this specific implementation anyone may become a Publisher by calling PublishEvent() on any structure that implements the Event interface. This provides for modules being able to define their own event types (so long as they also extend the central enum of EventType in state.go). - SubscribeWithHistory() This returns a channel to the caller that returns copies of any event matching the input event type. This channel will be prepopulated with up to 256 most recent events already in the cache. Future work will make this number configurable. Each event published will be sent down all relevant channels at publish time. - Subscribe() This returns a channel to the caller, as SubscribeWithHistory also does, with the sole caveat that there are no historical "old" cached events prepopulating the channel. Finally, this commit also contains tests Signed-off-by: Ava Affine <ava@sunnypup.io>
396 lines
7.9 KiB
Go
396 lines
7.9 KiB
Go
package state
|
|
|
|
/* STATE
|
|
* This package encapsulates various state information
|
|
* state is represented in various global singletons
|
|
* Additionally, this package offers a pub/sub interface
|
|
* for various aspects of state
|
|
*/
|
|
|
|
import (
|
|
"slices"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/bwmarrin/discordgo"
|
|
)
|
|
|
|
/* Event interface is meant to encapsulate a general interface
|
|
* extendable for any different action the bot takes or that a
|
|
* user takes.
|
|
*/
|
|
|
|
const (
|
|
BadEventTypeError = "bad event type"
|
|
EventValidationFailedError = "event failed validation: "
|
|
)
|
|
|
|
var DiscordSession *discordgo.Session
|
|
var eventMutex sync.RWMutex
|
|
var eventSubscriptionCache = [NumEventTypes][]chan Event{}
|
|
var eventCache = []Event{}
|
|
|
|
// TODO: Configurable
|
|
var eventChannelBufferSize = 256
|
|
|
|
// TODO: pruned events go to DISK
|
|
// ASSUMES eventCache is ordered by age (oldest first)
|
|
func pruneEventCache() {
|
|
eventMutex.Lock()
|
|
defer eventMutex.Unlock()
|
|
|
|
oldCacheInvalid := false
|
|
newCache := []Event{}
|
|
for _, obj := range eventCache {
|
|
if time.Since(obj.Time()).Hours() > 24*10 {
|
|
oldCacheInvalid = true
|
|
} else if oldCacheInvalid {
|
|
newCache = append(newCache, obj)
|
|
}
|
|
}
|
|
|
|
eventCache = newCache
|
|
}
|
|
|
|
type EventType int8
|
|
|
|
const (
|
|
Vote EventType = iota
|
|
Challenge
|
|
Restoration
|
|
UserActive
|
|
// ...
|
|
|
|
// leave this last
|
|
NumEventTypes
|
|
)
|
|
|
|
func (et EventType) Validate() error {
|
|
if et < 0 || et >= NumEventTypes {
|
|
return stringErrorType(BadEventTypeError)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
type Event interface {
|
|
// gets EventType associated with event
|
|
Type() EventType
|
|
|
|
// gets time event created
|
|
Time() time.Time
|
|
|
|
// gets internal metadata associated with event
|
|
// may be read only depending on event implementation
|
|
Data() map[string]string
|
|
|
|
// validates state of internal metadata per EventType
|
|
Validate() error
|
|
}
|
|
|
|
// TODO: Something better than this
|
|
type stringErrorType string
|
|
|
|
func (s stringErrorType) Error() string {
|
|
return string(s)
|
|
}
|
|
|
|
/* adds a new subscriber channel to the event subscription cache
|
|
* and returns the channel that it will publish notifications on
|
|
*
|
|
* note: channel is prefilled with at most eventChannelBufferSize
|
|
* historical events. Truncated if history exceeds event channel
|
|
* buffer size.
|
|
*/
|
|
func (et EventType) SubscribeWithHistory() (chan Event, error) {
|
|
if err := et.Validate(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
ch := make(chan Event, eventChannelBufferSize)
|
|
|
|
eventMutex.Lock()
|
|
eventSubscriptionCache[et] = append(
|
|
eventSubscriptionCache[et],
|
|
ch,
|
|
)
|
|
eventMutex.Unlock()
|
|
|
|
eventMutex.RLock()
|
|
defer eventMutex.RUnlock()
|
|
numEventsAdded := 0
|
|
for _, ev := range slices.Backward(eventCache) {
|
|
if numEventsAdded >= eventChannelBufferSize {
|
|
break
|
|
}
|
|
|
|
if ev.Type() == et {
|
|
ch <- ev
|
|
numEventsAdded += 1
|
|
}
|
|
}
|
|
|
|
return ch, nil
|
|
}
|
|
|
|
/* adds a new subscriber channel to the event subscription cache
|
|
* and returns the channel that it will publish notifications on
|
|
*/
|
|
func (et EventType) Subscribe() (chan Event, error) {
|
|
if err := et.Validate(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
ch := make(chan Event, eventChannelBufferSize)
|
|
|
|
eventMutex.Lock()
|
|
defer eventMutex.Unlock()
|
|
eventSubscriptionCache[et] = append(
|
|
eventSubscriptionCache[et],
|
|
ch,
|
|
)
|
|
|
|
return ch, nil
|
|
}
|
|
|
|
func PublishEvent(e Event) error {
|
|
if err := ValidateEvent(e); err != nil {
|
|
return stringErrorType(
|
|
EventValidationFailedError + err.Error(),
|
|
)
|
|
}
|
|
|
|
eventMutex.Lock()
|
|
eventCache = append(eventCache, e)
|
|
eventMutex.Unlock()
|
|
eventMutex.RLock()
|
|
defer eventMutex.RUnlock()
|
|
for _, c := range eventSubscriptionCache[e.Type()] {
|
|
if float32(len(c)) > (float32(eventChannelBufferSize) * 0.25) {
|
|
if len(c) == eventChannelBufferSize {
|
|
// TODO: log that this publish is blocking
|
|
// on a full channel
|
|
}
|
|
}
|
|
c <- e
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func ValidateEvent(e Event) error {
|
|
if err := e.Type().Validate(); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := e.Validate(); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
/* gets all events of type T in cache
|
|
* that also share all field values in
|
|
* map 'filters'
|
|
*/
|
|
func GetMatchingEvents(
|
|
t EventType,
|
|
filters map[string]string,
|
|
) ([]Event, error) {
|
|
matches := []Event{}
|
|
|
|
if err := t.Validate(); err != nil {
|
|
return matches, err
|
|
}
|
|
|
|
eventMutex.RLock()
|
|
defer eventMutex.RUnlock()
|
|
|
|
Events:
|
|
for _, e := range eventCache {
|
|
if e.Type() != t {
|
|
continue
|
|
}
|
|
for k, v := range filters {
|
|
val, found := e.Data()[k]
|
|
if !found || val != v {
|
|
continue Events
|
|
}
|
|
}
|
|
|
|
matches = append(matches, e)
|
|
}
|
|
|
|
return matches, nil
|
|
}
|
|
|
|
type VoteEvent map[string]string
|
|
|
|
const (
|
|
VoteMissingKeyError = "vote data not found: "
|
|
VoteCreatedKey = "created"
|
|
VoteRequesterKey = "requester"
|
|
VoteActionKey = "action"
|
|
|
|
VoteResultKey = "result"
|
|
VoteResultPass = "pass"
|
|
VoteResultFail = "fail"
|
|
VoteResultTie = "tie"
|
|
VoteResultTimeout = "timeout"
|
|
VoteBadResultError = "vote has invalid result: "
|
|
VoteNotFinishedError = "vote has result but isnt finished"
|
|
VoteMissingResultError = "vote finished but missing result"
|
|
|
|
VoteStatusKey = "status"
|
|
VoteStatusInProgress = "in_progress"
|
|
VoteStatusFinalized = "finalized"
|
|
VoteStatusTimeout = "timed_out"
|
|
VoteBadStatusError = "vote has invalid status: "
|
|
|
|
VeryOldVote = "1990-01-01T00:00:00Z"
|
|
)
|
|
|
|
func (ve VoteEvent) Type() EventType {
|
|
return Vote
|
|
}
|
|
|
|
func (ve VoteEvent) Time() time.Time {
|
|
t, e := time.Parse(time.RFC3339, ve[VoteCreatedKey])
|
|
if e != nil {
|
|
// we have a corrupted event
|
|
// TODO: log first
|
|
// return old time so that this event gets
|
|
// pruned from cache
|
|
tooOld, _ := time.Parse(
|
|
time.RFC3339,
|
|
VeryOldVote,
|
|
)
|
|
return tooOld
|
|
}
|
|
return t
|
|
}
|
|
|
|
func (ve VoteEvent) Data() map[string]string {
|
|
return map[string]string(ve)
|
|
}
|
|
|
|
func (ve VoteEvent) Validate() error {
|
|
// make sure action, requester, and created are set
|
|
for _, key := range []string{
|
|
VoteActionKey,
|
|
VoteRequesterKey,
|
|
VoteCreatedKey,
|
|
VoteStatusKey,
|
|
} {
|
|
if _, found := ve[key]; !found {
|
|
return stringErrorType(
|
|
VoteMissingKeyError + key)
|
|
}
|
|
}
|
|
|
|
status := ve[VoteStatusKey]
|
|
if status != VoteStatusTimeout &&
|
|
status != VoteStatusInProgress &&
|
|
status != VoteStatusFinalized {
|
|
return stringErrorType(VoteBadStatusError + status)
|
|
}
|
|
|
|
result, hasResult := ve[VoteResultKey]
|
|
if hasResult && status == VoteStatusInProgress {
|
|
return stringErrorType(VoteNotFinishedError)
|
|
}
|
|
if status != VoteStatusInProgress && !hasResult {
|
|
return stringErrorType(VoteMissingResultError)
|
|
}
|
|
|
|
if hasResult &&
|
|
(result != VoteResultPass &&
|
|
result != VoteResultFail &&
|
|
result != VoteResultTie &&
|
|
result != VoteResultTimeout) {
|
|
|
|
return stringErrorType(VoteBadResultError + result)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
type UserEvent struct {
|
|
uid string
|
|
created time.Time
|
|
}
|
|
|
|
const (
|
|
UserEventUserKey = "user"
|
|
UserEventCreatedKey = "created"
|
|
UserEventBadUserError = "event has bad user"
|
|
)
|
|
|
|
func (ue UserEvent) Time() time.Time {
|
|
return ue.created
|
|
}
|
|
|
|
func (ue UserEvent) Data() map[string]string {
|
|
return map[string]string{
|
|
UserEventUserKey: ue.uid,
|
|
UserEventCreatedKey: ue.created.Local().String(),
|
|
}
|
|
}
|
|
|
|
func (ue UserEvent) Validate() error {
|
|
if DiscordSession != nil {
|
|
_, err := DiscordSession.User(ue.uid)
|
|
return err
|
|
} else {
|
|
// TODO: Log validation failure
|
|
// I would love to know how to actually fail here
|
|
// and still have unit testable code.
|
|
return nil
|
|
}
|
|
}
|
|
|
|
type ChallengeEvent struct {
|
|
UserEvent
|
|
}
|
|
|
|
func (ce ChallengeEvent) Type() EventType {
|
|
return Challenge
|
|
}
|
|
|
|
func NewChallengeEvent(user string) ChallengeEvent {
|
|
return ChallengeEvent{UserEvent{
|
|
uid: user,
|
|
created: time.Now(),
|
|
}}
|
|
}
|
|
|
|
type RestorationEvent struct {
|
|
UserEvent
|
|
}
|
|
|
|
func (re RestorationEvent) Type() EventType {
|
|
return Restoration
|
|
}
|
|
|
|
func NewRestorationEvent(user string) RestorationEvent {
|
|
return RestorationEvent{UserEvent{
|
|
uid: user,
|
|
created: time.Now(),
|
|
}}
|
|
}
|
|
|
|
type UserActiveEvent struct {
|
|
UserEvent
|
|
}
|
|
|
|
func (ua UserActiveEvent) Type() EventType {
|
|
return UserActive
|
|
}
|
|
|
|
func NewUserActiveEvent(user string) UserActiveEvent {
|
|
return UserActiveEvent{UserEvent{
|
|
uid: user,
|
|
created: time.Now(),
|
|
}}
|
|
}
|