diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 3093957..e67ef81 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -26,4 +26,4 @@ tests-config-pkg: tests-docbuf-pkg: stage: test script: - - go test ./internal/docbuf + - go test ./pkg/docbuf diff --git a/internal/activity/activity.go b/internal/activity/activity.go new file mode 100644 index 0000000..488de9d --- /dev/null +++ b/internal/activity/activity.go @@ -0,0 +1,112 @@ +package activity + +import ( + "errors" + "fmt" + "sync" + "time" + + "gitlab.com/whom/bingobot/internal/config" + "gitlab.com/whom/bingobot/internal/logging" + "gitlab.com/whom/bingobot/internal/state" +) + +/* Activity module + * This module sits and processes incoming + */ + +const ( + ActivityModuleStartFail = "failed to start activity module" +) + +var currentUserActivity = make(map[string][]state.UserActiveEvent) +var userActivityLock sync.RWMutex + +func Start() error { + ch, err := state.UserActive.Subscribe() + if err != nil { + return errors.Join( + errors.New(ActivityModuleStartFail), + err, + ) + } + + // process incoming events loop + go func() { + for { + ev := <- ch + emap := ev.Data() + user := emap[state.UserEventUserKey] + etime := ev.Time() + delta := time.Since(etime).Hours() / float64(24) + logging.Debug(fmt.Sprintf( + "processing UserActive event for %s", user, + )) + + + if delta <= float64(config.Get().UserEventLifespanDays) { + new := []state.UserActiveEvent{ev.(state.UserActiveEvent)} + userActivityLock.Lock() + current, found := currentUserActivity[user] + if found { + new = append(new, current...) + } + currentUserActivity[user] = new + userActivityLock.Unlock() + } else { + logging.Warn("recieved expired useractive event") + } + } + }() + + // process expired events loop + go func() { + for { + delta := time.Hour * 24 + delta *= time.Duration(config.Get().UserEventLifespanDays) + tcur := time.Now().Add(delta) + + // get next soonest expiration + userActivityLock.RLock() + for _, evs := range currentUserActivity { + for _, ev := range evs { + hrs := time.Duration(24 * config.Get().UserEventLifespanDays) + t := ev.Time().Add(time.Hour * hrs) + if t.Before(tcur) { + tcur = t + } + } + } + userActivityLock.RUnlock() + time.Sleep(tcur.Sub(time.Now())) + + userActivityLock.Lock() + for k, v := range currentUserActivity { + new := []state.UserActiveEvent{} + for _, ev := range v { + if !ev.Disposable() { + new = append(new, ev) + } + } + currentUserActivity[k] = new + } + userActivityLock.Unlock() + } + }() + + return nil +} + +func GetActivitySnapshot() map[string][]state.UserActiveEvent { + userActivityLock.RLock() + defer userActivityLock.RUnlock() + + snapshot := make(map[string][]state.UserActiveEvent) + for k, v := range currentUserActivity { + newSl := make([]state.UserActiveEvent, len(v)) + copy(newSl, v) + snapshot[k] = newSl + } + + return snapshot +} diff --git a/internal/confession/confession.go b/internal/confession/confession.go new file mode 100644 index 0000000..d76aa38 --- /dev/null +++ b/internal/confession/confession.go @@ -0,0 +1,60 @@ +package confession + +import ( + "errors" + "fmt" + "sync" + + "github.com/bwmarrin/discordgo" + "gitlab.com/whom/bingobot/internal/logging" + "gitlab.com/whom/bingobot/internal/state" +) + +/* Activity module + * This module posts anonymous confessions according to a linked channel map + */ + +const ( + ActivityModuleStartFail = "failed to start activity module" +) + +var ( + // guild ID to channel ID + linkLock sync.RWMutex + confessionChannelLinks = make(map[string]state.ConfessionsChannelLinkEvent) +) + +func Start() error { + ch, err := state.ConfessionsChannelLink.Subscribe() + if err != nil { + return errors.Join( + errors.New(ActivityModuleStartFail), + err, + ) + } + + // process incoming events loop + go func() { + for { + ev := <- ch + logging.Debug("recieved new confessional channel link") + e := ev.(state.ConfessionsChannelLinkEvent) + linkLock.Lock() + confessionChannelLinks[e.GuildID] = e + linkLock.Unlock() + } + }() + + return nil +} + +func MakeConfession(s *discordgo.Session, guildID string, content string) { + linkLock.RLock() + link, ok := confessionChannelLinks[guildID] + linkLock.RUnlock() + if !ok { + logging.Error(fmt.Sprintf("Failed to send confession in guild %s: no link exists in map", guildID)) + return + } + s.ChannelMessageSend(link.ChannelID, content) +} diff --git a/internal/config/config.go b/internal/config/config.go index eee917d..f32ae9a 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -17,30 +17,34 @@ type AppConfig struct { LogCompression bool `yaml:"log_compression"` LogAddSource bool `yaml:"log_add_source"` - /* - how long (in seconds) a user needs to be in vc in order to generate a - UserActive event - */ + /* how long (in seconds) a user needs to be in vc in order to generate a + * UserActive event + */ VoiceActivityThresholdSeconds int `yaml:"voice_activity_threshold_seconds"` - /* - how long (in milliseconds) a voice activity timer sleeps at a time between - context cancellation checks. - - a higher value means the function sleeps longer which could be - useful for some reason in the future - - a higher value also means that the timer could take longer to cancel. - - current recommended value is 1000ms. - */ + /* how long (in milliseconds) a voice activity timer sleeps at a time between + * context cancellation checks. + * a higher value means the function sleeps longer which could be + * useful for some reason in the future + * a higher value also means that the timer could take longer to cancel. + * current recommended value is 1000ms. + */ VoiceActivityTimerSleepIntervalMillis int `yaml:"voice_activity_timer_sleep_interval_millis"` /* persistent state file store */ PersistentCacheStore string `yaml:"persistent_cache_store"` /* number of internal state events to cache in memory */ - InMemoryEventCacheSize int `yaml:"InMemoryEventCacheSize"` + InMemoryEventCacheSize int `yaml:"in_memory_event_cache_size"` + + /* number of days a useractive event is valid for + * increasing this will have a bell curve effect on + * voting weights + */ + UserEventLifespanDays int `yaml:"user_event_lifespan_days"` + + /* listen address for local http Server */ + LocalWebEndpointListen string `yaml:"local_web_endpoint_listen"` } var config *AppConfig @@ -110,4 +114,6 @@ func setDefaults() { viper.SetDefault("VoiceActivityTimerSleepIntervalMillis", 1000) viper.SetDefault("PersistentCacheStore", "/tmp/bingobot") viper.SetDefault("InMemoryEventCacheSize", 512) + viper.SetDefault("UserEventLifespanDays", 10) + viper.SetDefault("LocalWebEndpointListen", ":8080") } diff --git a/internal/discord/activity.go b/internal/discord/activity.go index afa6c8d..002f3b4 100644 --- a/internal/discord/activity.go +++ b/internal/discord/activity.go @@ -42,7 +42,7 @@ func NewActivityTimer(uid string) *UserActivityTimer { Start() initializes the timer and calls run() */ func (t *UserActivityTimer) Start(ctx context.Context) { - logging.Info("starting voiceActivityTimer", "uid", t.UID) + logging.Debug("starting voiceActivityTimer", "uid", t.UID) t.sleepDuration = time.Millisecond * time.Duration(config.Get().VoiceActivityTimerSleepIntervalMillis) activityTimerDuration := time.Second * time.Duration(config.Get().VoiceActivityThresholdSeconds) @@ -89,7 +89,7 @@ func (t *UserActivityTimer) run() { } // the timer's context has been cancelled or deadline expired. - logging.Info("voiceActivityTimer stopping", "uid", t.UID, "reason", context.Cause(t.ctx)) + logging.Debug("voiceActivityTimer stopping", "uid", t.UID, "reason", context.Cause(t.ctx)) if context.Cause(t.ctx) == ErrTimerExpired { /* @@ -128,7 +128,7 @@ func stopActivityTimer(uid string) { timerMutex.Lock() defer timerMutex.Unlock() - if _, ok := activityTimers[uid]; !ok { + if _, ok := activityTimers[uid]; ok { activityTimers[uid].Cancel() delete(activityTimers, uid) } @@ -147,5 +147,5 @@ func emitUserActiveEvent(uid string) { return } - logging.Info("published UserActiveEvent", "uid", uid) + logging.Debug("published UserActiveEvent", "uid", uid) } diff --git a/internal/discord/commands.go b/internal/discord/commands.go new file mode 100644 index 0000000..c774dc3 --- /dev/null +++ b/internal/discord/commands.go @@ -0,0 +1,129 @@ +package discord + +import ( + "fmt" + "time" + + "github.com/bwmarrin/discordgo" + + "gitlab.com/whom/bingobot/internal/confession" + "gitlab.com/whom/bingobot/internal/logging" + "gitlab.com/whom/bingobot/internal/state" +) + +var ( + // map of guildID to registeredCommands + registeredCommands = make(map[string][]*discordgo.ApplicationCommand) + + // all commands + commandList = []*discordgo.ApplicationCommand{ + // TODO: Limit usage somehow? + // maybe delete this and use the vote module instead + &discordgo.ApplicationCommand{ + Name: "confessional", + Description: "mark a channel as a designated confessional for a guild", + }, + + &discordgo.ApplicationCommand{ + Name: "confess", + Description: "anonymously post a confession in configured channel", + Options: []*discordgo.ApplicationCommandOption{ + &discordgo.ApplicationCommandOption{ + Type: discordgo.ApplicationCommandOptionString, + Name: "confession", + Description: "A confession to be posted anonymously", + Required: true, + }, + }, + }, + } + + commandHandlers = map[string]func( + s *discordgo.Session, + i *discordgo.InteractionCreate, + ) { + "confessional": func(s *discordgo.Session, i *discordgo.InteractionCreate) { + if err := state.PublishEvent(state.ConfessionsChannelLinkEvent{ + GuildID: i.GuildID, + ChannelID: i.ChannelID, + Created: time.Now(), + }); err != nil { + logging.Error(fmt.Sprintf( + "failed to publish confession channel link: %s", + err.Error(), + )) + } else { + logging.Info("published confession channel link") + } + }, + + // handle a confession + "confess": func(s *discordgo.Session, i *discordgo.InteractionCreate) { + for _, v := range i.ApplicationCommandData().Options { + if v.Name == "confession" { + confession.MakeConfession(s, i.GuildID, v.StringValue()) + } + } + }, + } +) + +func handleCommand(s *discordgo.Session, e *discordgo.InteractionCreate) { + name := e.ApplicationCommandData().Name + // TODO: audit log + if h, ok := commandHandlers[name]; ok { + h(s, e) + } else { + logging.Debug("no handler for command: %s", name) + } + + s.InteractionRespond(e.Interaction, &discordgo.InteractionResponse{ + Type: discordgo.InteractionResponseChannelMessageWithSource, + Data: &discordgo.InteractionResponseData{ + Flags: discordgo.MessageFlagsEphemeral, + Content: "Your cauldron bubbles...", + }, + }) +} + +func registerCommands(s *discordgo.Session) { + for _, guild := range s.State.Guilds { + cmds, err := s.ApplicationCommandBulkOverwrite( + s.State.Application.ID, + guild.ID, + commandList, + ) + + if err != nil { + logging.Error(fmt.Sprintf( + "Failed to register commands for guild %s: %s", + guild.ID, err.Error(), + )) + } else { + logging.Info(fmt.Sprintf("Registered commands for guild %s", guild.ID)) + registeredCommands[guild.ID] = cmds + } + } +} + +func deregisterCommands(s *discordgo.Session) { + for guild, commands := range registeredCommands { + for _, cmd := range commands { + if err := s.ApplicationCommandDelete( + s.State.Application.ID, + guild, + cmd.ID, + ); err != nil { + logging.Error(fmt.Sprintf( + "Failed to delete %s command (id: %s) from guild %s", + cmd.Name, cmd.ID, guild, + )) + } else { + logging.Info(fmt.Sprintf( + "Deregistered command %s (id: %s) from guild %s", + cmd.Name, cmd.ID, guild, + )) + } + } + } +} diff --git a/internal/discord/discord.go b/internal/discord/discord.go index 896ba1a..6c81929 100644 --- a/internal/discord/discord.go +++ b/internal/discord/discord.go @@ -36,10 +36,13 @@ func Connect(token string) error { return fmt.Errorf("failed to open discord session: %s", err) } + registerCommands(session.s) + return nil } func Close() { + deregisterCommands(session.s) err := session.s.Close() if err != nil { diff --git a/internal/discord/handlers.go b/internal/discord/handlers.go index c01d441..03b5434 100644 --- a/internal/discord/handlers.go +++ b/internal/discord/handlers.go @@ -9,6 +9,7 @@ func addHandlers() { session.s.AddHandler(handleConnect) session.s.AddHandler(handleDisconnect) session.s.AddHandler(handleVoiceStateUpdate) + session.s.AddHandler(handleCommand) // handles InteractionCreate } func handleConnect(s *discordgo.Session, e *discordgo.Connect) { diff --git a/internal/state/events.go b/internal/state/events.go new file mode 100644 index 0000000..44ad0f4 --- /dev/null +++ b/internal/state/events.go @@ -0,0 +1,272 @@ +package state + +import ( + "errors" + "fmt" + "time" + + "gitlab.com/whom/bingobot/internal/config" + "gitlab.com/whom/bingobot/internal/logging" +) + +type VoteEvent map[string]string + +const ( + VoteMissingKeyError = "vote data not found: " + VoteCreatedKey = "created" + VoteRequesterKey = "requester" + VoteActionKey = "action" + + VoteResultKey = "result" + VoteResultPass = "pass" + VoteResultFail = "fail" + VoteResultTie = "tie" + VoteResultTimeout = "timeout" + VoteBadResultError = "vote has invalid result: " + VoteNotFinishedError = "vote has result but isnt finished" + VoteMissingResultError = "vote finished but missing result" + + VoteStatusKey = "status" + VoteStatusInProgress = "in_progress" + VoteStatusFinalized = "finalized" + VoteStatusTimeout = "timed_out" + VoteBadStatusError = "vote has invalid status: " + + VeryOldVote = "1990-01-01T00:00:00Z" +) + +func (ve VoteEvent) Type() EventType { + return Vote +} + +func (ve VoteEvent) Time() time.Time { + t, e := time.Parse(time.RFC3339, ve[VoteCreatedKey]) + if e != nil { + // we have a corrupted event + // return old time so that this event gets + // pruned from cache + logging.Warn( + "pruning corrupted vote event", + "event", + fmt.Sprintf("%+v", ve), + ) + tooOld, _ := time.Parse( + time.RFC3339, + VeryOldVote, + ) + return tooOld + } + return t +} + +func (ve VoteEvent) Data() map[string]string { + return map[string]string(ve) +} + +func (ve VoteEvent) Validate() error { + // make sure action, requester, and created are set + for _, key := range []string{ + VoteActionKey, + VoteRequesterKey, + VoteCreatedKey, + VoteStatusKey, + } { + if _, found := ve[key]; !found { + return errors.New(VoteMissingKeyError + key) + } + } + + status := ve[VoteStatusKey] + if status != VoteStatusTimeout && + status != VoteStatusInProgress && + status != VoteStatusFinalized { + return errors.New(VoteBadStatusError + status) + } + + result, hasResult := ve[VoteResultKey] + if hasResult && status == VoteStatusInProgress { + return errors.New(VoteNotFinishedError) + } + if status != VoteStatusInProgress && !hasResult { + return errors.New(VoteMissingResultError) + } + + if hasResult && + (result != VoteResultPass && + result != VoteResultFail && + result != VoteResultTie && + result != VoteResultTimeout) { + + return errors.New(VoteBadResultError + result) + } + + return nil +} + +func (ve VoteEvent) Disposable() bool { + // will be implemented with democracy module + logging.Warn("unimplemented: VoteEvent::Disposable") + return false +} + +type UserEvent struct { + uid string + created time.Time +} + +const ( + UserEventUserKey = "user" + UserEventCreatedKey = "created" + UserEventBadUserError = "event has bad user" +) + +func (ue UserEvent) Time() time.Time { + return ue.created +} + +func (ue UserEvent) Data() map[string]string { + return map[string]string{ + UserEventUserKey: ue.uid, + UserEventCreatedKey: ue.created.Format(time.RFC3339), + } +} + +func (ue UserEvent) Validate() error { + // empty for now, we may do some validation later. + return nil +} + +func (ue UserEvent) Disposable() bool { + // when I make a module for challenges, restorations, and UserActives + // then I should implement this + logging.Warn("unimplemented: UserEvent::Disposable") + return false +} + +type ChallengeEvent struct { + UserEvent +} + +func (ce ChallengeEvent) Type() EventType { + return Challenge +} + +func NewChallengeEvent(user string) ChallengeEvent { + return ChallengeEvent{UserEvent{ + uid: user, + created: time.Now(), + }} +} + +type RestorationEvent struct { + UserEvent +} + +func (re RestorationEvent) Type() EventType { + return Restoration +} + +func NewRestorationEvent(user string) RestorationEvent { + return RestorationEvent{UserEvent{ + uid: user, + created: time.Now(), + }} +} + +type UserActiveEvent struct { + UserEvent +} + +func (ua UserActiveEvent) Type() EventType { + return UserActive +} + +func (ua UserActiveEvent) Disposable() bool { + return (time.Since(ua.created).Hours() / 24) >= float64(config.Get().UserEventLifespanDays) +} + +func NewUserActiveEvent(user string) UserActiveEvent { + return UserActiveEvent{UserEvent{ + uid: user, + created: time.Now(), + }} +} + +const ( + TestEventDisposeKey = "dispose" + TestEventIDKey = "id" +) + +type TestEvent struct { + Dispose bool + ID int +} + +func (te TestEvent) Type() EventType { + return Test +} + +func (te TestEvent) Time() time.Time { + return time.Now() +} + +func (te TestEvent) Data() map[string]string { + m := map[string]string{} + if te.Dispose { + m[TestEventDisposeKey] = "t" + } + m[TestEventIDKey] = fmt.Sprintf("%d", te.ID) + return m +} + +func (te TestEvent) Validate() error { + return nil +} + +func (te TestEvent) Disposable() bool { + return te.Dispose +} + +const ( + ConfessionsLinkEventGuildKey = "guild_id" + ConfessionsLinkEventChannelKey = "channel_id" + ConfessionsLinkEventCreatedKey = "created" + ConfessionsLinkEventObsoleteKey = "obsolete" + + BadConfessionsLinkEventError = "link event doesnt have required fields" +) + +type ConfessionsChannelLinkEvent struct { + GuildID string + ChannelID string + Created time.Time +} + +func (e ConfessionsChannelLinkEvent) Type() EventType { + return ConfessionsChannelLink +} + +func (e ConfessionsChannelLinkEvent) Time() time.Time { + return e.Time() +} + +func (e ConfessionsChannelLinkEvent) Data() map[string]string { + return map[string]string{ + ConfessionsLinkEventGuildKey: e.GuildID, + ConfessionsLinkEventChannelKey: e.ChannelID, + ConfessionsLinkEventCreatedKey: e.Created.Format(time.RFC3339), + } +} + +func (e ConfessionsChannelLinkEvent) Validate() error { + if len(e.ChannelID) <= 1 || len(e.GuildID) <= 1 { + return errors.New(BadConfessionsLinkEventError) + } + + return nil +} + +func (e ConfessionsChannelLinkEvent) Disposable() bool { + // TODO + return false +} diff --git a/internal/state/state.go b/internal/state/state.go index 1325546..cc439f5 100644 --- a/internal/state/state.go +++ b/internal/state/state.go @@ -1,6 +1,6 @@ package state -/* STATE +/* State module * This package encapsulates various state information * state is represented in various global singletons * Additionally, this package offers a pub/sub interface @@ -8,15 +8,15 @@ package state */ import ( - "fmt" + "encoding/json" + "errors" + "os" + "strconv" "sync" "time" - "os" - "errors" - "encoding/json" - "gitlab.com/whom/bingobot/internal/docbuf" "gitlab.com/whom/bingobot/internal/logging" + "gitlab.com/whom/bingobot/pkg/docbuf" ) /* Event interface is meant to encapsulate a general interface @@ -36,14 +36,17 @@ const ( BadChallengeEvent = "failed to make Challenge Event" BadRestorationEvent = "failed to make Restoration Event" BadUserActiveEvent = "failed to make UserActive Event" + BadTestEvent = "failed to make Test Event" BadEventObjMarshal = "error marshalling event" BadEventObjUnmarshal = "failed to unmarshal event map" BadEventUnmarshal = "failed to unmarshal event" BadEventMissingTypeKey = "event map missing type key" -) + StartBeforeInitError = "state machine not initialized" + ReopenStoreFileError = "failed to reopen store file" -const ( EventTypeMapKey = "type" + + TmpDocBufBackingFile = "bingobot_events_processing" ) var eventMutex sync.RWMutex @@ -60,7 +63,7 @@ func Init(eventMemCacheSize int, eventStoreFileName string) error { } if eventStream, err = docbuf.NewDocumentBuffer( - eventMemCacheSize, + 0, // temporary nil-cache buffer to replay events from file, ); err != nil { return errors.Join(errors.New(BadEventStreamInit), err) @@ -71,8 +74,92 @@ func Init(eventMemCacheSize int, eventStoreFileName string) error { return nil } +// replay events and begin state machine +func Start() error { + if eventStream == nil { + return errors.New(StartBeforeInitError) + } + + tmpBackFile, err := os.CreateTemp("", TmpDocBufBackingFile) + if err != nil { + return err + } + + tmpBuf, err := docbuf.NewDocumentBuffer(0, tmpBackFile) + if err != nil { + tmpBackFile.Close() + return err + } + + // filter out disposable events into a reverse ordered on disk stack + var doc string + for err == nil || err.Error() != docbuf.PopFromEmptyBufferError { + doc, err = eventStream.Pop() + if err != nil && err.Error() != docbuf.PopFromEmptyBufferError { + return err + } + + if doc == "" || + (err != nil && err.Error() == docbuf.PopFromEmptyBufferError) { + break + } + + ev, err := EventFromString(doc) + if err != nil { + logging.Warn("Discarding the following event for not unmarshaling: %s", doc) + continue + } + + if !ev.Disposable() { + tmpBuf.Push(doc) + } + } + + if err := os.Truncate(eventStorageFileName, 0); err != nil { + return errors.Join(errors.New(ReopenStoreFileError), err) + } + + file, err := os.OpenFile(eventStorageFileName, os.O_CREATE|os.O_RDWR, 0644) + if err != nil { + return errors.Join(errors.New(BadEventStoreFilename), err) + } + + eventStream, err = docbuf.NewDocumentBuffer(maxEventsInMemory, file) + if err != nil { + // return error here will panic without truncating or writing to the file + return err + } + + // unravel tmp stack into properly ordered properly allocated buffer + for err == nil || err.Error() != docbuf.PopFromEmptyBufferError { + doc, err := tmpBuf.Pop() + if err != nil && err.Error() != docbuf.PopFromEmptyBufferError { + logging.Warn("Could not handle the following error: %s", err.Error()) + continue + } + + if doc == "" || + (err != nil && err.Error() == docbuf.PopFromEmptyBufferError) { + break + } + + ev, err := EventFromString(doc) + if err != nil { + logging.Warn("Could not handle the following error: %s", err.Error()) + continue + } + + if err := PublishEvent(ev); err != nil { + logging.Warn("Could not handle the following error: %s", err.Error()) + } + } + + return nil +} + // return no error. we are handling SIGINT func Teardown() { + logging.Warn("Tearing down state engine") // riskily ignore all locking.... // we are handling a termination signal after all i, e := eventStream.Close() @@ -81,6 +168,7 @@ func Teardown() { logging.Warn("will attempt to truncate event store anyways") } + logging.Warn("teardown: events flushed") e = os.Truncate(eventStorageFileName, i) if e != nil { logging.Error("FAILED TO TRUNCATE EVENT STORE!!") @@ -96,6 +184,8 @@ const ( Challenge Restoration UserActive + Test + ConfessionsChannelLink // ... // leave this last @@ -113,6 +203,10 @@ func EventTypeFromString(doc string) EventType { return Restoration case "UserActive": return UserActive + case "Test": + return Test + case "ConfessionsChannelLink": + return ConfessionsChannelLink default: // error case return NumEventTypes @@ -125,6 +219,8 @@ func (et EventType) String() string { "Challenge", "Restoration", "UserActive", + "Test", + "ConfessionsChannelLink", } if et < 0 || et >= NumEventTypes { @@ -176,24 +272,71 @@ func (et EventType) MakeEvent(data map[string]string) (Event, error) { switch et { case Vote: return VoteEvent(data), nil + case Challenge: e, err := MakeUserEvent(data) if err != nil { return nil, errors.Join(errors.New(BadChallengeEvent), err) } return ChallengeEvent{*e}, nil + case Restoration: e, err := MakeUserEvent(data) if err != nil { return nil, errors.Join(errors.New(BadRestorationEvent), err) } return RestorationEvent{*e}, nil + case UserActive: e, err := MakeUserEvent(data) if err != nil { return nil, errors.Join(errors.New(BadUserActiveEvent), err) } return UserActiveEvent{*e}, nil + + case Test: + disp := false + if v, ok := data[TestEventDisposeKey]; ok && v == "t" { + disp = true + } + id := -1 + if v, ok := data[TestEventIDKey]; ok { + var err error + if id, err = strconv.Atoi(v); err != nil { + return nil, errors.Join(errors.New(BadTestEvent), err) + } + } + return TestEvent{ + Dispose: disp, + ID: id, + }, nil + + case ConfessionsChannelLink: + gid, ok := data[ConfessionsLinkEventGuildKey] + if !ok { + return nil, errors.New(BadConfessionsLinkEventError) + } + + cid, ok := data[ConfessionsLinkEventChannelKey] + if !ok { + return nil, errors.New(BadConfessionsLinkEventError) + } + + ti, ok := data[ConfessionsLinkEventCreatedKey] + if !ok { + return nil, errors.New(BadConfessionsLinkEventError) + } + t, err := time.Parse(time.RFC3339, ti) + if err != nil { + return nil, errors.Join(errors.New(BadConfessionsLinkEventError), err) + } + + return ConfessionsChannelLinkEvent{ + GuildID: gid, + ChannelID: cid, + Created: t, + }, nil + default: return nil, errors.New(BadEventTypeError) } @@ -232,6 +375,9 @@ type Event interface { // validates state of internal metadata per EventType Validate() error + + // returns true if event can be discarded + Disposable() bool } func EventToString(e Event) (string, error) { @@ -283,7 +429,7 @@ func PublishEvent(e Event) error { blocking := false for _, c := range eventSubscriptionCache[e.Type()] { - if float32(len(c)) > (float32(maxEventsInMemory) * 0.25) { + if float32(len(c)) > (float32(maxEventsInMemory) * 0.75) { if len(c) == maxEventsInMemory { logging.Warn( "PublishEvent() blocking -- event channel full", @@ -373,8 +519,6 @@ func GetMatchingEvents( } filter := func(e Event) bool { - ev, er := EventToString(e) - fmt.Printf("Checking: %s (%e)\n", ev, er) if e.Type() != t { return true } @@ -385,7 +529,6 @@ func GetMatchingEvents( } } - fmt.Println("Found Match") matches = append(matches, e) return true } @@ -394,168 +537,3 @@ func GetMatchingEvents( return matches, err } -type VoteEvent map[string]string - -const ( - VoteMissingKeyError = "vote data not found: " - VoteCreatedKey = "created" - VoteRequesterKey = "requester" - VoteActionKey = "action" - - VoteResultKey = "result" - VoteResultPass = "pass" - VoteResultFail = "fail" - VoteResultTie = "tie" - VoteResultTimeout = "timeout" - VoteBadResultError = "vote has invalid result: " - VoteNotFinishedError = "vote has result but isnt finished" - VoteMissingResultError = "vote finished but missing result" - - VoteStatusKey = "status" - VoteStatusInProgress = "in_progress" - VoteStatusFinalized = "finalized" - VoteStatusTimeout = "timed_out" - VoteBadStatusError = "vote has invalid status: " - - VeryOldVote = "1990-01-01T00:00:00Z" -) - -func (ve VoteEvent) Type() EventType { - return Vote -} - -func (ve VoteEvent) Time() time.Time { - t, e := time.Parse(time.RFC3339, ve[VoteCreatedKey]) - if e != nil { - // we have a corrupted event - // return old time so that this event gets - // pruned from cache - logging.Warn( - "pruning corrupted vote event", - "event", - fmt.Sprintf("%+v", ve), - ) - tooOld, _ := time.Parse( - time.RFC3339, - VeryOldVote, - ) - return tooOld - } - return t -} - -func (ve VoteEvent) Data() map[string]string { - return map[string]string(ve) -} - -func (ve VoteEvent) Validate() error { - // make sure action, requester, and created are set - for _, key := range []string{ - VoteActionKey, - VoteRequesterKey, - VoteCreatedKey, - VoteStatusKey, - } { - if _, found := ve[key]; !found { - return errors.New(VoteMissingKeyError + key) - } - } - - status := ve[VoteStatusKey] - if status != VoteStatusTimeout && - status != VoteStatusInProgress && - status != VoteStatusFinalized { - return errors.New(VoteBadStatusError + status) - } - - result, hasResult := ve[VoteResultKey] - if hasResult && status == VoteStatusInProgress { - return errors.New(VoteNotFinishedError) - } - if status != VoteStatusInProgress && !hasResult { - return errors.New(VoteMissingResultError) - } - - if hasResult && - (result != VoteResultPass && - result != VoteResultFail && - result != VoteResultTie && - result != VoteResultTimeout) { - - return errors.New(VoteBadResultError + result) - } - - return nil -} - -type UserEvent struct { - uid string - created time.Time -} - -const ( - UserEventUserKey = "user" - UserEventCreatedKey = "created" - UserEventBadUserError = "event has bad user" -) - -func (ue UserEvent) Time() time.Time { - return ue.created -} - -func (ue UserEvent) Data() map[string]string { - return map[string]string{ - UserEventUserKey: ue.uid, - UserEventCreatedKey: ue.created.Format(time.RFC3339), - } -} - -func (ue UserEvent) Validate() error { - // empty for now, we may do some validation later. - return nil -} - -type ChallengeEvent struct { - UserEvent -} - -func (ce ChallengeEvent) Type() EventType { - return Challenge -} - -func NewChallengeEvent(user string) ChallengeEvent { - return ChallengeEvent{UserEvent{ - uid: user, - created: time.Now(), - }} -} - -type RestorationEvent struct { - UserEvent -} - -func (re RestorationEvent) Type() EventType { - return Restoration -} - -func NewRestorationEvent(user string) RestorationEvent { - return RestorationEvent{UserEvent{ - uid: user, - created: time.Now(), - }} -} - -type UserActiveEvent struct { - UserEvent -} - -func (ua UserActiveEvent) Type() EventType { - return UserActive -} - -func NewUserActiveEvent(user string) UserActiveEvent { - return UserActiveEvent{UserEvent{ - uid: user, - created: time.Now(), - }} -} diff --git a/internal/state/state_test.go b/internal/state/state_test.go index e6d4073..877b724 100644 --- a/internal/state/state_test.go +++ b/internal/state/state_test.go @@ -2,11 +2,12 @@ package state import ( "fmt" + "os" "testing" "time" - "gitlab.com/whom/bingobot/internal/docbuf" "gitlab.com/whom/bingobot/internal/logging" + "gitlab.com/whom/bingobot/pkg/docbuf" ) /* WARNING: @@ -54,15 +55,11 @@ func TestEventMarshalUnmarshal(t *testing.T) { t.Fatalf("error marshalling challenge: %e, %s", err, cstr) } - t.Logf("cstr: %s\n", cstr) - vstr, err := EventToString(vev); if err != nil { t.Fatalf("error marshalling vote: %e, %s", err, vstr) } - t.Logf("vstr: %s\n", vstr) - if ev, err := EventFromString(cstr); err != nil || ev.Data()[UserEventUserKey] != cev.Data()[UserEventUserKey] || ev.Data()[UserEventCreatedKey] != cev.Data()[UserEventCreatedKey] { @@ -270,3 +267,81 @@ func TestVoteEventValidations(t *testing.T) { t.Errorf("Unexpected or no error: %e", err) } } + +func TestEventReplay(t *testing.T) { + tmpTestFile, err := os.CreateTemp("", "TestEventReplayBackingStore") + if err != nil { + t.Fatalf("failed setting up tempfile: %s", err.Error()) + } + + if err := tmpTestFile.Close(); err != nil { + t.Fatalf("failed to close initial handle to tempfile: %s", err.Error()) + } + + if err := Init(2, tmpTestFile.Name()); err != nil { + t.Fatalf("failed initializing state machine: %s", err.Error()) + } + + if err := PublishEvent(TestEvent{ + ID: 1, + Dispose: true, + }); err != nil { + t.Fatalf("failed to publish event 1: %s", err.Error()) + } + + if err := PublishEvent(TestEvent{ + ID: 2, + Dispose: false, + }); err != nil { + t.Fatalf("failed to publish event 2: %s", err.Error()) + } + + if err := PublishEvent(TestEvent{ + ID: 3, + Dispose: true, + }); err != nil { + t.Fatalf("failed to publish event 3: %s", err.Error()) + } + + if err := PublishEvent(TestEvent{ + ID: 4, + Dispose: false, + }); err != nil { + t.Fatalf("failed to publish event 4: %s", err.Error()) + } + + sub, err := Test.Subscribe() + if err := Start(); err != nil { + t.Fatalf("failed to start state machine: %s", err.Error()) + } + + select { + case ev1 := <- sub: + if ev1.Type() != Test { + t.Fatalf("wrong kind of event somehow: %+v", ev1) + } + if ev1.(TestEvent).ID != 2 { + t.Fatalf("misordered event: %+v", ev1) + } + default: + t.Fatal("no events available from replay") + } + + select { + case ev2 := <- sub: + if ev2.Type() != Test { + t.Fatalf("wrong kind of event somehow: %+v", ev2) + } + if ev2.(TestEvent).ID != 4 { + t.Fatalf("misordered event: %+v", ev2) + } + default: + t.Fatal("only one event made it out") + } + + select { + case <- sub: + t.Fatalf("superfluous events left in subscription") + default: + } +} diff --git a/internal/web/web.go b/internal/web/web.go new file mode 100644 index 0000000..fbe65eb --- /dev/null +++ b/internal/web/web.go @@ -0,0 +1,36 @@ +package web + +import ( + "fmt" + "net/http" + + "gitlab.com/whom/bingobot/internal/activity" + "gitlab.com/whom/bingobot/internal/config" + "gitlab.com/whom/bingobot/internal/logging" +) + +func Start() error { + frag := config.Get().LocalWebEndpointListen + http.HandleFunc("/", HandleHttpRequest) + go logging.Error(http.ListenAndServe(frag, nil).Error()) + + logging.Debug("Web handlers started") + return nil +} + +func HandleHttpRequest(w http.ResponseWriter, r *http.Request) { + fmt.Fprintf(w, "") + fmt.Fprintf(w, "

User Activity Metrics

") + fmt.Fprintf(w, "

number of the last %d days a user has been active

", + config.Get().UserEventLifespanDays) + + fmt.Fprintf(w, "") + fmt.Fprintf(w, "") + for k, v := range activity.GetActivitySnapshot() { + fmt.Fprintf(w, "", + k, len(v)) + } + fmt.Fprintf(w, "
UserDays
%s%d
") + + fmt.Fprintf(w, "") +} diff --git a/main.go b/main.go index 21111e9..96e2e36 100644 --- a/main.go +++ b/main.go @@ -3,11 +3,16 @@ package main import ( "flag" "log" + "os" + "os/signal" + "gitlab.com/whom/bingobot/internal/activity" + "gitlab.com/whom/bingobot/internal/confession" "gitlab.com/whom/bingobot/internal/config" "gitlab.com/whom/bingobot/internal/discord" "gitlab.com/whom/bingobot/internal/logging" "gitlab.com/whom/bingobot/internal/state" + "gitlab.com/whom/bingobot/internal/web" ) var ( @@ -26,30 +31,51 @@ func main() { logging.Init() flag.Parse() + logging.Info("startup: initializing state engine") if err := state.Init( config.Get().InMemoryEventCacheSize, config.Get().PersistentCacheStore, ); err != nil { log.Fatalf("couldn't initialize state engine: %s", err.Error()) } - defer state.Teardown() - err = startBot() + logging.Info("startup: starting activity module") + if err := activity.Start(); err != nil { + log.Fatalf("failed to start activity module: %s", err.Error()) + } - if err != nil { + logging.Info("startup: starting confession module") + if err := confession.Start(); err != nil { + log.Fatalf("failed to start confession module: %s", err.Error()) + } + + logging.Info("startup: starting web module") + go func() { + if err := web.Start(); err != nil { + log.Fatalf("failed to start local web server: %s", err.Error()) + } + }() + + // start this LAST + logging.Info("startup: starting state engine") + if err := state.Start(); err != nil { + log.Fatalf("failed to start state machine: %s", err.Error()) + } + + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + go func(){ + for _ = range c { + state.Teardown() + discord.Close() + os.Exit(1) + } + }() + + logging.Info("startup: connecting to discord") + if err := discord.Connect(*token); err != nil { log.Fatal(err) } -} - -func startBot() error { - err := discord.Connect(*token) - - if err != nil { - return err - } - - logging.Info("shutting down gracefully", "type", "shutdown") - discord.Close() - - return nil + + for {} } diff --git a/internal/docbuf/docbuf.go b/pkg/docbuf/docbuf.go similarity index 98% rename from internal/docbuf/docbuf.go rename to pkg/docbuf/docbuf.go index cc0610e..cd3f633 100644 --- a/internal/docbuf/docbuf.go +++ b/pkg/docbuf/docbuf.go @@ -263,6 +263,11 @@ func NewDocumentBuffer( * If cache is full oldest document is written to backing writer. */ func (b *DocBuf) Push(doc string) error { + if b.cacheSize == 0 { + b.writeToBackingStore(doc) + return nil + } + if len(b.cache) >= b.cacheSize { if err := b.demote(); err != nil { return err @@ -280,6 +285,14 @@ func (b *DocBuf) Push(doc string) error { * out of the backing ReaderWriter. */ func (b *DocBuf) Pop() (string, error) { + if b.cacheSize == 0 { + d, e := b.readDocumentsFromDisk(1, true, false) + if len(d) > 0 { + return d[0], e + } + return "", e + } + if len(b.cache) < 1 { if err := b.promote(true); err != nil { return "", err diff --git a/internal/docbuf/docbuf_test.go b/pkg/docbuf/docbuf_test.go similarity index 100% rename from internal/docbuf/docbuf_test.go rename to pkg/docbuf/docbuf_test.go diff --git a/internal/docbuf/read_write_seek_string.go b/pkg/docbuf/read_write_seek_string.go similarity index 100% rename from internal/docbuf/read_write_seek_string.go rename to pkg/docbuf/read_write_seek_string.go diff --git a/internal/docbuf/read_write_seek_string_test.go b/pkg/docbuf/read_write_seek_string_test.go similarity index 100% rename from internal/docbuf/read_write_seek_string_test.go rename to pkg/docbuf/read_write_seek_string_test.go