Refactor state module to leverage DocumentBuffer

This commit refactors the state module to remove the eventCache and
introduce an eventStream instead. The eventStream is not a static array
but a DocumentBuffer, and thus many functions had to be altered. This
commit is best reviewed side by side with a before/after view instead of
just looking at the diff.

An Init() function is added to initialize the DocumentBuffer with config
values.

Facilities are added to both Event and EventType to allow for parsing
them to and from string documents.

a MakeEvent() function is added to create events of proper subtype based
on a general data map input.

ApplyToEvents() is added, which wraps around DocumentBuffer's Apply() method
to alter the filter input type with one that Marshals and Unmarshals events.

GetMatchingEvents() is now a proof of concept for DocumentBuffer's Apply()

PruneCache() is now no longer needed.

Signed-off-by: Ava Affine <ava@sunnypup.io>
This commit is contained in:
Ava Apples Affine 2024-11-30 00:31:08 -08:00
parent 609c50ff7d
commit 0a29b35f4b
2 changed files with 321 additions and 138 deletions

View file

@ -9,10 +9,13 @@ package state
import ( import (
"fmt" "fmt"
"slices"
"sync" "sync"
"time" "time"
"os"
"errors"
"encoding/json"
"gitlab.com/whom/bingobot/internal/docbuf"
"gitlab.com/whom/bingobot/internal/logging" "gitlab.com/whom/bingobot/internal/logging"
) )
@ -24,32 +27,46 @@ import (
const ( const (
BadEventTypeError = "bad event type" BadEventTypeError = "bad event type"
EventValidationFailedError = "event failed validation: " EventValidationFailedError = "event failed validation: "
BadEventNoUID = "event data has no UID field"
BadEventNoCreated = "event data has no created field"
BadEventStoreFilename = "failed to open event store file"
BadEventStreamInit = "failed to initialize event stream"
BadEventCreate = "failed to create event"
BadUserEventCreatedParse = "failed to parse created time"
BadChallengeEvent = "failed to make Challenge Event"
BadRestorationEvent = "failed to make Restoration Event"
BadUserActiveEvent = "failed to make UserActive Event"
BadEventObjMarshal = "error marshalling event"
BadEventObjUnmarshal = "failed to unmarshal event map"
BadEventUnmarshal = "failed to unmarshal event"
BadEventMissingTypeKey = "event map missing type key"
)
const (
EventTypeMapKey = "type"
) )
var eventMutex sync.RWMutex var eventMutex sync.RWMutex
var eventSubscriptionCache = [NumEventTypes][]chan Event{} var eventSubscriptionCache = [NumEventTypes][]chan Event{}
var eventCache = []Event{} var eventStream docbuf.DocumentBuffer
var maxEventsInMemory int
// TODO: Configurable // expect filename validations in config package
var eventChannelBufferSize = 256 func Init(eventMemCacheSize int, eventStoreFileName string) error {
file, err := os.OpenFile(eventStoreFileName, os.O_CREATE|os.O_RDWR, 0644)
// TODO: pruned events go to DISK if err != nil {
// ASSUMES eventCache is ordered by age (oldest first) return errors.Join(errors.New(BadEventStoreFilename), err)
func pruneEventCache() {
eventMutex.Lock()
defer eventMutex.Unlock()
oldCacheInvalid := false
newCache := []Event{}
for _, obj := range eventCache {
if time.Since(obj.Time()).Hours() > 24*10 {
oldCacheInvalid = true
} else if oldCacheInvalid {
newCache = append(newCache, obj)
}
} }
eventCache = newCache if eventStream, err = docbuf.NewDocumentBuffer(
eventMemCacheSize,
file,
); err != nil {
return errors.Join(errors.New(BadEventStreamInit), err)
}
maxEventsInMemory = eventMemCacheSize
return nil
} }
type EventType int8 type EventType int8
@ -65,14 +82,123 @@ const (
NumEventTypes NumEventTypes
) )
// either returns a valid event type or NumEventTypes
func EventTypeFromString(doc string) EventType {
switch doc {
case "Vote":
return Vote
case "Challenge":
return Challenge
case "Restoration":
return Restoration
case "UserActive":
return UserActive
default:
// error case
return NumEventTypes
}
}
func (et EventType) String() string {
events := []string{
"Vote",
"Challenge",
"Restoration",
"UserActive",
}
if et < 0 || et >= NumEventTypes {
return ""
}
return events[et]
}
func (et EventType) Validate() error { func (et EventType) Validate() error {
if et < 0 || et >= NumEventTypes { if et < 0 || et >= NumEventTypes {
return stringErrorType(BadEventTypeError) return errors.New(BadEventTypeError)
} }
return nil return nil
} }
/* Allocates a specific event of type T from event type instance.
* output event not guaranteed to be valid.
* Call Validate() yourself.
*/
func (et EventType) MakeEvent(data map[string]string) (Event, error) {
if err := et.Validate(); err != nil {
return nil, errors.Join(errors.New(BadEventCreate), err)
}
MakeUserEvent := func(data map[string]string) (*UserEvent, error) {
user, hasUser := data[UserEventUserKey]
if !hasUser {
return nil, errors.New(BadEventNoUID)
}
created, hasCreated := data[UserEventCreatedKey]
if !hasCreated {
return nil, errors.New(BadEventNoCreated)
}
createdTime, err := time.Parse(time.RFC3339, created)
if err != nil {
return nil, errors.Join(errors.New(BadUserEventCreatedParse), err)
}
return &UserEvent{
uid: user,
created: createdTime,
}, nil
}
switch et {
case Vote:
return VoteEvent(data), nil
case Challenge:
e, err := MakeUserEvent(data)
if err != nil {
return nil, errors.Join(errors.New(BadChallengeEvent), err)
}
return ChallengeEvent{*e}, nil
case Restoration:
e, err := MakeUserEvent(data)
if err != nil {
return nil, errors.Join(errors.New(BadRestorationEvent), err)
}
return RestorationEvent{*e}, nil
case UserActive:
e, err := MakeUserEvent(data)
if err != nil {
return nil, errors.Join(errors.New(BadUserActiveEvent), err)
}
return UserActiveEvent{*e}, nil
default:
return nil, errors.New(BadEventTypeError)
}
}
/* adds a new subscriber channel to the event subscription cache
* and returns the channel that it will publish notifications on
*/
func (et EventType) Subscribe() (chan Event, error) {
if err := et.Validate(); err != nil {
return nil, err
}
ch := make(chan Event, maxEventsInMemory)
eventMutex.Lock()
defer eventMutex.Unlock()
eventSubscriptionCache[et] = append(
eventSubscriptionCache[et],
ch,
)
return ch, nil
}
type Event interface { type Event interface {
// gets EventType associated with event // gets EventType associated with event
Type() EventType Type() EventType
@ -88,80 +214,48 @@ type Event interface {
Validate() error Validate() error
} }
// TODO: Something better than this func EventToString(e Event) (string, error) {
type stringErrorType string m := e.Data()
m[EventTypeMapKey] = e.Type().String()
func (s stringErrorType) Error() string { buf, err := json.Marshal(m)
return string(s) if err != nil {
return "", errors.Join(errors.New(BadEventObjMarshal), err)
} }
/* adds a new subscriber channel to the event subscription cache return string(buf), nil
* and returns the channel that it will publish notifications on
*
* note: channel is prefilled with at most eventChannelBufferSize
* historical events. Truncated if history exceeds event channel
* buffer size.
*/
func (et EventType) SubscribeWithHistory() (chan Event, error) {
if err := et.Validate(); err != nil {
return nil, err
} }
ch := make(chan Event, eventChannelBufferSize) func EventFromString(doc string) (Event, error) {
var obj map[string]string
eventMutex.Lock() if err := json.Unmarshal([]byte(doc), &obj); err != nil {
eventSubscriptionCache[et] = append( return nil, errors.Join(errors.New(BadEventObjUnmarshal), err)
eventSubscriptionCache[et],
ch,
)
eventMutex.Unlock()
eventMutex.RLock()
defer eventMutex.RUnlock()
numEventsAdded := 0
for _, ev := range slices.Backward(eventCache) {
if numEventsAdded >= eventChannelBufferSize {
break
} }
if ev.Type() == et { et, ok := obj[EventTypeMapKey]
ch <- ev if !ok {
numEventsAdded += 1 return nil, errors.New(BadEventMissingTypeKey)
} }
t := EventTypeFromString(et)
ev, err := t.MakeEvent(obj)
if err != nil {
return nil, errors.Join(errors.New(BadEventCreate), err)
} }
return ch, nil return ev, ev.Validate()
}
/* adds a new subscriber channel to the event subscription cache
* and returns the channel that it will publish notifications on
*/
func (et EventType) Subscribe() (chan Event, error) {
if err := et.Validate(); err != nil {
return nil, err
}
ch := make(chan Event, eventChannelBufferSize)
eventMutex.Lock()
defer eventMutex.Unlock()
eventSubscriptionCache[et] = append(
eventSubscriptionCache[et],
ch,
)
return ch, nil
} }
func PublishEvent(e Event) error { func PublishEvent(e Event) error {
if err := ValidateEvent(e); err != nil { if err := ValidateEvent(e); err != nil {
return stringErrorType( return errors.Join(errors.New(EventValidationFailedError), err)
EventValidationFailedError + err.Error(), }
)
doc, err := EventToString(e)
if err != nil {
return errors.New(BadEventUnmarshal)
} }
eventMutex.Lock() eventMutex.Lock()
eventCache = append(eventCache, e) eventStream.Push(doc)
eventMutex.Unlock() eventMutex.Unlock()
eventMutex.RLock() eventMutex.RLock()
defer eventMutex.RUnlock() defer eventMutex.RUnlock()
@ -169,8 +263,8 @@ func PublishEvent(e Event) error {
blocking := false blocking := false
for _, c := range eventSubscriptionCache[e.Type()] { for _, c := range eventSubscriptionCache[e.Type()] {
if float32(len(c)) > (float32(eventChannelBufferSize) * 0.25) { if float32(len(c)) > (float32(maxEventsInMemory) * 0.25) {
if len(c) == eventChannelBufferSize { if len(c) == maxEventsInMemory {
logging.Warn( logging.Warn(
"PublishEvent() blocking -- event channel full", "PublishEvent() blocking -- event channel full",
// log the event time to provide blockage timing information // log the event time to provide blockage timing information
@ -205,6 +299,45 @@ func ValidateEvent(e Event) error {
return nil return nil
} }
/* Takes a filter and applies it to each individual event
* The filter is a function/closure that accepts one event
* and returns true if ApplyToEvents should continue iterating
*
* If the filter returns false then ApplyToEvents halts and returns.
*
* (this calls docbuf.Apply() under the hood)
*/
func ApplyToEvents(
f func(Event)bool,
) error {
// local variables enclosed by filter function filterWrap
var err error
var ev Event
// wrap f() to be compatible with docbuf.Apply()
filterWrap := func(doc string) bool {
ev, err = EventFromString(doc)
if err != nil {
return false
}
return f(ev)
}
eventMutex.RLock()
defer eventMutex.RUnlock()
// cant reuse err or return val directly as err is set
// by filter function if an error happens unmarshalling
// an event. In this case apply might return nil
err2 := eventStream.Apply(filterWrap)
if err2 != nil {
return err2
}
if err != nil {
return err
}
return nil
}
/* gets all events of type T in cache /* gets all events of type T in cache
* that also share all field values in * that also share all field values in
* map 'filters' * map 'filters'
@ -219,25 +352,26 @@ func GetMatchingEvents(
return matches, err return matches, err
} }
eventMutex.RLock() filter := func(e Event) bool {
defer eventMutex.RUnlock() ev, er := EventToString(e)
fmt.Printf("Checking: %s (%e)\n", ev, er)
Events:
for _, e := range eventCache {
if e.Type() != t { if e.Type() != t {
continue return true
} }
for k, v := range filters { for k, v := range filters {
val, found := e.Data()[k] val, found := e.Data()[k]
if !found || val != v { if !found || val != v {
continue Events return true
} }
} }
fmt.Println("Found Match")
matches = append(matches, e) matches = append(matches, e)
return true
} }
return matches, nil err := ApplyToEvents(filter)
return matches, err
} }
type VoteEvent map[string]string type VoteEvent map[string]string
@ -303,8 +437,7 @@ func (ve VoteEvent) Validate() error {
VoteStatusKey, VoteStatusKey,
} { } {
if _, found := ve[key]; !found { if _, found := ve[key]; !found {
return stringErrorType( return errors.New(VoteMissingKeyError + key)
VoteMissingKeyError + key)
} }
} }
@ -312,15 +445,15 @@ func (ve VoteEvent) Validate() error {
if status != VoteStatusTimeout && if status != VoteStatusTimeout &&
status != VoteStatusInProgress && status != VoteStatusInProgress &&
status != VoteStatusFinalized { status != VoteStatusFinalized {
return stringErrorType(VoteBadStatusError + status) return errors.New(VoteBadStatusError + status)
} }
result, hasResult := ve[VoteResultKey] result, hasResult := ve[VoteResultKey]
if hasResult && status == VoteStatusInProgress { if hasResult && status == VoteStatusInProgress {
return stringErrorType(VoteNotFinishedError) return errors.New(VoteNotFinishedError)
} }
if status != VoteStatusInProgress && !hasResult { if status != VoteStatusInProgress && !hasResult {
return stringErrorType(VoteMissingResultError) return errors.New(VoteMissingResultError)
} }
if hasResult && if hasResult &&
@ -329,7 +462,7 @@ func (ve VoteEvent) Validate() error {
result != VoteResultTie && result != VoteResultTie &&
result != VoteResultTimeout) { result != VoteResultTimeout) {
return stringErrorType(VoteBadResultError + result) return errors.New(VoteBadResultError + result)
} }
return nil return nil
@ -353,7 +486,7 @@ func (ue UserEvent) Time() time.Time {
func (ue UserEvent) Data() map[string]string { func (ue UserEvent) Data() map[string]string {
return map[string]string{ return map[string]string{
UserEventUserKey: ue.uid, UserEventUserKey: ue.uid,
UserEventCreatedKey: ue.created.Local().String(), UserEventCreatedKey: ue.created.Format(time.RFC3339),
} }
} }

View file

@ -5,6 +5,7 @@ import (
"testing" "testing"
"time" "time"
"gitlab.com/whom/bingobot/internal/docbuf"
"gitlab.com/whom/bingobot/internal/logging" "gitlab.com/whom/bingobot/internal/logging"
) )
@ -17,12 +18,71 @@ const TestTok = "TEST_NAME"
var loggingInitialized = false var loggingInitialized = false
func SetupTest(t *testing.T) { func SetupTest(t *testing.T) {
maxEventsInMemory = 271
var err error
// have to set up logger // have to set up logger
if !loggingInitialized { if !loggingInitialized {
logging.Init() logging.Init()
loggingInitialized = true loggingInitialized = true
} }
b := docbuf.NewReadWriteSeekString()
eventStream, err = docbuf.NewDocumentBuffer(300, &b)
if err != nil {
t.Fatalf("error allocating buffer: %e", err)
}
}
func TestEventMarshalUnmarshal(t *testing.T) {
tm := time.Now()
cev := ChallengeEvent{UserEvent{
uid: TestTok,
created: tm,
}}
vev := VoteEvent(map[string]string{
VoteActionKey: "a",
VoteRequesterKey: "r",
VoteCreatedKey: "c",
VoteStatusKey: VoteStatusFinalized,
VoteResultKey: VoteResultFail,
})
cstr, err := EventToString(cev);
if err != nil {
t.Fatalf("error marshalling challenge: %e, %s", err, cstr)
}
t.Logf("cstr: %s\n", cstr)
vstr, err := EventToString(vev);
if err != nil {
t.Fatalf("error marshalling vote: %e, %s", err, vstr)
}
t.Logf("vstr: %s\n", vstr)
if ev, err := EventFromString(cstr); err != nil ||
ev.Data()[UserEventUserKey] != cev.Data()[UserEventUserKey] ||
ev.Data()[UserEventCreatedKey] != cev.Data()[UserEventCreatedKey] {
t.Fatalf("error unmarshalling challenge: %e, %v!=%v", err, ev, cev)
}
if ev, err := EventFromString(vstr); err != nil ||
fmt.Sprint(ev) != fmt.Sprint(vev) {
t.Fatalf("error unmarshalling vote: %e, %v!=%v", err, ev, vev)
}
}
func TestPubSub(t *testing.T) {
SetupTest(t)
c, e := UserActive.Subscribe()
if e != nil {
t.Errorf("Error subscribing to UserActive events: %e", e)
}
old, _ := time.Parse( old, _ := time.Parse(
time.RFC3339, time.RFC3339,
VeryOldVote, VeryOldVote,
@ -47,40 +107,21 @@ func SetupTest(t *testing.T) {
created: time.Now(), created: time.Now(),
}}) }})
if len(eventCache) != 272 {
t.Errorf("Unexpected number of events in cache: %d",
len(eventCache))
}
}
func CleanupTest() {
eventSubscriptionCache = [NumEventTypes][]chan Event{}
eventCache = []Event{}
}
func TestPubSub(t *testing.T) {
SetupTest(t)
c, e := UserActive.SubscribeWithHistory()
if e != nil {
t.Errorf("Error subscribing to UserActive events: %e", e)
}
Loop: Loop:
for i := 0; true; i++ { for i := 0; true; i++ {
select { select {
case e, ok := <-c: case e, ok := <-c:
if !ok { if !ok {
t.Errorf("Subscription Channel Closed") t.Fatalf("Subscription Channel Closed")
} }
if e.Type() != UserActive { if e.Type() != UserActive {
t.Errorf("Non UserActive Event in UserActive subscription: %v", e.Type()) t.Fatalf("Non UserActive Event in UserActive subscription: %v", e.Type())
} }
default: default:
if i == eventChannelBufferSize { if i == maxEventsInMemory {
break Loop break Loop
} else { } else {
t.Errorf("Unexpected number of events in channel: %d", i) t.Fatalf("Unexpected number of events in channel: %d", i)
} }
} }
} }
@ -93,18 +134,40 @@ Loop:
select { select {
case e, ok := <-c: case e, ok := <-c:
if !ok || e.Data()[UserEventUserKey] != "uniqueToken" { if !ok || e.Data()[UserEventUserKey] != "uniqueToken" {
t.Errorf("didnt read correct event from channel: %v", e) t.Fatalf("didnt read correct event from channel: %v", e)
} }
default: default:
t.Errorf("New event not published to subscription!") t.Fatalf("New event not published to subscription!")
} }
CleanupTest()
} }
func TestFilterCache(t *testing.T) { func TestFilterCache(t *testing.T) {
SetupTest(t) SetupTest(t)
old, _ := time.Parse(
time.RFC3339,
VeryOldVote,
)
for i := range 270 {
if err := PublishEvent(UserActiveEvent{UserEvent{
uid: fmt.Sprintf("%d", i),
created: old,
}}); err != nil {
t.Errorf("Failed to add event: %e", err)
}
}
PublishEvent(UserActiveEvent{UserEvent{
uid: fmt.Sprintf(TestTok),
created: time.Now(),
}})
PublishEvent(ChallengeEvent{UserEvent{
uid: fmt.Sprintf(TestTok),
created: time.Now(),
}})
events, err := GetMatchingEvents( events, err := GetMatchingEvents(
UserActive, UserActive,
map[string]string{ map[string]string{
@ -123,19 +186,6 @@ func TestFilterCache(t *testing.T) {
if events[0].Type() != UserActive { if events[0].Type() != UserActive {
t.Errorf("Got wrong event!: %+v", events[0]) t.Errorf("Got wrong event!: %+v", events[0])
} }
CleanupTest()
}
func TestPruneCache(t *testing.T) {
SetupTest(t)
pruneEventCache()
if len(eventCache) != 2 {
t.Errorf("Incorrect number of remaining events: %d", len(eventCache))
}
CleanupTest()
} }
func TestVoteEventValidations(t *testing.T) { func TestVoteEventValidations(t *testing.T) {