Compare commits

...
Sign in to create a new pull request.

2 commits

Author SHA1 Message Date
e7d229c217 Early functionality modules
0. tests for event replay
   Tests are now included for the event replay features. This includes many fixes on top of
   the last commit as well as some tweaks to config module values.
1. activity module
   An activity module is created to track the useractive events and provide a counter for them.
   It also encapsulates logic to discard old useractive events.
2. web module
   A web module is created. This module serves a static webpage showing runtime information.
   Currently it only shows a snapshot of the user activity data. It is my intention that it
   eventually also shows an audit log, known users and channels, uptime, and more. Future work
   will also be needed in order to use HTML templating so that it doesn't look so... basic.
   Live updates to the information may also be desired.

Signed-off-by: Ava Affine <ava@sunnypup.io>
2025-01-07 16:43:55 -08:00
2515d396a0 Event replay and cleanup
This commit introduces the ability to reconfigure modules/event subscribers by
replaying previously stored events at startup. This way modules/subscribers can
always come up with the latest activity and data that was present at close at the
last run of the program. This process also includes the disposal of unneeded events
to minimize disk use over time. The changes include the following:

1. Event interface is extended with the Disposable() function
   unimplemented stubs for existing interface implementations
2. state.Init() is split into Init() and Start()
   Init() now initialized the file used by eventStream, and puts in place a 0 size
   memory cache as a temporary measure on top of the file.
   Start() reads Pop()s documents one by one from the temp nil-cache eventStream,
   it attempts to dispose of each event and if the event is not disposable it is
   pushed onto a second temporary no-memory-cache buffer which is then reverse ordered.
   The underlying file is truncated and reopened.
   Finally, the real eventStream is allocated, with in memory cache. All events in the
   second buffer are published (sent to subscribers as well as added to the new eventStream).
3. Updates to main() to support Init() vs Start()

Signed-off-by: Ava Affine <ava@sunnypup.io>
2024-12-06 13:00:50 -08:00
8 changed files with 447 additions and 39 deletions

View file

@ -0,0 +1,106 @@
package activity
import (
"errors"
"sync"
"time"
"gitlab.com/whom/bingobot/internal/config"
"gitlab.com/whom/bingobot/internal/state"
"gitlab.com/whom/bingobot/internal/logging"
)
/* Activity module
* This module sits and processes incoming
*/
const (
ActivityModuleStartFail = "failed to start activity module"
)
var currentUserActivity map[string][]state.UserActiveEvent
var userActivityLock sync.RWMutex
func Start() error {
ch, err := state.UserActive.Subscribe()
if err != nil {
return errors.Join(
errors.New(ActivityModuleStartFail),
err,
)
}
// process incoming events loop
go func() {
for {
ev := <- ch
emap := ev.Data()
user := emap[state.UserEventUserKey]
etime := ev.Time()
delta := time.Since(etime).Hours() / float64(24)
if delta <= float64(config.Get().UserEventLifespanDays) {
new := []state.UserActiveEvent{ev.(state.UserActiveEvent)}
userActivityLock.Lock()
current, found := currentUserActivity[user]
if found {
new = append(new, current...)
}
userActivityLock.Unlock()
} else {
logging.Warn("recieved expired useractive event")
}
}
}()
// process expired events loop
go func() {
for {
delta := time.Hour * 24
delta *= time.Duration(config.Get().UserEventLifespanDays)
tcur := time.Now().Add(delta)
// get next soonest expiration
userActivityLock.RLock()
for _, evs := range currentUserActivity {
for _, ev := range evs {
hrs := time.Duration(24 * config.Get().UserEventLifespanDays)
t := ev.Time().Add(time.Hour * hrs)
if t.Before(tcur) {
tcur = t
}
}
}
userActivityLock.RUnlock()
time.Sleep(tcur.Sub(time.Now()))
userActivityLock.Lock()
for k, v := range currentUserActivity {
new := []state.UserActiveEvent{}
for _, ev := range v {
if !ev.Disposable() {
new = append(new, ev)
}
}
currentUserActivity[k] = new
}
userActivityLock.Unlock()
}
}()
return nil
}
func GetActivitySnapshot() map[string][]state.UserActiveEvent {
userActivityLock.RLock()
defer userActivityLock.RUnlock()
snapshot := make(map[string][]state.UserActiveEvent)
for k, v := range currentUserActivity {
newSl := make([]state.UserActiveEvent, len(v))
copy(newSl, v)
snapshot[k] = newSl
}
return snapshot
}

