From a4ad7b13cc398fcb804a974c36445a4bd38ecb98 Mon Sep 17 00:00:00 2001
From: James Hillyerd
Date: Fri, 26 Oct 2012 13:38:59 -0700
Subject: [PATCH] Wire in retention
- Update README
- Add retention metrics
- Start retention scanner if configured
---
README.md | 1 +
config/config.go | 15 ++--
etc/devel.conf | 2 +-
smtpd/datastore.go | 1 +
smtpd/listener.go | 5 ++
smtpd/retention.go | 79 +++++++++++++++++++++-
smtpd/retention_test.go | 4 +-
themes/integral/templates/root/status.html | 19 ++++++
8 files changed, 116 insertions(+), 10 deletions(-)
diff --git a/README.md b/README.md
index cbe77a8..be0f5eb 100644
--- a/README.md
+++ b/README.md
@@ -33,6 +33,7 @@ It can:
* Display the source of a message (headers + body text)
* Display the HTML version of a message (in a new window)
* Delete a message
+ * Purge messages after a configurable amount of time
It does not yet:
diff --git a/config/config.go b/config/config.go
index 56a45cf..fd6b6cd 100644
--- a/config/config.go
+++ b/config/config.go
@@ -109,14 +109,19 @@ func LoadConfig(filename string) error {
return fmt.Errorf("Failed to validate configuration")
}
- err = parseSmtpConfig()
- if err != nil {
- return nil
+ if err = parseSmtpConfig(); err != nil {
+ return err
}
- err = parseWebConfig()
+ if err = parseWebConfig(); err != nil {
+ return err
+ }
- return err
+ if err = parseDataStoreConfig(); err != nil {
+ return err
+ }
+
+ return nil
}
// parseLoggingConfig trying to catch config errors early
diff --git a/etc/devel.conf b/etc/devel.conf
index 4f2e360..8d7f213 100644
--- a/etc/devel.conf
+++ b/etc/devel.conf
@@ -66,7 +66,7 @@ path=/tmp/inbucket
# How many minutes after receipt should a message be stored until it's
# automatically purged. To retain messages until manually deleted, set this
# to 0
-retention.minutes=1
+retention.minutes=0
# How many milliseconds to sleep after purging messages from a mailbox.
# This should help reduce disk I/O when there are a large number of messages
diff --git a/smtpd/datastore.go b/smtpd/datastore.go
index 8b67c88..d81947f 100644
--- a/smtpd/datastore.go
+++ b/smtpd/datastore.go
@@ -30,3 +30,4 @@ type Message interface {
Delete() error
String() string
}
+
diff --git a/smtpd/listener.go b/smtpd/listener.go
index 72f348d..aace2e0 100644
--- a/smtpd/listener.go
+++ b/smtpd/listener.go
@@ -65,6 +65,10 @@ func (s *Server) Start() {
panic(err)
}
+ // Start retention scanner
+ StartRetentionScanner(s.dataStore)
+
+ // Handle incoming connections
for sid := 1; ; sid++ {
if conn, err := ln.Accept(); err != nil {
// TODO Implement a max error counter before shutdown?
@@ -86,6 +90,7 @@ func metricsTicker(t *time.Ticker) {
expConnectsHist.Set(pushMetric(connectsHist, expConnectsTotal))
expErrorsHist.Set(pushMetric(errorsHist, expErrorsTotal))
expWarnsHist.Set(pushMetric(warnsHist, expWarnsTotal))
+ expRetentionDeletesHist.Set(pushMetric(retentionDeletesHist, expRetentionDeletesTotal))
}
}
diff --git a/smtpd/retention.go b/smtpd/retention.go
index ddfe260..0c6bc52 100644
--- a/smtpd/retention.go
+++ b/smtpd/retention.go
@@ -1,12 +1,58 @@
package smtpd
import (
+ "container/list"
+ "expvar"
+ "github.com/jhillyerd/inbucket/config"
"github.com/jhillyerd/inbucket/log"
+ "sync"
"time"
)
-// retentionScan does a single pass of all mailboxes looking for messages that can be purged
-func retentionScan(ds DataStore, maxAge time.Duration, sleep time.Duration) error {
+var retentionScanCompleted time.Time
+var retentionScanCompletedMu sync.RWMutex
+
+var expRetentionDeletesTotal = new(expvar.Int)
+
+// History of certain stats
+var retentionDeletesHist = list.New()
+
+// History rendered as comma delim string
+var expRetentionDeletesHist = new(expvar.String)
+
+func StartRetentionScanner(ds DataStore) {
+ cfg := config.GetDataStoreConfig()
+ if cfg.RetentionMinutes > 0 {
+ // Retention scanning enabled
+ log.Info("Retention configured for %v minutes", cfg.RetentionMinutes)
+ go retentionScanner(ds, time.Duration(cfg.RetentionMinutes) * time.Minute,
+ time.Duration(cfg.RetentionSleep) * time.Millisecond)
+ } else {
+ log.Info("Retention scanner disabled")
+ }
+}
+
+func retentionScanner(ds DataStore, maxAge time.Duration, sleep time.Duration) {
+ start := time.Now()
+ for {
+ // Prevent scanner from running more than once a minute
+ since := time.Since(start)
+ if since < time.Minute {
+ dur := time.Minute - since
+ log.Trace("Retention scanner sleeping for %v", dur)
+ time.Sleep(dur)
+ }
+ start = time.Now()
+
+ // Kickoff scan
+ if err := doRetentionScan(ds, maxAge, sleep); err != nil {
+ log.Error("Error during retention scan: %v", err)
+ }
+ }
+}
+
+// doRetentionScan does a single pass of all mailboxes looking for messages that can be purged
+func doRetentionScan(ds DataStore, maxAge time.Duration, sleep time.Duration) error {
log.Trace("Starting retention scan")
cutoff := time.Now().Add(-1 * maxAge)
mboxes, err := ds.AllMailboxes()
@@ -26,6 +72,8 @@ func retentionScan(ds DataStore, maxAge time.Duration, sleep time.Duration) erro
if err != nil {
// Log but don't abort
log.Error("Failed to purge message %v: %v", msg.Id(), err)
+ } else {
+ expRetentionDeletesTotal.Add(1)
}
}
}
@@ -33,5 +81,32 @@ func retentionScan(ds DataStore, maxAge time.Duration, sleep time.Duration) erro
time.Sleep(sleep)
}
+ setRetentionScanCompleted(time.Now())
+
return nil
}
+
+func setRetentionScanCompleted(t time.Time) {
+ retentionScanCompletedMu.Lock()
+ defer retentionScanCompletedMu.Unlock()
+
+ retentionScanCompleted = t
+}
+
+func getRetentionScanCompleted() time.Time {
+ retentionScanCompletedMu.RLock()
+ defer retentionScanCompletedMu.RUnlock()
+
+ return retentionScanCompleted
+}
+
+func secondsSinceRetentionScanCompleted() interface{} {
+ return time.Since(getRetentionScanCompleted()) / time.Second
+}
+
+func init() {
+ rm := expvar.NewMap("retention")
+ rm.Set("SecondsSinceScanCompleted", expvar.Func(secondsSinceRetentionScanCompleted))
+ rm.Set("DeletesHist", expRetentionDeletesHist)
+ rm.Set("DeletesTotal", expRetentionDeletesTotal)
+}
diff --git a/smtpd/retention_test.go b/smtpd/retention_test.go
index 835e57b..bf0113d 100644
--- a/smtpd/retention_test.go
+++ b/smtpd/retention_test.go
@@ -8,7 +8,7 @@ import (
"time"
)
-func TestRetentionScanner(t *testing.T) {
+func TestDoRetentionScan(t *testing.T) {
// Create mock objects
mds := &MockDataStore{}
@@ -33,7 +33,7 @@ func TestRetentionScanner(t *testing.T) {
mb3.On("GetMessages").Return([]Message{new3}, nil)
// Test 4 hour retention
- retentionScan(mds, 4*time.Hour, 0)
+ doRetentionScan(mds, 4*time.Hour, 0)
// Check our assertions
mds.AssertExpectations(t)
diff --git a/themes/integral/templates/root/status.html b/themes/integral/templates/root/status.html
index c69f079..efbb6c7 100644
--- a/themes/integral/templates/root/status.html
+++ b/themes/integral/templates/root/status.html
@@ -85,6 +85,7 @@
function displayMetrics(data, textStatus, jqXHR) {
// Non graphing
metric('uptime', data.uptime, timeFilter, false)
+ metric('retentionScanCompleted', data.retention.SecondsSinceScanCompleted, timeFilter, false)
// JavaScript history
metric('memstatsSys', data.memstats.Sys, sizeFilter, true)
@@ -102,6 +103,8 @@
setHistory('smtpWarnsTotal', data.smtp.WarnsHist)
metric('smtpErrorsTotal', data.smtp.ErrorsTotal, numberFilter, false)
setHistory('smtpErrorsTotal', data.smtp.ErrorsHist)
+ metric('retentionDeletesTotal', data.retention.DeletesTotal, numberFilter, false)
+ setHistory('retentionDeletesTotal', data.retention.DeletesHist)
}
function loadMetrics() {
@@ -202,5 +205,21 @@ values over time.
+
+
Data Store Metrics
+
+
+ | Retention Scan: |
+ Completed . ago |
+
+
+ | Retention Deletes: |
+ . |
+ |
+ (60s) |
+
+
+
+
{{end}}