mirror of
https://github.com/jhillyerd/inbucket.git
synced 2025-12-22 03:57:02 +00:00
Move retention scanner into datastore pkg for #67
This commit is contained in:
@@ -33,8 +33,6 @@ func init() {
|
||||
expConnectsHist.Set(log.PushMetric(connectsHist, expConnectsTotal))
|
||||
expErrorsHist.Set(log.PushMetric(errorsHist, expErrorsTotal))
|
||||
expWarnsHist.Set(log.PushMetric(warnsHist, expWarnsTotal))
|
||||
expRetentionDeletesHist.Set(log.PushMetric(retentionDeletesHist, expRetentionDeletesTotal))
|
||||
expRetainedHist.Set(log.PushMetric(retainedHist, expRetainedCurrent))
|
||||
})
|
||||
}
|
||||
|
||||
@@ -50,10 +48,10 @@ type Server struct {
|
||||
storeMessages bool
|
||||
|
||||
// Dependencies
|
||||
dataStore datastore.DataStore // Mailbox/message store
|
||||
globalShutdown chan bool // Shuts down Inbucket
|
||||
msgHub *msghub.Hub // Pub/sub for message info
|
||||
retentionScanner *RetentionScanner // Deletes expired messages
|
||||
dataStore datastore.DataStore // Mailbox/message store
|
||||
globalShutdown chan bool // Shuts down Inbucket
|
||||
msgHub *msghub.Hub // Pub/sub for message info
|
||||
retentionScanner *datastore.RetentionScanner // Deletes expired messages
|
||||
|
||||
// State
|
||||
listener net.Listener // Incoming network connections
|
||||
@@ -98,7 +96,7 @@ func NewServer(
|
||||
globalShutdown: globalShutdown,
|
||||
dataStore: ds,
|
||||
msgHub: msgHub,
|
||||
retentionScanner: NewRetentionScanner(ds, globalShutdown),
|
||||
retentionScanner: datastore.NewRetentionScanner(ds, globalShutdown),
|
||||
waitgroup: new(sync.WaitGroup),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,178 +0,0 @@
|
||||
package smtpd
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"expvar"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/jhillyerd/inbucket/config"
|
||||
"github.com/jhillyerd/inbucket/datastore"
|
||||
"github.com/jhillyerd/inbucket/log"
|
||||
)
|
||||
|
||||
var (
|
||||
retentionScanCompleted = time.Now()
|
||||
retentionScanCompletedMu sync.RWMutex
|
||||
|
||||
// History counters
|
||||
expRetentionDeletesTotal = new(expvar.Int)
|
||||
expRetentionPeriod = new(expvar.Int)
|
||||
expRetainedCurrent = new(expvar.Int)
|
||||
|
||||
// History of certain stats
|
||||
retentionDeletesHist = list.New()
|
||||
retainedHist = list.New()
|
||||
|
||||
// History rendered as comma delimited string
|
||||
expRetentionDeletesHist = new(expvar.String)
|
||||
expRetainedHist = new(expvar.String)
|
||||
)
|
||||
|
||||
func init() {
|
||||
rm := expvar.NewMap("retention")
|
||||
rm.Set("SecondsSinceScanCompleted", expvar.Func(secondsSinceRetentionScanCompleted))
|
||||
rm.Set("DeletesHist", expRetentionDeletesHist)
|
||||
rm.Set("DeletesTotal", expRetentionDeletesTotal)
|
||||
rm.Set("Period", expRetentionPeriod)
|
||||
rm.Set("RetainedHist", expRetainedHist)
|
||||
rm.Set("RetainedCurrent", expRetainedCurrent)
|
||||
}
|
||||
|
||||
// RetentionScanner looks for messages older than the configured retention period and deletes them.
|
||||
type RetentionScanner struct {
|
||||
globalShutdown chan bool // Closes when Inbucket needs to shut down
|
||||
retentionShutdown chan bool // Closed after the scanner has shut down
|
||||
ds datastore.DataStore
|
||||
retentionPeriod time.Duration
|
||||
retentionSleep time.Duration
|
||||
}
|
||||
|
||||
// NewRetentionScanner launches a go-routine that scans for expired
|
||||
// messages, following the configured interval
|
||||
func NewRetentionScanner(ds datastore.DataStore, shutdownChannel chan bool) *RetentionScanner {
|
||||
cfg := config.GetDataStoreConfig()
|
||||
rs := &RetentionScanner{
|
||||
globalShutdown: shutdownChannel,
|
||||
retentionShutdown: make(chan bool),
|
||||
ds: ds,
|
||||
retentionPeriod: time.Duration(cfg.RetentionMinutes) * time.Minute,
|
||||
retentionSleep: time.Duration(cfg.RetentionSleep) * time.Millisecond,
|
||||
}
|
||||
// expRetentionPeriod is displayed on the status page
|
||||
expRetentionPeriod.Set(int64(cfg.RetentionMinutes * 60))
|
||||
return rs
|
||||
}
|
||||
|
||||
// Start up the retention scanner if retention period > 0
|
||||
func (rs *RetentionScanner) Start() {
|
||||
if rs.retentionPeriod <= 0 {
|
||||
log.Infof("Retention scanner disabled")
|
||||
close(rs.retentionShutdown)
|
||||
return
|
||||
}
|
||||
log.Infof("Retention configured for %v", rs.retentionPeriod)
|
||||
go rs.run()
|
||||
}
|
||||
|
||||
// run loops to kick off the scanner on the correct schedule
|
||||
func (rs *RetentionScanner) run() {
|
||||
start := time.Now()
|
||||
retentionLoop:
|
||||
for {
|
||||
// Prevent scanner from starting more than once a minute
|
||||
since := time.Since(start)
|
||||
if since < time.Minute {
|
||||
dur := time.Minute - since
|
||||
log.Tracef("Retention scanner sleeping for %v", dur)
|
||||
select {
|
||||
case _ = <-rs.globalShutdown:
|
||||
break retentionLoop
|
||||
case _ = <-time.After(dur):
|
||||
}
|
||||
}
|
||||
// Kickoff scan
|
||||
start = time.Now()
|
||||
if err := rs.doScan(); err != nil {
|
||||
log.Errorf("Error during retention scan: %v", err)
|
||||
}
|
||||
// Check for global shutdown
|
||||
select {
|
||||
case _ = <-rs.globalShutdown:
|
||||
break retentionLoop
|
||||
default:
|
||||
}
|
||||
}
|
||||
log.Tracef("Retention scanner shut down")
|
||||
close(rs.retentionShutdown)
|
||||
}
|
||||
|
||||
// doScan does a single pass of all mailboxes looking for messages that can be purged
|
||||
func (rs *RetentionScanner) doScan() error {
|
||||
log.Tracef("Starting retention scan")
|
||||
cutoff := time.Now().Add(-1 * rs.retentionPeriod)
|
||||
mboxes, err := rs.ds.AllMailboxes()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
retained := 0
|
||||
// Loop over all mailboxes
|
||||
for _, mb := range mboxes {
|
||||
messages, err := mb.GetMessages()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Loop over all messages in mailbox
|
||||
for _, msg := range messages {
|
||||
if msg.Date().Before(cutoff) {
|
||||
log.Tracef("Purging expired message %v", msg.ID())
|
||||
err = msg.Delete()
|
||||
if err != nil {
|
||||
// Log but don't abort
|
||||
log.Errorf("Failed to purge message %v: %v", msg.ID(), err)
|
||||
} else {
|
||||
expRetentionDeletesTotal.Add(1)
|
||||
}
|
||||
} else {
|
||||
retained++
|
||||
}
|
||||
}
|
||||
// Sleep after completing a mailbox
|
||||
select {
|
||||
case <-rs.globalShutdown:
|
||||
log.Tracef("Retention scan aborted due to shutdown")
|
||||
return nil
|
||||
case <-time.After(rs.retentionSleep):
|
||||
// Reduce disk thrashing
|
||||
}
|
||||
}
|
||||
// Update metrics
|
||||
setRetentionScanCompleted(time.Now())
|
||||
expRetainedCurrent.Set(int64(retained))
|
||||
return nil
|
||||
}
|
||||
|
||||
// Join does not retun until the retention scanner has shut down
|
||||
func (rs *RetentionScanner) Join() {
|
||||
if rs.retentionShutdown != nil {
|
||||
select {
|
||||
case <-rs.retentionShutdown:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func setRetentionScanCompleted(t time.Time) {
|
||||
retentionScanCompletedMu.Lock()
|
||||
defer retentionScanCompletedMu.Unlock()
|
||||
retentionScanCompleted = t
|
||||
}
|
||||
|
||||
func getRetentionScanCompleted() time.Time {
|
||||
retentionScanCompletedMu.RLock()
|
||||
defer retentionScanCompletedMu.RUnlock()
|
||||
return retentionScanCompleted
|
||||
}
|
||||
|
||||
func secondsSinceRetentionScanCompleted() interface{} {
|
||||
return time.Since(getRetentionScanCompleted()) / time.Second
|
||||
}
|
||||
@@ -1,198 +0,0 @@
|
||||
package smtpd
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net/mail"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/jhillyerd/enmime"
|
||||
"github.com/jhillyerd/inbucket/datastore"
|
||||
"github.com/stretchr/testify/mock"
|
||||
)
|
||||
|
||||
func TestDoRetentionScan(t *testing.T) {
|
||||
// Create mock objects
|
||||
mds := &MockDataStore{}
|
||||
|
||||
mb1 := &MockMailbox{}
|
||||
mb2 := &MockMailbox{}
|
||||
mb3 := &MockMailbox{}
|
||||
|
||||
// Mockup some different aged messages (num is in hours)
|
||||
new1 := mockMessage(0)
|
||||
new2 := mockMessage(1)
|
||||
new3 := mockMessage(2)
|
||||
old1 := mockMessage(4)
|
||||
old2 := mockMessage(12)
|
||||
old3 := mockMessage(24)
|
||||
|
||||
// First it should ask for all mailboxes
|
||||
mds.On("AllMailboxes").Return([]datastore.Mailbox{mb1, mb2, mb3}, nil)
|
||||
|
||||
// Then for all messages on each box
|
||||
mb1.On("GetMessages").Return([]datastore.Message{new1, old1, old2}, nil)
|
||||
mb2.On("GetMessages").Return([]datastore.Message{old3, new2}, nil)
|
||||
mb3.On("GetMessages").Return([]datastore.Message{new3}, nil)
|
||||
|
||||
// Test 4 hour retention
|
||||
rs := &RetentionScanner{
|
||||
ds: mds,
|
||||
retentionPeriod: 4*time.Hour - time.Minute,
|
||||
retentionSleep: 0,
|
||||
}
|
||||
if err := rs.doScan(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
// Check our assertions
|
||||
mds.AssertExpectations(t)
|
||||
mb1.AssertExpectations(t)
|
||||
mb2.AssertExpectations(t)
|
||||
mb3.AssertExpectations(t)
|
||||
|
||||
// Delete should not have been called on new messages
|
||||
new1.AssertNotCalled(t, "Delete")
|
||||
new2.AssertNotCalled(t, "Delete")
|
||||
new3.AssertNotCalled(t, "Delete")
|
||||
|
||||
// Delete should have been called once on old messages
|
||||
old1.AssertNumberOfCalls(t, "Delete", 1)
|
||||
old2.AssertNumberOfCalls(t, "Delete", 1)
|
||||
old3.AssertNumberOfCalls(t, "Delete", 1)
|
||||
}
|
||||
|
||||
// Make a MockMessage of a specific age
|
||||
func mockMessage(ageHours int) *MockMessage {
|
||||
msg := &MockMessage{}
|
||||
msg.On("ID").Return(fmt.Sprintf("MSG[age=%vh]", ageHours))
|
||||
msg.On("Date").Return(time.Now().Add(time.Duration(ageHours*-1) * time.Hour))
|
||||
msg.On("Delete").Return(nil)
|
||||
return msg
|
||||
}
|
||||
|
||||
// Mock DataStore object
|
||||
type MockDataStore struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
func (m *MockDataStore) MailboxFor(name string) (datastore.Mailbox, error) {
|
||||
args := m.Called(name)
|
||||
return args.Get(0).(datastore.Mailbox), args.Error(1)
|
||||
}
|
||||
|
||||
func (m *MockDataStore) AllMailboxes() ([]datastore.Mailbox, error) {
|
||||
args := m.Called()
|
||||
return args.Get(0).([]datastore.Mailbox), args.Error(1)
|
||||
}
|
||||
|
||||
// Mock Mailbox object
|
||||
type MockMailbox struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
func (m *MockMailbox) GetMessages() ([]datastore.Message, error) {
|
||||
args := m.Called()
|
||||
return args.Get(0).([]datastore.Message), args.Error(1)
|
||||
}
|
||||
|
||||
func (m *MockMailbox) GetMessage(id string) (datastore.Message, error) {
|
||||
args := m.Called(id)
|
||||
return args.Get(0).(datastore.Message), args.Error(1)
|
||||
}
|
||||
|
||||
func (m *MockMailbox) Purge() error {
|
||||
args := m.Called()
|
||||
return args.Error(0)
|
||||
}
|
||||
|
||||
func (m *MockMailbox) NewMessage() (datastore.Message, error) {
|
||||
args := m.Called()
|
||||
return args.Get(0).(datastore.Message), args.Error(1)
|
||||
}
|
||||
|
||||
func (m *MockMailbox) Name() string {
|
||||
args := m.Called()
|
||||
return args.String(0)
|
||||
}
|
||||
|
||||
func (m *MockMailbox) String() string {
|
||||
args := m.Called()
|
||||
return args.String(0)
|
||||
}
|
||||
|
||||
// Mock Message object
|
||||
type MockMessage struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
func (m *MockMessage) ID() string {
|
||||
args := m.Called()
|
||||
return args.String(0)
|
||||
}
|
||||
|
||||
func (m *MockMessage) From() string {
|
||||
args := m.Called()
|
||||
return args.String(0)
|
||||
}
|
||||
|
||||
func (m *MockMessage) To() []string {
|
||||
args := m.Called()
|
||||
return args.Get(0).([]string)
|
||||
}
|
||||
|
||||
func (m *MockMessage) Date() time.Time {
|
||||
args := m.Called()
|
||||
return args.Get(0).(time.Time)
|
||||
}
|
||||
|
||||
func (m *MockMessage) Subject() string {
|
||||
args := m.Called()
|
||||
return args.String(0)
|
||||
}
|
||||
|
||||
func (m *MockMessage) ReadHeader() (msg *mail.Message, err error) {
|
||||
args := m.Called()
|
||||
return args.Get(0).(*mail.Message), args.Error(1)
|
||||
}
|
||||
|
||||
func (m *MockMessage) ReadBody() (body *enmime.Envelope, err error) {
|
||||
args := m.Called()
|
||||
return args.Get(0).(*enmime.Envelope), args.Error(1)
|
||||
}
|
||||
|
||||
func (m *MockMessage) ReadRaw() (raw *string, err error) {
|
||||
args := m.Called()
|
||||
return args.Get(0).(*string), args.Error(1)
|
||||
}
|
||||
|
||||
func (m *MockMessage) RawReader() (reader io.ReadCloser, err error) {
|
||||
args := m.Called()
|
||||
return args.Get(0).(io.ReadCloser), args.Error(1)
|
||||
}
|
||||
|
||||
func (m *MockMessage) Size() int64 {
|
||||
args := m.Called()
|
||||
return int64(args.Int(0))
|
||||
}
|
||||
|
||||
func (m *MockMessage) Append(data []byte) error {
|
||||
// []byte arg seems to mess up testify/mock
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MockMessage) Close() error {
|
||||
args := m.Called()
|
||||
return args.Error(0)
|
||||
}
|
||||
|
||||
func (m *MockMessage) Delete() error {
|
||||
args := m.Called()
|
||||
return args.Error(0)
|
||||
}
|
||||
|
||||
func (m *MockMessage) String() string {
|
||||
args := m.Called()
|
||||
return args.String(0)
|
||||
}
|
||||
Reference in New Issue
Block a user