View file

@ -17,30 +17,34 @@ type AppConfig struct {
LogCompression bool `yaml:"log_compression"` LogCompression bool `yaml:"log_compression"`
LogAddSource bool `yaml:"log_add_source"` LogAddSource bool `yaml:"log_add_source"`
/* /* how long (in seconds) a user needs to be in vc in order to generate a
how long (in seconds) a user needs to be in vc in order to generate a * UserActive event
UserActive event */
*/
VoiceActivityThresholdSeconds int `yaml:"voice_activity_threshold_seconds"` VoiceActivityThresholdSeconds int `yaml:"voice_activity_threshold_seconds"`
/* /* how long (in milliseconds) a voice activity timer sleeps at a time between
how long (in milliseconds) a voice activity timer sleeps at a time between * context cancellation checks.
context cancellation checks. * a higher value means the function sleeps longer which could be
* useful for some reason in the future
a higher value means the function sleeps longer which could be * a higher value also means that the timer could take longer to cancel.
useful for some reason in the future * current recommended value is 1000ms.
*/
a higher value also means that the timer could take longer to cancel.
current recommended value is 1000ms.
*/
VoiceActivityTimerSleepIntervalMillis int `yaml:"voice_activity_timer_sleep_interval_millis"` VoiceActivityTimerSleepIntervalMillis int `yaml:"voice_activity_timer_sleep_interval_millis"`
/* persistent state file store */ /* persistent state file store */
PersistentCacheStore string `yaml:"persistent_cache_store"` PersistentCacheStore string `yaml:"persistent_cache_store"`
/* number of internal state events to cache in memory */ /* number of internal state events to cache in memory */
InMemoryEventCacheSize int `yaml:"InMemoryEventCacheSize"` InMemoryEventCacheSize int `yaml:"in_memory_event_cache_size"`
/* number of days a useractive event is valid for
* increasing this will have a bell curve effect on
* voting weights
*/
UserEventLifespanDays int `yaml:"user_event_lifespan_days"`
/* listen address for local http Server */
LocalWebEndpointListen string `yaml:"local_web_endpoint_listen"`
} }
var config *AppConfig var config *AppConfig
@ -110,4 +114,6 @@ func setDefaults() {
viper.SetDefault("VoiceActivityTimerSleepIntervalMillis", 1000) viper.SetDefault("VoiceActivityTimerSleepIntervalMillis", 1000)
viper.SetDefault("PersistentCacheStore", "/tmp/bingobot") viper.SetDefault("PersistentCacheStore", "/tmp/bingobot")
viper.SetDefault("InMemoryEventCacheSize", 512) viper.SetDefault("InMemoryEventCacheSize", 512)
viper.SetDefault("UserEventLifespanDays", 10)
viper.SetDefault("LocalWebEndpointListen", ":8080")
} }

View file

