Compare commits

..

2 commits

Author SHA1 Message Date
e7d229c217 Early functionality modules
0. tests for event replay
   Tests are now included for the event replay features. This includes many fixes on top of
   the last commit as well as some tweaks to config module values.
1. activity module
   An activity module is created to track the useractive events and provide a counter for them.
   It also encapsulates logic to discard old useractive events.
2. web module
   A web module is created. This module serves a static webpage showing runtime information.
   Currently it only shows a snapshot of the user activity data. It is my intention that it
   eventually also shows an audit log, known users and channels, uptime, and more. Future work
   will also be needed in order to use HTML templating so that it doesn't look so... basic.
   Live updates to the information may also be desired.

Signed-off-by: Ava Affine <ava@sunnypup.io>
2025-01-07 16:43:55 -08:00
2515d396a0 Event replay and cleanup
This commit introduces the ability to reconfigure modules/event subscribers by
replaying previously stored events at startup. This way modules/subscribers can
always come up with the latest activity and data that was present at close at the
last run of the program. This process also includes the disposal of unneeded events
to minimize disk use over time. The changes include the following:

1. Event interface is extended with the Disposable() function
   unimplemented stubs for existing interface implementations
2. state.Init() is split into Init() and Start()
   Init() now initialized the file used by eventStream, and puts in place a 0 size
   memory cache as a temporary measure on top of the file.
   Start() reads Pop()s documents one by one from the temp nil-cache eventStream,
   it attempts to dispose of each event and if the event is not disposable it is
   pushed onto a second temporary no-memory-cache buffer which is then reverse ordered.
   The underlying file is truncated and reopened.
   Finally, the real eventStream is allocated, with in memory cache. All events in the
   second buffer are published (sent to subscribers as well as added to the new eventStream).
3. Updates to main() to support Init() vs Start()

Signed-off-by: Ava Affine <ava@sunnypup.io>
2024-12-06 13:00:50 -08:00
10 changed files with 24 additions and 317 deletions

View file

@ -2,13 +2,12 @@ package activity
import ( import (
"errors" "errors"
"fmt"
"sync" "sync"
"time" "time"
"gitlab.com/whom/bingobot/internal/config" "gitlab.com/whom/bingobot/internal/config"
"gitlab.com/whom/bingobot/internal/logging"
"gitlab.com/whom/bingobot/internal/state" "gitlab.com/whom/bingobot/internal/state"
"gitlab.com/whom/bingobot/internal/logging"
) )
/* Activity module /* Activity module
@ -19,7 +18,7 @@ const (
ActivityModuleStartFail = "failed to start activity module" ActivityModuleStartFail = "failed to start activity module"
) )
var currentUserActivity = make(map[string][]state.UserActiveEvent) var currentUserActivity map[string][]state.UserActiveEvent
var userActivityLock sync.RWMutex var userActivityLock sync.RWMutex
func Start() error { func Start() error {
@ -39,10 +38,6 @@ func Start() error {
user := emap[state.UserEventUserKey] user := emap[state.UserEventUserKey]
etime := ev.Time() etime := ev.Time()
delta := time.Since(etime).Hours() / float64(24) delta := time.Since(etime).Hours() / float64(24)
logging.Debug(fmt.Sprintf(
"processing UserActive event for %s", user,
))
if delta <= float64(config.Get().UserEventLifespanDays) { if delta <= float64(config.Get().UserEventLifespanDays) {
new := []state.UserActiveEvent{ev.(state.UserActiveEvent)} new := []state.UserActiveEvent{ev.(state.UserActiveEvent)}
@ -51,7 +46,6 @@ func Start() error {
if found { if found {
new = append(new, current...) new = append(new, current...)
} }
currentUserActivity[user] = new
userActivityLock.Unlock() userActivityLock.Unlock()
} else { } else {
logging.Warn("recieved expired useractive event") logging.Warn("recieved expired useractive event")

View file

@ -1,60 +0,0 @@
package confession
import (
"errors"
"fmt"
"sync"
"github.com/bwmarrin/discordgo"
"gitlab.com/whom/bingobot/internal/logging"
"gitlab.com/whom/bingobot/internal/state"
)
/* Activity module
* This module posts anonymous confessions according to a linked channel map
*/
const (
ActivityModuleStartFail = "failed to start activity module"
)
var (
// guild ID to channel ID
linkLock sync.RWMutex
confessionChannelLinks = make(map[string]state.ConfessionsChannelLinkEvent)
)
func Start() error {
ch, err := state.ConfessionsChannelLink.Subscribe()
if err != nil {
return errors.Join(
errors.New(ActivityModuleStartFail),
err,
)
}
// process incoming events loop
go func() {
for {
ev := <- ch
logging.Debug("recieved new confessional channel link")
e := ev.(state.ConfessionsChannelLinkEvent)
linkLock.Lock()
confessionChannelLinks[e.GuildID] = e
linkLock.Unlock()
}
}()
return nil
}
func MakeConfession(s *discordgo.Session, guildID string, content string) {
linkLock.RLock()
link, ok := confessionChannelLinks[guildID]
linkLock.RUnlock()
if !ok {
logging.Error(fmt.Sprintf("Failed to send confession in guild %s: no link exists in map", guildID))
return
}
s.ChannelMessageSend(link.ChannelID, content)
}

