1
0
mirror of https://github.com/jhillyerd/inbucket.git synced 2025-12-17 17:47:03 +00:00

storage/mem: implement size enforcer for #88

This commit is contained in:
James Hillyerd
2018-03-24 20:27:05 -07:00
parent b42ea130ea
commit 412b62d6fa
5 changed files with 225 additions and 48 deletions

View File

@@ -0,0 +1,73 @@
package mem
import "container/list"
type msgDone struct {
msg *Message
done chan struct{}
}
// enforceMaxSize will delete the oldest message until the entire mail store is equal to or less
// than Store.maxSize bytes.
func (s *Store) maxSizeEnforcer(maxSize int64) {
all := &list.List{}
curSize := int64(0)
for {
select {
case md, ok := <-s.incoming:
if !ok {
return
}
// Add message to all.
m := md.msg
el := all.PushBack(m)
m.el = el
curSize += int64(m.Size())
for curSize > maxSize {
// Remove oldest message.
el := all.Front()
all.Remove(el)
m := el.Value.(*Message)
if s.removeMessage(m.mailbox, m.id) != nil {
curSize -= int64(m.Size())
}
}
close(md.done)
case md, ok := <-s.remove:
if !ok {
return
}
// Remove message from all.
m := md.msg
el := all.Remove(m.el)
if el != nil {
curSize -= int64(m.Size())
}
close(md.done)
}
}
}
// enforcerDeliver sends delivery to enforcer if configured, and waits for completion.
func (s *Store) enforcerDeliver(m *Message) {
if s.incoming != nil {
md := &msgDone{
msg: m,
done: make(chan struct{}),
}
s.incoming <- md
<-md.done
}
}
// enforcerRemove sends removal to enforcer if configured, and waits for completion.
func (s *Store) enforcerRemove(m *Message) {
if s.remove != nil {
md := &msgDone{
msg: m,
done: make(chan struct{}),
}
s.remove <- md
<-md.done
}
}

View File