@ -2,9 +2,10 @@ package state
import ( import (
"errors" "errors"
"time"
"fmt" "fmt"
"time"
"gitlab.com/whom/bingobot/internal/config"
"gitlab.com/whom/bingobot/internal/logging" "gitlab.com/whom/bingobot/internal/logging"
) )
@ -102,6 +103,12 @@ func (ve VoteEvent) Validate() error {
return nil return nil
} }
func (ve VoteEvent) Disposable() bool {
// will be implemented with democracy module
logging.Warn("unimplemented: VoteEvent::Disposable")
return false
}
type UserEvent struct { type UserEvent struct {
uid string uid string
created time.Time created time.Time
@ -129,6 +136,13 @@ func (ue UserEvent) Validate() error {
return nil return nil
} }
func (ue UserEvent) Disposable() bool {
// when I make a module for challenges, restorations, and UserActives
// then I should implement this
logging.Warn("unimplemented: UserEvent::Disposable")
return false
}
type ChallengeEvent struct { type ChallengeEvent struct {
UserEvent UserEvent
} }
@ -167,9 +181,48 @@ func (ua UserActiveEvent) Type() EventType {
return UserActive return UserActive
} }
func (ua UserActiveEvent) Disposable() bool {
return (time.Since(ua.created).Hours() / 24) >= float64(config.Get().UserEventLifespanDays)
}
func NewUserActiveEvent(user string) UserActiveEvent { func NewUserActiveEvent(user string) UserActiveEvent {
return UserActiveEvent{UserEvent{ return UserActiveEvent{UserEvent{
uid: user, uid: user,
created: time.Now(), created: time.Now(),
}} }}
} }
const (
TestEventDisposeKey = "dispose"
TestEventIDKey = "id"
)
type TestEvent struct {
Dispose bool
ID int
}
func (te TestEvent) Type() EventType {
return Test
}
func (te TestEvent) Time() time.Time {
return time.Now()
}
func (te TestEvent) Data() map[string]string {
m := map[string]string{}
if te.Dispose {
m[TestEventDisposeKey] = "t"
}
m[TestEventIDKey] = fmt.Sprintf("%d", te.ID)
return m
}
func (te TestEvent) Validate() error {
return nil
}
func (te TestEvent) Disposable() bool {
return te.Dispose
}

View file

