This change introduces the UserActiveTimer, which tracks voice activity and emits UserActive events. UserActiveTimer is basically a fancy wrapper around a context with a deadline and cancelFunc. When a user joins a voice channel, a UserActiveTimer is started. If the user stays in the voice channel for an amount of time defined in the configs, the timer context's deadline trips and a UserActive event is emitted. A new timer is then started. If instead the user leaves the voice channel, the timer's context is cancelled. This change introduces two config values to manage this process: VoiceActivityThresholdSeconds defines the length of time a user is required to stay in vc before a UserActive event is generated. VoiceActivityTimerSleepInterval defines how long the timer sleeps at any one time. After this interval, it wakes up to check if its context has been cancelled. This change also changes the state package's UserEvent validation to remove an import loop. We will now assume that the discord package has already validated any UIDs it passes along to the state package.
408 lines
8.1 KiB
Go
408 lines
8.1 KiB
Go
package state
|
|
|
|
/* STATE
|
|
* This package encapsulates various state information
|
|
* state is represented in various global singletons
|
|
* Additionally, this package offers a pub/sub interface
|
|
* for various aspects of state
|
|
*/
|
|
|
|
import (
|
|
"fmt"
|
|
"slices"
|
|
"sync"
|
|
"time"
|
|
|
|
"gitlab.com/whom/bingobot/internal/logging"
|
|
)
|
|
|
|
/* Event interface is meant to encapsulate a general interface
|
|
* extendable for any different action the bot takes or that a
|
|
* user takes.
|
|
*/
|
|
|
|
const (
|
|
BadEventTypeError = "bad event type"
|
|
EventValidationFailedError = "event failed validation: "
|
|
)
|
|
|
|
var eventMutex sync.RWMutex
|
|
var eventSubscriptionCache = [NumEventTypes][]chan Event{}
|
|
var eventCache = []Event{}
|
|
|
|
// TODO: Configurable
|
|
var eventChannelBufferSize = 256
|
|
|
|
// TODO: pruned events go to DISK
|
|
// ASSUMES eventCache is ordered by age (oldest first)
|
|
func pruneEventCache() {
|
|
eventMutex.Lock()
|
|
defer eventMutex.Unlock()
|
|
|
|
oldCacheInvalid := false
|
|
newCache := []Event{}
|
|
for _, obj := range eventCache {
|
|
if time.Since(obj.Time()).Hours() > 24*10 {
|
|
oldCacheInvalid = true
|
|
} else if oldCacheInvalid {
|
|
newCache = append(newCache, obj)
|
|
}
|
|
}
|
|
|
|
eventCache = newCache
|
|
}
|
|
|
|
type EventType int8
|
|
|
|
const (
|
|
Vote EventType = iota
|
|
Challenge
|
|
Restoration
|
|
UserActive
|
|
// ...
|
|
|
|
// leave this last
|
|
NumEventTypes
|
|
)
|
|
|
|
func (et EventType) Validate() error {
|
|
if et < 0 || et >= NumEventTypes {
|
|
return stringErrorType(BadEventTypeError)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
type Event interface {
|
|
// gets EventType associated with event
|
|
Type() EventType
|
|
|
|
// gets time event created
|
|
Time() time.Time
|
|
|
|
// gets internal metadata associated with event
|
|
// may be read only depending on event implementation
|
|
Data() map[string]string
|
|
|
|
// validates state of internal metadata per EventType
|
|
Validate() error
|
|
}
|
|
|
|
// TODO: Something better than this
|
|
type stringErrorType string
|
|
|
|
func (s stringErrorType) Error() string {
|
|
return string(s)
|
|
}
|
|
|
|
/* adds a new subscriber channel to the event subscription cache
|
|
* and returns the channel that it will publish notifications on
|
|
*
|
|
* note: channel is prefilled with at most eventChannelBufferSize
|
|
* historical events. Truncated if history exceeds event channel
|
|
* buffer size.
|
|
*/
|
|
func (et EventType) SubscribeWithHistory() (chan Event, error) {
|
|
if err := et.Validate(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
ch := make(chan Event, eventChannelBufferSize)
|
|
|
|
eventMutex.Lock()
|
|
eventSubscriptionCache[et] = append(
|
|
eventSubscriptionCache[et],
|
|
ch,
|
|
)
|
|
eventMutex.Unlock()
|
|
|
|
eventMutex.RLock()
|
|
defer eventMutex.RUnlock()
|
|
numEventsAdded := 0
|
|
for _, ev := range slices.Backward(eventCache) {
|
|
if numEventsAdded >= eventChannelBufferSize {
|
|
break
|
|
}
|
|
|
|
if ev.Type() == et {
|
|
ch <- ev
|
|
numEventsAdded += 1
|
|
}
|
|
}
|
|
|
|
return ch, nil
|
|
}
|
|
|
|
/* adds a new subscriber channel to the event subscription cache
|
|
* and returns the channel that it will publish notifications on
|
|
*/
|
|
func (et EventType) Subscribe() (chan Event, error) {
|
|
if err := et.Validate(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
ch := make(chan Event, eventChannelBufferSize)
|
|
|
|
eventMutex.Lock()
|
|
defer eventMutex.Unlock()
|
|
eventSubscriptionCache[et] = append(
|
|
eventSubscriptionCache[et],
|
|
ch,
|
|
)
|
|
|
|
return ch, nil
|
|
}
|
|
|
|
func PublishEvent(e Event) error {
|
|
if err := ValidateEvent(e); err != nil {
|
|
return stringErrorType(
|
|
EventValidationFailedError + err.Error(),
|
|
)
|
|
}
|
|
|
|
eventMutex.Lock()
|
|
eventCache = append(eventCache, e)
|
|
eventMutex.Unlock()
|
|
eventMutex.RLock()
|
|
defer eventMutex.RUnlock()
|
|
|
|
blocking := false
|
|
|
|
for _, c := range eventSubscriptionCache[e.Type()] {
|
|
if float32(len(c)) > (float32(eventChannelBufferSize) * 0.25) {
|
|
if len(c) == eventChannelBufferSize {
|
|
logging.Warn(
|
|
"PublishEvent() blocking -- event channel full",
|
|
// log the event time to provide blockage timing information
|
|
// in the logs
|
|
"eventTime",
|
|
e.Time(),
|
|
)
|
|
blocking = true
|
|
}
|
|
}
|
|
c <- e
|
|
|
|
if blocking {
|
|
logging.Info("PublishEvent() no longer blocking")
|
|
blocking = false
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func ValidateEvent(e Event) error {
|
|
if err := e.Type().Validate(); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := e.Validate(); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
/* gets all events of type T in cache
|
|
* that also share all field values in
|
|
* map 'filters'
|
|
*/
|
|
func GetMatchingEvents(
|
|
t EventType,
|
|
filters map[string]string,
|
|
) ([]Event, error) {
|
|
matches := []Event{}
|
|
|
|
if err := t.Validate(); err != nil {
|
|
return matches, err
|
|
}
|
|
|
|
eventMutex.RLock()
|
|
defer eventMutex.RUnlock()
|
|
|
|
Events:
|
|
for _, e := range eventCache {
|
|
if e.Type() != t {
|
|
continue
|
|
}
|
|
for k, v := range filters {
|
|
val, found := e.Data()[k]
|
|
if !found || val != v {
|
|
continue Events
|
|
}
|
|
}
|
|
|
|
matches = append(matches, e)
|
|
}
|
|
|
|
return matches, nil
|
|
}
|
|
|
|
type VoteEvent map[string]string
|
|
|
|
const (
|
|
VoteMissingKeyError = "vote data not found: "
|
|
VoteCreatedKey = "created"
|
|
VoteRequesterKey = "requester"
|
|
VoteActionKey = "action"
|
|
|
|
VoteResultKey = "result"
|
|
VoteResultPass = "pass"
|
|
VoteResultFail = "fail"
|
|
VoteResultTie = "tie"
|
|
VoteResultTimeout = "timeout"
|
|
VoteBadResultError = "vote has invalid result: "
|
|
VoteNotFinishedError = "vote has result but isnt finished"
|
|
VoteMissingResultError = "vote finished but missing result"
|
|
|
|
VoteStatusKey = "status"
|
|
VoteStatusInProgress = "in_progress"
|
|
VoteStatusFinalized = "finalized"
|
|
VoteStatusTimeout = "timed_out"
|
|
VoteBadStatusError = "vote has invalid status: "
|
|
|
|
VeryOldVote = "1990-01-01T00:00:00Z"
|
|
)
|
|
|
|
func (ve VoteEvent) Type() EventType {
|
|
return Vote
|
|
}
|
|
|
|
func (ve VoteEvent) Time() time.Time {
|
|
t, e := time.Parse(time.RFC3339, ve[VoteCreatedKey])
|
|
if e != nil {
|
|
// we have a corrupted event
|
|
// return old time so that this event gets
|
|
// pruned from cache
|
|
logging.Warn(
|
|
"pruning corrupted vote event",
|
|
"event",
|
|
fmt.Sprintf("%+v", ve),
|
|
)
|
|
tooOld, _ := time.Parse(
|
|
time.RFC3339,
|
|
VeryOldVote,
|
|
)
|
|
return tooOld
|
|
}
|
|
return t
|
|
}
|
|
|
|
func (ve VoteEvent) Data() map[string]string {
|
|
return map[string]string(ve)
|
|
}
|
|
|
|
func (ve VoteEvent) Validate() error {
|
|
// make sure action, requester, and created are set
|
|
for _, key := range []string{
|
|
VoteActionKey,
|
|
VoteRequesterKey,
|
|
VoteCreatedKey,
|
|
VoteStatusKey,
|
|
} {
|
|
if _, found := ve[key]; !found {
|
|
return stringErrorType(
|
|
VoteMissingKeyError + key)
|
|
}
|
|
}
|
|
|
|
status := ve[VoteStatusKey]
|
|
if status != VoteStatusTimeout &&
|
|
status != VoteStatusInProgress &&
|
|
status != VoteStatusFinalized {
|
|
return stringErrorType(VoteBadStatusError + status)
|
|
}
|
|
|
|
result, hasResult := ve[VoteResultKey]
|
|
if hasResult && status == VoteStatusInProgress {
|
|
return stringErrorType(VoteNotFinishedError)
|
|
}
|
|
if status != VoteStatusInProgress && !hasResult {
|
|
return stringErrorType(VoteMissingResultError)
|
|
}
|
|
|
|
if hasResult &&
|
|
(result != VoteResultPass &&
|
|
result != VoteResultFail &&
|
|
result != VoteResultTie &&
|
|
result != VoteResultTimeout) {
|
|
|
|
return stringErrorType(VoteBadResultError + result)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
type UserEvent struct {
|
|
uid string
|
|
created time.Time
|
|
}
|
|
|
|
const (
|
|
UserEventUserKey = "user"
|
|
UserEventCreatedKey = "created"
|
|
UserEventBadUserError = "event has bad user"
|
|
)
|
|
|
|
func (ue UserEvent) Time() time.Time {
|
|
return ue.created
|
|
}
|
|
|
|
func (ue UserEvent) Data() map[string]string {
|
|
return map[string]string{
|
|
UserEventUserKey: ue.uid,
|
|
UserEventCreatedKey: ue.created.Local().String(),
|
|
}
|
|
}
|
|
|
|
func (ue UserEvent) Validate() error {
|
|
// empty for now, we may do some validation later.
|
|
return nil
|
|
}
|
|
|
|
type ChallengeEvent struct {
|
|
UserEvent
|
|
}
|
|
|
|
func (ce ChallengeEvent) Type() EventType {
|
|
return Challenge
|
|
}
|
|
|
|
func NewChallengeEvent(user string) ChallengeEvent {
|
|
return ChallengeEvent{UserEvent{
|
|
uid: user,
|
|
created: time.Now(),
|
|
}}
|
|
}
|
|
|
|
type RestorationEvent struct {
|
|
UserEvent
|
|
}
|
|
|
|
func (re RestorationEvent) Type() EventType {
|
|
return Restoration
|
|
}
|
|
|
|
func NewRestorationEvent(user string) RestorationEvent {
|
|
return RestorationEvent{UserEvent{
|
|
uid: user,
|
|
created: time.Now(),
|
|
}}
|
|
}
|
|
|
|
type UserActiveEvent struct {
|
|
UserEvent
|
|
}
|
|
|
|
func (ua UserActiveEvent) Type() EventType {
|
|
return UserActive
|
|
}
|
|
|
|
func NewUserActiveEvent(user string) UserActiveEvent {
|
|
return UserActiveEvent{UserEvent{
|
|
uid: user,
|
|
created: time.Now(),
|
|
}}
|
|
}
|