diff --git a/internal/activity/activity.go b/internal/activity/activity.go new file mode 100644 index 0000000..9430085 --- /dev/null +++ b/internal/activity/activity.go @@ -0,0 +1,106 @@ +package activity + +import ( + "errors" + "sync" + "time" + + "gitlab.com/whom/bingobot/internal/config" + "gitlab.com/whom/bingobot/internal/state" + "gitlab.com/whom/bingobot/internal/logging" +) + +/* Activity module + * This module sits and processes incoming + */ + +const ( + ActivityModuleStartFail = "failed to start activity module" +) + +var currentUserActivity map[string][]state.UserActiveEvent +var userActivityLock sync.RWMutex + +func Start() error { + ch, err := state.UserActive.Subscribe() + if err != nil { + return errors.Join( + errors.New(ActivityModuleStartFail), + err, + ) + } + + // process incoming events loop + go func() { + for { + ev := <- ch + emap := ev.Data() + user := emap[state.UserEventUserKey] + etime := ev.Time() + delta := time.Since(etime).Hours() / float64(24) + + if delta <= float64(config.Get().UserEventLifespanDays) { + new := []state.UserActiveEvent{ev.(state.UserActiveEvent)} + userActivityLock.Lock() + current, found := currentUserActivity[user] + if found { + new = append(new, current...) + } + userActivityLock.Unlock() + } else { + logging.Warn("recieved expired useractive event") + } + } + }() + + // process expired events loop + go func() { + for { + delta := time.Hour * 24 + delta *= time.Duration(config.Get().UserEventLifespanDays) + tcur := time.Now().Add(delta) + + // get next soonest expiration + userActivityLock.RLock() + for _, evs := range currentUserActivity { + for _, ev := range evs { + hrs := time.Duration(24 * config.Get().UserEventLifespanDays) + t := ev.Time().Add(time.Hour * hrs) + if t.Before(tcur) { + tcur = t + } + } + } + userActivityLock.RUnlock() + time.Sleep(tcur.Sub(time.Now())) + + userActivityLock.Lock() + for k, v := range currentUserActivity { + new := []state.UserActiveEvent{} + for _, ev := range v { + if !ev.Disposable() { + new = append(new, ev) + } + } + currentUserActivity[k] = new + } + userActivityLock.Unlock() + } + }() + + return nil +} + +func GetActivitySnapshot() map[string][]state.UserActiveEvent { + userActivityLock.RLock() + defer userActivityLock.RUnlock() + + snapshot := make(map[string][]state.UserActiveEvent) + for k, v := range currentUserActivity { + newSl := make([]state.UserActiveEvent, len(v)) + copy(newSl, v) + snapshot[k] = newSl + } + + return snapshot +} diff --git a/internal/config/config.go b/internal/config/config.go index eee917d..f32ae9a 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -17,30 +17,34 @@ type AppConfig struct { LogCompression bool `yaml:"log_compression"` LogAddSource bool `yaml:"log_add_source"` - /* - how long (in seconds) a user needs to be in vc in order to generate a - UserActive event - */ + /* how long (in seconds) a user needs to be in vc in order to generate a + * UserActive event + */ VoiceActivityThresholdSeconds int `yaml:"voice_activity_threshold_seconds"` - /* - how long (in milliseconds) a voice activity timer sleeps at a time between - context cancellation checks. - - a higher value means the function sleeps longer which could be - useful for some reason in the future - - a higher value also means that the timer could take longer to cancel. - - current recommended value is 1000ms. - */ + /* how long (in milliseconds) a voice activity timer sleeps at a time between + * context cancellation checks. + * a higher value means the function sleeps longer which could be + * useful for some reason in the future + * a higher value also means that the timer could take longer to cancel. + * current recommended value is 1000ms. + */ VoiceActivityTimerSleepIntervalMillis int `yaml:"voice_activity_timer_sleep_interval_millis"` /* persistent state file store */ PersistentCacheStore string `yaml:"persistent_cache_store"` /* number of internal state events to cache in memory */ - InMemoryEventCacheSize int `yaml:"InMemoryEventCacheSize"` + InMemoryEventCacheSize int `yaml:"in_memory_event_cache_size"` + + /* number of days a useractive event is valid for + * increasing this will have a bell curve effect on + * voting weights + */ + UserEventLifespanDays int `yaml:"user_event_lifespan_days"` + + /* listen address for local http Server */ + LocalWebEndpointListen string `yaml:"local_web_endpoint_listen"` } var config *AppConfig @@ -110,4 +114,6 @@ func setDefaults() { viper.SetDefault("VoiceActivityTimerSleepIntervalMillis", 1000) viper.SetDefault("PersistentCacheStore", "/tmp/bingobot") viper.SetDefault("InMemoryEventCacheSize", 512) + viper.SetDefault("UserEventLifespanDays", 10) + viper.SetDefault("LocalWebEndpointListen", ":8080") } diff --git a/internal/state/events.go b/internal/state/events.go index e6ac421..c14ee77 100644 --- a/internal/state/events.go +++ b/internal/state/events.go @@ -2,9 +2,10 @@ package state import ( "errors" - "time" "fmt" - + "time" + + "gitlab.com/whom/bingobot/internal/config" "gitlab.com/whom/bingobot/internal/logging" ) @@ -180,9 +181,48 @@ func (ua UserActiveEvent) Type() EventType { return UserActive } +func (ua UserActiveEvent) Disposable() bool { + return (time.Since(ua.created).Hours() / 24) >= float64(config.Get().UserEventLifespanDays) +} + func NewUserActiveEvent(user string) UserActiveEvent { return UserActiveEvent{UserEvent{ uid: user, created: time.Now(), }} } + +const ( + TestEventDisposeKey = "dispose" + TestEventIDKey = "id" +) + +type TestEvent struct { + Dispose bool + ID int +} + +func (te TestEvent) Type() EventType { + return Test +} + +func (te TestEvent) Time() time.Time { + return time.Now() +} + +func (te TestEvent) Data() map[string]string { + m := map[string]string{} + if te.Dispose { + m[TestEventDisposeKey] = "t" + } + m[TestEventIDKey] = fmt.Sprintf("%d", te.ID) + return m +} + +func (te TestEvent) Validate() error { + return nil +} + +func (te TestEvent) Disposable() bool { + return te.Dispose +} diff --git a/internal/state/state.go b/internal/state/state.go index deb0094..f4a83fd 100644 --- a/internal/state/state.go +++ b/internal/state/state.go @@ -1,6 +1,6 @@ package state -/* STATE +/* State module * This package encapsulates various state information * state is represented in various global singletons * Additionally, this package offers a pub/sub interface @@ -8,15 +8,15 @@ package state */ import ( - "fmt" + "encoding/json" + "errors" + "os" + "strconv" "sync" "time" - "os" - "errors" - "encoding/json" - "gitlab.com/whom/bingobot/pkg/docbuf" "gitlab.com/whom/bingobot/internal/logging" + "gitlab.com/whom/bingobot/pkg/docbuf" ) /* Event interface is meant to encapsulate a general interface @@ -36,19 +36,16 @@ const ( BadChallengeEvent = "failed to make Challenge Event" BadRestorationEvent = "failed to make Restoration Event" BadUserActiveEvent = "failed to make UserActive Event" + BadTestEvent = "failed to make Test Event" BadEventObjMarshal = "error marshalling event" BadEventObjUnmarshal = "failed to unmarshal event map" BadEventUnmarshal = "failed to unmarshal event" BadEventMissingTypeKey = "event map missing type key" StartBeforeInitError = "state machine not initialized" ReopenStoreFileError = "failed to reopen store file" -) -const ( EventTypeMapKey = "type" -) -const ( TmpDocBufBackingFile = "bingobot_events_processing" ) @@ -96,12 +93,17 @@ func Start() error { // filter out disposable events into a reverse ordered on disk stack var doc string - for err != nil && err.Error() != docbuf.PopFromEmptyBufferError { + for err == nil || err.Error() != docbuf.PopFromEmptyBufferError { doc, err = eventStream.Pop() if err != nil && err.Error() != docbuf.PopFromEmptyBufferError { return err } + if doc == "" || + (err != nil && err.Error() == docbuf.PopFromEmptyBufferError) { + break + } + ev, err := EventFromString(doc) if err != nil { logging.Warn("Discarding the following event for not unmarshaling: %s", doc) @@ -122,20 +124,25 @@ func Start() error { return errors.Join(errors.New(BadEventStoreFilename), err) } - eventStream, err := docbuf.NewDocumentBuffer(maxEventsInMemory, file) + eventStream, err = docbuf.NewDocumentBuffer(maxEventsInMemory, file) if err != nil { // return error here will panic without truncating or writing to the file return err } // unravel tmp stack into properly ordered properly allocated buffer - for err != nil && err.Error() != docbuf.PopFromEmptyBufferError { - doc, err := eventStream.Pop() + for err == nil || err.Error() != docbuf.PopFromEmptyBufferError { + doc, err := tmpBuf.Pop() if err != nil && err.Error() != docbuf.PopFromEmptyBufferError { logging.Warn("Could not handle the following error: %s", err.Error()) continue } + if doc == "" || + (err != nil && err.Error() == docbuf.PopFromEmptyBufferError) { + break + } + ev, err := EventFromString(doc) if err != nil { logging.Warn("Could not handle the following error: %s", err.Error()) @@ -175,6 +182,7 @@ const ( Challenge Restoration UserActive + Test // ... // leave this last @@ -192,6 +200,8 @@ func EventTypeFromString(doc string) EventType { return Restoration case "UserActive": return UserActive + case "Test": + return Test default: // error case return NumEventTypes @@ -204,6 +214,7 @@ func (et EventType) String() string { "Challenge", "Restoration", "UserActive", + "Test", } if et < 0 || et >= NumEventTypes { @@ -273,6 +284,22 @@ func (et EventType) MakeEvent(data map[string]string) (Event, error) { return nil, errors.Join(errors.New(BadUserActiveEvent), err) } return UserActiveEvent{*e}, nil + case Test: + disp := false + if v, ok := data[TestEventDisposeKey]; ok && v == "t" { + disp = true + } + id := -1 + if v, ok := data[TestEventIDKey]; ok { + var err error + if id, err = strconv.Atoi(v); err != nil { + return nil, errors.Join(errors.New(BadTestEvent), err) + } + } + return TestEvent{ + Dispose: disp, + ID: id, + }, nil default: return nil, errors.New(BadEventTypeError) } @@ -365,7 +392,7 @@ func PublishEvent(e Event) error { blocking := false for _, c := range eventSubscriptionCache[e.Type()] { - if float32(len(c)) > (float32(maxEventsInMemory) * 0.25) { + if float32(len(c)) > (float32(maxEventsInMemory) * 0.75) { if len(c) == maxEventsInMemory { logging.Warn( "PublishEvent() blocking -- event channel full", @@ -455,8 +482,6 @@ func GetMatchingEvents( } filter := func(e Event) bool { - ev, er := EventToString(e) - fmt.Printf("Checking: %s (%e)\n", ev, er) if e.Type() != t { return true } @@ -467,7 +492,6 @@ func GetMatchingEvents( } } - fmt.Println("Found Match") matches = append(matches, e) return true } diff --git a/internal/state/state_test.go b/internal/state/state_test.go index 20800f3..877b724 100644 --- a/internal/state/state_test.go +++ b/internal/state/state_test.go @@ -2,11 +2,12 @@ package state import ( "fmt" + "os" "testing" "time" - "gitlab.com/whom/bingobot/pkg/docbuf" "gitlab.com/whom/bingobot/internal/logging" + "gitlab.com/whom/bingobot/pkg/docbuf" ) /* WARNING: @@ -54,15 +55,11 @@ func TestEventMarshalUnmarshal(t *testing.T) { t.Fatalf("error marshalling challenge: %e, %s", err, cstr) } - t.Logf("cstr: %s\n", cstr) - vstr, err := EventToString(vev); if err != nil { t.Fatalf("error marshalling vote: %e, %s", err, vstr) } - t.Logf("vstr: %s\n", vstr) - if ev, err := EventFromString(cstr); err != nil || ev.Data()[UserEventUserKey] != cev.Data()[UserEventUserKey] || ev.Data()[UserEventCreatedKey] != cev.Data()[UserEventCreatedKey] { @@ -270,3 +267,81 @@ func TestVoteEventValidations(t *testing.T) { t.Errorf("Unexpected or no error: %e", err) } } + +func TestEventReplay(t *testing.T) { + tmpTestFile, err := os.CreateTemp("", "TestEventReplayBackingStore") + if err != nil { + t.Fatalf("failed setting up tempfile: %s", err.Error()) + } + + if err := tmpTestFile.Close(); err != nil { + t.Fatalf("failed to close initial handle to tempfile: %s", err.Error()) + } + + if err := Init(2, tmpTestFile.Name()); err != nil { + t.Fatalf("failed initializing state machine: %s", err.Error()) + } + + if err := PublishEvent(TestEvent{ + ID: 1, + Dispose: true, + }); err != nil { + t.Fatalf("failed to publish event 1: %s", err.Error()) + } + + if err := PublishEvent(TestEvent{ + ID: 2, + Dispose: false, + }); err != nil { + t.Fatalf("failed to publish event 2: %s", err.Error()) + } + + if err := PublishEvent(TestEvent{ + ID: 3, + Dispose: true, + }); err != nil { + t.Fatalf("failed to publish event 3: %s", err.Error()) + } + + if err := PublishEvent(TestEvent{ + ID: 4, + Dispose: false, + }); err != nil { + t.Fatalf("failed to publish event 4: %s", err.Error()) + } + + sub, err := Test.Subscribe() + if err := Start(); err != nil { + t.Fatalf("failed to start state machine: %s", err.Error()) + } + + select { + case ev1 := <- sub: + if ev1.Type() != Test { + t.Fatalf("wrong kind of event somehow: %+v", ev1) + } + if ev1.(TestEvent).ID != 2 { + t.Fatalf("misordered event: %+v", ev1) + } + default: + t.Fatal("no events available from replay") + } + + select { + case ev2 := <- sub: + if ev2.Type() != Test { + t.Fatalf("wrong kind of event somehow: %+v", ev2) + } + if ev2.(TestEvent).ID != 4 { + t.Fatalf("misordered event: %+v", ev2) + } + default: + t.Fatal("only one event made it out") + } + + select { + case <- sub: + t.Fatalf("superfluous events left in subscription") + default: + } +} diff --git a/internal/web/web.go b/internal/web/web.go new file mode 100644 index 0000000..38051e6 --- /dev/null +++ b/internal/web/web.go @@ -0,0 +1,35 @@ +package web + +import ( + "fmt" + "net/http" + + "gitlab.com/whom/bingobot/internal/activity" + "gitlab.com/whom/bingobot/internal/config" + "gitlab.com/whom/bingobot/internal/logging" +) + +func Start() error { + frag := config.Get().LocalWebEndpointListen + http.HandleFunc("/", HandleHttpRequest) + go logging.Error(http.ListenAndServe(frag, nil).Error()) + + return nil +} + +func HandleHttpRequest(w http.ResponseWriter, r *http.Request) { + fmt.Fprintf(w, "") + fmt.Fprintf(w, "
number of the last %d days a user has been active
", + config.Get().UserEventLifespanDays) + + fmt.Fprintf(w, "| User | Days |
|---|---|
| %s | %d |