@ -1,6 +1,6 @@
package state package state
/* STATE /* State module
* This package encapsulates various state information * This package encapsulates various state information
* state is represented in various global singletons * state is represented in various global singletons
* Additionally, this package offers a pub/sub interface * Additionally, this package offers a pub/sub interface
@ -8,15 +8,15 @@ package state
*/ */
import ( import (
"fmt" "encoding/json"
"errors"
"os"
"strconv"
"sync" "sync"
"time" "time"
"os"
"errors"
"encoding/json"
"gitlab.com/whom/bingobot/pkg/docbuf"
"gitlab.com/whom/bingobot/internal/logging" "gitlab.com/whom/bingobot/internal/logging"
"gitlab.com/whom/bingobot/pkg/docbuf"
) )
/* Event interface is meant to encapsulate a general interface /* Event interface is meant to encapsulate a general interface
@ -36,14 +36,17 @@ const (
BadChallengeEvent = "failed to make Challenge Event" BadChallengeEvent = "failed to make Challenge Event"
BadRestorationEvent = "failed to make Restoration Event" BadRestorationEvent = "failed to make Restoration Event"
BadUserActiveEvent = "failed to make UserActive Event" BadUserActiveEvent = "failed to make UserActive Event"
BadTestEvent = "failed to make Test Event"
BadEventObjMarshal = "error marshalling event" BadEventObjMarshal = "error marshalling event"
BadEventObjUnmarshal = "failed to unmarshal event map" BadEventObjUnmarshal = "failed to unmarshal event map"
BadEventUnmarshal = "failed to unmarshal event" BadEventUnmarshal = "failed to unmarshal event"
BadEventMissingTypeKey = "event map missing type key" BadEventMissingTypeKey = "event map missing type key"
) StartBeforeInitError = "state machine not initialized"
ReopenStoreFileError = "failed to reopen store file"
const (
EventTypeMapKey = "type" EventTypeMapKey = "type"
TmpDocBufBackingFile = "bingobot_events_processing"
) )
var eventMutex sync.RWMutex var eventMutex sync.RWMutex
@ -60,7 +63,7 @@ func Init(eventMemCacheSize int, eventStoreFileName string) error {
} }
if eventStream, err = docbuf.NewDocumentBuffer( if eventStream, err = docbuf.NewDocumentBuffer(
eventMemCacheSize, 0, // temporary nil-cache buffer to replay events from
file, file,
); err != nil { ); err != nil {
return errors.Join(errors.New(BadEventStreamInit), err) return errors.Join(errors.New(BadEventStreamInit), err)
@ -71,6 +74,89 @@ func Init(eventMemCacheSize int, eventStoreFileName string) error {
return nil return nil
} }
// replay events and begin state machine
func Start() error {
if eventStream == nil {
return errors.New(StartBeforeInitError)
}
tmpBackFile, err := os.CreateTemp("", TmpDocBufBackingFile)
if err != nil {
return err
}
tmpBuf, err := docbuf.NewDocumentBuffer(0, tmpBackFile)
if err != nil {
tmpBackFile.Close()
return err
}
// filter out disposable events into a reverse ordered on disk stack
var doc string
for err == nil || err.Error() != docbuf.PopFromEmptyBufferError {
doc, err = eventStream.Pop()
if err != nil && err.Error() != docbuf.PopFromEmptyBufferError {
return err
}
if doc == "" ||
(err != nil && err.Error() == docbuf.PopFromEmptyBufferError) {
break
}
ev, err := EventFromString(doc)
if err != nil {
logging.Warn("Discarding the following event for not unmarshaling: %s", doc)
continue
}
if !ev.Disposable() {
tmpBuf.Push(doc)
}
}
if err := os.Truncate(eventStorageFileName, 0); err != nil {
return errors.Join(errors.New(ReopenStoreFileError), err)
}
file, err := os.OpenFile(eventStorageFileName, os.O_CREATE|os.O_RDWR, 0644)
if err != nil {
return errors.Join(errors.New(BadEventStoreFilename), err)
}
eventStream, err = docbuf.NewDocumentBuffer(maxEventsInMemory, file)
if err != nil {
// return error here will panic without truncating or writing to the file
return err
}
// unravel tmp stack into properly ordered properly allocated buffer
for err == nil || err.Error() != docbuf.PopFromEmptyBufferError {
doc, err := tmpBuf.Pop()
if err != nil && err.Error() != docbuf.PopFromEmptyBufferError {
logging.Warn("Could not handle the following error: %s", err.Error())
continue
}
if doc == "" ||
(err != nil && err.Error() == docbuf.PopFromEmptyBufferError) {
break
}
ev, err := EventFromString(doc)
if err != nil {
logging.Warn("Could not handle the following error: %s", err.Error())
continue
}
if err := PublishEvent(ev); err != nil {
logging.Warn("Could not handle the following error: %s", err.Error())
}
}
return nil
}
// return no error. we are handling SIGINT // return no error. we are handling SIGINT
func Teardown() { func Teardown() {
// riskily ignore all locking.... // riskily ignore all locking....
@ -96,6 +182,7 @@ const (
Challenge Challenge
Restoration Restoration
UserActive UserActive
Test
// ... // ...
// leave this last // leave this last
@ -113,6 +200,8 @@ func EventTypeFromString(doc string) EventType {
return Restoration return Restoration
case "UserActive": case "UserActive":
return UserActive return UserActive
case "Test":
return Test
default: default:
// error case // error case
return NumEventTypes return NumEventTypes
@ -125,6 +214,7 @@ func (et EventType) String() string {
"Challenge", "Challenge",
"Restoration", "Restoration",
"UserActive", "UserActive",
"Test",
} }
if et < 0 || et >= NumEventTypes { if et < 0 || et >= NumEventTypes {
@ -194,6 +284,22 @@ func (et EventType) MakeEvent(data map[string]string) (Event, error) {
return nil, errors.Join(errors.New(BadUserActiveEvent), err) return nil, errors.Join(errors.New(BadUserActiveEvent), err)
} }
return UserActiveEvent{*e}, nil return UserActiveEvent{*e}, nil
case Test:
disp := false
if v, ok := data[TestEventDisposeKey]; ok && v == "t" {
disp = true
}
id := -1
if v, ok := data[TestEventIDKey]; ok {
var err error
if id, err = strconv.Atoi(v); err != nil {
return nil, errors.Join(errors.New(BadTestEvent), err)
}
}
return TestEvent{
Dispose: disp,
ID: id,
}, nil
default: default:
return nil, errors.New(BadEventTypeError) return nil, errors.New(BadEventTypeError)
} }
@ -232,6 +338,9 @@ type Event interface {
// validates state of internal metadata per EventType // validates state of internal metadata per EventType
Validate() error Validate() error
// returns true if event can be discarded
Disposable() bool
} }
func EventToString(e Event) (string, error) { func EventToString(e Event) (string, error) {
@ -283,7 +392,7 @@ func PublishEvent(e Event) error {
blocking := false blocking := false
for _, c := range eventSubscriptionCache[e.Type()] { for _, c := range eventSubscriptionCache[e.Type()] {
if float32(len(c)) > (float32(maxEventsInMemory) * 0.25) { if float32(len(c)) > (float32(maxEventsInMemory) * 0.75) {
if len(c) == maxEventsInMemory { if len(c) == maxEventsInMemory {
logging.Warn( logging.Warn(
"PublishEvent() blocking -- event channel full", "PublishEvent() blocking -- event channel full",
@ -373,8 +482,6 @@ func GetMatchingEvents(
} }
filter := func(e Event) bool { filter := func(e Event) bool {
ev, er := EventToString(e)
fmt.Printf("Checking: %s (%e)\n", ev, er)
if e.Type() != t { if e.Type() != t {
return true return true
} }
@ -385,7 +492,6 @@ func GetMatchingEvents(
} }
} }
fmt.Println("Found Match")
matches = append(matches, e) matches = append(matches, e)
return true return true
} }

View file

@ -2,11 +2,12 @@ package state
import ( import (
"fmt" "fmt"
"os"
"testing" "testing"
"time" "time"
"gitlab.com/whom/bingobot/pkg/docbuf"
"gitlab.com/whom/bingobot/internal/logging" "gitlab.com/whom/bingobot/internal/logging"
"gitlab.com/whom/bingobot/pkg/docbuf"
) )
/* WARNING: /* WARNING:
@ -54,15 +55,11 @@ func TestEventMarshalUnmarshal(t *testing.T) {
t.Fatalf("error marshalling challenge: %e, %s", err, cstr) t.Fatalf("error marshalling challenge: %e, %s", err, cstr)
} }
t.Logf("cstr: %s\n", cstr)
vstr, err := EventToString(vev); vstr, err := EventToString(vev);
if err != nil { if err != nil {
t.Fatalf("error marshalling vote: %e, %s", err, vstr) t.Fatalf("error marshalling vote: %e, %s", err, vstr)
} }
t.Logf("vstr: %s\n", vstr)
if ev, err := EventFromString(cstr); err != nil || if ev, err := EventFromString(cstr); err != nil ||
ev.Data()[UserEventUserKey] != cev.Data()[UserEventUserKey] || ev.Data()[UserEventUserKey] != cev.Data()[UserEventUserKey] ||
ev.Data()[UserEventCreatedKey] != cev.Data()[UserEventCreatedKey] { ev.Data()[UserEventCreatedKey] != cev.Data()[UserEventCreatedKey] {
@ -270,3 +267,81 @@ func TestVoteEventValidations(t *testing.T) {
t.Errorf("Unexpected or no error: %e", err) t.Errorf("Unexpected or no error: %e", err)
} }
} }
func TestEventReplay(t *testing.T) {
tmpTestFile, err := os.CreateTemp("", "TestEventReplayBackingStore")
if err != nil {
t.Fatalf("failed setting up tempfile: %s", err.Error())
}
if err := tmpTestFile.Close(); err != nil {
t.Fatalf("failed to close initial handle to tempfile: %s", err.Error())
}
if err := Init(2, tmpTestFile.Name()); err != nil {
t.Fatalf("failed initializing state machine: %s", err.Error())
}
if err := PublishEvent(TestEvent{
ID: 1,
Dispose: true,
}); err != nil {
t.Fatalf("failed to publish event 1: %s", err.Error())
}
if err := PublishEvent(TestEvent{
ID: 2,
Dispose: false,
}); err != nil {
t.Fatalf("failed to publish event 2: %s", err.Error())
}
if err := PublishEvent(TestEvent{
ID: 3,
Dispose: true,
}); err != nil {
t.Fatalf("failed to publish event 3: %s", err.Error())
}
if err := PublishEvent(TestEvent{
ID: 4,
Dispose: false,
}); err != nil {
t.Fatalf("failed to publish event 4: %s", err.Error())
}
sub, err := Test.Subscribe()
if err := Start(); err != nil {
t.Fatalf("failed to start state machine: %s", err.Error())
}
select {
case ev1 := <- sub:
if ev1.Type() != Test {
t.Fatalf("wrong kind of event somehow: %+v", ev1)
}
if ev1.(TestEvent).ID != 2 {
t.Fatalf("misordered event: %+v", ev1)
}
default:
t.Fatal("no events available from replay")
}
select {
case ev2 := <- sub:
if ev2.Type() != Test {
t.Fatalf("wrong kind of event somehow: %+v", ev2)
}
if ev2.(TestEvent).ID != 4 {
t.Fatalf("misordered event: %+v", ev2)
}
default:
t.Fatal("only one event made it out")
}
select {
case <- sub:
t.Fatalf("superfluous events left in subscription")
default:
}
}

35
internal/web/web.go Normal file
View file

@ -0,0 +1,35 @@
package web
import (
"fmt"
"net/http"
"gitlab.com/whom/bingobot/internal/activity"
"gitlab.com/whom/bingobot/internal/config"
"gitlab.com/whom/bingobot/internal/logging"
)
func Start() error {
frag := config.Get().LocalWebEndpointListen
http.HandleFunc("/", HandleHttpRequest)
go logging.Error(http.ListenAndServe(frag, nil).Error())
return nil
}
func HandleHttpRequest(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "<html>")
fmt.Fprintf(w, "<h1> User Activity Metrics </h1>")
fmt.Fprintf(w, "<p> number of the last %d days a user has been active </p>",
config.Get().UserEventLifespanDays)
fmt.Fprintf(w, "<table>")
fmt.Fprintf(w, "<tr><th>User</th><th>Days</th></tr>")
for k, v := range activity.GetActivitySnapshot() {
fmt.Fprintf(w, "<tr><td>%s</td><td>%d</td></tr>",
k, len(v))
}
fmt.Fprintf(w, "</table>")
fmt.Fprintf(w, "</html>")
}

20
main.go
View file

@ -4,10 +4,12 @@ import (
"flag" "flag"
"log" "log"
"gitlab.com/whom/bingobot/internal/activity"
"gitlab.com/whom/bingobot/internal/config" "gitlab.com/whom/bingobot/internal/config"
"gitlab.com/whom/bingobot/internal/discord" "gitlab.com/whom/bingobot/internal/discord"
"gitlab.com/whom/bingobot/internal/logging" "gitlab.com/whom/bingobot/internal/logging"
"gitlab.com/whom/bingobot/internal/state" "gitlab.com/whom/bingobot/internal/state"
"gitlab.com/whom/bingobot/internal/web"
) )
var ( var (
@ -32,11 +34,23 @@ func main() {
); err != nil { ); err != nil {
log.Fatalf("couldn't initialize state engine: %s", err.Error()) log.Fatalf("couldn't initialize state engine: %s", err.Error())
} }
if err := activity.Start(); err != nil {
// TODO: handle gracefully and continue?
log.Fatalf("failed to start activity module: %s", err.Error())
}
if err := web.Start(); err != nil {
log.Fatalf("failed to start local web server: %s", err.Error())
}
// start this LAST
if err := state.Start(); err != nil {
log.Fatalf("failed to start state machine: %s", err.Error())
}
defer state.Teardown() defer state.Teardown()
err = startBot() if err := startBot(); err != nil {
if err != nil {
log.Fatal(err) log.Fatal(err)
} }
} }

View file

@ -263,6 +263,11 @@ func NewDocumentBuffer(
* If cache is full oldest document is written to backing writer. * If cache is full oldest document is written to backing writer.
*/ */
func (b *DocBuf) Push(doc string) error { func (b *DocBuf) Push(doc string) error {
if b.cacheSize == 0 {
b.writeToBackingStore(doc)
return nil
}
if len(b.cache) >= b.cacheSize { if len(b.cache) >= b.cacheSize {
if err := b.demote(); err != nil { if err := b.demote(); err != nil {
return err return err
@ -280,6 +285,14 @@ func (b *DocBuf) Push(doc string) error {
* out of the backing ReaderWriter. * out of the backing ReaderWriter.
*/ */
func (b *DocBuf) Pop() (string, error) { func (b *DocBuf) Pop() (string, error) {
if b.cacheSize == 0 {
d, e := b.readDocumentsFromDisk(1, true, false)
if len(d) > 0 {
return d[0], e
}
return "", e
}
if len(b.cache) < 1 { if len(b.cache) < 1 {
if err := b.promote(true); err != nil { if err := b.promote(true); err != nil {
return "", err return "", err