0. tests for event replay Tests are now included for the event replay features. This includes many fixes on top of the last commit as well as some tweaks to config module values. 1. activity module An activity module is created to track the useractive events and provide a counter for them. It also encapsulates logic to discard old useractive events. 2. web module A web module is created. This module serves a static webpage showing runtime information. Currently it only shows a snapshot of the user activity data. It is my intention that it eventually also shows an audit log, known users and channels, uptime, and more. Future work will also be needed in order to use HTML templating so that it doesn't look so... basic. Live updates to the information may also be desired. Signed-off-by: Ava Affine <ava@sunnypup.io>
502 lines
12 KiB
Go
502 lines
12 KiB
Go
package state
|
|
|
|
/* State module
|
|
* This package encapsulates various state information
|
|
* state is represented in various global singletons
|
|
* Additionally, this package offers a pub/sub interface
|
|
* for various aspects of state
|
|
*/
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"os"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
|
|
"gitlab.com/whom/bingobot/internal/logging"
|
|
"gitlab.com/whom/bingobot/pkg/docbuf"
|
|
)
|
|
|
|
/* Event interface is meant to encapsulate a general interface
|
|
* extendable for any different action the bot takes or that a
|
|
* user takes.
|
|
*/
|
|
|
|
const (
|
|
BadEventTypeError = "bad event type"
|
|
EventValidationFailedError = "event failed validation: "
|
|
BadEventNoUID = "event data has no UID field"
|
|
BadEventNoCreated = "event data has no created field"
|
|
BadEventStoreFilename = "failed to open event store file"
|
|
BadEventStreamInit = "failed to initialize event stream"
|
|
BadEventCreate = "failed to create event"
|
|
BadUserEventCreatedParse = "failed to parse created time"
|
|
BadChallengeEvent = "failed to make Challenge Event"
|
|
BadRestorationEvent = "failed to make Restoration Event"
|
|
BadUserActiveEvent = "failed to make UserActive Event"
|
|
BadTestEvent = "failed to make Test Event"
|
|
BadEventObjMarshal = "error marshalling event"
|
|
BadEventObjUnmarshal = "failed to unmarshal event map"
|
|
BadEventUnmarshal = "failed to unmarshal event"
|
|
BadEventMissingTypeKey = "event map missing type key"
|
|
StartBeforeInitError = "state machine not initialized"
|
|
ReopenStoreFileError = "failed to reopen store file"
|
|
|
|
EventTypeMapKey = "type"
|
|
|
|
TmpDocBufBackingFile = "bingobot_events_processing"
|
|
)
|
|
|
|
var eventMutex sync.RWMutex
|
|
var eventSubscriptionCache = [NumEventTypes][]chan Event{}
|
|
var eventStorageFileName string
|
|
var eventStream docbuf.DocumentBuffer
|
|
var maxEventsInMemory int
|
|
|
|
// expect filename validations in config package
|
|
func Init(eventMemCacheSize int, eventStoreFileName string) error {
|
|
file, err := os.OpenFile(eventStoreFileName, os.O_CREATE|os.O_RDWR, 0644)
|
|
if err != nil {
|
|
return errors.Join(errors.New(BadEventStoreFilename), err)
|
|
}
|
|
|
|
if eventStream, err = docbuf.NewDocumentBuffer(
|
|
0, // temporary nil-cache buffer to replay events from
|
|
file,
|
|
); err != nil {
|
|
return errors.Join(errors.New(BadEventStreamInit), err)
|
|
}
|
|
|
|
eventStorageFileName = eventStoreFileName
|
|
maxEventsInMemory = eventMemCacheSize
|
|
return nil
|
|
}
|
|
|
|
// replay events and begin state machine
|
|
func Start() error {
|
|
if eventStream == nil {
|
|
return errors.New(StartBeforeInitError)
|
|
}
|
|
|
|
tmpBackFile, err := os.CreateTemp("", TmpDocBufBackingFile)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
tmpBuf, err := docbuf.NewDocumentBuffer(0, tmpBackFile)
|
|
if err != nil {
|
|
tmpBackFile.Close()
|
|
return err
|
|
}
|
|
|
|
// filter out disposable events into a reverse ordered on disk stack
|
|
var doc string
|
|
for err == nil || err.Error() != docbuf.PopFromEmptyBufferError {
|
|
doc, err = eventStream.Pop()
|
|
if err != nil && err.Error() != docbuf.PopFromEmptyBufferError {
|
|
return err
|
|
}
|
|
|
|
if doc == "" ||
|
|
(err != nil && err.Error() == docbuf.PopFromEmptyBufferError) {
|
|
break
|
|
}
|
|
|
|
ev, err := EventFromString(doc)
|
|
if err != nil {
|
|
logging.Warn("Discarding the following event for not unmarshaling: %s", doc)
|
|
continue
|
|
}
|
|
|
|
if !ev.Disposable() {
|
|
tmpBuf.Push(doc)
|
|
}
|
|
}
|
|
|
|
if err := os.Truncate(eventStorageFileName, 0); err != nil {
|
|
return errors.Join(errors.New(ReopenStoreFileError), err)
|
|
}
|
|
|
|
file, err := os.OpenFile(eventStorageFileName, os.O_CREATE|os.O_RDWR, 0644)
|
|
if err != nil {
|
|
return errors.Join(errors.New(BadEventStoreFilename), err)
|
|
}
|
|
|
|
eventStream, err = docbuf.NewDocumentBuffer(maxEventsInMemory, file)
|
|
if err != nil {
|
|
// return error here will panic without truncating or writing to the file
|
|
return err
|
|
}
|
|
|
|
// unravel tmp stack into properly ordered properly allocated buffer
|
|
for err == nil || err.Error() != docbuf.PopFromEmptyBufferError {
|
|
doc, err := tmpBuf.Pop()
|
|
if err != nil && err.Error() != docbuf.PopFromEmptyBufferError {
|
|
logging.Warn("Could not handle the following error: %s", err.Error())
|
|
continue
|
|
}
|
|
|
|
if doc == "" ||
|
|
(err != nil && err.Error() == docbuf.PopFromEmptyBufferError) {
|
|
break
|
|
}
|
|
|
|
ev, err := EventFromString(doc)
|
|
if err != nil {
|
|
logging.Warn("Could not handle the following error: %s", err.Error())
|
|
continue
|
|
}
|
|
|
|
if err := PublishEvent(ev); err != nil {
|
|
logging.Warn("Could not handle the following error: %s", err.Error())
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// return no error. we are handling SIGINT
|
|
func Teardown() {
|
|
// riskily ignore all locking....
|
|
// we are handling a termination signal after all
|
|
i, e := eventStream.Close()
|
|
if e != nil {
|
|
logging.Warn("failed to close document buffer: %s", e.Error())
|
|
logging.Warn("will attempt to truncate event store anyways")
|
|
}
|
|
|
|
e = os.Truncate(eventStorageFileName, i)
|
|
if e != nil {
|
|
logging.Error("FAILED TO TRUNCATE EVENT STORE!!")
|
|
logging.Error("You will likely have garbage data at the end of it")
|
|
logging.Error("Attempt manual correction!")
|
|
}
|
|
}
|
|
|
|
type EventType int8
|
|
|
|
const (
|
|
Vote EventType = iota
|
|
Challenge
|
|
Restoration
|
|
UserActive
|
|
Test
|
|
// ...
|
|
|
|
// leave this last
|
|
NumEventTypes
|
|
)
|
|
|
|
// either returns a valid event type or NumEventTypes
|
|
func EventTypeFromString(doc string) EventType {
|
|
switch doc {
|
|
case "Vote":
|
|
return Vote
|
|
case "Challenge":
|
|
return Challenge
|
|
case "Restoration":
|
|
return Restoration
|
|
case "UserActive":
|
|
return UserActive
|
|
case "Test":
|
|
return Test
|
|
default:
|
|
// error case
|
|
return NumEventTypes
|
|
}
|
|
}
|
|
|
|
func (et EventType) String() string {
|
|
events := []string{
|
|
"Vote",
|
|
"Challenge",
|
|
"Restoration",
|
|
"UserActive",
|
|
"Test",
|
|
}
|
|
|
|
if et < 0 || et >= NumEventTypes {
|
|
return ""
|
|
}
|
|
|
|
return events[et]
|
|
}
|
|
|
|
func (et EventType) Validate() error {
|
|
if et < 0 || et >= NumEventTypes {
|
|
return errors.New(BadEventTypeError)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
/* Allocates a specific event of type T from event type instance.
|
|
* output event not guaranteed to be valid.
|
|
* Call Validate() yourself.
|
|
*/
|
|
func (et EventType) MakeEvent(data map[string]string) (Event, error) {
|
|
if err := et.Validate(); err != nil {
|
|
return nil, errors.Join(errors.New(BadEventCreate), err)
|
|
}
|
|
|
|
MakeUserEvent := func(data map[string]string) (*UserEvent, error) {
|
|
user, hasUser := data[UserEventUserKey]
|
|
if !hasUser {
|
|
return nil, errors.New(BadEventNoUID)
|
|
}
|
|
|
|
created, hasCreated := data[UserEventCreatedKey]
|
|
if !hasCreated {
|
|
return nil, errors.New(BadEventNoCreated)
|
|
}
|
|
|
|
createdTime, err := time.Parse(time.RFC3339, created)
|
|
if err != nil {
|
|
return nil, errors.Join(errors.New(BadUserEventCreatedParse), err)
|
|
}
|
|
|
|
return &UserEvent{
|
|
uid: user,
|
|
created: createdTime,
|
|
}, nil
|
|
}
|
|
|
|
switch et {
|
|
case Vote:
|
|
return VoteEvent(data), nil
|
|
case Challenge:
|
|
e, err := MakeUserEvent(data)
|
|
if err != nil {
|
|
return nil, errors.Join(errors.New(BadChallengeEvent), err)
|
|
}
|
|
return ChallengeEvent{*e}, nil
|
|
case Restoration:
|
|
e, err := MakeUserEvent(data)
|
|
if err != nil {
|
|
return nil, errors.Join(errors.New(BadRestorationEvent), err)
|
|
}
|
|
return RestorationEvent{*e}, nil
|
|
case UserActive:
|
|
e, err := MakeUserEvent(data)
|
|
if err != nil {
|
|
return nil, errors.Join(errors.New(BadUserActiveEvent), err)
|
|
}
|
|
return UserActiveEvent{*e}, nil
|
|
case Test:
|
|
disp := false
|
|
if v, ok := data[TestEventDisposeKey]; ok && v == "t" {
|
|
disp = true
|
|
}
|
|
id := -1
|
|
if v, ok := data[TestEventIDKey]; ok {
|
|
var err error
|
|
if id, err = strconv.Atoi(v); err != nil {
|
|
return nil, errors.Join(errors.New(BadTestEvent), err)
|
|
}
|
|
}
|
|
return TestEvent{
|
|
Dispose: disp,
|
|
ID: id,
|
|
}, nil
|
|
default:
|
|
return nil, errors.New(BadEventTypeError)
|
|
}
|
|
}
|
|
|
|
/* adds a new subscriber channel to the event subscription cache
|
|
* and returns the channel that it will publish notifications on
|
|
*/
|
|
func (et EventType) Subscribe() (chan Event, error) {
|
|
if err := et.Validate(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
ch := make(chan Event, maxEventsInMemory)
|
|
|
|
eventMutex.Lock()
|
|
defer eventMutex.Unlock()
|
|
eventSubscriptionCache[et] = append(
|
|
eventSubscriptionCache[et],
|
|
ch,
|
|
)
|
|
|
|
return ch, nil
|
|
}
|
|
|
|
type Event interface {
|
|
// gets EventType associated with event
|
|
Type() EventType
|
|
|
|
// gets time event created
|
|
Time() time.Time
|
|
|
|
// gets internal metadata associated with event
|
|
// may be read only depending on event implementation
|
|
Data() map[string]string
|
|
|
|
// validates state of internal metadata per EventType
|
|
Validate() error
|
|
|
|
// returns true if event can be discarded
|
|
Disposable() bool
|
|
}
|
|
|
|
func EventToString(e Event) (string, error) {
|
|
m := e.Data()
|
|
m[EventTypeMapKey] = e.Type().String()
|
|
buf, err := json.Marshal(m)
|
|
if err != nil {
|
|
return "", errors.Join(errors.New(BadEventObjMarshal), err)
|
|
}
|
|
|
|
return string(buf), nil
|
|
}
|
|
|
|
func EventFromString(doc string) (Event, error) {
|
|
var obj map[string]string
|
|
if err := json.Unmarshal([]byte(doc), &obj); err != nil {
|
|
return nil, errors.Join(errors.New(BadEventObjUnmarshal), err)
|
|
}
|
|
|
|
et, ok := obj[EventTypeMapKey]
|
|
if !ok {
|
|
return nil, errors.New(BadEventMissingTypeKey)
|
|
}
|
|
t := EventTypeFromString(et)
|
|
ev, err := t.MakeEvent(obj)
|
|
if err != nil {
|
|
return nil, errors.Join(errors.New(BadEventCreate), err)
|
|
}
|
|
|
|
return ev, ev.Validate()
|
|
}
|
|
|
|
func PublishEvent(e Event) error {
|
|
if err := ValidateEvent(e); err != nil {
|
|
return errors.Join(errors.New(EventValidationFailedError), err)
|
|
}
|
|
|
|
doc, err := EventToString(e)
|
|
if err != nil {
|
|
return errors.New(BadEventUnmarshal)
|
|
}
|
|
|
|
eventMutex.Lock()
|
|
eventStream.Push(doc)
|
|
eventMutex.Unlock()
|
|
eventMutex.RLock()
|
|
defer eventMutex.RUnlock()
|
|
|
|
blocking := false
|
|
|
|
for _, c := range eventSubscriptionCache[e.Type()] {
|
|
if float32(len(c)) > (float32(maxEventsInMemory) * 0.75) {
|
|
if len(c) == maxEventsInMemory {
|
|
logging.Warn(
|
|
"PublishEvent() blocking -- event channel full",
|
|
// log the event time to provide blockage timing information
|
|
// in the logs
|
|
"eventTime",
|
|
e.Time(),
|
|
)
|
|
blocking = true
|
|
}
|
|
}
|
|
c <- e
|
|
|
|
if blocking {
|
|
logging.Info("PublishEvent() no longer blocking")
|
|
blocking = false
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func ValidateEvent(e Event) error {
|
|
if err := e.Type().Validate(); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := e.Validate(); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
/* Takes a filter and applies it to each individual event
|
|
* The filter is a function/closure that accepts one event
|
|
* and returns true if ApplyToEvents should continue iterating
|
|
*
|
|
* If the filter returns false then ApplyToEvents halts and returns.
|
|
*
|
|
* (this calls docbuf.Apply() under the hood)
|
|
*/
|
|
func ApplyToEvents(
|
|
f func(Event)bool,
|
|
) error {
|
|
// local variables enclosed by filter function filterWrap
|
|
var err error
|
|
var ev Event
|
|
// wrap f() to be compatible with docbuf.Apply()
|
|
filterWrap := func(doc string) bool {
|
|
ev, err = EventFromString(doc)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
return f(ev)
|
|
}
|
|
|
|
eventMutex.RLock()
|
|
defer eventMutex.RUnlock()
|
|
|
|
// cant reuse err or return val directly as err is set
|
|
// by filter function if an error happens unmarshalling
|
|
// an event. In this case apply might return nil
|
|
err2 := eventStream.Apply(filterWrap)
|
|
if err2 != nil {
|
|
return err2
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
/* gets all events of type T in cache
|
|
* that also share all field values in
|
|
* map 'filters'
|
|
*/
|
|
func GetMatchingEvents(
|
|
t EventType,
|
|
filters map[string]string,
|
|
) ([]Event, error) {
|
|
matches := []Event{}
|
|
|
|
if err := t.Validate(); err != nil {
|
|
return matches, err
|
|
}
|
|
|
|
filter := func(e Event) bool {
|
|
if e.Type() != t {
|
|
return true
|
|
}
|
|
for k, v := range filters {
|
|
val, found := e.Data()[k]
|
|
if !found || val != v {
|
|
return true
|
|
}
|
|
}
|
|
|
|
matches = append(matches, e)
|
|
return true
|
|
}
|
|
|
|
err := ApplyToEvents(filter)
|
|
return matches, err
|
|
}
|
|
|