From 2515d396a0e4e72a61bd1620147288976debc448 Mon Sep 17 00:00:00 2001 From: Ava Affine Date: Thu, 5 Dec 2024 18:11:41 -0800 Subject: [PATCH 1/9] Event replay and cleanup This commit introduces the ability to reconfigure modules/event subscribers by replaying previously stored events at startup. This way modules/subscribers can always come up with the latest activity and data that was present at close at the last run of the program. This process also includes the disposal of unneeded events to minimize disk use over time. The changes include the following: 1. Event interface is extended with the Disposable() function unimplemented stubs for existing interface implementations 2. state.Init() is split into Init() and Start() Init() now initialized the file used by eventStream, and puts in place a 0 size memory cache as a temporary measure on top of the file. Start() reads Pop()s documents one by one from the temp nil-cache eventStream, it attempts to dispose of each event and if the event is not disposable it is pushed onto a second temporary no-memory-cache buffer which is then reverse ordered. The underlying file is truncated and reopened. Finally, the real eventStream is allocated, with in memory cache. All events in the second buffer are published (sent to subscribers as well as added to the new eventStream). 3. Updates to main() to support Init() vs Start() Signed-off-by: Ava Affine --- internal/state/events.go | 13 +++++++ internal/state/state.go | 84 +++++++++++++++++++++++++++++++++++++++- main.go | 7 +++- 3 files changed, 102 insertions(+), 2 deletions(-) diff --git a/internal/state/events.go b/internal/state/events.go index a24b91f..e6ac421 100644 --- a/internal/state/events.go +++ b/internal/state/events.go @@ -102,6 +102,12 @@ func (ve VoteEvent) Validate() error { return nil } +func (ve VoteEvent) Disposable() bool { + // will be implemented with democracy module + logging.Warn("unimplemented: VoteEvent::Disposable") + return false +} + type UserEvent struct { uid string created time.Time @@ -129,6 +135,13 @@ func (ue UserEvent) Validate() error { return nil } +func (ue UserEvent) Disposable() bool { + // when I make a module for challenges, restorations, and UserActives + // then I should implement this + logging.Warn("unimplemented: UserEvent::Disposable") + return false +} + type ChallengeEvent struct { UserEvent } diff --git a/internal/state/state.go b/internal/state/state.go index 4986991..deb0094 100644 --- a/internal/state/state.go +++ b/internal/state/state.go @@ -40,12 +40,18 @@ const ( BadEventObjUnmarshal = "failed to unmarshal event map" BadEventUnmarshal = "failed to unmarshal event" BadEventMissingTypeKey = "event map missing type key" + StartBeforeInitError = "state machine not initialized" + ReopenStoreFileError = "failed to reopen store file" ) const ( EventTypeMapKey = "type" ) +const ( + TmpDocBufBackingFile = "bingobot_events_processing" +) + var eventMutex sync.RWMutex var eventSubscriptionCache = [NumEventTypes][]chan Event{} var eventStorageFileName string @@ -60,7 +66,7 @@ func Init(eventMemCacheSize int, eventStoreFileName string) error { } if eventStream, err = docbuf.NewDocumentBuffer( - eventMemCacheSize, + 0, // temporary nil-cache buffer to replay events from file, ); err != nil { return errors.Join(errors.New(BadEventStreamInit), err) @@ -71,6 +77,79 @@ func Init(eventMemCacheSize int, eventStoreFileName string) error { return nil } +// replay events and begin state machine +func Start() error { + if eventStream == nil { + return errors.New(StartBeforeInitError) + } + + tmpBackFile, err := os.CreateTemp("", TmpDocBufBackingFile) + if err != nil { + return err + } + + tmpBuf, err := docbuf.NewDocumentBuffer(0, tmpBackFile) + if err != nil { + tmpBackFile.Close() + return err + } + + // filter out disposable events into a reverse ordered on disk stack + var doc string + for err != nil && err.Error() != docbuf.PopFromEmptyBufferError { + doc, err = eventStream.Pop() + if err != nil && err.Error() != docbuf.PopFromEmptyBufferError { + return err + } + + ev, err := EventFromString(doc) + if err != nil { + logging.Warn("Discarding the following event for not unmarshaling: %s", doc) + continue + } + + if !ev.Disposable() { + tmpBuf.Push(doc) + } + } + + if err := os.Truncate(eventStorageFileName, 0); err != nil { + return errors.Join(errors.New(ReopenStoreFileError), err) + } + + file, err := os.OpenFile(eventStorageFileName, os.O_CREATE|os.O_RDWR, 0644) + if err != nil { + return errors.Join(errors.New(BadEventStoreFilename), err) + } + + eventStream, err := docbuf.NewDocumentBuffer(maxEventsInMemory, file) + if err != nil { + // return error here will panic without truncating or writing to the file + return err + } + + // unravel tmp stack into properly ordered properly allocated buffer + for err != nil && err.Error() != docbuf.PopFromEmptyBufferError { + doc, err := eventStream.Pop() + if err != nil && err.Error() != docbuf.PopFromEmptyBufferError { + logging.Warn("Could not handle the following error: %s", err.Error()) + continue + } + + ev, err := EventFromString(doc) + if err != nil { + logging.Warn("Could not handle the following error: %s", err.Error()) + continue + } + + if err := PublishEvent(ev); err != nil { + logging.Warn("Could not handle the following error: %s", err.Error()) + } + } + + return nil +} + // return no error. we are handling SIGINT func Teardown() { // riskily ignore all locking.... @@ -232,6 +311,9 @@ type Event interface { // validates state of internal metadata per EventType Validate() error + + // returns true if event can be discarded + Disposable() bool } func EventToString(e Event) (string, error) { diff --git a/main.go b/main.go index 21111e9..6d2e17f 100644 --- a/main.go +++ b/main.go @@ -32,10 +32,15 @@ func main() { ); err != nil { log.Fatalf("couldn't initialize state engine: %s", err.Error()) } - defer state.Teardown() + // TODO: start modules HERE and not elsewhere err = startBot() + if err := state.Start(); err != nil { + log.Fatal("failed to start state machine: %s", err.Error()) + } + defer state.Teardown() + if err != nil { log.Fatal(err) } From e7d229c217fb19fccd0d6943f270d905e0fe947a Mon Sep 17 00:00:00 2001 From: Ava Affine Date: Tue, 31 Dec 2024 11:11:49 -0800 Subject: [PATCH 2/9] Early functionality modules 0. tests for event replay Tests are now included for the event replay features. This includes many fixes on top of the last commit as well as some tweaks to config module values. 1. activity module An activity module is created to track the useractive events and provide a counter for them. It also encapsulates logic to discard old useractive events. 2. web module A web module is created. This module serves a static webpage showing runtime information. Currently it only shows a snapshot of the user activity data. It is my intention that it eventually also shows an audit log, known users and channels, uptime, and more. Future work will also be needed in order to use HTML templating so that it doesn't look so... basic. Live updates to the information may also be desired. Signed-off-by: Ava Affine --- internal/activity/activity.go | 106 ++++++++++++++++++++++++++++++++++ internal/config/config.go | 38 +++++++----- internal/state/events.go | 44 +++++++++++++- internal/state/state.go | 60 +++++++++++++------ internal/state/state_test.go | 85 +++++++++++++++++++++++++-- internal/web/web.go | 35 +++++++++++ main.go | 17 ++++-- pkg/docbuf/docbuf.go | 13 +++++ 8 files changed, 353 insertions(+), 45 deletions(-) create mode 100644 internal/activity/activity.go create mode 100644 internal/web/web.go diff --git a/internal/activity/activity.go b/internal/activity/activity.go new file mode 100644 index 0000000..9430085 --- /dev/null +++ b/internal/activity/activity.go @@ -0,0 +1,106 @@ +package activity + +import ( + "errors" + "sync" + "time" + + "gitlab.com/whom/bingobot/internal/config" + "gitlab.com/whom/bingobot/internal/state" + "gitlab.com/whom/bingobot/internal/logging" +) + +/* Activity module + * This module sits and processes incoming + */ + +const ( + ActivityModuleStartFail = "failed to start activity module" +) + +var currentUserActivity map[string][]state.UserActiveEvent +var userActivityLock sync.RWMutex + +func Start() error { + ch, err := state.UserActive.Subscribe() + if err != nil { + return errors.Join( + errors.New(ActivityModuleStartFail), + err, + ) + } + + // process incoming events loop + go func() { + for { + ev := <- ch + emap := ev.Data() + user := emap[state.UserEventUserKey] + etime := ev.Time() + delta := time.Since(etime).Hours() / float64(24) + + if delta <= float64(config.Get().UserEventLifespanDays) { + new := []state.UserActiveEvent{ev.(state.UserActiveEvent)} + userActivityLock.Lock() + current, found := currentUserActivity[user] + if found { + new = append(new, current...) + } + userActivityLock.Unlock() + } else { + logging.Warn("recieved expired useractive event") + } + } + }() + + // process expired events loop + go func() { + for { + delta := time.Hour * 24 + delta *= time.Duration(config.Get().UserEventLifespanDays) + tcur := time.Now().Add(delta) + + // get next soonest expiration + userActivityLock.RLock() + for _, evs := range currentUserActivity { + for _, ev := range evs { + hrs := time.Duration(24 * config.Get().UserEventLifespanDays) + t := ev.Time().Add(time.Hour * hrs) + if t.Before(tcur) { + tcur = t + } + } + } + userActivityLock.RUnlock() + time.Sleep(tcur.Sub(time.Now())) + + userActivityLock.Lock() + for k, v := range currentUserActivity { + new := []state.UserActiveEvent{} + for _, ev := range v { + if !ev.Disposable() { + new = append(new, ev) + } + } + currentUserActivity[k] = new + } + userActivityLock.Unlock() + } + }() + + return nil +} + +func GetActivitySnapshot() map[string][]state.UserActiveEvent { + userActivityLock.RLock() + defer userActivityLock.RUnlock() + + snapshot := make(map[string][]state.UserActiveEvent) + for k, v := range currentUserActivity { + newSl := make([]state.UserActiveEvent, len(v)) + copy(newSl, v) + snapshot[k] = newSl + } + + return snapshot +} diff --git a/internal/config/config.go b/internal/config/config.go index eee917d..f32ae9a 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -17,30 +17,34 @@ type AppConfig struct { LogCompression bool `yaml:"log_compression"` LogAddSource bool `yaml:"log_add_source"` - /* - how long (in seconds) a user needs to be in vc in order to generate a - UserActive event - */ + /* how long (in seconds) a user needs to be in vc in order to generate a + * UserActive event + */ VoiceActivityThresholdSeconds int `yaml:"voice_activity_threshold_seconds"` - /* - how long (in milliseconds) a voice activity timer sleeps at a time between - context cancellation checks. - - a higher value means the function sleeps longer which could be - useful for some reason in the future - - a higher value also means that the timer could take longer to cancel. - - current recommended value is 1000ms. - */ + /* how long (in milliseconds) a voice activity timer sleeps at a time between + * context cancellation checks. + * a higher value means the function sleeps longer which could be + * useful for some reason in the future + * a higher value also means that the timer could take longer to cancel. + * current recommended value is 1000ms. + */ VoiceActivityTimerSleepIntervalMillis int `yaml:"voice_activity_timer_sleep_interval_millis"` /* persistent state file store */ PersistentCacheStore string `yaml:"persistent_cache_store"` /* number of internal state events to cache in memory */ - InMemoryEventCacheSize int `yaml:"InMemoryEventCacheSize"` + InMemoryEventCacheSize int `yaml:"in_memory_event_cache_size"` + + /* number of days a useractive event is valid for + * increasing this will have a bell curve effect on + * voting weights + */ + UserEventLifespanDays int `yaml:"user_event_lifespan_days"` + + /* listen address for local http Server */ + LocalWebEndpointListen string `yaml:"local_web_endpoint_listen"` } var config *AppConfig @@ -110,4 +114,6 @@ func setDefaults() { viper.SetDefault("VoiceActivityTimerSleepIntervalMillis", 1000) viper.SetDefault("PersistentCacheStore", "/tmp/bingobot") viper.SetDefault("InMemoryEventCacheSize", 512) + viper.SetDefault("UserEventLifespanDays", 10) + viper.SetDefault("LocalWebEndpointListen", ":8080") } diff --git a/internal/state/events.go b/internal/state/events.go index e6ac421..c14ee77 100644 --- a/internal/state/events.go +++ b/internal/state/events.go @@ -2,9 +2,10 @@ package state import ( "errors" - "time" "fmt" - + "time" + + "gitlab.com/whom/bingobot/internal/config" "gitlab.com/whom/bingobot/internal/logging" ) @@ -180,9 +181,48 @@ func (ua UserActiveEvent) Type() EventType { return UserActive } +func (ua UserActiveEvent) Disposable() bool { + return (time.Since(ua.created).Hours() / 24) >= float64(config.Get().UserEventLifespanDays) +} + func NewUserActiveEvent(user string) UserActiveEvent { return UserActiveEvent{UserEvent{ uid: user, created: time.Now(), }} } + +const ( + TestEventDisposeKey = "dispose" + TestEventIDKey = "id" +) + +type TestEvent struct { + Dispose bool + ID int +} + +func (te TestEvent) Type() EventType { + return Test +} + +func (te TestEvent) Time() time.Time { + return time.Now() +} + +func (te TestEvent) Data() map[string]string { + m := map[string]string{} + if te.Dispose { + m[TestEventDisposeKey] = "t" + } + m[TestEventIDKey] = fmt.Sprintf("%d", te.ID) + return m +} + +func (te TestEvent) Validate() error { + return nil +} + +func (te TestEvent) Disposable() bool { + return te.Dispose +} diff --git a/internal/state/state.go b/internal/state/state.go index deb0094..f4a83fd 100644 --- a/internal/state/state.go +++ b/internal/state/state.go @@ -1,6 +1,6 @@ package state -/* STATE +/* State module * This package encapsulates various state information * state is represented in various global singletons * Additionally, this package offers a pub/sub interface @@ -8,15 +8,15 @@ package state */ import ( - "fmt" + "encoding/json" + "errors" + "os" + "strconv" "sync" "time" - "os" - "errors" - "encoding/json" - "gitlab.com/whom/bingobot/pkg/docbuf" "gitlab.com/whom/bingobot/internal/logging" + "gitlab.com/whom/bingobot/pkg/docbuf" ) /* Event interface is meant to encapsulate a general interface @@ -36,19 +36,16 @@ const ( BadChallengeEvent = "failed to make Challenge Event" BadRestorationEvent = "failed to make Restoration Event" BadUserActiveEvent = "failed to make UserActive Event" + BadTestEvent = "failed to make Test Event" BadEventObjMarshal = "error marshalling event" BadEventObjUnmarshal = "failed to unmarshal event map" BadEventUnmarshal = "failed to unmarshal event" BadEventMissingTypeKey = "event map missing type key" StartBeforeInitError = "state machine not initialized" ReopenStoreFileError = "failed to reopen store file" -) -const ( EventTypeMapKey = "type" -) -const ( TmpDocBufBackingFile = "bingobot_events_processing" ) @@ -96,12 +93,17 @@ func Start() error { // filter out disposable events into a reverse ordered on disk stack var doc string - for err != nil && err.Error() != docbuf.PopFromEmptyBufferError { + for err == nil || err.Error() != docbuf.PopFromEmptyBufferError { doc, err = eventStream.Pop() if err != nil && err.Error() != docbuf.PopFromEmptyBufferError { return err } + if doc == "" || + (err != nil && err.Error() == docbuf.PopFromEmptyBufferError) { + break + } + ev, err := EventFromString(doc) if err != nil { logging.Warn("Discarding the following event for not unmarshaling: %s", doc) @@ -122,20 +124,25 @@ func Start() error { return errors.Join(errors.New(BadEventStoreFilename), err) } - eventStream, err := docbuf.NewDocumentBuffer(maxEventsInMemory, file) + eventStream, err = docbuf.NewDocumentBuffer(maxEventsInMemory, file) if err != nil { // return error here will panic without truncating or writing to the file return err } // unravel tmp stack into properly ordered properly allocated buffer - for err != nil && err.Error() != docbuf.PopFromEmptyBufferError { - doc, err := eventStream.Pop() + for err == nil || err.Error() != docbuf.PopFromEmptyBufferError { + doc, err := tmpBuf.Pop() if err != nil && err.Error() != docbuf.PopFromEmptyBufferError { logging.Warn("Could not handle the following error: %s", err.Error()) continue } + if doc == "" || + (err != nil && err.Error() == docbuf.PopFromEmptyBufferError) { + break + } + ev, err := EventFromString(doc) if err != nil { logging.Warn("Could not handle the following error: %s", err.Error()) @@ -175,6 +182,7 @@ const ( Challenge Restoration UserActive + Test // ... // leave this last @@ -192,6 +200,8 @@ func EventTypeFromString(doc string) EventType { return Restoration case "UserActive": return UserActive + case "Test": + return Test default: // error case return NumEventTypes @@ -204,6 +214,7 @@ func (et EventType) String() string { "Challenge", "Restoration", "UserActive", + "Test", } if et < 0 || et >= NumEventTypes { @@ -273,6 +284,22 @@ func (et EventType) MakeEvent(data map[string]string) (Event, error) { return nil, errors.Join(errors.New(BadUserActiveEvent), err) } return UserActiveEvent{*e}, nil + case Test: + disp := false + if v, ok := data[TestEventDisposeKey]; ok && v == "t" { + disp = true + } + id := -1 + if v, ok := data[TestEventIDKey]; ok { + var err error + if id, err = strconv.Atoi(v); err != nil { + return nil, errors.Join(errors.New(BadTestEvent), err) + } + } + return TestEvent{ + Dispose: disp, + ID: id, + }, nil default: return nil, errors.New(BadEventTypeError) } @@ -365,7 +392,7 @@ func PublishEvent(e Event) error { blocking := false for _, c := range eventSubscriptionCache[e.Type()] { - if float32(len(c)) > (float32(maxEventsInMemory) * 0.25) { + if float32(len(c)) > (float32(maxEventsInMemory) * 0.75) { if len(c) == maxEventsInMemory { logging.Warn( "PublishEvent() blocking -- event channel full", @@ -455,8 +482,6 @@ func GetMatchingEvents( } filter := func(e Event) bool { - ev, er := EventToString(e) - fmt.Printf("Checking: %s (%e)\n", ev, er) if e.Type() != t { return true } @@ -467,7 +492,6 @@ func GetMatchingEvents( } } - fmt.Println("Found Match") matches = append(matches, e) return true } diff --git a/internal/state/state_test.go b/internal/state/state_test.go index 20800f3..877b724 100644 --- a/internal/state/state_test.go +++ b/internal/state/state_test.go @@ -2,11 +2,12 @@ package state import ( "fmt" + "os" "testing" "time" - "gitlab.com/whom/bingobot/pkg/docbuf" "gitlab.com/whom/bingobot/internal/logging" + "gitlab.com/whom/bingobot/pkg/docbuf" ) /* WARNING: @@ -54,15 +55,11 @@ func TestEventMarshalUnmarshal(t *testing.T) { t.Fatalf("error marshalling challenge: %e, %s", err, cstr) } - t.Logf("cstr: %s\n", cstr) - vstr, err := EventToString(vev); if err != nil { t.Fatalf("error marshalling vote: %e, %s", err, vstr) } - t.Logf("vstr: %s\n", vstr) - if ev, err := EventFromString(cstr); err != nil || ev.Data()[UserEventUserKey] != cev.Data()[UserEventUserKey] || ev.Data()[UserEventCreatedKey] != cev.Data()[UserEventCreatedKey] { @@ -270,3 +267,81 @@ func TestVoteEventValidations(t *testing.T) { t.Errorf("Unexpected or no error: %e", err) } } + +func TestEventReplay(t *testing.T) { + tmpTestFile, err := os.CreateTemp("", "TestEventReplayBackingStore") + if err != nil { + t.Fatalf("failed setting up tempfile: %s", err.Error()) + } + + if err := tmpTestFile.Close(); err != nil { + t.Fatalf("failed to close initial handle to tempfile: %s", err.Error()) + } + + if err := Init(2, tmpTestFile.Name()); err != nil { + t.Fatalf("failed initializing state machine: %s", err.Error()) + } + + if err := PublishEvent(TestEvent{ + ID: 1, + Dispose: true, + }); err != nil { + t.Fatalf("failed to publish event 1: %s", err.Error()) + } + + if err := PublishEvent(TestEvent{ + ID: 2, + Dispose: false, + }); err != nil { + t.Fatalf("failed to publish event 2: %s", err.Error()) + } + + if err := PublishEvent(TestEvent{ + ID: 3, + Dispose: true, + }); err != nil { + t.Fatalf("failed to publish event 3: %s", err.Error()) + } + + if err := PublishEvent(TestEvent{ + ID: 4, + Dispose: false, + }); err != nil { + t.Fatalf("failed to publish event 4: %s", err.Error()) + } + + sub, err := Test.Subscribe() + if err := Start(); err != nil { + t.Fatalf("failed to start state machine: %s", err.Error()) + } + + select { + case ev1 := <- sub: + if ev1.Type() != Test { + t.Fatalf("wrong kind of event somehow: %+v", ev1) + } + if ev1.(TestEvent).ID != 2 { + t.Fatalf("misordered event: %+v", ev1) + } + default: + t.Fatal("no events available from replay") + } + + select { + case ev2 := <- sub: + if ev2.Type() != Test { + t.Fatalf("wrong kind of event somehow: %+v", ev2) + } + if ev2.(TestEvent).ID != 4 { + t.Fatalf("misordered event: %+v", ev2) + } + default: + t.Fatal("only one event made it out") + } + + select { + case <- sub: + t.Fatalf("superfluous events left in subscription") + default: + } +} diff --git a/internal/web/web.go b/internal/web/web.go new file mode 100644 index 0000000..38051e6 --- /dev/null +++ b/internal/web/web.go @@ -0,0 +1,35 @@ +package web + +import ( + "fmt" + "net/http" + + "gitlab.com/whom/bingobot/internal/activity" + "gitlab.com/whom/bingobot/internal/config" + "gitlab.com/whom/bingobot/internal/logging" +) + +func Start() error { + frag := config.Get().LocalWebEndpointListen + http.HandleFunc("/", HandleHttpRequest) + go logging.Error(http.ListenAndServe(frag, nil).Error()) + + return nil +} + +func HandleHttpRequest(w http.ResponseWriter, r *http.Request) { + fmt.Fprintf(w, "") + fmt.Fprintf(w, "

User Activity Metrics

") + fmt.Fprintf(w, "

number of the last %d days a user has been active

", + config.Get().UserEventLifespanDays) + + fmt.Fprintf(w, "") + fmt.Fprintf(w, "") + for k, v := range activity.GetActivitySnapshot() { + fmt.Fprintf(w, "", + k, len(v)) + } + fmt.Fprintf(w, "
UserDays
%s%d
") + + fmt.Fprintf(w, "") +} diff --git a/main.go b/main.go index 6d2e17f..0528e1c 100644 --- a/main.go +++ b/main.go @@ -4,10 +4,12 @@ import ( "flag" "log" + "gitlab.com/whom/bingobot/internal/activity" "gitlab.com/whom/bingobot/internal/config" "gitlab.com/whom/bingobot/internal/discord" "gitlab.com/whom/bingobot/internal/logging" "gitlab.com/whom/bingobot/internal/state" + "gitlab.com/whom/bingobot/internal/web" ) var ( @@ -33,15 +35,22 @@ func main() { log.Fatalf("couldn't initialize state engine: %s", err.Error()) } - // TODO: start modules HERE and not elsewhere - err = startBot() + if err := activity.Start(); err != nil { + // TODO: handle gracefully and continue? + log.Fatalf("failed to start activity module: %s", err.Error()) + } + if err := web.Start(); err != nil { + log.Fatalf("failed to start local web server: %s", err.Error()) + } + + // start this LAST if err := state.Start(); err != nil { - log.Fatal("failed to start state machine: %s", err.Error()) + log.Fatalf("failed to start state machine: %s", err.Error()) } defer state.Teardown() - if err != nil { + if err := startBot(); err != nil { log.Fatal(err) } } diff --git a/pkg/docbuf/docbuf.go b/pkg/docbuf/docbuf.go index cc0610e..cd3f633 100644 --- a/pkg/docbuf/docbuf.go +++ b/pkg/docbuf/docbuf.go @@ -263,6 +263,11 @@ func NewDocumentBuffer( * If cache is full oldest document is written to backing writer. */ func (b *DocBuf) Push(doc string) error { + if b.cacheSize == 0 { + b.writeToBackingStore(doc) + return nil + } + if len(b.cache) >= b.cacheSize { if err := b.demote(); err != nil { return err @@ -280,6 +285,14 @@ func (b *DocBuf) Push(doc string) error { * out of the backing ReaderWriter. */ func (b *DocBuf) Pop() (string, error) { + if b.cacheSize == 0 { + d, e := b.readDocumentsFromDisk(1, true, false) + if len(d) > 0 { + return d[0], e + } + return "", e + } + if len(b.cache) < 1 { if err := b.promote(true); err != nil { return "", err From 97bf66c191f33d54315506a8771e781b88c66399 Mon Sep 17 00:00:00 2001 From: Ava Affine Date: Wed, 8 Jan 2025 00:47:49 +0000 Subject: [PATCH 3/9] Event replay and cleanup --- internal/activity/activity.go | 106 +++++++++++++++++++++++++++ internal/config/config.go | 38 +++++----- internal/state/events.go | 57 ++++++++++++++- internal/state/state.go | 132 ++++++++++++++++++++++++++++++---- internal/state/state_test.go | 85 ++++++++++++++++++++-- internal/web/web.go | 35 +++++++++ main.go | 20 +++++- pkg/docbuf/docbuf.go | 13 ++++ 8 files changed, 447 insertions(+), 39 deletions(-) create mode 100644 internal/activity/activity.go create mode 100644 internal/web/web.go diff --git a/internal/activity/activity.go b/internal/activity/activity.go new file mode 100644 index 0000000..9430085 --- /dev/null +++ b/internal/activity/activity.go @@ -0,0 +1,106 @@ +package activity + +import ( + "errors" + "sync" + "time" + + "gitlab.com/whom/bingobot/internal/config" + "gitlab.com/whom/bingobot/internal/state" + "gitlab.com/whom/bingobot/internal/logging" +) + +/* Activity module + * This module sits and processes incoming + */ + +const ( + ActivityModuleStartFail = "failed to start activity module" +) + +var currentUserActivity map[string][]state.UserActiveEvent +var userActivityLock sync.RWMutex + +func Start() error { + ch, err := state.UserActive.Subscribe() + if err != nil { + return errors.Join( + errors.New(ActivityModuleStartFail), + err, + ) + } + + // process incoming events loop + go func() { + for { + ev := <- ch + emap := ev.Data() + user := emap[state.UserEventUserKey] + etime := ev.Time() + delta := time.Since(etime).Hours() / float64(24) + + if delta <= float64(config.Get().UserEventLifespanDays) { + new := []state.UserActiveEvent{ev.(state.UserActiveEvent)} + userActivityLock.Lock() + current, found := currentUserActivity[user] + if found { + new = append(new, current...) + } + userActivityLock.Unlock() + } else { + logging.Warn("recieved expired useractive event") + } + } + }() + + // process expired events loop + go func() { + for { + delta := time.Hour * 24 + delta *= time.Duration(config.Get().UserEventLifespanDays) + tcur := time.Now().Add(delta) + + // get next soonest expiration + userActivityLock.RLock() + for _, evs := range currentUserActivity { + for _, ev := range evs { + hrs := time.Duration(24 * config.Get().UserEventLifespanDays) + t := ev.Time().Add(time.Hour * hrs) + if t.Before(tcur) { + tcur = t + } + } + } + userActivityLock.RUnlock() + time.Sleep(tcur.Sub(time.Now())) + + userActivityLock.Lock() + for k, v := range currentUserActivity { + new := []state.UserActiveEvent{} + for _, ev := range v { + if !ev.Disposable() { + new = append(new, ev) + } + } + currentUserActivity[k] = new + } + userActivityLock.Unlock() + } + }() + + return nil +} + +func GetActivitySnapshot() map[string][]state.UserActiveEvent { + userActivityLock.RLock() + defer userActivityLock.RUnlock() + + snapshot := make(map[string][]state.UserActiveEvent) + for k, v := range currentUserActivity { + newSl := make([]state.UserActiveEvent, len(v)) + copy(newSl, v) + snapshot[k] = newSl + } + + return snapshot +} diff --git a/internal/config/config.go b/internal/config/config.go index eee917d..f32ae9a 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -17,30 +17,34 @@ type AppConfig struct { LogCompression bool `yaml:"log_compression"` LogAddSource bool `yaml:"log_add_source"` - /* - how long (in seconds) a user needs to be in vc in order to generate a - UserActive event - */ + /* how long (in seconds) a user needs to be in vc in order to generate a + * UserActive event + */ VoiceActivityThresholdSeconds int `yaml:"voice_activity_threshold_seconds"` - /* - how long (in milliseconds) a voice activity timer sleeps at a time between - context cancellation checks. - - a higher value means the function sleeps longer which could be - useful for some reason in the future - - a higher value also means that the timer could take longer to cancel. - - current recommended value is 1000ms. - */ + /* how long (in milliseconds) a voice activity timer sleeps at a time between + * context cancellation checks. + * a higher value means the function sleeps longer which could be + * useful for some reason in the future + * a higher value also means that the timer could take longer to cancel. + * current recommended value is 1000ms. + */ VoiceActivityTimerSleepIntervalMillis int `yaml:"voice_activity_timer_sleep_interval_millis"` /* persistent state file store */ PersistentCacheStore string `yaml:"persistent_cache_store"` /* number of internal state events to cache in memory */ - InMemoryEventCacheSize int `yaml:"InMemoryEventCacheSize"` + InMemoryEventCacheSize int `yaml:"in_memory_event_cache_size"` + + /* number of days a useractive event is valid for + * increasing this will have a bell curve effect on + * voting weights + */ + UserEventLifespanDays int `yaml:"user_event_lifespan_days"` + + /* listen address for local http Server */ + LocalWebEndpointListen string `yaml:"local_web_endpoint_listen"` } var config *AppConfig @@ -110,4 +114,6 @@ func setDefaults() { viper.SetDefault("VoiceActivityTimerSleepIntervalMillis", 1000) viper.SetDefault("PersistentCacheStore", "/tmp/bingobot") viper.SetDefault("InMemoryEventCacheSize", 512) + viper.SetDefault("UserEventLifespanDays", 10) + viper.SetDefault("LocalWebEndpointListen", ":8080") } diff --git a/internal/state/events.go b/internal/state/events.go index a24b91f..c14ee77 100644 --- a/internal/state/events.go +++ b/internal/state/events.go @@ -2,9 +2,10 @@ package state import ( "errors" - "time" "fmt" - + "time" + + "gitlab.com/whom/bingobot/internal/config" "gitlab.com/whom/bingobot/internal/logging" ) @@ -102,6 +103,12 @@ func (ve VoteEvent) Validate() error { return nil } +func (ve VoteEvent) Disposable() bool { + // will be implemented with democracy module + logging.Warn("unimplemented: VoteEvent::Disposable") + return false +} + type UserEvent struct { uid string created time.Time @@ -129,6 +136,13 @@ func (ue UserEvent) Validate() error { return nil } +func (ue UserEvent) Disposable() bool { + // when I make a module for challenges, restorations, and UserActives + // then I should implement this + logging.Warn("unimplemented: UserEvent::Disposable") + return false +} + type ChallengeEvent struct { UserEvent } @@ -167,9 +181,48 @@ func (ua UserActiveEvent) Type() EventType { return UserActive } +func (ua UserActiveEvent) Disposable() bool { + return (time.Since(ua.created).Hours() / 24) >= float64(config.Get().UserEventLifespanDays) +} + func NewUserActiveEvent(user string) UserActiveEvent { return UserActiveEvent{UserEvent{ uid: user, created: time.Now(), }} } + +const ( + TestEventDisposeKey = "dispose" + TestEventIDKey = "id" +) + +type TestEvent struct { + Dispose bool + ID int +} + +func (te TestEvent) Type() EventType { + return Test +} + +func (te TestEvent) Time() time.Time { + return time.Now() +} + +func (te TestEvent) Data() map[string]string { + m := map[string]string{} + if te.Dispose { + m[TestEventDisposeKey] = "t" + } + m[TestEventIDKey] = fmt.Sprintf("%d", te.ID) + return m +} + +func (te TestEvent) Validate() error { + return nil +} + +func (te TestEvent) Disposable() bool { + return te.Dispose +} diff --git a/internal/state/state.go b/internal/state/state.go index 4986991..f4a83fd 100644 --- a/internal/state/state.go +++ b/internal/state/state.go @@ -1,6 +1,6 @@ package state -/* STATE +/* State module * This package encapsulates various state information * state is represented in various global singletons * Additionally, this package offers a pub/sub interface @@ -8,15 +8,15 @@ package state */ import ( - "fmt" + "encoding/json" + "errors" + "os" + "strconv" "sync" "time" - "os" - "errors" - "encoding/json" - "gitlab.com/whom/bingobot/pkg/docbuf" "gitlab.com/whom/bingobot/internal/logging" + "gitlab.com/whom/bingobot/pkg/docbuf" ) /* Event interface is meant to encapsulate a general interface @@ -36,14 +36,17 @@ const ( BadChallengeEvent = "failed to make Challenge Event" BadRestorationEvent = "failed to make Restoration Event" BadUserActiveEvent = "failed to make UserActive Event" + BadTestEvent = "failed to make Test Event" BadEventObjMarshal = "error marshalling event" BadEventObjUnmarshal = "failed to unmarshal event map" BadEventUnmarshal = "failed to unmarshal event" BadEventMissingTypeKey = "event map missing type key" -) + StartBeforeInitError = "state machine not initialized" + ReopenStoreFileError = "failed to reopen store file" -const ( EventTypeMapKey = "type" + + TmpDocBufBackingFile = "bingobot_events_processing" ) var eventMutex sync.RWMutex @@ -60,7 +63,7 @@ func Init(eventMemCacheSize int, eventStoreFileName string) error { } if eventStream, err = docbuf.NewDocumentBuffer( - eventMemCacheSize, + 0, // temporary nil-cache buffer to replay events from file, ); err != nil { return errors.Join(errors.New(BadEventStreamInit), err) @@ -71,6 +74,89 @@ func Init(eventMemCacheSize int, eventStoreFileName string) error { return nil } +// replay events and begin state machine +func Start() error { + if eventStream == nil { + return errors.New(StartBeforeInitError) + } + + tmpBackFile, err := os.CreateTemp("", TmpDocBufBackingFile) + if err != nil { + return err + } + + tmpBuf, err := docbuf.NewDocumentBuffer(0, tmpBackFile) + if err != nil { + tmpBackFile.Close() + return err + } + + // filter out disposable events into a reverse ordered on disk stack + var doc string + for err == nil || err.Error() != docbuf.PopFromEmptyBufferError { + doc, err = eventStream.Pop() + if err != nil && err.Error() != docbuf.PopFromEmptyBufferError { + return err + } + + if doc == "" || + (err != nil && err.Error() == docbuf.PopFromEmptyBufferError) { + break + } + + ev, err := EventFromString(doc) + if err != nil { + logging.Warn("Discarding the following event for not unmarshaling: %s", doc) + continue + } + + if !ev.Disposable() { + tmpBuf.Push(doc) + } + } + + if err := os.Truncate(eventStorageFileName, 0); err != nil { + return errors.Join(errors.New(ReopenStoreFileError), err) + } + + file, err := os.OpenFile(eventStorageFileName, os.O_CREATE|os.O_RDWR, 0644) + if err != nil { + return errors.Join(errors.New(BadEventStoreFilename), err) + } + + eventStream, err = docbuf.NewDocumentBuffer(maxEventsInMemory, file) + if err != nil { + // return error here will panic without truncating or writing to the file + return err + } + + // unravel tmp stack into properly ordered properly allocated buffer + for err == nil || err.Error() != docbuf.PopFromEmptyBufferError { + doc, err := tmpBuf.Pop() + if err != nil && err.Error() != docbuf.PopFromEmptyBufferError { + logging.Warn("Could not handle the following error: %s", err.Error()) + continue + } + + if doc == "" || + (err != nil && err.Error() == docbuf.PopFromEmptyBufferError) { + break + } + + ev, err := EventFromString(doc) + if err != nil { + logging.Warn("Could not handle the following error: %s", err.Error()) + continue + } + + if err := PublishEvent(ev); err != nil { + logging.Warn("Could not handle the following error: %s", err.Error()) + } + } + + return nil +} + // return no error. we are handling SIGINT func Teardown() { // riskily ignore all locking.... @@ -96,6 +182,7 @@ const ( Challenge Restoration UserActive + Test // ... // leave this last @@ -113,6 +200,8 @@ func EventTypeFromString(doc string) EventType { return Restoration case "UserActive": return UserActive + case "Test": + return Test default: // error case return NumEventTypes @@ -125,6 +214,7 @@ func (et EventType) String() string { "Challenge", "Restoration", "UserActive", + "Test", } if et < 0 || et >= NumEventTypes { @@ -194,6 +284,22 @@ func (et EventType) MakeEvent(data map[string]string) (Event, error) { return nil, errors.Join(errors.New(BadUserActiveEvent), err) } return UserActiveEvent{*e}, nil + case Test: + disp := false + if v, ok := data[TestEventDisposeKey]; ok && v == "t" { + disp = true + } + id := -1 + if v, ok := data[TestEventIDKey]; ok { + var err error + if id, err = strconv.Atoi(v); err != nil { + return nil, errors.Join(errors.New(BadTestEvent), err) + } + } + return TestEvent{ + Dispose: disp, + ID: id, + }, nil default: return nil, errors.New(BadEventTypeError) } @@ -232,6 +338,9 @@ type Event interface { // validates state of internal metadata per EventType Validate() error + + // returns true if event can be discarded + Disposable() bool } func EventToString(e Event) (string, error) { @@ -283,7 +392,7 @@ func PublishEvent(e Event) error { blocking := false for _, c := range eventSubscriptionCache[e.Type()] { - if float32(len(c)) > (float32(maxEventsInMemory) * 0.25) { + if float32(len(c)) > (float32(maxEventsInMemory) * 0.75) { if len(c) == maxEventsInMemory { logging.Warn( "PublishEvent() blocking -- event channel full", @@ -373,8 +482,6 @@ func GetMatchingEvents( } filter := func(e Event) bool { - ev, er := EventToString(e) - fmt.Printf("Checking: %s (%e)\n", ev, er) if e.Type() != t { return true } @@ -385,7 +492,6 @@ func GetMatchingEvents( } } - fmt.Println("Found Match") matches = append(matches, e) return true } diff --git a/internal/state/state_test.go b/internal/state/state_test.go index 20800f3..877b724 100644 --- a/internal/state/state_test.go +++ b/internal/state/state_test.go @@ -2,11 +2,12 @@ package state import ( "fmt" + "os" "testing" "time" - "gitlab.com/whom/bingobot/pkg/docbuf" "gitlab.com/whom/bingobot/internal/logging" + "gitlab.com/whom/bingobot/pkg/docbuf" ) /* WARNING: @@ -54,15 +55,11 @@ func TestEventMarshalUnmarshal(t *testing.T) { t.Fatalf("error marshalling challenge: %e, %s", err, cstr) } - t.Logf("cstr: %s\n", cstr) - vstr, err := EventToString(vev); if err != nil { t.Fatalf("error marshalling vote: %e, %s", err, vstr) } - t.Logf("vstr: %s\n", vstr) - if ev, err := EventFromString(cstr); err != nil || ev.Data()[UserEventUserKey] != cev.Data()[UserEventUserKey] || ev.Data()[UserEventCreatedKey] != cev.Data()[UserEventCreatedKey] { @@ -270,3 +267,81 @@ func TestVoteEventValidations(t *testing.T) { t.Errorf("Unexpected or no error: %e", err) } } + +func TestEventReplay(t *testing.T) { + tmpTestFile, err := os.CreateTemp("", "TestEventReplayBackingStore") + if err != nil { + t.Fatalf("failed setting up tempfile: %s", err.Error()) + } + + if err := tmpTestFile.Close(); err != nil { + t.Fatalf("failed to close initial handle to tempfile: %s", err.Error()) + } + + if err := Init(2, tmpTestFile.Name()); err != nil { + t.Fatalf("failed initializing state machine: %s", err.Error()) + } + + if err := PublishEvent(TestEvent{ + ID: 1, + Dispose: true, + }); err != nil { + t.Fatalf("failed to publish event 1: %s", err.Error()) + } + + if err := PublishEvent(TestEvent{ + ID: 2, + Dispose: false, + }); err != nil { + t.Fatalf("failed to publish event 2: %s", err.Error()) + } + + if err := PublishEvent(TestEvent{ + ID: 3, + Dispose: true, + }); err != nil { + t.Fatalf("failed to publish event 3: %s", err.Error()) + } + + if err := PublishEvent(TestEvent{ + ID: 4, + Dispose: false, + }); err != nil { + t.Fatalf("failed to publish event 4: %s", err.Error()) + } + + sub, err := Test.Subscribe() + if err := Start(); err != nil { + t.Fatalf("failed to start state machine: %s", err.Error()) + } + + select { + case ev1 := <- sub: + if ev1.Type() != Test { + t.Fatalf("wrong kind of event somehow: %+v", ev1) + } + if ev1.(TestEvent).ID != 2 { + t.Fatalf("misordered event: %+v", ev1) + } + default: + t.Fatal("no events available from replay") + } + + select { + case ev2 := <- sub: + if ev2.Type() != Test { + t.Fatalf("wrong kind of event somehow: %+v", ev2) + } + if ev2.(TestEvent).ID != 4 { + t.Fatalf("misordered event: %+v", ev2) + } + default: + t.Fatal("only one event made it out") + } + + select { + case <- sub: + t.Fatalf("superfluous events left in subscription") + default: + } +} diff --git a/internal/web/web.go b/internal/web/web.go new file mode 100644 index 0000000..38051e6 --- /dev/null +++ b/internal/web/web.go @@ -0,0 +1,35 @@ +package web + +import ( + "fmt" + "net/http" + + "gitlab.com/whom/bingobot/internal/activity" + "gitlab.com/whom/bingobot/internal/config" + "gitlab.com/whom/bingobot/internal/logging" +) + +func Start() error { + frag := config.Get().LocalWebEndpointListen + http.HandleFunc("/", HandleHttpRequest) + go logging.Error(http.ListenAndServe(frag, nil).Error()) + + return nil +} + +func HandleHttpRequest(w http.ResponseWriter, r *http.Request) { + fmt.Fprintf(w, "") + fmt.Fprintf(w, "

User Activity Metrics

") + fmt.Fprintf(w, "

number of the last %d days a user has been active

", + config.Get().UserEventLifespanDays) + + fmt.Fprintf(w, "") + fmt.Fprintf(w, "") + for k, v := range activity.GetActivitySnapshot() { + fmt.Fprintf(w, "", + k, len(v)) + } + fmt.Fprintf(w, "
UserDays
%s%d
") + + fmt.Fprintf(w, "") +} diff --git a/main.go b/main.go index 21111e9..0528e1c 100644 --- a/main.go +++ b/main.go @@ -4,10 +4,12 @@ import ( "flag" "log" + "gitlab.com/whom/bingobot/internal/activity" "gitlab.com/whom/bingobot/internal/config" "gitlab.com/whom/bingobot/internal/discord" "gitlab.com/whom/bingobot/internal/logging" "gitlab.com/whom/bingobot/internal/state" + "gitlab.com/whom/bingobot/internal/web" ) var ( @@ -32,11 +34,23 @@ func main() { ); err != nil { log.Fatalf("couldn't initialize state engine: %s", err.Error()) } + + if err := activity.Start(); err != nil { + // TODO: handle gracefully and continue? + log.Fatalf("failed to start activity module: %s", err.Error()) + } + + if err := web.Start(); err != nil { + log.Fatalf("failed to start local web server: %s", err.Error()) + } + + // start this LAST + if err := state.Start(); err != nil { + log.Fatalf("failed to start state machine: %s", err.Error()) + } defer state.Teardown() - err = startBot() - - if err != nil { + if err := startBot(); err != nil { log.Fatal(err) } } diff --git a/pkg/docbuf/docbuf.go b/pkg/docbuf/docbuf.go index cc0610e..cd3f633 100644 --- a/pkg/docbuf/docbuf.go +++ b/pkg/docbuf/docbuf.go @@ -263,6 +263,11 @@ func NewDocumentBuffer( * If cache is full oldest document is written to backing writer. */ func (b *DocBuf) Push(doc string) error { + if b.cacheSize == 0 { + b.writeToBackingStore(doc) + return nil + } + if len(b.cache) >= b.cacheSize { if err := b.demote(); err != nil { return err @@ -280,6 +285,14 @@ func (b *DocBuf) Push(doc string) error { * out of the backing ReaderWriter. */ func (b *DocBuf) Pop() (string, error) { + if b.cacheSize == 0 { + d, e := b.readDocumentsFromDisk(1, true, false) + if len(d) > 0 { + return d[0], e + } + return "", e + } + if len(b.cache) < 1 { if err := b.promote(true); err != nil { return "", err From 430c0afaa6edee94393948a0f3014669566f5e3c Mon Sep 17 00:00:00 2001 From: Ava Affine Date: Wed, 8 Jan 2025 13:40:11 -0800 Subject: [PATCH 4/9] Confessions module This commit adds a confessions feature that allows users to mark a "confessional" channel and also to post anonymously to it. The changes that this comprises of are as follows: - New discord "slash" commands for both marking a confessional and posting to it - a bunch of stuff in the discord module to register and deregister "slash" commands - New event type to track marked confessionals - confession module that processes new confession channel links and also posts confessions to corresponding confessionals Not included in this commit: - a way to cleanup obsolete or reconfigured confession channel links - access control for the confessional slash commands Signed-off-by: Ava Affine --- internal/confession/confession.go | 58 ++++++++++++++++ internal/discord/commands.go | 112 ++++++++++++++++++++++++++++++ internal/discord/discord.go | 3 + internal/discord/handlers.go | 1 + internal/state/events.go | 44 ++++++++++++ internal/state/state.go | 35 ++++++++++ 6 files changed, 253 insertions(+) create mode 100644 internal/confession/confession.go create mode 100644 internal/discord/commands.go diff --git a/internal/confession/confession.go b/internal/confession/confession.go new file mode 100644 index 0000000..2681d31 --- /dev/null +++ b/internal/confession/confession.go @@ -0,0 +1,58 @@ +package confession + +import ( + "errors" + "sync" + + "github.com/bwmarrin/discordgo" + "gitlab.com/whom/bingobot/internal/logging" + "gitlab.com/whom/bingobot/internal/state" +) + +/* Activity module + * This module posts anonymous confessions according to a linked channel map + */ + +const ( + ActivityModuleStartFail = "failed to start activity module" +) + +var ( + // guild ID to channel ID + linkLock sync.RWMutex + confessionChannelLinks map[string]state.ConfessionsChannelLinkEvent +) + +func Start() error { + ch, err := state.ConfessionsChannelLink.Subscribe() + if err != nil { + return errors.Join( + errors.New(ActivityModuleStartFail), + err, + ) + } + + // process incoming events loop + go func() { + for { + ev := <- ch + e := ev.(state.ConfessionsChannelLinkEvent) + linkLock.Lock() + confessionChannelLinks[e.GuildID] = e + linkLock.Unlock() + } + }() + + return nil +} + +func MakeConfession(s *discordgo.Session, guildID string, content string) { + linkLock.RLock() + link, ok := confessionChannelLinks[guildID] + linkLock.RUnlock() + if !ok { + logging.Error("Failed to send confession in guild %s: no link exists in map", guildID) + return + } + s.ChannelMessageSend(link.ChannelID, content) +} diff --git a/internal/discord/commands.go b/internal/discord/commands.go new file mode 100644 index 0000000..7a13ed0 --- /dev/null +++ b/internal/discord/commands.go @@ -0,0 +1,112 @@ +package discord + +import ( + "time" + + "github.com/bwmarrin/discordgo" + + "gitlab.com/whom/bingobot/internal/confession" + "gitlab.com/whom/bingobot/internal/logging" + "gitlab.com/whom/bingobot/internal/state" +) + +var ( + // map of guildID to registeredCommands + registeredCommands map[string][]*discordgo.ApplicationCommand + + // all commands + commandList = []*discordgo.ApplicationCommand{ + // TODO: Limit usage somehow? + // maybe delete this and use the vote module instead + &discordgo.ApplicationCommand{ + Name: "confessional", + Description: "mark a channel as a designated confessional for a guild", + }, + + &discordgo.ApplicationCommand{ + Name: "confess", + Description: "anonymously post a confession in configured channel", + Options: []*discordgo.ApplicationCommandOption{ + &discordgo.ApplicationCommandOption{ + Name: "confession", + Description: "A confession to be posted anonymously", + Required: true, + }, + }, + }, + } + + commandHandlers = map[string]func( + s *discordgo.Session, + i *discordgo.InteractionCreate, + ) { + "confessional": func(s *discordgo.Session, i *discordgo.InteractionCreate) { + state.PublishEvent(state.ConfessionsChannelLinkEvent{ + GuildID: i.GuildID, + ChannelID: i.ChannelID, + Created: time.Now(), + }) + }, + + // handle a confession + "confess": func(s *discordgo.Session, i *discordgo.InteractionCreate) { + for _, v := range i.ApplicationCommandData().Options { + if v.Name == "confession" { + confession.MakeConfession(s, i.GuildID, v.StringValue()) + } + } + }, + } +) + +func handleCommand(s *discordgo.Session, e *discordgo.InteractionCreate) { + name := e.ApplicationCommandData().Name + // TODO: audit log + if h, ok := commandHandlers[name]; ok { + h(s, e) + } else { + logging.Debug("no handler for command: %s", name) + } +} + +func registerCommands(s *discordgo.Session) { + for _, guild := range s.State.Guilds { + cmds, err := s.ApplicationCommandBulkOverwrite( + s.State.Application.ID, + guild.ID, + commandList, + ) + + if err != nil { + logging.Error( + "Failed to register commands for guild %s: %s", + guild.ID, err.Error(), + ) + } else { + logging.Info("Registered commands for guild %s", guild.ID) + registeredCommands[guild.ID] = cmds + } + } +} + +func deregisterCommands(s *discordgo.Session) { + for guild, commands := range registeredCommands { + for _, cmd := range commands { + if err := s.ApplicationCommandDelete( + s.State.Application.ID, + guild, + cmd.ID, + ); err != nil { + logging.Error( + "Failed to delete %s command (id: %s) from guild %s", + cmd.Name, cmd.ID, guild, + ) + } else { + logging.Info( + "Deregistered command %s (id: %s) from guild %s", + cmd.Name, cmd.ID, guild, + ) + } + } + } +} diff --git a/internal/discord/discord.go b/internal/discord/discord.go index 896ba1a..6c81929 100644 --- a/internal/discord/discord.go +++ b/internal/discord/discord.go @@ -36,10 +36,13 @@ func Connect(token string) error { return fmt.Errorf("failed to open discord session: %s", err) } + registerCommands(session.s) + return nil } func Close() { + deregisterCommands(session.s) err := session.s.Close() if err != nil { diff --git a/internal/discord/handlers.go b/internal/discord/handlers.go index c01d441..03b5434 100644 --- a/internal/discord/handlers.go +++ b/internal/discord/handlers.go @@ -9,6 +9,7 @@ func addHandlers() { session.s.AddHandler(handleConnect) session.s.AddHandler(handleDisconnect) session.s.AddHandler(handleVoiceStateUpdate) + session.s.AddHandler(handleCommand) // handles InteractionCreate } func handleConnect(s *discordgo.Session, e *discordgo.Connect) { diff --git a/internal/state/events.go b/internal/state/events.go index c14ee77..285db51 100644 --- a/internal/state/events.go +++ b/internal/state/events.go @@ -226,3 +226,47 @@ func (te TestEvent) Validate() error { func (te TestEvent) Disposable() bool { return te.Dispose } + +const ( + ConfessionsLinkEventGuildKey = "guild_id" + ConfessionsLinkEventChannelKey = "channel_id" + ConfessionsLinkEventCreatedKey = "created" + ConfessionsLinkEventObsoleteKey = "obsolete" + + BadConfessionsLinkEventError = "link event doesnt have required fields" +) + +type ConfessionsChannelLinkEvent struct { + GuildID string + ChannelID string + Created time.Time +} + +func (e ConfessionsChannelLinkEvent) Type() EventType { + return ConfessionsChannelLink +} + +func (e ConfessionsChannelLinkEvent) Time() time.Time { + return e.Time() +} + +func (e ConfessionsChannelLinkEvent) Data() map[string]string { + return map[string]string{ + ConfessionsLinkEventGuildKey: e.GuildID, + ConfessionsLinkEventChannelKey: e.ChannelID, + ConfessionsLinkEventCreatedKey: e.Created.Format(time.RFC3339), + } +} + +func (e ConfessionsChannelLinkEvent) Validate() error { + if len(e.ChannelID) > 1 || len(e.GuildID) > 1 { + return errors.New(BadConfessionsLinkEventError) + } + + return nil +} + +func (e ConfessionsChannelLinkEvent) Disposable() bool { + // TODO + return false +} diff --git a/internal/state/state.go b/internal/state/state.go index f4a83fd..9b94ec2 100644 --- a/internal/state/state.go +++ b/internal/state/state.go @@ -183,6 +183,7 @@ const ( Restoration UserActive Test + ConfessionsChannelLink // ... // leave this last @@ -202,6 +203,8 @@ func EventTypeFromString(doc string) EventType { return UserActive case "Test": return Test + case "ConfessionsChannelLink": + return ConfessionsChannelLink default: // error case return NumEventTypes @@ -215,6 +218,7 @@ func (et EventType) String() string { "Restoration", "UserActive", "Test", + "ConfessionsChannelLink", } if et < 0 || et >= NumEventTypes { @@ -266,24 +270,28 @@ func (et EventType) MakeEvent(data map[string]string) (Event, error) { switch et { case Vote: return VoteEvent(data), nil + case Challenge: e, err := MakeUserEvent(data) if err != nil { return nil, errors.Join(errors.New(BadChallengeEvent), err) } return ChallengeEvent{*e}, nil + case Restoration: e, err := MakeUserEvent(data) if err != nil { return nil, errors.Join(errors.New(BadRestorationEvent), err) } return RestorationEvent{*e}, nil + case UserActive: e, err := MakeUserEvent(data) if err != nil { return nil, errors.Join(errors.New(BadUserActiveEvent), err) } return UserActiveEvent{*e}, nil + case Test: disp := false if v, ok := data[TestEventDisposeKey]; ok && v == "t" { @@ -300,6 +308,33 @@ func (et EventType) MakeEvent(data map[string]string) (Event, error) { Dispose: disp, ID: id, }, nil + + case ConfessionsChannelLink: + gid, ok := data[ConfessionsLinkEventGuildKey] + if !ok { + return nil, errors.New(BadConfessionsLinkEventError) + } + + cid, ok := data[ConfessionsLinkEventChannelKey] + if !ok { + return nil, errors.New(BadConfessionsLinkEventError) + } + + ti, ok := data[ConfessionsLinkEventCreatedKey] + if !ok { + return nil, errors.New(BadConfessionsLinkEventError) + } + t, err := time.Parse(time.RFC3339, ti) + if err != nil { + return nil, errors.Join(errors.New(BadConfessionsLinkEventError), err) + } + + return ConfessionsChannelLinkEvent{ + GuildID: gid, + ChannelID: cid, + Created: t, + }, nil + default: return nil, errors.New(BadEventTypeError) } From 359ff427e31a1f008a52e524d96e6e5079d663ef Mon Sep 17 00:00:00 2001 From: Ava Affine Date: Wed, 8 Jan 2025 15:27:27 -0800 Subject: [PATCH 5/9] fix bug in activity timers Signed-off-by: Ava Affine --- internal/discord/activity.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/discord/activity.go b/internal/discord/activity.go index afa6c8d..62329f3 100644 --- a/internal/discord/activity.go +++ b/internal/discord/activity.go @@ -128,7 +128,7 @@ func stopActivityTimer(uid string) { timerMutex.Lock() defer timerMutex.Unlock() - if _, ok := activityTimers[uid]; !ok { + if _, ok := activityTimers[uid]; ok { activityTimers[uid].Cancel() delete(activityTimers, uid) } From 55f9725af10f1d8d3bfcf4b2045913d603e5581d Mon Sep 17 00:00:00 2001 From: Ava Affine Date: Wed, 8 Jan 2025 15:28:24 -0800 Subject: [PATCH 6/9] fix nil map dereference in discord commands, and add option type to slash command option Signed-off-by: Ava Affine --- internal/discord/commands.go | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/internal/discord/commands.go b/internal/discord/commands.go index 7a13ed0..4499a85 100644 --- a/internal/discord/commands.go +++ b/internal/discord/commands.go @@ -12,7 +12,7 @@ import ( var ( // map of guildID to registeredCommands - registeredCommands map[string][]*discordgo.ApplicationCommand + registeredCommands = make(map[string][]*discordgo.ApplicationCommand) // all commands commandList = []*discordgo.ApplicationCommand{ @@ -28,6 +28,7 @@ var ( Description: "anonymously post a confession in configured channel", Options: []*discordgo.ApplicationCommandOption{ &discordgo.ApplicationCommandOption{ + Type: discordgo.ApplicationCommandOptionString, Name: "confession", Description: "A confession to be posted anonymously", Required: true, @@ -79,11 +80,11 @@ func registerCommands(s *discordgo.Session) { if err != nil { logging.Error( - "Failed to register commands for guild %s: %s", - guild.ID, err.Error(), + "Failed to register commands for guild", + guild.ID, ": ", err.Error(), ) } else { - logging.Info("Registered commands for guild %s", guild.ID) + logging.Info("Registered commands for guild ", guild.ID) registeredCommands[guild.ID] = cmds } } @@ -98,13 +99,13 @@ func deregisterCommands(s *discordgo.Session) { cmd.ID, ); err != nil { logging.Error( - "Failed to delete %s command (id: %s) from guild %s", - cmd.Name, cmd.ID, guild, + "Failed to delete", cmd.Name, "command (id:", cmd.ID, + ") from guild ", guild, ) } else { logging.Info( - "Deregistered command %s (id: %s) from guild %s", - cmd.Name, cmd.ID, guild, + "Deregistered command ", cmd.Name,"(id:", cmd.ID, + ") from guild ", guild, ) } } From fed49ba3cb1b78792e28200a8f529b7537e7fce7 Mon Sep 17 00:00:00 2001 From: Ava Affine Date: Mon, 13 Jan 2025 15:45:07 -0800 Subject: [PATCH 7/9] refactor main, and fix bugs from initial manual run Signed-off-by: Ava Affine --- internal/confession/confession.go | 6 ++-- internal/discord/commands.go | 42 +++++++++++++++++---------- internal/state/events.go | 2 +- internal/state/state.go | 2 ++ internal/web/web.go | 1 + main.go | 48 ++++++++++++++++++++----------- 6 files changed, 66 insertions(+), 35 deletions(-) diff --git a/internal/confession/confession.go b/internal/confession/confession.go index 2681d31..81babdb 100644 --- a/internal/confession/confession.go +++ b/internal/confession/confession.go @@ -2,6 +2,7 @@ package confession import ( "errors" + "fmt" "sync" "github.com/bwmarrin/discordgo" @@ -20,7 +21,7 @@ const ( var ( // guild ID to channel ID linkLock sync.RWMutex - confessionChannelLinks map[string]state.ConfessionsChannelLinkEvent + confessionChannelLinks = make(map[string]state.ConfessionsChannelLinkEvent) ) func Start() error { @@ -36,6 +37,7 @@ func Start() error { go func() { for { ev := <- ch + logging.Info("recieved new confessional channel link") e := ev.(state.ConfessionsChannelLinkEvent) linkLock.Lock() confessionChannelLinks[e.GuildID] = e @@ -51,7 +53,7 @@ func MakeConfession(s *discordgo.Session, guildID string, content string) { link, ok := confessionChannelLinks[guildID] linkLock.RUnlock() if !ok { - logging.Error("Failed to send confession in guild %s: no link exists in map", guildID) + logging.Error(fmt.Sprintf("Failed to send confession in guild %s: no link exists in map", guildID)) return } s.ChannelMessageSend(link.ChannelID, content) diff --git a/internal/discord/commands.go b/internal/discord/commands.go index 4499a85..b194bc6 100644 --- a/internal/discord/commands.go +++ b/internal/discord/commands.go @@ -1,6 +1,7 @@ package discord import ( + "fmt" "time" "github.com/bwmarrin/discordgo" @@ -42,11 +43,18 @@ var ( i *discordgo.InteractionCreate, ) { "confessional": func(s *discordgo.Session, i *discordgo.InteractionCreate) { - state.PublishEvent(state.ConfessionsChannelLinkEvent{ + if err := state.PublishEvent(state.ConfessionsChannelLinkEvent{ GuildID: i.GuildID, ChannelID: i.ChannelID, Created: time.Now(), - }) + }); err != nil { + logging.Error(fmt.Sprintf( + "failed to publish confession channel link: %s", + err.Error(), + )) + } else { + logging.Info("published confession channel link") + } }, // handle a confession @@ -68,6 +76,10 @@ func handleCommand(s *discordgo.Session, e *discordgo.InteractionCreate) { } else { logging.Debug("no handler for command: %s", name) } + + s.InteractionRespond(e.Interaction, &discordgo.InteractionResponse{ + // TODO: Some silent ACK + }) } func registerCommands(s *discordgo.Session) { @@ -79,12 +91,12 @@ func registerCommands(s *discordgo.Session) { ) if err != nil { - logging.Error( - "Failed to register commands for guild", - guild.ID, ": ", err.Error(), - ) + logging.Error(fmt.Sprintf( + "Failed to register commands for guild %s: %s", + guild.ID, err.Error(), + )) } else { - logging.Info("Registered commands for guild ", guild.ID) + logging.Info(fmt.Sprintf("Registered commands for guild %s", guild.ID)) registeredCommands[guild.ID] = cmds } } @@ -98,15 +110,15 @@ func deregisterCommands(s *discordgo.Session) { guild, cmd.ID, ); err != nil { - logging.Error( - "Failed to delete", cmd.Name, "command (id:", cmd.ID, - ") from guild ", guild, - ) + logging.Error(fmt.Sprintf( + "Failed to delete %s command (id: %s) from guild %s", + cmd.Name, cmd.ID, guild, + )) } else { - logging.Info( - "Deregistered command ", cmd.Name,"(id:", cmd.ID, - ") from guild ", guild, - ) + logging.Info(fmt.Sprintf( + "Deregistered command %s (id: %s) from guild %s", + cmd.Name, cmd.ID, guild, + )) } } } diff --git a/internal/state/events.go b/internal/state/events.go index 285db51..44ad0f4 100644 --- a/internal/state/events.go +++ b/internal/state/events.go @@ -259,7 +259,7 @@ func (e ConfessionsChannelLinkEvent) Data() map[string]string { } func (e ConfessionsChannelLinkEvent) Validate() error { - if len(e.ChannelID) > 1 || len(e.GuildID) > 1 { + if len(e.ChannelID) <= 1 || len(e.GuildID) <= 1 { return errors.New(BadConfessionsLinkEventError) } diff --git a/internal/state/state.go b/internal/state/state.go index 9b94ec2..cc439f5 100644 --- a/internal/state/state.go +++ b/internal/state/state.go @@ -159,6 +159,7 @@ func Start() error { // return no error. we are handling SIGINT func Teardown() { + logging.Warn("Tearing down state engine") // riskily ignore all locking.... // we are handling a termination signal after all i, e := eventStream.Close() @@ -167,6 +168,7 @@ func Teardown() { logging.Warn("will attempt to truncate event store anyways") } + logging.Warn("teardown: events flushed") e = os.Truncate(eventStorageFileName, i) if e != nil { logging.Error("FAILED TO TRUNCATE EVENT STORE!!") diff --git a/internal/web/web.go b/internal/web/web.go index 38051e6..fbe65eb 100644 --- a/internal/web/web.go +++ b/internal/web/web.go @@ -14,6 +14,7 @@ func Start() error { http.HandleFunc("/", HandleHttpRequest) go logging.Error(http.ListenAndServe(frag, nil).Error()) + logging.Debug("Web handlers started") return nil } diff --git a/main.go b/main.go index 0528e1c..a91fdb4 100644 --- a/main.go +++ b/main.go @@ -3,8 +3,11 @@ package main import ( "flag" "log" + "os" + "os/signal" "gitlab.com/whom/bingobot/internal/activity" + "gitlab.com/whom/bingobot/internal/confession" "gitlab.com/whom/bingobot/internal/config" "gitlab.com/whom/bingobot/internal/discord" "gitlab.com/whom/bingobot/internal/logging" @@ -28,6 +31,7 @@ func main() { logging.Init() flag.Parse() + logging.Info("startup: initializing state engine") if err := state.Init( config.Get().InMemoryEventCacheSize, config.Get().PersistentCacheStore, @@ -35,35 +39,45 @@ func main() { log.Fatalf("couldn't initialize state engine: %s", err.Error()) } + logging.Info("startup: starting activity module") if err := activity.Start(); err != nil { // TODO: handle gracefully and continue? log.Fatalf("failed to start activity module: %s", err.Error()) } - if err := web.Start(); err != nil { - log.Fatalf("failed to start local web server: %s", err.Error()) + logging.Info("startup: starting confession module") + if err := confession.Start(); err != nil { + // TODO: handle gracefully and continue? + log.Fatalf("failed to start confession module: %s", err.Error()) } + logging.Info("startup: starting web module") + go func() { + if err := web.Start(); err != nil { + log.Fatalf("failed to start local web server: %s", err.Error()) + } + }() + // start this LAST + logging.Info("startup: starting state engine") if err := state.Start(); err != nil { log.Fatalf("failed to start state machine: %s", err.Error()) } - defer state.Teardown() - if err := startBot(); err != nil { + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + go func(){ + for _ = range c { + state.Teardown() + discord.Close() + os.Exit(1) + } + }() + + logging.Info("startup: connecting to discord") + if err := discord.Connect(*token); err != nil { log.Fatal(err) } -} - -func startBot() error { - err := discord.Connect(*token) - - if err != nil { - return err - } - - logging.Info("shutting down gracefully", "type", "shutdown") - discord.Close() - - return nil + + for {} } From 6480f27946a3de4077e5daa67f6feb2b06901da8 Mon Sep 17 00:00:00 2001 From: Ava Affine Date: Mon, 13 Jan 2025 17:07:37 -0800 Subject: [PATCH 8/9] fixes for web endpoint Signed-off-by: Ava Affine --- internal/activity/activity.go | 10 ++++++++-- internal/confession/confession.go | 2 +- internal/discord/activity.go | 6 +++--- main.go | 2 -- 4 files changed, 12 insertions(+), 8 deletions(-) diff --git a/internal/activity/activity.go b/internal/activity/activity.go index 9430085..488de9d 100644 --- a/internal/activity/activity.go +++ b/internal/activity/activity.go @@ -2,12 +2,13 @@ package activity import ( "errors" + "fmt" "sync" "time" "gitlab.com/whom/bingobot/internal/config" - "gitlab.com/whom/bingobot/internal/state" "gitlab.com/whom/bingobot/internal/logging" + "gitlab.com/whom/bingobot/internal/state" ) /* Activity module @@ -18,7 +19,7 @@ const ( ActivityModuleStartFail = "failed to start activity module" ) -var currentUserActivity map[string][]state.UserActiveEvent +var currentUserActivity = make(map[string][]state.UserActiveEvent) var userActivityLock sync.RWMutex func Start() error { @@ -38,6 +39,10 @@ func Start() error { user := emap[state.UserEventUserKey] etime := ev.Time() delta := time.Since(etime).Hours() / float64(24) + logging.Debug(fmt.Sprintf( + "processing UserActive event for %s", user, + )) + if delta <= float64(config.Get().UserEventLifespanDays) { new := []state.UserActiveEvent{ev.(state.UserActiveEvent)} @@ -46,6 +51,7 @@ func Start() error { if found { new = append(new, current...) } + currentUserActivity[user] = new userActivityLock.Unlock() } else { logging.Warn("recieved expired useractive event") diff --git a/internal/confession/confession.go b/internal/confession/confession.go index 81babdb..d76aa38 100644 --- a/internal/confession/confession.go +++ b/internal/confession/confession.go @@ -37,7 +37,7 @@ func Start() error { go func() { for { ev := <- ch - logging.Info("recieved new confessional channel link") + logging.Debug("recieved new confessional channel link") e := ev.(state.ConfessionsChannelLinkEvent) linkLock.Lock() confessionChannelLinks[e.GuildID] = e diff --git a/internal/discord/activity.go b/internal/discord/activity.go index 62329f3..002f3b4 100644 --- a/internal/discord/activity.go +++ b/internal/discord/activity.go @@ -42,7 +42,7 @@ func NewActivityTimer(uid string) *UserActivityTimer { Start() initializes the timer and calls run() */ func (t *UserActivityTimer) Start(ctx context.Context) { - logging.Info("starting voiceActivityTimer", "uid", t.UID) + logging.Debug("starting voiceActivityTimer", "uid", t.UID) t.sleepDuration = time.Millisecond * time.Duration(config.Get().VoiceActivityTimerSleepIntervalMillis) activityTimerDuration := time.Second * time.Duration(config.Get().VoiceActivityThresholdSeconds) @@ -89,7 +89,7 @@ func (t *UserActivityTimer) run() { } // the timer's context has been cancelled or deadline expired. - logging.Info("voiceActivityTimer stopping", "uid", t.UID, "reason", context.Cause(t.ctx)) + logging.Debug("voiceActivityTimer stopping", "uid", t.UID, "reason", context.Cause(t.ctx)) if context.Cause(t.ctx) == ErrTimerExpired { /* @@ -147,5 +147,5 @@ func emitUserActiveEvent(uid string) { return } - logging.Info("published UserActiveEvent", "uid", uid) + logging.Debug("published UserActiveEvent", "uid", uid) } diff --git a/main.go b/main.go index a91fdb4..96e2e36 100644 --- a/main.go +++ b/main.go @@ -41,13 +41,11 @@ func main() { logging.Info("startup: starting activity module") if err := activity.Start(); err != nil { - // TODO: handle gracefully and continue? log.Fatalf("failed to start activity module: %s", err.Error()) } logging.Info("startup: starting confession module") if err := confession.Start(); err != nil { - // TODO: handle gracefully and continue? log.Fatalf("failed to start confession module: %s", err.Error()) } From 4ccfd8172ae116e038a4f248861198b886f0aa33 Mon Sep 17 00:00:00 2001 From: Ava Affine Date: Mon, 17 Feb 2025 20:24:09 -0800 Subject: [PATCH 9/9] Respond with ephemeral message to slash command usage Signed-off-by: Ava Affine --- internal/discord/commands.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/internal/discord/commands.go b/internal/discord/commands.go index b194bc6..c774dc3 100644 --- a/internal/discord/commands.go +++ b/internal/discord/commands.go @@ -78,7 +78,11 @@ func handleCommand(s *discordgo.Session, e *discordgo.InteractionCreate) { } s.InteractionRespond(e.Interaction, &discordgo.InteractionResponse{ - // TODO: Some silent ACK + Type: discordgo.InteractionResponseChannelMessageWithSource, + Data: &discordgo.InteractionResponseData{ + Flags: discordgo.MessageFlagsEphemeral, + Content: "Your cauldron bubbles...", + }, }) }