@@ -2,6 +2,7 @@ package mem
import (
"bytes"
"container/list"
"io"
"io/ioutil"
"net/mail"
@@ -20,6 +21,7 @@ type Message struct {
date time.Time
subject string
source []byte
el *list.Element // This message in Store.messages
}
var _ storage.Message = &Message{}

View File

@@ -1,6 +1,7 @@
package mem
import (
"fmt"
"io/ioutil"
"sort"
"strconv"
@@ -13,8 +14,10 @@ import (
// Store implements an in-memory message store.
type Store struct {
sync.Mutex
boxes map[string]*mbox
cap int
boxes map[string]*mbox
cap int // Per-mailbox message cap.
incoming chan *msgDone // New messages for size enforcer.
remove chan *msgDone // Remove deleted messages from size enforcer.
}
type mbox struct {
@@ -29,38 +32,51 @@ var _ storage.Store = &Store{}
// New returns an emtpy memory store.
func New(cfg config.Storage) (storage.Store, error) {
return &Store{
s := &Store{
boxes: make(map[string]*mbox),
cap: cfg.MailboxMsgCap,
}, nil
}
if str, ok := cfg.Params["maxkb"]; ok {
maxKB, err := strconv.ParseInt(str, 10, 64)
if err != nil {
return nil, fmt.Errorf("failed to parse maxkb: %v", err)
}
if maxKB > 0 {
// Setup enforcer.
s.incoming = make(chan *msgDone)
s.remove = make(chan *msgDone)
go s.maxSizeEnforcer(maxKB * 1024)
}
}
return s, nil
}
// AddMessage stores the message, message ID and Size will be ignored.
func (s *Store) AddMessage(message storage.Message) (id string, err error) {
r, ierr := message.Source()
if ierr != nil {
err = ierr
return
}
source, ierr := ioutil.ReadAll(r)
if ierr != nil {
err = ierr
return
}
m := &Message{
mailbox: message.Mailbox(),
from: message.From(),
to: message.To(),
date: message.Date(),
subject: message.Subject(),
}
s.withMailbox(message.Mailbox(), true, func(mb *mbox) {
r, ierr := message.Source()
if ierr != nil {
err = ierr
return
}
source, ierr := ioutil.ReadAll(r)
if ierr != nil {
err = ierr
return
}
// Generate message ID.
mb.last++
m.index = mb.last
id = strconv.Itoa(mb.last)
m := &Message{
index: mb.last,
mailbox: message.Mailbox(),
id: id,
from: message.From(),
to: message.To(),
date: message.Date(),
subject: message.Subject(),
source: source,
}
m.id = id
m.source = source
mb.messages[id] = m
if s.cap > 0 {
// Enforce cap.
@@ -70,6 +86,7 @@ func (s *Store) AddMessage(message storage.Message) (id string, err error) {
}
}
})
s.enforcerDeliver(m)
return id, err
}
@@ -97,17 +114,38 @@ func (s *Store) GetMessages(mailbox string) (ms []storage.Message, err error) {
// PurgeMessages deletes the contents of a mailbox.
func (s *Store) PurgeMessages(mailbox string) error {
var messages map[string]*Message
s.withMailbox(mailbox, true, func(mb *mbox) {
messages = mb.messages
mb.messages = make(map[string]*Message)
})
if len(messages) > 0 && s.remove != nil {
for _, m := range messages {
s.enforcerRemove(m)
}
}
return nil
}
// removeMessage deletes a single message without notifying the size enforcer. Returns the message
// that was removed.
func (s *Store) removeMessage(mailbox, id string) *Message {
var m *Message
s.withMailbox(mailbox, true, func(mb *mbox) {
m = mb.messages[id]
if m != nil {
delete(mb.messages, id)
}
})
return m
}
// RemoveMessage deletes a single message.
func (s *Store) RemoveMessage(mailbox, id string) error {
s.withMailbox(mailbox, true, func(mb *mbox) {
delete(mb.messages, id)
})
m := s.removeMessage(mailbox, id)
if m != nil {
s.enforcerRemove(m)
}
return nil
}

View File

@@ -1,7 +1,9 @@
package mem
import (
"sync"
"testing"
"time"
"github.com/jhillyerd/inbucket/pkg/config"
"github.com/jhillyerd/inbucket/pkg/storage"
@@ -16,3 +18,65 @@ func TestSuite(t *testing.T) {
return s, destroy, nil
})
}
// TestMessageList verifies the operation of the global message list: mem.Store.messages.
func TestMaxSize(t *testing.T) {
maxSize := int64(2048)
s, _ := New(config.Storage{Params: map[string]string{"maxkb": "2"}})
boxes := []string{"alpha", "beta", "whiskey", "tango", "foxtrot"}
n := 10
// total := 50
sizeChan := make(chan int64, len(boxes))
// Populate mailboxes concurrently.
for _, mailbox := range boxes {
go func(mailbox string) {
size := int64(0)
for i := 0; i < n; i++ {
_, nbytes := test.DeliverToStore(t, s, mailbox, "subject", time.Now())
size += nbytes
}
sizeChan <- size
}(mailbox)
}
// Wait for sizes.
sentBytesTotal := int64(0)
for range boxes {
sentBytesTotal += <-sizeChan
}
// Calculate actual size.
gotSize := int64(0)
s.VisitMailboxes(func(messages []storage.Message) bool {
for _, m := range messages {
gotSize += m.Size()
}
return true
})
// Verify state. Messages are ~75 bytes each.
if gotSize < 2048-75 {
t.Errorf("Got total size %v, want greater than: %v", gotSize, 2048-75)
}
if gotSize > maxSize {
t.Errorf("Got total size %v, want less than: %v", gotSize, maxSize)
}
// Purge all messages concurrently, testing for deadlocks.
wg := &sync.WaitGroup{}
wg.Add(len(boxes))
for _, mailbox := range boxes {
go func(mailbox string) {
err := s.PurgeMessages(mailbox)
if err != nil {
t.Fatal(err)
}
wg.Done()
}(mailbox)
}
wg.Wait()
count := 0
s.VisitMailboxes(func(messages []storage.Message) bool {
count += len(messages)
return true
})
if count != 0 {
t.Errorf("Got %v total messages, want: %v", count, 0)
}
}

View File

@@ -173,11 +173,11 @@ func testDeliveryOrder(t *testing.T, store storage.Store) {
subjects := []string{"alpha", "bravo", "charlie", "delta", "echo"}
for i, subj := range subjects {
// Check mailbox count.
getAndCountMessages(t, store, mailbox, i)
deliverMessage(t, store, mailbox, subj, time.Now())
GetAndCountMessages(t, store, mailbox, i)
DeliverToStore(t, store, mailbox, subj, time.Now())
}
// Confirm delivery order.
msgs := getAndCountMessages(t, store, mailbox, 5)
msgs := GetAndCountMessages(t, store, mailbox, 5)
for i, want := range subjects {
got := msgs[i].Subject()
if got != want {
@@ -193,7 +193,7 @@ func testSize(t *testing.T, store storage.Store) {
sentIds := make([]string, len(subjects))
sentSizes := make([]int64, len(subjects))
for i, subj := range subjects {
id, size := deliverMessage(t, store, mailbox, subj, time.Now())
id, size := DeliverToStore(t, store, mailbox, subj, time.Now())
sentIds[i] = id
sentSizes[i] = size
}
@@ -215,9 +215,9 @@ func testDelete(t *testing.T, store storage.Store) {
mailbox := "fred"
subjects := []string{"alpha", "bravo", "charlie", "delta", "echo"}
for _, subj := range subjects {
deliverMessage(t, store, mailbox, subj, time.Now())
DeliverToStore(t, store, mailbox, subj, time.Now())
}
msgs := getAndCountMessages(t, store, mailbox, len(subjects))
msgs := GetAndCountMessages(t, store, mailbox, len(subjects))
// Delete a couple messages.
err := store.RemoveMessage(mailbox, msgs[1].ID())
if err != nil {
@@ -229,7 +229,7 @@ func testDelete(t *testing.T, store storage.Store) {
}
// Confirm deletion.
subjects = []string{"alpha", "charlie", "echo"}
msgs = getAndCountMessages(t, store, mailbox, len(subjects))
msgs = GetAndCountMessages(t, store, mailbox, len(subjects))
for i, want := range subjects {
got := msgs[i].Subject()
if got != want {
@@ -237,9 +237,9 @@ func testDelete(t *testing.T, store storage.Store) {
}
}
// Try appending one more.
deliverMessage(t, store, mailbox, "foxtrot", time.Now())
DeliverToStore(t, store, mailbox, "foxtrot", time.Now())
subjects = []string{"alpha", "charlie", "echo", "foxtrot"}
msgs = getAndCountMessages(t, store, mailbox, len(subjects))
msgs = GetAndCountMessages(t, store, mailbox, len(subjects))
for i, want := range subjects {
got := msgs[i].Subject()
if got != want {
@@ -253,15 +253,15 @@ func testPurge(t *testing.T, store storage.Store) {
mailbox := "fred"
subjects := []string{"alpha", "bravo", "charlie", "delta", "echo"}
for _, subj := range subjects {
deliverMessage(t, store, mailbox, subj, time.Now())
DeliverToStore(t, store, mailbox, subj, time.Now())
}
getAndCountMessages(t, store, mailbox, len(subjects))
GetAndCountMessages(t, store, mailbox, len(subjects))
// Purge and verify.
err := store.PurgeMessages(mailbox)
if err != nil {
t.Fatal(err)
}
getAndCountMessages(t, store, mailbox, 0)
GetAndCountMessages(t, store, mailbox, 0)
}
// testMsgCap verifies the message cap is enforced.
@@ -270,7 +270,7 @@ func testMsgCap(t *testing.T, store storage.Store) {
mailbox := "captain"
for i := 0; i < 20; i++ {
subj := fmt.Sprintf("subject %v", i)
deliverMessage(t, store, mailbox, subj, time.Now())
DeliverToStore(t, store, mailbox, subj, time.Now())
msgs, err := store.GetMessages(mailbox)
if err != nil {
t.Fatalf("Failed to GetMessages for %q: %v", mailbox, err)
@@ -296,8 +296,8 @@ func testNoMsgCap(t *testing.T, store storage.Store) {
mailbox := "captain"
for i := 0; i < 20; i++ {
subj := fmt.Sprintf("subject %v", i)
deliverMessage(t, store, mailbox, subj, time.Now())
getAndCountMessages(t, store, mailbox, i+1)
DeliverToStore(t, store, mailbox, subj, time.Now())
GetAndCountMessages(t, store, mailbox, i+1)
}
}
@@ -306,8 +306,8 @@ func testNoMsgCap(t *testing.T, store storage.Store) {
func testVisitMailboxes(t *testing.T, ds storage.Store) {
boxes := []string{"abby", "bill", "christa", "donald", "evelyn"}
for _, name := range boxes {
deliverMessage(t, ds, name, "Old Message", time.Now().Add(-24*time.Hour))
deliverMessage(t, ds, name, "New Message", time.Now())
DeliverToStore(t, ds, name, "Old Message", time.Now().Add(-24*time.Hour))
DeliverToStore(t, ds, name, "New Message", time.Now())
}
seen := 0
err := ds.VisitMailboxes(func(messages []storage.Message) bool {
@@ -326,9 +326,9 @@ func testVisitMailboxes(t *testing.T, ds storage.Store) {
}
}
// deliverMessage creates and delivers a message to the specific mailbox, returning the size of the
// DeliverToStore creates and delivers a message to the specific mailbox, returning the size of the
// generated message.
func deliverMessage(
func DeliverToStore(
t *testing.T,
store storage.Store,
mailbox string,
@@ -356,9 +356,9 @@ func deliverMessage(
return id, int64(len(testMsg))
}
// getAndCountMessages is a test helper that expects to receive count messages or fails the test, it
// GetAndCountMessages is a test helper that expects to receive count messages or fails the test, it
// also checks return error.
func getAndCountMessages(t *testing.T, s storage.Store, mailbox string, count int) []storage.Message {
func GetAndCountMessages(t *testing.T, s storage.Store, mailbox string, count int) []storage.Message {
t.Helper()
msgs, err := s.GetMessages(mailbox)
if err != nil {