bingobot/internal/activity/activity.go

113 lines
2.4 KiB
Go
Raw Permalink Normal View History

2025-01-08 00:47:49 +00:00
package activity
import (
"errors"
"fmt"
2025-01-08 00:47:49 +00:00
"sync"
"time"
"gitlab.com/whom/bingobot/internal/config"
"gitlab.com/whom/bingobot/internal/logging"
"gitlab.com/whom/bingobot/internal/state"
2025-01-08 00:47:49 +00:00
)
/* Activity module
* This module sits and processes incoming
*/
const (
ActivityModuleStartFail = "failed to start activity module"
)
var currentUserActivity = make(map[string][]state.UserActiveEvent)
2025-01-08 00:47:49 +00:00
var userActivityLock sync.RWMutex
func Start() error {
ch, err := state.UserActive.Subscribe()
if err != nil {
return errors.Join(
errors.New(ActivityModuleStartFail),
err,
)
}
// process incoming events loop
go func() {
for {
ev := <- ch
emap := ev.Data()
user := emap[state.UserEventUserKey]
etime := ev.Time()
delta := time.Since(etime).Hours() / float64(24)
logging.Debug(fmt.Sprintf(
"processing UserActive event for %s", user,
))
2025-01-08 00:47:49 +00:00
if delta <= float64(config.Get().UserEventLifespanDays) {
new := []state.UserActiveEvent{ev.(state.UserActiveEvent)}
userActivityLock.Lock()
current, found := currentUserActivity[user]
if found {
new = append(new, current...)
}
currentUserActivity[user] = new
2025-01-08 00:47:49 +00:00
userActivityLock.Unlock()
} else {
logging.Warn("recieved expired useractive event")
}
}
}()
// process expired events loop
go func() {
for {
delta := time.Hour * 24
delta *= time.Duration(config.Get().UserEventLifespanDays)
tcur := time.Now().Add(delta)
// get next soonest expiration
userActivityLock.RLock()
for _, evs := range currentUserActivity {
for _, ev := range evs {
hrs := time.Duration(24 * config.Get().UserEventLifespanDays)
t := ev.Time().Add(time.Hour * hrs)
if t.Before(tcur) {
tcur = t
}
}
}
userActivityLock.RUnlock()
time.Sleep(tcur.Sub(time.Now()))
userActivityLock.Lock()
for k, v := range currentUserActivity {
new := []state.UserActiveEvent{}
for _, ev := range v {
if !ev.Disposable() {
new = append(new, ev)
}
}
currentUserActivity[k] = new
}
userActivityLock.Unlock()
}
}()
return nil
}
func GetActivitySnapshot() map[string][]state.UserActiveEvent {
userActivityLock.RLock()
defer userActivityLock.RUnlock()
snapshot := make(map[string][]state.UserActiveEvent)
for k, v := range currentUserActivity {
newSl := make([]state.UserActiveEvent, len(v))
copy(newSl, v)
snapshot[k] = newSl
}
return snapshot
}