View file

@ -42,7 +42,7 @@ func NewActivityTimer(uid string) *UserActivityTimer {
Start() initializes the timer and calls run() Start() initializes the timer and calls run()
*/ */
func (t *UserActivityTimer) Start(ctx context.Context) { func (t *UserActivityTimer) Start(ctx context.Context) {
logging.Debug("starting voiceActivityTimer", "uid", t.UID) logging.Info("starting voiceActivityTimer", "uid", t.UID)
t.sleepDuration = time.Millisecond * time.Duration(config.Get().VoiceActivityTimerSleepIntervalMillis) t.sleepDuration = time.Millisecond * time.Duration(config.Get().VoiceActivityTimerSleepIntervalMillis)
activityTimerDuration := time.Second * time.Duration(config.Get().VoiceActivityThresholdSeconds) activityTimerDuration := time.Second * time.Duration(config.Get().VoiceActivityThresholdSeconds)
@ -89,7 +89,7 @@ func (t *UserActivityTimer) run() {
} }
// the timer's context has been cancelled or deadline expired. // the timer's context has been cancelled or deadline expired.
logging.Debug("voiceActivityTimer stopping", "uid", t.UID, "reason", context.Cause(t.ctx)) logging.Info("voiceActivityTimer stopping", "uid", t.UID, "reason", context.Cause(t.ctx))
if context.Cause(t.ctx) == ErrTimerExpired { if context.Cause(t.ctx) == ErrTimerExpired {
/* /*
@ -128,7 +128,7 @@ func stopActivityTimer(uid string) {
timerMutex.Lock() timerMutex.Lock()
defer timerMutex.Unlock() defer timerMutex.Unlock()
if _, ok := activityTimers[uid]; ok { if _, ok := activityTimers[uid]; !ok {
activityTimers[uid].Cancel() activityTimers[uid].Cancel()
delete(activityTimers, uid) delete(activityTimers, uid)
} }
@ -147,5 +147,5 @@ func emitUserActiveEvent(uid string) {
return return
} }
logging.Debug("published UserActiveEvent", "uid", uid) logging.Info("published UserActiveEvent", "uid", uid)
} }

View file

@ -1,129 +0,0 @@
package discord
import (
"fmt"
"time"
"github.com/bwmarrin/discordgo"
"gitlab.com/whom/bingobot/internal/confession"
"gitlab.com/whom/bingobot/internal/logging"
"gitlab.com/whom/bingobot/internal/state"
)
var (
// map of guildID to registeredCommands
registeredCommands = make(map[string][]*discordgo.ApplicationCommand)
// all commands
commandList = []*discordgo.ApplicationCommand{
// TODO: Limit usage somehow?
// maybe delete this and use the vote module instead
&discordgo.ApplicationCommand{
Name: "confessional",
Description: "mark a channel as a designated confessional for a guild",
},
&discordgo.ApplicationCommand{
Name: "confess",
Description: "anonymously post a confession in configured channel",
Options: []*discordgo.ApplicationCommandOption{
&discordgo.ApplicationCommandOption{
Type: discordgo.ApplicationCommandOptionString,
Name: "confession",
Description: "A confession to be posted anonymously",
Required: true,
},
},
},
}
commandHandlers = map[string]func(
s *discordgo.Session,
i *discordgo.InteractionCreate,
) {
"confessional": func(s *discordgo.Session, i *discordgo.InteractionCreate) {
if err := state.PublishEvent(state.ConfessionsChannelLinkEvent{
GuildID: i.GuildID,
ChannelID: i.ChannelID,
Created: time.Now(),
}); err != nil {
logging.Error(fmt.Sprintf(
"failed to publish confession channel link: %s",
err.Error(),
))
} else {
logging.Info("published confession channel link")
}
},
// handle a confession
"confess": func(s *discordgo.Session, i *discordgo.InteractionCreate) {
for _, v := range i.ApplicationCommandData().Options {
if v.Name == "confession" {
confession.MakeConfession(s, i.GuildID, v.StringValue())
}
}
},
}
)
func handleCommand(s *discordgo.Session, e *discordgo.InteractionCreate) {
name := e.ApplicationCommandData().Name
// TODO: audit log
if h, ok := commandHandlers[name]; ok {
h(s, e)
} else {
logging.Debug("no handler for command: %s", name)
}
s.InteractionRespond(e.Interaction, &discordgo.InteractionResponse{
Type: discordgo.InteractionResponseChannelMessageWithSource,
Data: &discordgo.InteractionResponseData{
Flags: discordgo.MessageFlagsEphemeral,
Content: "Your cauldron bubbles...",
},
})
}
func registerCommands(s *discordgo.Session) {
for _, guild := range s.State.Guilds {
cmds, err := s.ApplicationCommandBulkOverwrite(
s.State.Application.ID,
guild.ID,
commandList,
)
if err != nil {
logging.Error(fmt.Sprintf(
"Failed to register commands for guild %s: %s",
guild.ID, err.Error(),
))
} else {
logging.Info(fmt.Sprintf("Registered commands for guild %s", guild.ID))
registeredCommands[guild.ID] = cmds
}
}
}
func deregisterCommands(s *discordgo.Session) {
for guild, commands := range registeredCommands {
for _, cmd := range commands {
if err := s.ApplicationCommandDelete(
s.State.Application.ID,
guild,
cmd.ID,
); err != nil {
logging.Error(fmt.Sprintf(
"Failed to delete %s command (id: %s) from guild %s",
cmd.Name, cmd.ID, guild,
))
} else {
logging.Info(fmt.Sprintf(
"Deregistered command %s (id: %s) from guild %s",
cmd.Name, cmd.ID, guild,
))
}
}
}
}

View file

@ -36,13 +36,10 @@ func Connect(token string) error {
return fmt.Errorf("failed to open discord session: %s", err) return fmt.Errorf("failed to open discord session: %s", err)
} }
registerCommands(session.s)
return nil return nil
} }
func Close() { func Close() {
deregisterCommands(session.s)
err := session.s.Close() err := session.s.Close()
if err != nil { if err != nil {

View file

@ -9,7 +9,6 @@ func addHandlers() {
session.s.AddHandler(handleConnect) session.s.AddHandler(handleConnect)
session.s.AddHandler(handleDisconnect) session.s.AddHandler(handleDisconnect)
session.s.AddHandler(handleVoiceStateUpdate) session.s.AddHandler(handleVoiceStateUpdate)
session.s.AddHandler(handleCommand) // handles InteractionCreate
} }
func handleConnect(s *discordgo.Session, e *discordgo.Connect) { func handleConnect(s *discordgo.Session, e *discordgo.Connect) {

View file

@ -226,47 +226,3 @@ func (te TestEvent) Validate() error {
func (te TestEvent) Disposable() bool { func (te TestEvent) Disposable() bool {
return te.Dispose return te.Dispose
} }
const (
ConfessionsLinkEventGuildKey = "guild_id"
ConfessionsLinkEventChannelKey = "channel_id"
ConfessionsLinkEventCreatedKey = "created"
ConfessionsLinkEventObsoleteKey = "obsolete"
BadConfessionsLinkEventError = "link event doesnt have required fields"
)
type ConfessionsChannelLinkEvent struct {
GuildID string
ChannelID string
Created time.Time
}
func (e ConfessionsChannelLinkEvent) Type() EventType {
return ConfessionsChannelLink
}
func (e ConfessionsChannelLinkEvent) Time() time.Time {
return e.Time()
}
func (e ConfessionsChannelLinkEvent) Data() map[string]string {
return map[string]string{
ConfessionsLinkEventGuildKey: e.GuildID,
ConfessionsLinkEventChannelKey: e.ChannelID,
ConfessionsLinkEventCreatedKey: e.Created.Format(time.RFC3339),
}
}
func (e ConfessionsChannelLinkEvent) Validate() error {
if len(e.ChannelID) <= 1 || len(e.GuildID) <= 1 {
return errors.New(BadConfessionsLinkEventError)
}
return nil
}
func (e ConfessionsChannelLinkEvent) Disposable() bool {
// TODO
return false
}

View file

@ -159,7 +159,6 @@ func Start() error {
// return no error. we are handling SIGINT // return no error. we are handling SIGINT
func Teardown() { func Teardown() {
logging.Warn("Tearing down state engine")
// riskily ignore all locking.... // riskily ignore all locking....
// we are handling a termination signal after all // we are handling a termination signal after all
i, e := eventStream.Close() i, e := eventStream.Close()
@ -168,7 +167,6 @@ func Teardown() {
logging.Warn("will attempt to truncate event store anyways") logging.Warn("will attempt to truncate event store anyways")
} }
logging.Warn("teardown: events flushed")
e = os.Truncate(eventStorageFileName, i) e = os.Truncate(eventStorageFileName, i)
if e != nil { if e != nil {
logging.Error("FAILED TO TRUNCATE EVENT STORE!!") logging.Error("FAILED TO TRUNCATE EVENT STORE!!")
@ -185,7 +183,6 @@ const (
Restoration Restoration
UserActive UserActive
Test Test
ConfessionsChannelLink
// ... // ...
// leave this last // leave this last
@ -205,8 +202,6 @@ func EventTypeFromString(doc string) EventType {
return UserActive return UserActive
case "Test": case "Test":
return Test return Test
case "ConfessionsChannelLink":
return ConfessionsChannelLink
default: default:
// error case // error case
return NumEventTypes return NumEventTypes
@ -220,7 +215,6 @@ func (et EventType) String() string {
"Restoration", "Restoration",
"UserActive", "UserActive",
"Test", "Test",
"ConfessionsChannelLink",
} }
if et < 0 || et >= NumEventTypes { if et < 0 || et >= NumEventTypes {
@ -272,28 +266,24 @@ func (et EventType) MakeEvent(data map[string]string) (Event, error) {
switch et { switch et {
case Vote: case Vote:
return VoteEvent(data), nil return VoteEvent(data), nil
case Challenge: case Challenge:
e, err := MakeUserEvent(data) e, err := MakeUserEvent(data)
if err != nil { if err != nil {
return nil, errors.Join(errors.New(BadChallengeEvent), err) return nil, errors.Join(errors.New(BadChallengeEvent), err)
} }
return ChallengeEvent{*e}, nil return ChallengeEvent{*e}, nil
case Restoration: case Restoration:
e, err := MakeUserEvent(data) e, err := MakeUserEvent(data)
if err != nil { if err != nil {
return nil, errors.Join(errors.New(BadRestorationEvent), err) return nil, errors.Join(errors.New(BadRestorationEvent), err)
} }
return RestorationEvent{*e}, nil return RestorationEvent{*e}, nil
case UserActive: case UserActive:
e, err := MakeUserEvent(data) e, err := MakeUserEvent(data)
if err != nil { if err != nil {
return nil, errors.Join(errors.New(BadUserActiveEvent), err) return nil, errors.Join(errors.New(BadUserActiveEvent), err)
} }
return UserActiveEvent{*e}, nil return UserActiveEvent{*e}, nil
case Test: case Test:
disp := false disp := false
if v, ok := data[TestEventDisposeKey]; ok && v == "t" { if v, ok := data[TestEventDisposeKey]; ok && v == "t" {
@ -310,33 +300,6 @@ func (et EventType) MakeEvent(data map[string]string) (Event, error) {
Dispose: disp, Dispose: disp,
ID: id, ID: id,
}, nil }, nil
case ConfessionsChannelLink:
gid, ok := data[ConfessionsLinkEventGuildKey]
if !ok {
return nil, errors.New(BadConfessionsLinkEventError)
}
cid, ok := data[ConfessionsLinkEventChannelKey]
if !ok {
return nil, errors.New(BadConfessionsLinkEventError)
}
ti, ok := data[ConfessionsLinkEventCreatedKey]
if !ok {
return nil, errors.New(BadConfessionsLinkEventError)
}
t, err := time.Parse(time.RFC3339, ti)
if err != nil {
return nil, errors.Join(errors.New(BadConfessionsLinkEventError), err)
}
return ConfessionsChannelLinkEvent{
GuildID: gid,
ChannelID: cid,
Created: t,
}, nil
default: default:
return nil, errors.New(BadEventTypeError) return nil, errors.New(BadEventTypeError)
} }

View file

@ -14,7 +14,6 @@ func Start() error {
http.HandleFunc("/", HandleHttpRequest) http.HandleFunc("/", HandleHttpRequest)
go logging.Error(http.ListenAndServe(frag, nil).Error()) go logging.Error(http.ListenAndServe(frag, nil).Error())
logging.Debug("Web handlers started")
return nil return nil
} }

48
main.go
View file

@ -3,11 +3,8 @@ package main
import ( import (
"flag" "flag"
"log" "log"
"os"
"os/signal"
"gitlab.com/whom/bingobot/internal/activity" "gitlab.com/whom/bingobot/internal/activity"
"gitlab.com/whom/bingobot/internal/confession"
"gitlab.com/whom/bingobot/internal/config" "gitlab.com/whom/bingobot/internal/config"
"gitlab.com/whom/bingobot/internal/discord" "gitlab.com/whom/bingobot/internal/discord"
"gitlab.com/whom/bingobot/internal/logging" "gitlab.com/whom/bingobot/internal/logging"
@ -31,7 +28,6 @@ func main() {
logging.Init() logging.Init()
flag.Parse() flag.Parse()
logging.Info("startup: initializing state engine")
if err := state.Init( if err := state.Init(
config.Get().InMemoryEventCacheSize, config.Get().InMemoryEventCacheSize,
config.Get().PersistentCacheStore, config.Get().PersistentCacheStore,
@ -39,43 +35,35 @@ func main() {
log.Fatalf("couldn't initialize state engine: %s", err.Error()) log.Fatalf("couldn't initialize state engine: %s", err.Error())
} }
logging.Info("startup: starting activity module")
if err := activity.Start(); err != nil { if err := activity.Start(); err != nil {
// TODO: handle gracefully and continue?
log.Fatalf("failed to start activity module: %s", err.Error()) log.Fatalf("failed to start activity module: %s", err.Error())
} }
logging.Info("startup: starting confession module") if err := web.Start(); err != nil {
if err := confession.Start(); err != nil { log.Fatalf("failed to start local web server: %s", err.Error())
log.Fatalf("failed to start confession module: %s", err.Error())
} }
logging.Info("startup: starting web module")
go func() {
if err := web.Start(); err != nil {
log.Fatalf("failed to start local web server: %s", err.Error())
}
}()
// start this LAST // start this LAST
logging.Info("startup: starting state engine")
if err := state.Start(); err != nil { if err := state.Start(); err != nil {
log.Fatalf("failed to start state machine: %s", err.Error()) log.Fatalf("failed to start state machine: %s", err.Error())
} }
defer state.Teardown()
c := make(chan os.Signal, 1) if err := startBot(); err != nil {
signal.Notify(c, os.Interrupt)
go func(){
for _ = range c {
state.Teardown()
discord.Close()
os.Exit(1)
}
}()
logging.Info("startup: connecting to discord")
if err := discord.Connect(*token); err != nil {
log.Fatal(err) log.Fatal(err)
} }
}
for {}
func startBot() error {
err := discord.Connect(*token)
if err != nil {
return err
}
logging.Info("shutting down gracefully", "type", "shutdown")
discord.Close()
return nil
} }