mirror of
https://github.com/jhillyerd/inbucket.git
synced 2026-01-02 17:37:04 +00:00
Wire in retention
- Update README - Add retention metrics - Start retention scanner if configured
This commit is contained in:
@@ -30,3 +30,4 @@ type Message interface {
|
||||
Delete() error
|
||||
String() string
|
||||
}
|
||||
|
||||
|
||||
@@ -65,6 +65,10 @@ func (s *Server) Start() {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// Start retention scanner
|
||||
StartRetentionScanner(s.dataStore)
|
||||
|
||||
// Handle incoming connections
|
||||
for sid := 1; ; sid++ {
|
||||
if conn, err := ln.Accept(); err != nil {
|
||||
// TODO Implement a max error counter before shutdown?
|
||||
@@ -86,6 +90,7 @@ func metricsTicker(t *time.Ticker) {
|
||||
expConnectsHist.Set(pushMetric(connectsHist, expConnectsTotal))
|
||||
expErrorsHist.Set(pushMetric(errorsHist, expErrorsTotal))
|
||||
expWarnsHist.Set(pushMetric(warnsHist, expWarnsTotal))
|
||||
expRetentionDeletesHist.Set(pushMetric(retentionDeletesHist, expRetentionDeletesTotal))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,12 +1,58 @@
|
||||
package smtpd
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"expvar"
|
||||
"github.com/jhillyerd/inbucket/config"
|
||||
"github.com/jhillyerd/inbucket/log"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// retentionScan does a single pass of all mailboxes looking for messages that can be purged
|
||||
func retentionScan(ds DataStore, maxAge time.Duration, sleep time.Duration) error {
|
||||
var retentionScanCompleted time.Time
|
||||
var retentionScanCompletedMu sync.RWMutex
|
||||
|
||||
var expRetentionDeletesTotal = new(expvar.Int)
|
||||
|
||||
// History of certain stats
|
||||
var retentionDeletesHist = list.New()
|
||||
|
||||
// History rendered as comma delim string
|
||||
var expRetentionDeletesHist = new(expvar.String)
|
||||
|
||||
func StartRetentionScanner(ds DataStore) {
|
||||
cfg := config.GetDataStoreConfig()
|
||||
if cfg.RetentionMinutes > 0 {
|
||||
// Retention scanning enabled
|
||||
log.Info("Retention configured for %v minutes", cfg.RetentionMinutes)
|
||||
go retentionScanner(ds, time.Duration(cfg.RetentionMinutes) * time.Minute,
|
||||
time.Duration(cfg.RetentionSleep) * time.Millisecond)
|
||||
} else {
|
||||
log.Info("Retention scanner disabled")
|
||||
}
|
||||
}
|
||||
|
||||
func retentionScanner(ds DataStore, maxAge time.Duration, sleep time.Duration) {
|
||||
start := time.Now()
|
||||
for {
|
||||
// Prevent scanner from running more than once a minute
|
||||
since := time.Since(start)
|
||||
if since < time.Minute {
|
||||
dur := time.Minute - since
|
||||
log.Trace("Retention scanner sleeping for %v", dur)
|
||||
time.Sleep(dur)
|
||||
}
|
||||
start = time.Now()
|
||||
|
||||
// Kickoff scan
|
||||
if err := doRetentionScan(ds, maxAge, sleep); err != nil {
|
||||
log.Error("Error during retention scan: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// doRetentionScan does a single pass of all mailboxes looking for messages that can be purged
|
||||
func doRetentionScan(ds DataStore, maxAge time.Duration, sleep time.Duration) error {
|
||||
log.Trace("Starting retention scan")
|
||||
cutoff := time.Now().Add(-1 * maxAge)
|
||||
mboxes, err := ds.AllMailboxes()
|
||||
@@ -26,6 +72,8 @@ func retentionScan(ds DataStore, maxAge time.Duration, sleep time.Duration) erro
|
||||
if err != nil {
|
||||
// Log but don't abort
|
||||
log.Error("Failed to purge message %v: %v", msg.Id(), err)
|
||||
} else {
|
||||
expRetentionDeletesTotal.Add(1)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -33,5 +81,32 @@ func retentionScan(ds DataStore, maxAge time.Duration, sleep time.Duration) erro
|
||||
time.Sleep(sleep)
|
||||
}
|
||||
|
||||
setRetentionScanCompleted(time.Now())
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func setRetentionScanCompleted(t time.Time) {
|
||||
retentionScanCompletedMu.Lock()
|
||||
defer retentionScanCompletedMu.Unlock()
|
||||
|
||||
retentionScanCompleted = t
|
||||
}
|
||||
|
||||
func getRetentionScanCompleted() time.Time {
|
||||
retentionScanCompletedMu.RLock()
|
||||
defer retentionScanCompletedMu.RUnlock()
|
||||
|
||||
return retentionScanCompleted
|
||||
}
|
||||
|
||||
func secondsSinceRetentionScanCompleted() interface{} {
|
||||
return time.Since(getRetentionScanCompleted()) / time.Second
|
||||
}
|
||||
|
||||
func init() {
|
||||
rm := expvar.NewMap("retention")
|
||||
rm.Set("SecondsSinceScanCompleted", expvar.Func(secondsSinceRetentionScanCompleted))
|
||||
rm.Set("DeletesHist", expRetentionDeletesHist)
|
||||
rm.Set("DeletesTotal", expRetentionDeletesTotal)
|
||||
}
|
||||
|
||||
@@ -8,7 +8,7 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestRetentionScanner(t *testing.T) {
|
||||
func TestDoRetentionScan(t *testing.T) {
|
||||
// Create mock objects
|
||||
mds := &MockDataStore{}
|
||||
|
||||
@@ -33,7 +33,7 @@ func TestRetentionScanner(t *testing.T) {
|
||||
mb3.On("GetMessages").Return([]Message{new3}, nil)
|
||||
|
||||
// Test 4 hour retention
|
||||
retentionScan(mds, 4*time.Hour, 0)
|
||||
doRetentionScan(mds, 4*time.Hour, 0)
|
||||
|
||||
// Check our assertions
|
||||
mds.AssertExpectations(t)
|
||||
|
||||
Reference in New Issue
Block a user