bingobot/internal/state/state.go
Ava Affine fed49ba3cb refactor main, and fix bugs from initial manual run
Signed-off-by: Ava Affine <ava@sunnypup.io>
2025-01-13 15:45:07 -08:00

539 lines
12 KiB
Go

package state
/* State module
* This package encapsulates various state information
* state is represented in various global singletons
* Additionally, this package offers a pub/sub interface
* for various aspects of state
*/
import (
"encoding/json"
"errors"
"os"
"strconv"
"sync"
"time"
"gitlab.com/whom/bingobot/internal/logging"
"gitlab.com/whom/bingobot/pkg/docbuf"
)
/* Event interface is meant to encapsulate a general interface
* extendable for any different action the bot takes or that a
* user takes.
*/
const (
BadEventTypeError = "bad event type"
EventValidationFailedError = "event failed validation: "
BadEventNoUID = "event data has no UID field"
BadEventNoCreated = "event data has no created field"
BadEventStoreFilename = "failed to open event store file"
BadEventStreamInit = "failed to initialize event stream"
BadEventCreate = "failed to create event"
BadUserEventCreatedParse = "failed to parse created time"
BadChallengeEvent = "failed to make Challenge Event"
BadRestorationEvent = "failed to make Restoration Event"
BadUserActiveEvent = "failed to make UserActive Event"
BadTestEvent = "failed to make Test Event"
BadEventObjMarshal = "error marshalling event"
BadEventObjUnmarshal = "failed to unmarshal event map"
BadEventUnmarshal = "failed to unmarshal event"
BadEventMissingTypeKey = "event map missing type key"
StartBeforeInitError = "state machine not initialized"
ReopenStoreFileError = "failed to reopen store file"
EventTypeMapKey = "type"
TmpDocBufBackingFile = "bingobot_events_processing"
)
var eventMutex sync.RWMutex
var eventSubscriptionCache = [NumEventTypes][]chan Event{}
var eventStorageFileName string
var eventStream docbuf.DocumentBuffer
var maxEventsInMemory int
// expect filename validations in config package
func Init(eventMemCacheSize int, eventStoreFileName string) error {
file, err := os.OpenFile(eventStoreFileName, os.O_CREATE|os.O_RDWR, 0644)
if err != nil {
return errors.Join(errors.New(BadEventStoreFilename), err)
}
if eventStream, err = docbuf.NewDocumentBuffer(
0, // temporary nil-cache buffer to replay events from
file,
); err != nil {
return errors.Join(errors.New(BadEventStreamInit), err)
}
eventStorageFileName = eventStoreFileName
maxEventsInMemory = eventMemCacheSize
return nil
}
// replay events and begin state machine
func Start() error {
if eventStream == nil {
return errors.New(StartBeforeInitError)
}
tmpBackFile, err := os.CreateTemp("", TmpDocBufBackingFile)
if err != nil {
return err
}
tmpBuf, err := docbuf.NewDocumentBuffer(0, tmpBackFile)
if err != nil {
tmpBackFile.Close()
return err
}
// filter out disposable events into a reverse ordered on disk stack
var doc string
for err == nil || err.Error() != docbuf.PopFromEmptyBufferError {
doc, err = eventStream.Pop()
if err != nil && err.Error() != docbuf.PopFromEmptyBufferError {
return err
}
if doc == "" ||
(err != nil && err.Error() == docbuf.PopFromEmptyBufferError) {
break
}
ev, err := EventFromString(doc)
if err != nil {
logging.Warn("Discarding the following event for not unmarshaling: %s", doc)
continue
}
if !ev.Disposable() {
tmpBuf.Push(doc)
}
}
if err := os.Truncate(eventStorageFileName, 0); err != nil {
return errors.Join(errors.New(ReopenStoreFileError), err)
}
file, err := os.OpenFile(eventStorageFileName, os.O_CREATE|os.O_RDWR, 0644)
if err != nil {
return errors.Join(errors.New(BadEventStoreFilename), err)
}
eventStream, err = docbuf.NewDocumentBuffer(maxEventsInMemory, file)
if err != nil {
// return error here will panic without truncating or writing to the file
return err
}
// unravel tmp stack into properly ordered properly allocated buffer
for err == nil || err.Error() != docbuf.PopFromEmptyBufferError {
doc, err := tmpBuf.Pop()
if err != nil && err.Error() != docbuf.PopFromEmptyBufferError {
logging.Warn("Could not handle the following error: %s", err.Error())
continue
}
if doc == "" ||
(err != nil && err.Error() == docbuf.PopFromEmptyBufferError) {
break
}
ev, err := EventFromString(doc)
if err != nil {
logging.Warn("Could not handle the following error: %s", err.Error())
continue
}
if err := PublishEvent(ev); err != nil {
logging.Warn("Could not handle the following error: %s", err.Error())
}
}
return nil
}
// return no error. we are handling SIGINT
func Teardown() {
logging.Warn("Tearing down state engine")
// riskily ignore all locking....
// we are handling a termination signal after all
i, e := eventStream.Close()
if e != nil {
logging.Warn("failed to close document buffer: %s", e.Error())
logging.Warn("will attempt to truncate event store anyways")
}
logging.Warn("teardown: events flushed")
e = os.Truncate(eventStorageFileName, i)
if e != nil {
logging.Error("FAILED TO TRUNCATE EVENT STORE!!")
logging.Error("You will likely have garbage data at the end of it")
logging.Error("Attempt manual correction!")
}
}
type EventType int8
const (
Vote EventType = iota
Challenge
Restoration
UserActive
Test
ConfessionsChannelLink
// ...
// leave this last
NumEventTypes
)
// either returns a valid event type or NumEventTypes
func EventTypeFromString(doc string) EventType {
switch doc {
case "Vote":
return Vote
case "Challenge":
return Challenge
case "Restoration":
return Restoration
case "UserActive":
return UserActive
case "Test":
return Test
case "ConfessionsChannelLink":
return ConfessionsChannelLink
default:
// error case
return NumEventTypes
}
}
func (et EventType) String() string {
events := []string{
"Vote",
"Challenge",
"Restoration",
"UserActive",
"Test",
"ConfessionsChannelLink",
}
if et < 0 || et >= NumEventTypes {
return ""
}
return events[et]
}
func (et EventType) Validate() error {
if et < 0 || et >= NumEventTypes {
return errors.New(BadEventTypeError)
}
return nil
}
/* Allocates a specific event of type T from event type instance.
* output event not guaranteed to be valid.
* Call Validate() yourself.
*/
func (et EventType) MakeEvent(data map[string]string) (Event, error) {
if err := et.Validate(); err != nil {
return nil, errors.Join(errors.New(BadEventCreate), err)
}
MakeUserEvent := func(data map[string]string) (*UserEvent, error) {
user, hasUser := data[UserEventUserKey]
if !hasUser {
return nil, errors.New(BadEventNoUID)
}
created, hasCreated := data[UserEventCreatedKey]
if !hasCreated {
return nil, errors.New(BadEventNoCreated)
}
createdTime, err := time.Parse(time.RFC3339, created)
if err != nil {
return nil, errors.Join(errors.New(BadUserEventCreatedParse), err)
}
return &UserEvent{
uid: user,
created: createdTime,
}, nil
}
switch et {
case Vote:
return VoteEvent(data), nil
case Challenge:
e, err := MakeUserEvent(data)
if err != nil {
return nil, errors.Join(errors.New(BadChallengeEvent), err)
}
return ChallengeEvent{*e}, nil
case Restoration:
e, err := MakeUserEvent(data)
if err != nil {
return nil, errors.Join(errors.New(BadRestorationEvent), err)
}
return RestorationEvent{*e}, nil
case UserActive:
e, err := MakeUserEvent(data)
if err != nil {
return nil, errors.Join(errors.New(BadUserActiveEvent), err)
}
return UserActiveEvent{*e}, nil
case Test:
disp := false
if v, ok := data[TestEventDisposeKey]; ok && v == "t" {
disp = true
}
id := -1
if v, ok := data[TestEventIDKey]; ok {
var err error
if id, err = strconv.Atoi(v); err != nil {
return nil, errors.Join(errors.New(BadTestEvent), err)
}
}
return TestEvent{
Dispose: disp,
ID: id,
}, nil
case ConfessionsChannelLink:
gid, ok := data[ConfessionsLinkEventGuildKey]
if !ok {
return nil, errors.New(BadConfessionsLinkEventError)
}
cid, ok := data[ConfessionsLinkEventChannelKey]
if !ok {
return nil, errors.New(BadConfessionsLinkEventError)
}
ti, ok := data[ConfessionsLinkEventCreatedKey]
if !ok {
return nil, errors.New(BadConfessionsLinkEventError)
}
t, err := time.Parse(time.RFC3339, ti)
if err != nil {
return nil, errors.Join(errors.New(BadConfessionsLinkEventError), err)
}
return ConfessionsChannelLinkEvent{
GuildID: gid,
ChannelID: cid,
Created: t,
}, nil
default:
return nil, errors.New(BadEventTypeError)
}
}
/* adds a new subscriber channel to the event subscription cache
* and returns the channel that it will publish notifications on
*/
func (et EventType) Subscribe() (chan Event, error) {
if err := et.Validate(); err != nil {
return nil, err
}
ch := make(chan Event, maxEventsInMemory)
eventMutex.Lock()
defer eventMutex.Unlock()
eventSubscriptionCache[et] = append(
eventSubscriptionCache[et],
ch,
)
return ch, nil
}
type Event interface {
// gets EventType associated with event
Type() EventType
// gets time event created
Time() time.Time
// gets internal metadata associated with event
// may be read only depending on event implementation
Data() map[string]string
// validates state of internal metadata per EventType
Validate() error
// returns true if event can be discarded
Disposable() bool
}
func EventToString(e Event) (string, error) {
m := e.Data()
m[EventTypeMapKey] = e.Type().String()
buf, err := json.Marshal(m)
if err != nil {
return "", errors.Join(errors.New(BadEventObjMarshal), err)
}
return string(buf), nil
}
func EventFromString(doc string) (Event, error) {
var obj map[string]string
if err := json.Unmarshal([]byte(doc), &obj); err != nil {
return nil, errors.Join(errors.New(BadEventObjUnmarshal), err)
}
et, ok := obj[EventTypeMapKey]
if !ok {
return nil, errors.New(BadEventMissingTypeKey)
}
t := EventTypeFromString(et)
ev, err := t.MakeEvent(obj)
if err != nil {
return nil, errors.Join(errors.New(BadEventCreate), err)
}
return ev, ev.Validate()
}
func PublishEvent(e Event) error {
if err := ValidateEvent(e); err != nil {
return errors.Join(errors.New(EventValidationFailedError), err)
}
doc, err := EventToString(e)
if err != nil {
return errors.New(BadEventUnmarshal)
}
eventMutex.Lock()
eventStream.Push(doc)
eventMutex.Unlock()
eventMutex.RLock()
defer eventMutex.RUnlock()
blocking := false
for _, c := range eventSubscriptionCache[e.Type()] {
if float32(len(c)) > (float32(maxEventsInMemory) * 0.75) {
if len(c) == maxEventsInMemory {
logging.Warn(
"PublishEvent() blocking -- event channel full",
// log the event time to provide blockage timing information
// in the logs
"eventTime",
e.Time(),
)
blocking = true
}
}
c <- e
if blocking {
logging.Info("PublishEvent() no longer blocking")
blocking = false
}
}
return nil
}
func ValidateEvent(e Event) error {
if err := e.Type().Validate(); err != nil {
return err
}
if err := e.Validate(); err != nil {
return err
}
return nil
}
/* Takes a filter and applies it to each individual event
* The filter is a function/closure that accepts one event
* and returns true if ApplyToEvents should continue iterating
*
* If the filter returns false then ApplyToEvents halts and returns.
*
* (this calls docbuf.Apply() under the hood)
*/
func ApplyToEvents(
f func(Event)bool,
) error {
// local variables enclosed by filter function filterWrap
var err error
var ev Event
// wrap f() to be compatible with docbuf.Apply()
filterWrap := func(doc string) bool {
ev, err = EventFromString(doc)
if err != nil {
return false
}
return f(ev)
}
eventMutex.RLock()
defer eventMutex.RUnlock()
// cant reuse err or return val directly as err is set
// by filter function if an error happens unmarshalling
// an event. In this case apply might return nil
err2 := eventStream.Apply(filterWrap)
if err2 != nil {
return err2
}
if err != nil {
return err
}
return nil
}
/* gets all events of type T in cache
* that also share all field values in
* map 'filters'
*/
func GetMatchingEvents(
t EventType,
filters map[string]string,
) ([]Event, error) {
matches := []Event{}
if err := t.Validate(); err != nil {
return matches, err
}
filter := func(e Event) bool {
if e.Type() != t {
return true
}
for k, v := range filters {
val, found := e.Data()[k]
if !found || val != v {
return true
}
}
matches = append(matches, e)
return true
}
err := ApplyToEvents(filter)
return matches, err
}