Compare commits
2 commits
ava-patche
...
ava-event-
| Author | SHA1 | Date | |
|---|---|---|---|
| e7d229c217 | |||
| 2515d396a0 |
8 changed files with 447 additions and 39 deletions
106
internal/activity/activity.go
Normal file
106
internal/activity/activity.go
Normal file
|
|
@ -0,0 +1,106 @@
|
||||||
|
package activity
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"gitlab.com/whom/bingobot/internal/config"
|
||||||
|
"gitlab.com/whom/bingobot/internal/state"
|
||||||
|
"gitlab.com/whom/bingobot/internal/logging"
|
||||||
|
)
|
||||||
|
|
||||||
|
/* Activity module
|
||||||
|
* This module sits and processes incoming
|
||||||
|
*/
|
||||||
|
|
||||||
|
const (
|
||||||
|
ActivityModuleStartFail = "failed to start activity module"
|
||||||
|
)
|
||||||
|
|
||||||
|
var currentUserActivity map[string][]state.UserActiveEvent
|
||||||
|
var userActivityLock sync.RWMutex
|
||||||
|
|
||||||
|
func Start() error {
|
||||||
|
ch, err := state.UserActive.Subscribe()
|
||||||
|
if err != nil {
|
||||||
|
return errors.Join(
|
||||||
|
errors.New(ActivityModuleStartFail),
|
||||||
|
err,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// process incoming events loop
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
ev := <- ch
|
||||||
|
emap := ev.Data()
|
||||||
|
user := emap[state.UserEventUserKey]
|
||||||
|
etime := ev.Time()
|
||||||
|
delta := time.Since(etime).Hours() / float64(24)
|
||||||
|
|
||||||
|
if delta <= float64(config.Get().UserEventLifespanDays) {
|
||||||
|
new := []state.UserActiveEvent{ev.(state.UserActiveEvent)}
|
||||||
|
userActivityLock.Lock()
|
||||||
|
current, found := currentUserActivity[user]
|
||||||
|
if found {
|
||||||
|
new = append(new, current...)
|
||||||
|
}
|
||||||
|
userActivityLock.Unlock()
|
||||||
|
} else {
|
||||||
|
logging.Warn("recieved expired useractive event")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// process expired events loop
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
delta := time.Hour * 24
|
||||||
|
delta *= time.Duration(config.Get().UserEventLifespanDays)
|
||||||
|
tcur := time.Now().Add(delta)
|
||||||
|
|
||||||
|
// get next soonest expiration
|
||||||
|
userActivityLock.RLock()
|
||||||
|
for _, evs := range currentUserActivity {
|
||||||
|
for _, ev := range evs {
|
||||||
|
hrs := time.Duration(24 * config.Get().UserEventLifespanDays)
|
||||||
|
t := ev.Time().Add(time.Hour * hrs)
|
||||||
|
if t.Before(tcur) {
|
||||||
|
tcur = t
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
userActivityLock.RUnlock()
|
||||||
|
time.Sleep(tcur.Sub(time.Now()))
|
||||||
|
|
||||||
|
userActivityLock.Lock()
|
||||||
|
for k, v := range currentUserActivity {
|
||||||
|
new := []state.UserActiveEvent{}
|
||||||
|
for _, ev := range v {
|
||||||
|
if !ev.Disposable() {
|
||||||
|
new = append(new, ev)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
currentUserActivity[k] = new
|
||||||
|
}
|
||||||
|
userActivityLock.Unlock()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetActivitySnapshot() map[string][]state.UserActiveEvent {
|
||||||
|
userActivityLock.RLock()
|
||||||
|
defer userActivityLock.RUnlock()
|
||||||
|
|
||||||
|
snapshot := make(map[string][]state.UserActiveEvent)
|
||||||
|
for k, v := range currentUserActivity {
|
||||||
|
newSl := make([]state.UserActiveEvent, len(v))
|
||||||
|
copy(newSl, v)
|
||||||
|
snapshot[k] = newSl
|
||||||
|
}
|
||||||
|
|
||||||
|
return snapshot
|
||||||
|
}
|
||||||
|
|
@ -17,30 +17,34 @@ type AppConfig struct {
|
||||||
LogCompression bool `yaml:"log_compression"`
|
LogCompression bool `yaml:"log_compression"`
|
||||||
LogAddSource bool `yaml:"log_add_source"`
|
LogAddSource bool `yaml:"log_add_source"`
|
||||||
|
|
||||||
/*
|
/* how long (in seconds) a user needs to be in vc in order to generate a
|
||||||
how long (in seconds) a user needs to be in vc in order to generate a
|
* UserActive event
|
||||||
UserActive event
|
*/
|
||||||
*/
|
|
||||||
VoiceActivityThresholdSeconds int `yaml:"voice_activity_threshold_seconds"`
|
VoiceActivityThresholdSeconds int `yaml:"voice_activity_threshold_seconds"`
|
||||||
|
|
||||||
/*
|
/* how long (in milliseconds) a voice activity timer sleeps at a time between
|
||||||
how long (in milliseconds) a voice activity timer sleeps at a time between
|
* context cancellation checks.
|
||||||
context cancellation checks.
|
* a higher value means the function sleeps longer which could be
|
||||||
|
* useful for some reason in the future
|
||||||
a higher value means the function sleeps longer which could be
|
* a higher value also means that the timer could take longer to cancel.
|
||||||
useful for some reason in the future
|
* current recommended value is 1000ms.
|
||||||
|
*/
|
||||||
a higher value also means that the timer could take longer to cancel.
|
|
||||||
|
|
||||||
current recommended value is 1000ms.
|
|
||||||
*/
|
|
||||||
VoiceActivityTimerSleepIntervalMillis int `yaml:"voice_activity_timer_sleep_interval_millis"`
|
VoiceActivityTimerSleepIntervalMillis int `yaml:"voice_activity_timer_sleep_interval_millis"`
|
||||||
|
|
||||||
/* persistent state file store */
|
/* persistent state file store */
|
||||||
PersistentCacheStore string `yaml:"persistent_cache_store"`
|
PersistentCacheStore string `yaml:"persistent_cache_store"`
|
||||||
|
|
||||||
/* number of internal state events to cache in memory */
|
/* number of internal state events to cache in memory */
|
||||||
InMemoryEventCacheSize int `yaml:"InMemoryEventCacheSize"`
|
InMemoryEventCacheSize int `yaml:"in_memory_event_cache_size"`
|
||||||
|
|
||||||
|
/* number of days a useractive event is valid for
|
||||||
|
* increasing this will have a bell curve effect on
|
||||||
|
* voting weights
|
||||||
|
*/
|
||||||
|
UserEventLifespanDays int `yaml:"user_event_lifespan_days"`
|
||||||
|
|
||||||
|
/* listen address for local http Server */
|
||||||
|
LocalWebEndpointListen string `yaml:"local_web_endpoint_listen"`
|
||||||
}
|
}
|
||||||
|
|
||||||
var config *AppConfig
|
var config *AppConfig
|
||||||
|
|
@ -110,4 +114,6 @@ func setDefaults() {
|
||||||
viper.SetDefault("VoiceActivityTimerSleepIntervalMillis", 1000)
|
viper.SetDefault("VoiceActivityTimerSleepIntervalMillis", 1000)
|
||||||
viper.SetDefault("PersistentCacheStore", "/tmp/bingobot")
|
viper.SetDefault("PersistentCacheStore", "/tmp/bingobot")
|
||||||
viper.SetDefault("InMemoryEventCacheSize", 512)
|
viper.SetDefault("InMemoryEventCacheSize", 512)
|
||||||
|
viper.SetDefault("UserEventLifespanDays", 10)
|
||||||
|
viper.SetDefault("LocalWebEndpointListen", ":8080")
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -2,9 +2,10 @@ package state
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"time"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"gitlab.com/whom/bingobot/internal/config"
|
||||||
"gitlab.com/whom/bingobot/internal/logging"
|
"gitlab.com/whom/bingobot/internal/logging"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -102,6 +103,12 @@ func (ve VoteEvent) Validate() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ve VoteEvent) Disposable() bool {
|
||||||
|
// will be implemented with democracy module
|
||||||
|
logging.Warn("unimplemented: VoteEvent::Disposable")
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
type UserEvent struct {
|
type UserEvent struct {
|
||||||
uid string
|
uid string
|
||||||
created time.Time
|
created time.Time
|
||||||
|
|
@ -129,6 +136,13 @@ func (ue UserEvent) Validate() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ue UserEvent) Disposable() bool {
|
||||||
|
// when I make a module for challenges, restorations, and UserActives
|
||||||
|
// then I should implement this
|
||||||
|
logging.Warn("unimplemented: UserEvent::Disposable")
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
type ChallengeEvent struct {
|
type ChallengeEvent struct {
|
||||||
UserEvent
|
UserEvent
|
||||||
}
|
}
|
||||||
|
|
@ -167,9 +181,48 @@ func (ua UserActiveEvent) Type() EventType {
|
||||||
return UserActive
|
return UserActive
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ua UserActiveEvent) Disposable() bool {
|
||||||
|
return (time.Since(ua.created).Hours() / 24) >= float64(config.Get().UserEventLifespanDays)
|
||||||
|
}
|
||||||
|
|
||||||
func NewUserActiveEvent(user string) UserActiveEvent {
|
func NewUserActiveEvent(user string) UserActiveEvent {
|
||||||
return UserActiveEvent{UserEvent{
|
return UserActiveEvent{UserEvent{
|
||||||
uid: user,
|
uid: user,
|
||||||
created: time.Now(),
|
created: time.Now(),
|
||||||
}}
|
}}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
TestEventDisposeKey = "dispose"
|
||||||
|
TestEventIDKey = "id"
|
||||||
|
)
|
||||||
|
|
||||||
|
type TestEvent struct {
|
||||||
|
Dispose bool
|
||||||
|
ID int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (te TestEvent) Type() EventType {
|
||||||
|
return Test
|
||||||
|
}
|
||||||
|
|
||||||
|
func (te TestEvent) Time() time.Time {
|
||||||
|
return time.Now()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (te TestEvent) Data() map[string]string {
|
||||||
|
m := map[string]string{}
|
||||||
|
if te.Dispose {
|
||||||
|
m[TestEventDisposeKey] = "t"
|
||||||
|
}
|
||||||
|
m[TestEventIDKey] = fmt.Sprintf("%d", te.ID)
|
||||||
|
return m
|
||||||
|
}
|
||||||
|
|
||||||
|
func (te TestEvent) Validate() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (te TestEvent) Disposable() bool {
|
||||||
|
return te.Dispose
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
package state
|
package state
|
||||||
|
|
||||||
/* STATE
|
/* State module
|
||||||
* This package encapsulates various state information
|
* This package encapsulates various state information
|
||||||
* state is represented in various global singletons
|
* state is represented in various global singletons
|
||||||
* Additionally, this package offers a pub/sub interface
|
* Additionally, this package offers a pub/sub interface
|
||||||
|
|
@ -8,15 +8,15 @@ package state
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"os"
|
||||||
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
"os"
|
|
||||||
"errors"
|
|
||||||
"encoding/json"
|
|
||||||
|
|
||||||
"gitlab.com/whom/bingobot/pkg/docbuf"
|
|
||||||
"gitlab.com/whom/bingobot/internal/logging"
|
"gitlab.com/whom/bingobot/internal/logging"
|
||||||
|
"gitlab.com/whom/bingobot/pkg/docbuf"
|
||||||
)
|
)
|
||||||
|
|
||||||
/* Event interface is meant to encapsulate a general interface
|
/* Event interface is meant to encapsulate a general interface
|
||||||
|
|
@ -36,14 +36,17 @@ const (
|
||||||
BadChallengeEvent = "failed to make Challenge Event"
|
BadChallengeEvent = "failed to make Challenge Event"
|
||||||
BadRestorationEvent = "failed to make Restoration Event"
|
BadRestorationEvent = "failed to make Restoration Event"
|
||||||
BadUserActiveEvent = "failed to make UserActive Event"
|
BadUserActiveEvent = "failed to make UserActive Event"
|
||||||
|
BadTestEvent = "failed to make Test Event"
|
||||||
BadEventObjMarshal = "error marshalling event"
|
BadEventObjMarshal = "error marshalling event"
|
||||||
BadEventObjUnmarshal = "failed to unmarshal event map"
|
BadEventObjUnmarshal = "failed to unmarshal event map"
|
||||||
BadEventUnmarshal = "failed to unmarshal event"
|
BadEventUnmarshal = "failed to unmarshal event"
|
||||||
BadEventMissingTypeKey = "event map missing type key"
|
BadEventMissingTypeKey = "event map missing type key"
|
||||||
)
|
StartBeforeInitError = "state machine not initialized"
|
||||||
|
ReopenStoreFileError = "failed to reopen store file"
|
||||||
|
|
||||||
const (
|
|
||||||
EventTypeMapKey = "type"
|
EventTypeMapKey = "type"
|
||||||
|
|
||||||
|
TmpDocBufBackingFile = "bingobot_events_processing"
|
||||||
)
|
)
|
||||||
|
|
||||||
var eventMutex sync.RWMutex
|
var eventMutex sync.RWMutex
|
||||||
|
|
@ -60,7 +63,7 @@ func Init(eventMemCacheSize int, eventStoreFileName string) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
if eventStream, err = docbuf.NewDocumentBuffer(
|
if eventStream, err = docbuf.NewDocumentBuffer(
|
||||||
eventMemCacheSize,
|
0, // temporary nil-cache buffer to replay events from
|
||||||
file,
|
file,
|
||||||
); err != nil {
|
); err != nil {
|
||||||
return errors.Join(errors.New(BadEventStreamInit), err)
|
return errors.Join(errors.New(BadEventStreamInit), err)
|
||||||
|
|
@ -71,6 +74,89 @@ func Init(eventMemCacheSize int, eventStoreFileName string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// replay events and begin state machine
|
||||||
|
func Start() error {
|
||||||
|
if eventStream == nil {
|
||||||
|
return errors.New(StartBeforeInitError)
|
||||||
|
}
|
||||||
|
|
||||||
|
tmpBackFile, err := os.CreateTemp("", TmpDocBufBackingFile)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
tmpBuf, err := docbuf.NewDocumentBuffer(0, tmpBackFile)
|
||||||
|
if err != nil {
|
||||||
|
tmpBackFile.Close()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// filter out disposable events into a reverse ordered on disk stack
|
||||||
|
var doc string
|
||||||
|
for err == nil || err.Error() != docbuf.PopFromEmptyBufferError {
|
||||||
|
doc, err = eventStream.Pop()
|
||||||
|
if err != nil && err.Error() != docbuf.PopFromEmptyBufferError {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if doc == "" ||
|
||||||
|
(err != nil && err.Error() == docbuf.PopFromEmptyBufferError) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
ev, err := EventFromString(doc)
|
||||||
|
if err != nil {
|
||||||
|
logging.Warn("Discarding the following event for not unmarshaling: %s", doc)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if !ev.Disposable() {
|
||||||
|
tmpBuf.Push(doc)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := os.Truncate(eventStorageFileName, 0); err != nil {
|
||||||
|
return errors.Join(errors.New(ReopenStoreFileError), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
file, err := os.OpenFile(eventStorageFileName, os.O_CREATE|os.O_RDWR, 0644)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Join(errors.New(BadEventStoreFilename), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
eventStream, err = docbuf.NewDocumentBuffer(maxEventsInMemory, file)
|
||||||
|
if err != nil {
|
||||||
|
// return error here will panic without truncating or writing to the file
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// unravel tmp stack into properly ordered properly allocated buffer
|
||||||
|
for err == nil || err.Error() != docbuf.PopFromEmptyBufferError {
|
||||||
|
doc, err := tmpBuf.Pop()
|
||||||
|
if err != nil && err.Error() != docbuf.PopFromEmptyBufferError {
|
||||||
|
logging.Warn("Could not handle the following error: %s", err.Error())
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if doc == "" ||
|
||||||
|
(err != nil && err.Error() == docbuf.PopFromEmptyBufferError) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
ev, err := EventFromString(doc)
|
||||||
|
if err != nil {
|
||||||
|
logging.Warn("Could not handle the following error: %s", err.Error())
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := PublishEvent(ev); err != nil {
|
||||||
|
logging.Warn("Could not handle the following error: %s", err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// return no error. we are handling SIGINT
|
// return no error. we are handling SIGINT
|
||||||
func Teardown() {
|
func Teardown() {
|
||||||
// riskily ignore all locking....
|
// riskily ignore all locking....
|
||||||
|
|
@ -96,6 +182,7 @@ const (
|
||||||
Challenge
|
Challenge
|
||||||
Restoration
|
Restoration
|
||||||
UserActive
|
UserActive
|
||||||
|
Test
|
||||||
// ...
|
// ...
|
||||||
|
|
||||||
// leave this last
|
// leave this last
|
||||||
|
|
@ -113,6 +200,8 @@ func EventTypeFromString(doc string) EventType {
|
||||||
return Restoration
|
return Restoration
|
||||||
case "UserActive":
|
case "UserActive":
|
||||||
return UserActive
|
return UserActive
|
||||||
|
case "Test":
|
||||||
|
return Test
|
||||||
default:
|
default:
|
||||||
// error case
|
// error case
|
||||||
return NumEventTypes
|
return NumEventTypes
|
||||||
|
|
@ -125,6 +214,7 @@ func (et EventType) String() string {
|
||||||
"Challenge",
|
"Challenge",
|
||||||
"Restoration",
|
"Restoration",
|
||||||
"UserActive",
|
"UserActive",
|
||||||
|
"Test",
|
||||||
}
|
}
|
||||||
|
|
||||||
if et < 0 || et >= NumEventTypes {
|
if et < 0 || et >= NumEventTypes {
|
||||||
|
|
@ -194,6 +284,22 @@ func (et EventType) MakeEvent(data map[string]string) (Event, error) {
|
||||||
return nil, errors.Join(errors.New(BadUserActiveEvent), err)
|
return nil, errors.Join(errors.New(BadUserActiveEvent), err)
|
||||||
}
|
}
|
||||||
return UserActiveEvent{*e}, nil
|
return UserActiveEvent{*e}, nil
|
||||||
|
case Test:
|
||||||
|
disp := false
|
||||||
|
if v, ok := data[TestEventDisposeKey]; ok && v == "t" {
|
||||||
|
disp = true
|
||||||
|
}
|
||||||
|
id := -1
|
||||||
|
if v, ok := data[TestEventIDKey]; ok {
|
||||||
|
var err error
|
||||||
|
if id, err = strconv.Atoi(v); err != nil {
|
||||||
|
return nil, errors.Join(errors.New(BadTestEvent), err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return TestEvent{
|
||||||
|
Dispose: disp,
|
||||||
|
ID: id,
|
||||||
|
}, nil
|
||||||
default:
|
default:
|
||||||
return nil, errors.New(BadEventTypeError)
|
return nil, errors.New(BadEventTypeError)
|
||||||
}
|
}
|
||||||
|
|
@ -232,6 +338,9 @@ type Event interface {
|
||||||
|
|
||||||
// validates state of internal metadata per EventType
|
// validates state of internal metadata per EventType
|
||||||
Validate() error
|
Validate() error
|
||||||
|
|
||||||
|
// returns true if event can be discarded
|
||||||
|
Disposable() bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func EventToString(e Event) (string, error) {
|
func EventToString(e Event) (string, error) {
|
||||||
|
|
@ -283,7 +392,7 @@ func PublishEvent(e Event) error {
|
||||||
blocking := false
|
blocking := false
|
||||||
|
|
||||||
for _, c := range eventSubscriptionCache[e.Type()] {
|
for _, c := range eventSubscriptionCache[e.Type()] {
|
||||||
if float32(len(c)) > (float32(maxEventsInMemory) * 0.25) {
|
if float32(len(c)) > (float32(maxEventsInMemory) * 0.75) {
|
||||||
if len(c) == maxEventsInMemory {
|
if len(c) == maxEventsInMemory {
|
||||||
logging.Warn(
|
logging.Warn(
|
||||||
"PublishEvent() blocking -- event channel full",
|
"PublishEvent() blocking -- event channel full",
|
||||||
|
|
@ -373,8 +482,6 @@ func GetMatchingEvents(
|
||||||
}
|
}
|
||||||
|
|
||||||
filter := func(e Event) bool {
|
filter := func(e Event) bool {
|
||||||
ev, er := EventToString(e)
|
|
||||||
fmt.Printf("Checking: %s (%e)\n", ev, er)
|
|
||||||
if e.Type() != t {
|
if e.Type() != t {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
@ -385,7 +492,6 @@ func GetMatchingEvents(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Println("Found Match")
|
|
||||||
matches = append(matches, e)
|
matches = append(matches, e)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -2,11 +2,12 @@ package state
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"gitlab.com/whom/bingobot/pkg/docbuf"
|
|
||||||
"gitlab.com/whom/bingobot/internal/logging"
|
"gitlab.com/whom/bingobot/internal/logging"
|
||||||
|
"gitlab.com/whom/bingobot/pkg/docbuf"
|
||||||
)
|
)
|
||||||
|
|
||||||
/* WARNING:
|
/* WARNING:
|
||||||
|
|
@ -54,15 +55,11 @@ func TestEventMarshalUnmarshal(t *testing.T) {
|
||||||
t.Fatalf("error marshalling challenge: %e, %s", err, cstr)
|
t.Fatalf("error marshalling challenge: %e, %s", err, cstr)
|
||||||
}
|
}
|
||||||
|
|
||||||
t.Logf("cstr: %s\n", cstr)
|
|
||||||
|
|
||||||
vstr, err := EventToString(vev);
|
vstr, err := EventToString(vev);
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error marshalling vote: %e, %s", err, vstr)
|
t.Fatalf("error marshalling vote: %e, %s", err, vstr)
|
||||||
}
|
}
|
||||||
|
|
||||||
t.Logf("vstr: %s\n", vstr)
|
|
||||||
|
|
||||||
if ev, err := EventFromString(cstr); err != nil ||
|
if ev, err := EventFromString(cstr); err != nil ||
|
||||||
ev.Data()[UserEventUserKey] != cev.Data()[UserEventUserKey] ||
|
ev.Data()[UserEventUserKey] != cev.Data()[UserEventUserKey] ||
|
||||||
ev.Data()[UserEventCreatedKey] != cev.Data()[UserEventCreatedKey] {
|
ev.Data()[UserEventCreatedKey] != cev.Data()[UserEventCreatedKey] {
|
||||||
|
|
@ -270,3 +267,81 @@ func TestVoteEventValidations(t *testing.T) {
|
||||||
t.Errorf("Unexpected or no error: %e", err)
|
t.Errorf("Unexpected or no error: %e", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestEventReplay(t *testing.T) {
|
||||||
|
tmpTestFile, err := os.CreateTemp("", "TestEventReplayBackingStore")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed setting up tempfile: %s", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := tmpTestFile.Close(); err != nil {
|
||||||
|
t.Fatalf("failed to close initial handle to tempfile: %s", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := Init(2, tmpTestFile.Name()); err != nil {
|
||||||
|
t.Fatalf("failed initializing state machine: %s", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := PublishEvent(TestEvent{
|
||||||
|
ID: 1,
|
||||||
|
Dispose: true,
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("failed to publish event 1: %s", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := PublishEvent(TestEvent{
|
||||||
|
ID: 2,
|
||||||
|
Dispose: false,
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("failed to publish event 2: %s", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := PublishEvent(TestEvent{
|
||||||
|
ID: 3,
|
||||||
|
Dispose: true,
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("failed to publish event 3: %s", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := PublishEvent(TestEvent{
|
||||||
|
ID: 4,
|
||||||
|
Dispose: false,
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("failed to publish event 4: %s", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
sub, err := Test.Subscribe()
|
||||||
|
if err := Start(); err != nil {
|
||||||
|
t.Fatalf("failed to start state machine: %s", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case ev1 := <- sub:
|
||||||
|
if ev1.Type() != Test {
|
||||||
|
t.Fatalf("wrong kind of event somehow: %+v", ev1)
|
||||||
|
}
|
||||||
|
if ev1.(TestEvent).ID != 2 {
|
||||||
|
t.Fatalf("misordered event: %+v", ev1)
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
t.Fatal("no events available from replay")
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case ev2 := <- sub:
|
||||||
|
if ev2.Type() != Test {
|
||||||
|
t.Fatalf("wrong kind of event somehow: %+v", ev2)
|
||||||
|
}
|
||||||
|
if ev2.(TestEvent).ID != 4 {
|
||||||
|
t.Fatalf("misordered event: %+v", ev2)
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
t.Fatal("only one event made it out")
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <- sub:
|
||||||
|
t.Fatalf("superfluous events left in subscription")
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
35
internal/web/web.go
Normal file
35
internal/web/web.go
Normal file
|
|
@ -0,0 +1,35 @@
|
||||||
|
package web
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
|
||||||
|
"gitlab.com/whom/bingobot/internal/activity"
|
||||||
|
"gitlab.com/whom/bingobot/internal/config"
|
||||||
|
"gitlab.com/whom/bingobot/internal/logging"
|
||||||
|
)
|
||||||
|
|
||||||
|
func Start() error {
|
||||||
|
frag := config.Get().LocalWebEndpointListen
|
||||||
|
http.HandleFunc("/", HandleHttpRequest)
|
||||||
|
go logging.Error(http.ListenAndServe(frag, nil).Error())
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func HandleHttpRequest(w http.ResponseWriter, r *http.Request) {
|
||||||
|
fmt.Fprintf(w, "<html>")
|
||||||
|
fmt.Fprintf(w, "<h1> User Activity Metrics </h1>")
|
||||||
|
fmt.Fprintf(w, "<p> number of the last %d days a user has been active </p>",
|
||||||
|
config.Get().UserEventLifespanDays)
|
||||||
|
|
||||||
|
fmt.Fprintf(w, "<table>")
|
||||||
|
fmt.Fprintf(w, "<tr><th>User</th><th>Days</th></tr>")
|
||||||
|
for k, v := range activity.GetActivitySnapshot() {
|
||||||
|
fmt.Fprintf(w, "<tr><td>%s</td><td>%d</td></tr>",
|
||||||
|
k, len(v))
|
||||||
|
}
|
||||||
|
fmt.Fprintf(w, "</table>")
|
||||||
|
|
||||||
|
fmt.Fprintf(w, "</html>")
|
||||||
|
}
|
||||||
20
main.go
20
main.go
|
|
@ -4,10 +4,12 @@ import (
|
||||||
"flag"
|
"flag"
|
||||||
"log"
|
"log"
|
||||||
|
|
||||||
|
"gitlab.com/whom/bingobot/internal/activity"
|
||||||
"gitlab.com/whom/bingobot/internal/config"
|
"gitlab.com/whom/bingobot/internal/config"
|
||||||
"gitlab.com/whom/bingobot/internal/discord"
|
"gitlab.com/whom/bingobot/internal/discord"
|
||||||
"gitlab.com/whom/bingobot/internal/logging"
|
"gitlab.com/whom/bingobot/internal/logging"
|
||||||
"gitlab.com/whom/bingobot/internal/state"
|
"gitlab.com/whom/bingobot/internal/state"
|
||||||
|
"gitlab.com/whom/bingobot/internal/web"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
|
@ -32,11 +34,23 @@ func main() {
|
||||||
); err != nil {
|
); err != nil {
|
||||||
log.Fatalf("couldn't initialize state engine: %s", err.Error())
|
log.Fatalf("couldn't initialize state engine: %s", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := activity.Start(); err != nil {
|
||||||
|
// TODO: handle gracefully and continue?
|
||||||
|
log.Fatalf("failed to start activity module: %s", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := web.Start(); err != nil {
|
||||||
|
log.Fatalf("failed to start local web server: %s", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
// start this LAST
|
||||||
|
if err := state.Start(); err != nil {
|
||||||
|
log.Fatalf("failed to start state machine: %s", err.Error())
|
||||||
|
}
|
||||||
defer state.Teardown()
|
defer state.Teardown()
|
||||||
|
|
||||||
err = startBot()
|
if err := startBot(); err != nil {
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -263,6 +263,11 @@ func NewDocumentBuffer(
|
||||||
* If cache is full oldest document is written to backing writer.
|
* If cache is full oldest document is written to backing writer.
|
||||||
*/
|
*/
|
||||||
func (b *DocBuf) Push(doc string) error {
|
func (b *DocBuf) Push(doc string) error {
|
||||||
|
if b.cacheSize == 0 {
|
||||||
|
b.writeToBackingStore(doc)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
if len(b.cache) >= b.cacheSize {
|
if len(b.cache) >= b.cacheSize {
|
||||||
if err := b.demote(); err != nil {
|
if err := b.demote(); err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
@ -280,6 +285,14 @@ func (b *DocBuf) Push(doc string) error {
|
||||||
* out of the backing ReaderWriter.
|
* out of the backing ReaderWriter.
|
||||||
*/
|
*/
|
||||||
func (b *DocBuf) Pop() (string, error) {
|
func (b *DocBuf) Pop() (string, error) {
|
||||||
|
if b.cacheSize == 0 {
|
||||||
|
d, e := b.readDocumentsFromDisk(1, true, false)
|
||||||
|
if len(d) > 0 {
|
||||||
|
return d[0], e
|
||||||
|
}
|
||||||
|
return "", e
|
||||||
|
}
|
||||||
|
|
||||||
if len(b.cache) < 1 {
|
if len(b.cache) < 1 {
|
||||||
if err := b.promote(true); err != nil {
|
if err := b.promote(true); err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue