2024-11-06 14:41:49 -08:00
|
|
|
package state
|
|
|
|
|
|
|
|
|
|
/* STATE
|
|
|
|
|
* This package encapsulates various state information
|
|
|
|
|
* state is represented in various global singletons
|
|
|
|
|
* Additionally, this package offers a pub/sub interface
|
|
|
|
|
* for various aspects of state
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
import (
|
2024-11-08 14:53:12 -08:00
|
|
|
"fmt"
|
2024-11-06 14:41:49 -08:00
|
|
|
"slices"
|
|
|
|
|
"sync"
|
|
|
|
|
"time"
|
|
|
|
|
|
2024-11-08 14:53:12 -08:00
|
|
|
"gitlab.com/whom/bingobot/internal/logging"
|
2024-11-06 14:41:49 -08:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
/* Event interface is meant to encapsulate a general interface
|
|
|
|
|
* extendable for any different action the bot takes or that a
|
|
|
|
|
* user takes.
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
BadEventTypeError = "bad event type"
|
|
|
|
|
EventValidationFailedError = "event failed validation: "
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
var eventMutex sync.RWMutex
|
|
|
|
|
var eventSubscriptionCache = [NumEventTypes][]chan Event{}
|
|
|
|
|
var eventCache = []Event{}
|
|
|
|
|
|
|
|
|
|
// TODO: Configurable
|
|
|
|
|
var eventChannelBufferSize = 256
|
|
|
|
|
|
|
|
|
|
// TODO: pruned events go to DISK
|
|
|
|
|
// ASSUMES eventCache is ordered by age (oldest first)
|
|
|
|
|
func pruneEventCache() {
|
|
|
|
|
eventMutex.Lock()
|
|
|
|
|
defer eventMutex.Unlock()
|
|
|
|
|
|
|
|
|
|
oldCacheInvalid := false
|
|
|
|
|
newCache := []Event{}
|
|
|
|
|
for _, obj := range eventCache {
|
|
|
|
|
if time.Since(obj.Time()).Hours() > 24*10 {
|
|
|
|
|
oldCacheInvalid = true
|
|
|
|
|
} else if oldCacheInvalid {
|
|
|
|
|
newCache = append(newCache, obj)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
eventCache = newCache
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type EventType int8
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
Vote EventType = iota
|
|
|
|
|
Challenge
|
|
|
|
|
Restoration
|
|
|
|
|
UserActive
|
|
|
|
|
// ...
|
|
|
|
|
|
|
|
|
|
// leave this last
|
|
|
|
|
NumEventTypes
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
func (et EventType) Validate() error {
|
|
|
|
|
if et < 0 || et >= NumEventTypes {
|
|
|
|
|
return stringErrorType(BadEventTypeError)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type Event interface {
|
|
|
|
|
// gets EventType associated with event
|
|
|
|
|
Type() EventType
|
|
|
|
|
|
|
|
|
|
// gets time event created
|
|
|
|
|
Time() time.Time
|
|
|
|
|
|
|
|
|
|
// gets internal metadata associated with event
|
|
|
|
|
// may be read only depending on event implementation
|
|
|
|
|
Data() map[string]string
|
|
|
|
|
|
|
|
|
|
// validates state of internal metadata per EventType
|
|
|
|
|
Validate() error
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// TODO: Something better than this
|
|
|
|
|
type stringErrorType string
|
|
|
|
|
|
|
|
|
|
func (s stringErrorType) Error() string {
|
|
|
|
|
return string(s)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* adds a new subscriber channel to the event subscription cache
|
|
|
|
|
* and returns the channel that it will publish notifications on
|
|
|
|
|
*
|
|
|
|
|
* note: channel is prefilled with at most eventChannelBufferSize
|
|
|
|
|
* historical events. Truncated if history exceeds event channel
|
|
|
|
|
* buffer size.
|
|
|
|
|
*/
|
|
|
|
|
func (et EventType) SubscribeWithHistory() (chan Event, error) {
|
|
|
|
|
if err := et.Validate(); err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ch := make(chan Event, eventChannelBufferSize)
|
|
|
|
|
|
|
|
|
|
eventMutex.Lock()
|
|
|
|
|
eventSubscriptionCache[et] = append(
|
|
|
|
|
eventSubscriptionCache[et],
|
|
|
|
|
ch,
|
|
|
|
|
)
|
|
|
|
|
eventMutex.Unlock()
|
|
|
|
|
|
|
|
|
|
eventMutex.RLock()
|
|
|
|
|
defer eventMutex.RUnlock()
|
|
|
|
|
numEventsAdded := 0
|
|
|
|
|
for _, ev := range slices.Backward(eventCache) {
|
|
|
|
|
if numEventsAdded >= eventChannelBufferSize {
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if ev.Type() == et {
|
|
|
|
|
ch <- ev
|
|
|
|
|
numEventsAdded += 1
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return ch, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* adds a new subscriber channel to the event subscription cache
|
|
|
|
|
* and returns the channel that it will publish notifications on
|
|
|
|
|
*/
|
|
|
|
|
func (et EventType) Subscribe() (chan Event, error) {
|
|
|
|
|
if err := et.Validate(); err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ch := make(chan Event, eventChannelBufferSize)
|
|
|
|
|
|
|
|
|
|
eventMutex.Lock()
|
|
|
|
|
defer eventMutex.Unlock()
|
|
|
|
|
eventSubscriptionCache[et] = append(
|
|
|
|
|
eventSubscriptionCache[et],
|
|
|
|
|
ch,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
return ch, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func PublishEvent(e Event) error {
|
|
|
|
|
if err := ValidateEvent(e); err != nil {
|
|
|
|
|
return stringErrorType(
|
|
|
|
|
EventValidationFailedError + err.Error(),
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
eventMutex.Lock()
|
|
|
|
|
eventCache = append(eventCache, e)
|
|
|
|
|
eventMutex.Unlock()
|
|
|
|
|
eventMutex.RLock()
|
|
|
|
|
defer eventMutex.RUnlock()
|
2024-11-08 14:53:12 -08:00
|
|
|
|
|
|
|
|
blocking := false
|
|
|
|
|
|
2024-11-06 14:41:49 -08:00
|
|
|
for _, c := range eventSubscriptionCache[e.Type()] {
|
|
|
|
|
if float32(len(c)) > (float32(eventChannelBufferSize) * 0.25) {
|
|
|
|
|
if len(c) == eventChannelBufferSize {
|
2024-11-08 14:53:12 -08:00
|
|
|
logging.Warn(
|
|
|
|
|
"PublishEvent() blocking -- event channel full",
|
|
|
|
|
// log the event time to provide blockage timing information
|
|
|
|
|
// in the logs
|
|
|
|
|
"eventTime",
|
|
|
|
|
e.Time(),
|
|
|
|
|
)
|
|
|
|
|
blocking = true
|
2024-11-06 14:41:49 -08:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
c <- e
|
2024-11-08 14:53:12 -08:00
|
|
|
|
|
|
|
|
if blocking {
|
|
|
|
|
logging.Info("PublishEvent() no longer blocking")
|
|
|
|
|
blocking = false
|
|
|
|
|
}
|
|
|
|
|
|
2024-11-06 14:41:49 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func ValidateEvent(e Event) error {
|
|
|
|
|
if err := e.Type().Validate(); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if err := e.Validate(); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* gets all events of type T in cache
|
|
|
|
|
* that also share all field values in
|
|
|
|
|
* map 'filters'
|
|
|
|
|
*/
|
|
|
|
|
func GetMatchingEvents(
|
|
|
|
|
t EventType,
|
|
|
|
|
filters map[string]string,
|
|
|
|
|
) ([]Event, error) {
|
|
|
|
|
matches := []Event{}
|
|
|
|
|
|
|
|
|
|
if err := t.Validate(); err != nil {
|
|
|
|
|
return matches, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
eventMutex.RLock()
|
|
|
|
|
defer eventMutex.RUnlock()
|
|
|
|
|
|
|
|
|
|
Events:
|
|
|
|
|
for _, e := range eventCache {
|
|
|
|
|
if e.Type() != t {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
for k, v := range filters {
|
|
|
|
|
val, found := e.Data()[k]
|
|
|
|
|
if !found || val != v {
|
|
|
|
|
continue Events
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
matches = append(matches, e)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return matches, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type VoteEvent map[string]string
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
VoteMissingKeyError = "vote data not found: "
|
|
|
|
|
VoteCreatedKey = "created"
|
|
|
|
|
VoteRequesterKey = "requester"
|
|
|
|
|
VoteActionKey = "action"
|
|
|
|
|
|
|
|
|
|
VoteResultKey = "result"
|
|
|
|
|
VoteResultPass = "pass"
|
|
|
|
|
VoteResultFail = "fail"
|
|
|
|
|
VoteResultTie = "tie"
|
|
|
|
|
VoteResultTimeout = "timeout"
|
|
|
|
|
VoteBadResultError = "vote has invalid result: "
|
|
|
|
|
VoteNotFinishedError = "vote has result but isnt finished"
|
|
|
|
|
VoteMissingResultError = "vote finished but missing result"
|
|
|
|
|
|
|
|
|
|
VoteStatusKey = "status"
|
|
|
|
|
VoteStatusInProgress = "in_progress"
|
|
|
|
|
VoteStatusFinalized = "finalized"
|
|
|
|
|
VoteStatusTimeout = "timed_out"
|
|
|
|
|
VoteBadStatusError = "vote has invalid status: "
|
|
|
|
|
|
|
|
|
|
VeryOldVote = "1990-01-01T00:00:00Z"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
func (ve VoteEvent) Type() EventType {
|
|
|
|
|
return Vote
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (ve VoteEvent) Time() time.Time {
|
|
|
|
|
t, e := time.Parse(time.RFC3339, ve[VoteCreatedKey])
|
|
|
|
|
if e != nil {
|
|
|
|
|
// we have a corrupted event
|
|
|
|
|
// return old time so that this event gets
|
|
|
|
|
// pruned from cache
|
2024-11-08 14:53:12 -08:00
|
|
|
logging.Warn(
|
|
|
|
|
"pruning corrupted vote event",
|
|
|
|
|
"event",
|
|
|
|
|
fmt.Sprintf("%+v", ve),
|
|
|
|
|
)
|
2024-11-06 14:41:49 -08:00
|
|
|
tooOld, _ := time.Parse(
|
|
|
|
|
time.RFC3339,
|
|
|
|
|
VeryOldVote,
|
|
|
|
|
)
|
|
|
|
|
return tooOld
|
|
|
|
|
}
|
|
|
|
|
return t
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (ve VoteEvent) Data() map[string]string {
|
|
|
|
|
return map[string]string(ve)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (ve VoteEvent) Validate() error {
|
|
|
|
|
// make sure action, requester, and created are set
|
|
|
|
|
for _, key := range []string{
|
|
|
|
|
VoteActionKey,
|
|
|
|
|
VoteRequesterKey,
|
|
|
|
|
VoteCreatedKey,
|
|
|
|
|
VoteStatusKey,
|
|
|
|
|
} {
|
|
|
|
|
if _, found := ve[key]; !found {
|
|
|
|
|
return stringErrorType(
|
|
|
|
|
VoteMissingKeyError + key)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
status := ve[VoteStatusKey]
|
|
|
|
|
if status != VoteStatusTimeout &&
|
|
|
|
|
status != VoteStatusInProgress &&
|
|
|
|
|
status != VoteStatusFinalized {
|
|
|
|
|
return stringErrorType(VoteBadStatusError + status)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
result, hasResult := ve[VoteResultKey]
|
|
|
|
|
if hasResult && status == VoteStatusInProgress {
|
|
|
|
|
return stringErrorType(VoteNotFinishedError)
|
|
|
|
|
}
|
|
|
|
|
if status != VoteStatusInProgress && !hasResult {
|
|
|
|
|
return stringErrorType(VoteMissingResultError)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if hasResult &&
|
|
|
|
|
(result != VoteResultPass &&
|
|
|
|
|
result != VoteResultFail &&
|
|
|
|
|
result != VoteResultTie &&
|
|
|
|
|
result != VoteResultTimeout) {
|
|
|
|
|
|
|
|
|
|
return stringErrorType(VoteBadResultError + result)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type UserEvent struct {
|
|
|
|
|
uid string
|
|
|
|
|
created time.Time
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
UserEventUserKey = "user"
|
|
|
|
|
UserEventCreatedKey = "created"
|
|
|
|
|
UserEventBadUserError = "event has bad user"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
func (ue UserEvent) Time() time.Time {
|
|
|
|
|
return ue.created
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (ue UserEvent) Data() map[string]string {
|
|
|
|
|
return map[string]string{
|
|
|
|
|
UserEventUserKey: ue.uid,
|
|
|
|
|
UserEventCreatedKey: ue.created.Local().String(),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (ue UserEvent) Validate() error {
|
2024-11-13 16:32:58 -08:00
|
|
|
// empty for now, we may do some validation later.
|
|
|
|
|
return nil
|
2024-11-06 14:41:49 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type ChallengeEvent struct {
|
|
|
|
|
UserEvent
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (ce ChallengeEvent) Type() EventType {
|
|
|
|
|
return Challenge
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func NewChallengeEvent(user string) ChallengeEvent {
|
|
|
|
|
return ChallengeEvent{UserEvent{
|
|
|
|
|
uid: user,
|
|
|
|
|
created: time.Now(),
|
|
|
|
|
}}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type RestorationEvent struct {
|
|
|
|
|
UserEvent
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (re RestorationEvent) Type() EventType {
|
|
|
|
|
return Restoration
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func NewRestorationEvent(user string) RestorationEvent {
|
|
|
|
|
return RestorationEvent{UserEvent{
|
|
|
|
|
uid: user,
|
|
|
|
|
created: time.Now(),
|
|
|
|
|
}}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type UserActiveEvent struct {
|
|
|
|
|
UserEvent
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (ua UserActiveEvent) Type() EventType {
|
|
|
|
|
return UserActive
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func NewUserActiveEvent(user string) UserActiveEvent {
|
|
|
|
|
return UserActiveEvent{UserEvent{
|
|
|
|
|
uid: user,
|
|
|
|
|
created: time.Now(),
|
|
|
|
|
}}
|
|
|
|
|
}
|