diff --git a/internal/activity/activity.go b/internal/activity/activity.go deleted file mode 100644 index 9430085..0000000 --- a/internal/activity/activity.go +++ /dev/null @@ -1,106 +0,0 @@ -package activity - -import ( - "errors" - "sync" - "time" - - "gitlab.com/whom/bingobot/internal/config" - "gitlab.com/whom/bingobot/internal/state" - "gitlab.com/whom/bingobot/internal/logging" -) - -/* Activity module - * This module sits and processes incoming - */ - -const ( - ActivityModuleStartFail = "failed to start activity module" -) - -var currentUserActivity map[string][]state.UserActiveEvent -var userActivityLock sync.RWMutex - -func Start() error { - ch, err := state.UserActive.Subscribe() - if err != nil { - return errors.Join( - errors.New(ActivityModuleStartFail), - err, - ) - } - - // process incoming events loop - go func() { - for { - ev := <- ch - emap := ev.Data() - user := emap[state.UserEventUserKey] - etime := ev.Time() - delta := time.Since(etime).Hours() / float64(24) - - if delta <= float64(config.Get().UserEventLifespanDays) { - new := []state.UserActiveEvent{ev.(state.UserActiveEvent)} - userActivityLock.Lock() - current, found := currentUserActivity[user] - if found { - new = append(new, current...) - } - userActivityLock.Unlock() - } else { - logging.Warn("recieved expired useractive event") - } - } - }() - - // process expired events loop - go func() { - for { - delta := time.Hour * 24 - delta *= time.Duration(config.Get().UserEventLifespanDays) - tcur := time.Now().Add(delta) - - // get next soonest expiration - userActivityLock.RLock() - for _, evs := range currentUserActivity { - for _, ev := range evs { - hrs := time.Duration(24 * config.Get().UserEventLifespanDays) - t := ev.Time().Add(time.Hour * hrs) - if t.Before(tcur) { - tcur = t - } - } - } - userActivityLock.RUnlock() - time.Sleep(tcur.Sub(time.Now())) - - userActivityLock.Lock() - for k, v := range currentUserActivity { - new := []state.UserActiveEvent{} - for _, ev := range v { - if !ev.Disposable() { - new = append(new, ev) - } - } - currentUserActivity[k] = new - } - userActivityLock.Unlock() - } - }() - - return nil -} - -func GetActivitySnapshot() map[string][]state.UserActiveEvent { - userActivityLock.RLock() - defer userActivityLock.RUnlock() - - snapshot := make(map[string][]state.UserActiveEvent) - for k, v := range currentUserActivity { - newSl := make([]state.UserActiveEvent, len(v)) - copy(newSl, v) - snapshot[k] = newSl - } - - return snapshot -} diff --git a/internal/config/config.go b/internal/config/config.go index f32ae9a..eee917d 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -17,34 +17,30 @@ type AppConfig struct { LogCompression bool `yaml:"log_compression"` LogAddSource bool `yaml:"log_add_source"` - /* how long (in seconds) a user needs to be in vc in order to generate a - * UserActive event - */ + /* + how long (in seconds) a user needs to be in vc in order to generate a + UserActive event + */ VoiceActivityThresholdSeconds int `yaml:"voice_activity_threshold_seconds"` - /* how long (in milliseconds) a voice activity timer sleeps at a time between - * context cancellation checks. - * a higher value means the function sleeps longer which could be - * useful for some reason in the future - * a higher value also means that the timer could take longer to cancel. - * current recommended value is 1000ms. - */ + /* + how long (in milliseconds) a voice activity timer sleeps at a time between + context cancellation checks. + + a higher value means the function sleeps longer which could be + useful for some reason in the future + + a higher value also means that the timer could take longer to cancel. + + current recommended value is 1000ms. + */ VoiceActivityTimerSleepIntervalMillis int `yaml:"voice_activity_timer_sleep_interval_millis"` /* persistent state file store */ PersistentCacheStore string `yaml:"persistent_cache_store"` /* number of internal state events to cache in memory */ - InMemoryEventCacheSize int `yaml:"in_memory_event_cache_size"` - - /* number of days a useractive event is valid for - * increasing this will have a bell curve effect on - * voting weights - */ - UserEventLifespanDays int `yaml:"user_event_lifespan_days"` - - /* listen address for local http Server */ - LocalWebEndpointListen string `yaml:"local_web_endpoint_listen"` + InMemoryEventCacheSize int `yaml:"InMemoryEventCacheSize"` } var config *AppConfig @@ -114,6 +110,4 @@ func setDefaults() { viper.SetDefault("VoiceActivityTimerSleepIntervalMillis", 1000) viper.SetDefault("PersistentCacheStore", "/tmp/bingobot") viper.SetDefault("InMemoryEventCacheSize", 512) - viper.SetDefault("UserEventLifespanDays", 10) - viper.SetDefault("LocalWebEndpointListen", ":8080") } diff --git a/internal/state/events.go b/internal/state/events.go index c14ee77..a24b91f 100644 --- a/internal/state/events.go +++ b/internal/state/events.go @@ -2,10 +2,9 @@ package state import ( "errors" - "fmt" "time" - - "gitlab.com/whom/bingobot/internal/config" + "fmt" + "gitlab.com/whom/bingobot/internal/logging" ) @@ -103,12 +102,6 @@ func (ve VoteEvent) Validate() error { return nil } -func (ve VoteEvent) Disposable() bool { - // will be implemented with democracy module - logging.Warn("unimplemented: VoteEvent::Disposable") - return false -} - type UserEvent struct { uid string created time.Time @@ -136,13 +129,6 @@ func (ue UserEvent) Validate() error { return nil } -func (ue UserEvent) Disposable() bool { - // when I make a module for challenges, restorations, and UserActives - // then I should implement this - logging.Warn("unimplemented: UserEvent::Disposable") - return false -} - type ChallengeEvent struct { UserEvent } @@ -181,48 +167,9 @@ func (ua UserActiveEvent) Type() EventType { return UserActive } -func (ua UserActiveEvent) Disposable() bool { - return (time.Since(ua.created).Hours() / 24) >= float64(config.Get().UserEventLifespanDays) -} - func NewUserActiveEvent(user string) UserActiveEvent { return UserActiveEvent{UserEvent{ uid: user, created: time.Now(), }} } - -const ( - TestEventDisposeKey = "dispose" - TestEventIDKey = "id" -) - -type TestEvent struct { - Dispose bool - ID int -} - -func (te TestEvent) Type() EventType { - return Test -} - -func (te TestEvent) Time() time.Time { - return time.Now() -} - -func (te TestEvent) Data() map[string]string { - m := map[string]string{} - if te.Dispose { - m[TestEventDisposeKey] = "t" - } - m[TestEventIDKey] = fmt.Sprintf("%d", te.ID) - return m -} - -func (te TestEvent) Validate() error { - return nil -} - -func (te TestEvent) Disposable() bool { - return te.Dispose -} diff --git a/internal/state/state.go b/internal/state/state.go index f4a83fd..4986991 100644 --- a/internal/state/state.go +++ b/internal/state/state.go @@ -1,6 +1,6 @@ package state -/* State module +/* STATE * This package encapsulates various state information * state is represented in various global singletons * Additionally, this package offers a pub/sub interface @@ -8,15 +8,15 @@ package state */ import ( - "encoding/json" - "errors" - "os" - "strconv" + "fmt" "sync" "time" + "os" + "errors" + "encoding/json" - "gitlab.com/whom/bingobot/internal/logging" "gitlab.com/whom/bingobot/pkg/docbuf" + "gitlab.com/whom/bingobot/internal/logging" ) /* Event interface is meant to encapsulate a general interface @@ -36,17 +36,14 @@ const ( BadChallengeEvent = "failed to make Challenge Event" BadRestorationEvent = "failed to make Restoration Event" BadUserActiveEvent = "failed to make UserActive Event" - BadTestEvent = "failed to make Test Event" BadEventObjMarshal = "error marshalling event" BadEventObjUnmarshal = "failed to unmarshal event map" BadEventUnmarshal = "failed to unmarshal event" BadEventMissingTypeKey = "event map missing type key" - StartBeforeInitError = "state machine not initialized" - ReopenStoreFileError = "failed to reopen store file" +) +const ( EventTypeMapKey = "type" - - TmpDocBufBackingFile = "bingobot_events_processing" ) var eventMutex sync.RWMutex @@ -63,7 +60,7 @@ func Init(eventMemCacheSize int, eventStoreFileName string) error { } if eventStream, err = docbuf.NewDocumentBuffer( - 0, // temporary nil-cache buffer to replay events from + eventMemCacheSize, file, ); err != nil { return errors.Join(errors.New(BadEventStreamInit), err) @@ -74,89 +71,6 @@ func Init(eventMemCacheSize int, eventStoreFileName string) error { return nil } -// replay events and begin state machine -func Start() error { - if eventStream == nil { - return errors.New(StartBeforeInitError) - } - - tmpBackFile, err := os.CreateTemp("", TmpDocBufBackingFile) - if err != nil { - return err - } - - tmpBuf, err := docbuf.NewDocumentBuffer(0, tmpBackFile) - if err != nil { - tmpBackFile.Close() - return err - } - - // filter out disposable events into a reverse ordered on disk stack - var doc string - for err == nil || err.Error() != docbuf.PopFromEmptyBufferError { - doc, err = eventStream.Pop() - if err != nil && err.Error() != docbuf.PopFromEmptyBufferError { - return err - } - - if doc == "" || - (err != nil && err.Error() == docbuf.PopFromEmptyBufferError) { - break - } - - ev, err := EventFromString(doc) - if err != nil { - logging.Warn("Discarding the following event for not unmarshaling: %s", doc) - continue - } - - if !ev.Disposable() { - tmpBuf.Push(doc) - } - } - - if err := os.Truncate(eventStorageFileName, 0); err != nil { - return errors.Join(errors.New(ReopenStoreFileError), err) - } - - file, err := os.OpenFile(eventStorageFileName, os.O_CREATE|os.O_RDWR, 0644) - if err != nil { - return errors.Join(errors.New(BadEventStoreFilename), err) - } - - eventStream, err = docbuf.NewDocumentBuffer(maxEventsInMemory, file) - if err != nil { - // return error here will panic without truncating or writing to the file - return err - } - - // unravel tmp stack into properly ordered properly allocated buffer - for err == nil || err.Error() != docbuf.PopFromEmptyBufferError { - doc, err := tmpBuf.Pop() - if err != nil && err.Error() != docbuf.PopFromEmptyBufferError { - logging.Warn("Could not handle the following error: %s", err.Error()) - continue - } - - if doc == "" || - (err != nil && err.Error() == docbuf.PopFromEmptyBufferError) { - break - } - - ev, err := EventFromString(doc) - if err != nil { - logging.Warn("Could not handle the following error: %s", err.Error()) - continue - } - - if err := PublishEvent(ev); err != nil { - logging.Warn("Could not handle the following error: %s", err.Error()) - } - } - - return nil -} - // return no error. we are handling SIGINT func Teardown() { // riskily ignore all locking.... @@ -182,7 +96,6 @@ const ( Challenge Restoration UserActive - Test // ... // leave this last @@ -200,8 +113,6 @@ func EventTypeFromString(doc string) EventType { return Restoration case "UserActive": return UserActive - case "Test": - return Test default: // error case return NumEventTypes @@ -214,7 +125,6 @@ func (et EventType) String() string { "Challenge", "Restoration", "UserActive", - "Test", } if et < 0 || et >= NumEventTypes { @@ -284,22 +194,6 @@ func (et EventType) MakeEvent(data map[string]string) (Event, error) { return nil, errors.Join(errors.New(BadUserActiveEvent), err) } return UserActiveEvent{*e}, nil - case Test: - disp := false - if v, ok := data[TestEventDisposeKey]; ok && v == "t" { - disp = true - } - id := -1 - if v, ok := data[TestEventIDKey]; ok { - var err error - if id, err = strconv.Atoi(v); err != nil { - return nil, errors.Join(errors.New(BadTestEvent), err) - } - } - return TestEvent{ - Dispose: disp, - ID: id, - }, nil default: return nil, errors.New(BadEventTypeError) } @@ -338,9 +232,6 @@ type Event interface { // validates state of internal metadata per EventType Validate() error - - // returns true if event can be discarded - Disposable() bool } func EventToString(e Event) (string, error) { @@ -392,7 +283,7 @@ func PublishEvent(e Event) error { blocking := false for _, c := range eventSubscriptionCache[e.Type()] { - if float32(len(c)) > (float32(maxEventsInMemory) * 0.75) { + if float32(len(c)) > (float32(maxEventsInMemory) * 0.25) { if len(c) == maxEventsInMemory { logging.Warn( "PublishEvent() blocking -- event channel full", @@ -482,6 +373,8 @@ func GetMatchingEvents( } filter := func(e Event) bool { + ev, er := EventToString(e) + fmt.Printf("Checking: %s (%e)\n", ev, er) if e.Type() != t { return true } @@ -492,6 +385,7 @@ func GetMatchingEvents( } } + fmt.Println("Found Match") matches = append(matches, e) return true } diff --git a/internal/state/state_test.go b/internal/state/state_test.go index 877b724..20800f3 100644 --- a/internal/state/state_test.go +++ b/internal/state/state_test.go @@ -2,12 +2,11 @@ package state import ( "fmt" - "os" "testing" "time" - "gitlab.com/whom/bingobot/internal/logging" "gitlab.com/whom/bingobot/pkg/docbuf" + "gitlab.com/whom/bingobot/internal/logging" ) /* WARNING: @@ -55,11 +54,15 @@ func TestEventMarshalUnmarshal(t *testing.T) { t.Fatalf("error marshalling challenge: %e, %s", err, cstr) } + t.Logf("cstr: %s\n", cstr) + vstr, err := EventToString(vev); if err != nil { t.Fatalf("error marshalling vote: %e, %s", err, vstr) } + t.Logf("vstr: %s\n", vstr) + if ev, err := EventFromString(cstr); err != nil || ev.Data()[UserEventUserKey] != cev.Data()[UserEventUserKey] || ev.Data()[UserEventCreatedKey] != cev.Data()[UserEventCreatedKey] { @@ -267,81 +270,3 @@ func TestVoteEventValidations(t *testing.T) { t.Errorf("Unexpected or no error: %e", err) } } - -func TestEventReplay(t *testing.T) { - tmpTestFile, err := os.CreateTemp("", "TestEventReplayBackingStore") - if err != nil { - t.Fatalf("failed setting up tempfile: %s", err.Error()) - } - - if err := tmpTestFile.Close(); err != nil { - t.Fatalf("failed to close initial handle to tempfile: %s", err.Error()) - } - - if err := Init(2, tmpTestFile.Name()); err != nil { - t.Fatalf("failed initializing state machine: %s", err.Error()) - } - - if err := PublishEvent(TestEvent{ - ID: 1, - Dispose: true, - }); err != nil { - t.Fatalf("failed to publish event 1: %s", err.Error()) - } - - if err := PublishEvent(TestEvent{ - ID: 2, - Dispose: false, - }); err != nil { - t.Fatalf("failed to publish event 2: %s", err.Error()) - } - - if err := PublishEvent(TestEvent{ - ID: 3, - Dispose: true, - }); err != nil { - t.Fatalf("failed to publish event 3: %s", err.Error()) - } - - if err := PublishEvent(TestEvent{ - ID: 4, - Dispose: false, - }); err != nil { - t.Fatalf("failed to publish event 4: %s", err.Error()) - } - - sub, err := Test.Subscribe() - if err := Start(); err != nil { - t.Fatalf("failed to start state machine: %s", err.Error()) - } - - select { - case ev1 := <- sub: - if ev1.Type() != Test { - t.Fatalf("wrong kind of event somehow: %+v", ev1) - } - if ev1.(TestEvent).ID != 2 { - t.Fatalf("misordered event: %+v", ev1) - } - default: - t.Fatal("no events available from replay") - } - - select { - case ev2 := <- sub: - if ev2.Type() != Test { - t.Fatalf("wrong kind of event somehow: %+v", ev2) - } - if ev2.(TestEvent).ID != 4 { - t.Fatalf("misordered event: %+v", ev2) - } - default: - t.Fatal("only one event made it out") - } - - select { - case <- sub: - t.Fatalf("superfluous events left in subscription") - default: - } -} diff --git a/internal/web/web.go b/internal/web/web.go deleted file mode 100644 index 38051e6..0000000 --- a/internal/web/web.go +++ /dev/null @@ -1,35 +0,0 @@ -package web - -import ( - "fmt" - "net/http" - - "gitlab.com/whom/bingobot/internal/activity" - "gitlab.com/whom/bingobot/internal/config" - "gitlab.com/whom/bingobot/internal/logging" -) - -func Start() error { - frag := config.Get().LocalWebEndpointListen - http.HandleFunc("/", HandleHttpRequest) - go logging.Error(http.ListenAndServe(frag, nil).Error()) - - return nil -} - -func HandleHttpRequest(w http.ResponseWriter, r *http.Request) { - fmt.Fprintf(w, "") - fmt.Fprintf(w, "

User Activity Metrics

") - fmt.Fprintf(w, "

number of the last %d days a user has been active

", - config.Get().UserEventLifespanDays) - - fmt.Fprintf(w, "") - fmt.Fprintf(w, "") - for k, v := range activity.GetActivitySnapshot() { - fmt.Fprintf(w, "", - k, len(v)) - } - fmt.Fprintf(w, "
UserDays
%s%d
") - - fmt.Fprintf(w, "") -} diff --git a/main.go b/main.go index 0528e1c..21111e9 100644 --- a/main.go +++ b/main.go @@ -4,12 +4,10 @@ import ( "flag" "log" - "gitlab.com/whom/bingobot/internal/activity" "gitlab.com/whom/bingobot/internal/config" "gitlab.com/whom/bingobot/internal/discord" "gitlab.com/whom/bingobot/internal/logging" "gitlab.com/whom/bingobot/internal/state" - "gitlab.com/whom/bingobot/internal/web" ) var ( @@ -34,23 +32,11 @@ func main() { ); err != nil { log.Fatalf("couldn't initialize state engine: %s", err.Error()) } - - if err := activity.Start(); err != nil { - // TODO: handle gracefully and continue? - log.Fatalf("failed to start activity module: %s", err.Error()) - } - - if err := web.Start(); err != nil { - log.Fatalf("failed to start local web server: %s", err.Error()) - } - - // start this LAST - if err := state.Start(); err != nil { - log.Fatalf("failed to start state machine: %s", err.Error()) - } defer state.Teardown() - if err := startBot(); err != nil { + err = startBot() + + if err != nil { log.Fatal(err) } } diff --git a/pkg/docbuf/docbuf.go b/pkg/docbuf/docbuf.go index cd3f633..cc0610e 100644 --- a/pkg/docbuf/docbuf.go +++ b/pkg/docbuf/docbuf.go @@ -263,11 +263,6 @@ func NewDocumentBuffer( * If cache is full oldest document is written to backing writer. */ func (b *DocBuf) Push(doc string) error { - if b.cacheSize == 0 { - b.writeToBackingStore(doc) - return nil - } - if len(b.cache) >= b.cacheSize { if err := b.demote(); err != nil { return err @@ -285,14 +280,6 @@ func (b *DocBuf) Push(doc string) error { * out of the backing ReaderWriter. */ func (b *DocBuf) Pop() (string, error) { - if b.cacheSize == 0 { - d, e := b.readDocumentsFromDisk(1, true, false) - if len(d) > 0 { - return d[0], e - } - return "", e - } - if len(b.cache) < 1 { if err := b.promote(true); err != nil { return "", err