implement throttling, refactor everything

trunk
vilmibm 2020-12-19 20:18:23 +00:00
parent bb69e9c613
commit c05634606e
3 changed files with 223 additions and 84 deletions

View File

@ -1,107 +1,74 @@
package main
import (
"bufio"
"bytes"
"fmt"
"io"
"log"
"os"
"os/exec"
"path/filepath"
"strings"
"github.com/fsnotify/fsnotify"
"time"
)
// TODO flesh out event handling with an eye towards eventual sampling
type EventType int
func initHomeWatcher() (*fsnotify.Watcher, error) {
fmt.Fprintf(os.Stderr, "setting up home watcher...\n")
watcher, err := fsnotify.NewWatcher()
if err != nil {
return nil, err
}
const (
eventHomeActivity = iota
eventLogin
eventLogout
)
out, err := exec.Command("sh", "-c", "stats | jq .active_users[] | tr -d '\"'").Output()
if err != nil {
return nil, fmt.Errorf("failed to call and process stats: %w", err)
}
scanner := bufio.NewScanner(bytes.NewReader(out))
for scanner.Scan() {
username := strings.TrimSpace(scanner.Text())
home := filepath.Join("/home", username)
addHome(watcher, home)
}
return watcher, nil
// TODO may compute flavor externally based on type + username...
type Event struct {
Username string
Type EventType
Flavor string
}
func addHome(watcher *fsnotify.Watcher, homePath string) error {
fileCount := 0
filepath.Walk(homePath, func(path string, info os.FileInfo, err error) error {
if err != nil {
return nil
}
if info.IsDir() && strings.HasPrefix(info.Name(), ".") {
return filepath.SkipDir
}
fileCount++
if info.Mode()&os.ModeSymlink != 0 {
return nil
}
err = watcher.Add(path)
if err != nil && err.Error() != "permission denied" && err.Error() != "no such file or directory" {
fmt.Printf("%#v\n", err)
fmt.Println("Died at ", fileCount)
fmt.Printf("%#v\n", info)
panic(err.Error())
} else {
fmt.Fprintf(os.Stderr, "watching %s\n", path)
}
return nil
})
return nil
func (e Event) String() string {
return e.Flavor
}
func watchHomes(watcher *fsnotify.Watcher) {
fmt.Fprintf(os.Stderr, "starting poll\n")
for {
select {
case event, ok := <-watcher.Events:
if !ok {
return
}
log.Println("event:", event)
if event.Op&fsnotify.Write == fsnotify.Write {
log.Println("modified file:", event.Name)
}
case err, ok := <-watcher.Errors:
if !ok {
return
}
log.Println("error:", err)
}
}
func NewLogger(w io.Writer) *log.Logger {
// TODO consider https://github.com/lestrrat-go/file-rotatelogs
return log.New(w, "", log.LstdFlags)
}
func cli(args []string) int {
fmt.Fprintf(os.Stderr, "starting\n")
watcher, err := initHomeWatcher()
fmt.Fprintf(os.Stderr, "watcher initialized\n")
// TODO less hardcoded
lf, err := os.OpenFile("bustle.log", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to create watcher: %w\n", err)
log.Printf("failed to open log file: %s\n", err)
return 2
}
logger := NewLogger(lf)
rawEvents := make(chan Event)
throttler := NewThrottler(time.Hour)
hw, err := NewHomeWatcher(rawEvents)
if err != nil {
log.Printf("failed to create hw: %s\n", err)
return 1
}
defer watcher.Close()
defer hw.Close()
// TODO have an event bus to write to
// TODO what is this done for do i need this done
done := make(chan bool)
go watchHomes(watcher)
<-done
// need a channel of raw events that can take home watcher events + eventual other events like
// logging in
// listener grabs each raw event and checks throttler
// if not throttled, event is written to log file
go hw.Watch()
for {
// TODO prob use a select with an error channel, like fsnotify does
event, ok := <-rawEvents
if !ok {
log.Println("event error")
break
}
if throttler.Throttled(event.Username) {
continue
}
throttler.Touch(event.Username)
logger.Println(event)
}
return 0
}
@ -109,3 +76,8 @@ func cli(args []string) int {
func main() {
os.Exit(cli(os.Args))
}
/*
note to self: restarting the bustled nightly will fix a lot of presence problems. Instead of worrying about people creating directories and logging off, i can just do a daily restart. i /do/ want to capture people logging in, though, but that's not as hard as keeping track of files.
*/

View File

@ -0,0 +1,42 @@
package main
import "time"
type Throttler interface {
Touch(string)
Throttled(string) bool
SetInterval(time.Duration)
Now() time.Time
}
type MapThrottler struct {
interval time.Duration
users map[string]time.Time
}
func NewThrottler(interval time.Duration) Throttler {
return &MapThrottler{
interval: interval,
users: map[string]time.Time{},
}
}
func (t *MapThrottler) SetInterval(interval time.Duration) {
t.interval = interval
}
func (t *MapThrottler) Touch(username string) {
t.users[username] = t.Now()
}
func (t *MapThrottler) Now() time.Time {
return time.Now()
}
func (t *MapThrottler) Throttled(username string) bool {
lastTouch, ok := t.users[username]
if !ok {
return false
}
return t.Now().Sub(lastTouch) < t.interval
}

View File

@ -0,0 +1,125 @@
package main
import (
"bufio"
"bytes"
"fmt"
"log"
"os"
"os/exec"
"path"
"path/filepath"
"regexp"
"strings"
"github.com/fsnotify/fsnotify"
)
type Watcher interface {
AddDirectory(string) error
Watch()
Close()
Root() string
}
type HomeWatcher struct {
watcher *fsnotify.Watcher
root string
raw chan Event
}
func NewHomeWatcher(rawEvents chan Event) (Watcher, error) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
return nil, err
}
hw := &HomeWatcher{
watcher: watcher,
root: "/home",
raw: rawEvents,
}
out, err := exec.Command("sh", "-c", "stats | jq .active_users[] | tr -d '\"'").Output()
if err != nil {
return nil, fmt.Errorf("failed to call and process stats: %w", err)
}
scanner := bufio.NewScanner(bytes.NewReader(out))
for scanner.Scan() {
username := strings.TrimSpace(scanner.Text())
home := filepath.Join(hw.Root(), username)
if err := hw.AddDirectory(home); err != nil {
return nil, err
}
}
return hw, nil
}
func (hw *HomeWatcher) AddDirectory(path string) error {
fileCount := 0
filepath.Walk(path, func(path string, info os.FileInfo, err error) error {
if err != nil {
return nil
}
if info.IsDir() && strings.HasPrefix(info.Name(), ".") {
return filepath.SkipDir
}
fileCount++
if info.Mode()&os.ModeSymlink != 0 {
return nil
}
err = hw.watcher.Add(path)
if err != nil && err.Error() != "permission denied" && err.Error() != "no such file or directory" {
return fmt.Errorf("died at file %d. info: %#v error: %w\n", fileCount, info, err)
}
log.Printf("watching %s\n", path)
return nil
})
return nil
}
func (hw *HomeWatcher) Root() string {
return hw.root
}
func (hw *HomeWatcher) Close() {
hw.watcher.Close()
}
func (hw *HomeWatcher) Watch() {
unre := regexp.MustCompile("^/home/([^/]+)/.*$")
for {
select {
case event, ok := <-hw.watcher.Events:
if !ok {
return
}
matches := unre.FindStringSubmatch(event.Name)
if matches == nil {
log.Printf("could not extract username: %s\n", event)
}
un := matches[1]
hw.raw <- Event{
Username: un,
Type: eventHomeActivity,
Flavor: fmt.Sprintf("%s is knocking about in %s", un, path.Dir(event.Name)),
}
// event.Name is filepath
log.Println("event:", event)
fmt.Printf("DBG %#v\n", event)
if event.Op&fsnotify.Write == fsnotify.Write {
log.Println("modified file:", event.Name)
}
case err, ok := <-hw.watcher.Errors:
if !ok {
return
}
log.Println("error:", err)
}
}
}