From 412b62d6fad94c33329285f5449eabd9769cb0d2 Mon Sep 17 00:00:00 2001 From: James Hillyerd Date: Sat, 24 Mar 2018 20:27:05 -0700 Subject: [PATCH] storage/mem: implement size enforcer for #88 --- pkg/storage/mem/maxsize.go | 73 +++++++++++++++++++++++++++ pkg/storage/mem/message.go | 2 + pkg/storage/mem/store.go | 92 +++++++++++++++++++++++++---------- pkg/storage/mem/store_test.go | 64 ++++++++++++++++++++++++ pkg/test/storage_suite.go | 42 ++++++++-------- 5 files changed, 225 insertions(+), 48 deletions(-) create mode 100644 pkg/storage/mem/maxsize.go diff --git a/pkg/storage/mem/maxsize.go b/pkg/storage/mem/maxsize.go new file mode 100644 index 0000000..888247b --- /dev/null +++ b/pkg/storage/mem/maxsize.go @@ -0,0 +1,73 @@ +package mem + +import "container/list" + +type msgDone struct { + msg *Message + done chan struct{} +} + +// enforceMaxSize will delete the oldest message until the entire mail store is equal to or less +// than Store.maxSize bytes. +func (s *Store) maxSizeEnforcer(maxSize int64) { + all := &list.List{} + curSize := int64(0) + for { + select { + case md, ok := <-s.incoming: + if !ok { + return + } + // Add message to all. + m := md.msg + el := all.PushBack(m) + m.el = el + curSize += int64(m.Size()) + for curSize > maxSize { + // Remove oldest message. + el := all.Front() + all.Remove(el) + m := el.Value.(*Message) + if s.removeMessage(m.mailbox, m.id) != nil { + curSize -= int64(m.Size()) + } + } + close(md.done) + case md, ok := <-s.remove: + if !ok { + return + } + // Remove message from all. + m := md.msg + el := all.Remove(m.el) + if el != nil { + curSize -= int64(m.Size()) + } + close(md.done) + } + } +} + +// enforcerDeliver sends delivery to enforcer if configured, and waits for completion. +func (s *Store) enforcerDeliver(m *Message) { + if s.incoming != nil { + md := &msgDone{ + msg: m, + done: make(chan struct{}), + } + s.incoming <- md + <-md.done + } +} + +// enforcerRemove sends removal to enforcer if configured, and waits for completion. +func (s *Store) enforcerRemove(m *Message) { + if s.remove != nil { + md := &msgDone{ + msg: m, + done: make(chan struct{}), + } + s.remove <- md + <-md.done + } +} diff --git a/pkg/storage/mem/message.go b/pkg/storage/mem/message.go index ea0d351..b5ca498 100644 --- a/pkg/storage/mem/message.go +++ b/pkg/storage/mem/message.go @@ -2,6 +2,7 @@ package mem import ( "bytes" + "container/list" "io" "io/ioutil" "net/mail" @@ -20,6 +21,7 @@ type Message struct { date time.Time subject string source []byte + el *list.Element // This message in Store.messages } var _ storage.Message = &Message{} diff --git a/pkg/storage/mem/store.go b/pkg/storage/mem/store.go index 0559a5b..c37b241 100644 --- a/pkg/storage/mem/store.go +++ b/pkg/storage/mem/store.go @@ -1,6 +1,7 @@ package mem import ( + "fmt" "io/ioutil" "sort" "strconv" @@ -13,8 +14,10 @@ import ( // Store implements an in-memory message store. type Store struct { sync.Mutex - boxes map[string]*mbox - cap int + boxes map[string]*mbox + cap int // Per-mailbox message cap. + incoming chan *msgDone // New messages for size enforcer. + remove chan *msgDone // Remove deleted messages from size enforcer. } type mbox struct { @@ -29,38 +32,51 @@ var _ storage.Store = &Store{} // New returns an emtpy memory store. func New(cfg config.Storage) (storage.Store, error) { - return &Store{ + s := &Store{ boxes: make(map[string]*mbox), cap: cfg.MailboxMsgCap, - }, nil + } + if str, ok := cfg.Params["maxkb"]; ok { + maxKB, err := strconv.ParseInt(str, 10, 64) + if err != nil { + return nil, fmt.Errorf("failed to parse maxkb: %v", err) + } + if maxKB > 0 { + // Setup enforcer. + s.incoming = make(chan *msgDone) + s.remove = make(chan *msgDone) + go s.maxSizeEnforcer(maxKB * 1024) + } + } + return s, nil } // AddMessage stores the message, message ID and Size will be ignored. func (s *Store) AddMessage(message storage.Message) (id string, err error) { + r, ierr := message.Source() + if ierr != nil { + err = ierr + return + } + source, ierr := ioutil.ReadAll(r) + if ierr != nil { + err = ierr + return + } + m := &Message{ + mailbox: message.Mailbox(), + from: message.From(), + to: message.To(), + date: message.Date(), + subject: message.Subject(), + } s.withMailbox(message.Mailbox(), true, func(mb *mbox) { - r, ierr := message.Source() - if ierr != nil { - err = ierr - return - } - source, ierr := ioutil.ReadAll(r) - if ierr != nil { - err = ierr - return - } // Generate message ID. mb.last++ + m.index = mb.last id = strconv.Itoa(mb.last) - m := &Message{ - index: mb.last, - mailbox: message.Mailbox(), - id: id, - from: message.From(), - to: message.To(), - date: message.Date(), - subject: message.Subject(), - source: source, - } + m.id = id + m.source = source mb.messages[id] = m if s.cap > 0 { // Enforce cap. @@ -70,6 +86,7 @@ func (s *Store) AddMessage(message storage.Message) (id string, err error) { } } }) + s.enforcerDeliver(m) return id, err } @@ -97,17 +114,38 @@ func (s *Store) GetMessages(mailbox string) (ms []storage.Message, err error) { // PurgeMessages deletes the contents of a mailbox. func (s *Store) PurgeMessages(mailbox string) error { + var messages map[string]*Message s.withMailbox(mailbox, true, func(mb *mbox) { + messages = mb.messages mb.messages = make(map[string]*Message) }) + if len(messages) > 0 && s.remove != nil { + for _, m := range messages { + s.enforcerRemove(m) + } + } return nil } +// removeMessage deletes a single message without notifying the size enforcer. Returns the message +// that was removed. +func (s *Store) removeMessage(mailbox, id string) *Message { + var m *Message + s.withMailbox(mailbox, true, func(mb *mbox) { + m = mb.messages[id] + if m != nil { + delete(mb.messages, id) + } + }) + return m +} + // RemoveMessage deletes a single message. func (s *Store) RemoveMessage(mailbox, id string) error { - s.withMailbox(mailbox, true, func(mb *mbox) { - delete(mb.messages, id) - }) + m := s.removeMessage(mailbox, id) + if m != nil { + s.enforcerRemove(m) + } return nil } diff --git a/pkg/storage/mem/store_test.go b/pkg/storage/mem/store_test.go index 8a45ec1..c48bca8 100644 --- a/pkg/storage/mem/store_test.go +++ b/pkg/storage/mem/store_test.go @@ -1,7 +1,9 @@ package mem import ( + "sync" "testing" + "time" "github.com/jhillyerd/inbucket/pkg/config" "github.com/jhillyerd/inbucket/pkg/storage" @@ -16,3 +18,65 @@ func TestSuite(t *testing.T) { return s, destroy, nil }) } + +// TestMessageList verifies the operation of the global message list: mem.Store.messages. +func TestMaxSize(t *testing.T) { + maxSize := int64(2048) + s, _ := New(config.Storage{Params: map[string]string{"maxkb": "2"}}) + boxes := []string{"alpha", "beta", "whiskey", "tango", "foxtrot"} + n := 10 + // total := 50 + sizeChan := make(chan int64, len(boxes)) + // Populate mailboxes concurrently. + for _, mailbox := range boxes { + go func(mailbox string) { + size := int64(0) + for i := 0; i < n; i++ { + _, nbytes := test.DeliverToStore(t, s, mailbox, "subject", time.Now()) + size += nbytes + } + sizeChan <- size + }(mailbox) + } + // Wait for sizes. + sentBytesTotal := int64(0) + for range boxes { + sentBytesTotal += <-sizeChan + } + // Calculate actual size. + gotSize := int64(0) + s.VisitMailboxes(func(messages []storage.Message) bool { + for _, m := range messages { + gotSize += m.Size() + } + return true + }) + // Verify state. Messages are ~75 bytes each. + if gotSize < 2048-75 { + t.Errorf("Got total size %v, want greater than: %v", gotSize, 2048-75) + } + if gotSize > maxSize { + t.Errorf("Got total size %v, want less than: %v", gotSize, maxSize) + } + // Purge all messages concurrently, testing for deadlocks. + wg := &sync.WaitGroup{} + wg.Add(len(boxes)) + for _, mailbox := range boxes { + go func(mailbox string) { + err := s.PurgeMessages(mailbox) + if err != nil { + t.Fatal(err) + } + wg.Done() + }(mailbox) + } + wg.Wait() + count := 0 + s.VisitMailboxes(func(messages []storage.Message) bool { + count += len(messages) + return true + }) + if count != 0 { + t.Errorf("Got %v total messages, want: %v", count, 0) + } +} diff --git a/pkg/test/storage_suite.go b/pkg/test/storage_suite.go index 1038dac..9936145 100644 --- a/pkg/test/storage_suite.go +++ b/pkg/test/storage_suite.go @@ -173,11 +173,11 @@ func testDeliveryOrder(t *testing.T, store storage.Store) { subjects := []string{"alpha", "bravo", "charlie", "delta", "echo"} for i, subj := range subjects { // Check mailbox count. - getAndCountMessages(t, store, mailbox, i) - deliverMessage(t, store, mailbox, subj, time.Now()) + GetAndCountMessages(t, store, mailbox, i) + DeliverToStore(t, store, mailbox, subj, time.Now()) } // Confirm delivery order. - msgs := getAndCountMessages(t, store, mailbox, 5) + msgs := GetAndCountMessages(t, store, mailbox, 5) for i, want := range subjects { got := msgs[i].Subject() if got != want { @@ -193,7 +193,7 @@ func testSize(t *testing.T, store storage.Store) { sentIds := make([]string, len(subjects)) sentSizes := make([]int64, len(subjects)) for i, subj := range subjects { - id, size := deliverMessage(t, store, mailbox, subj, time.Now()) + id, size := DeliverToStore(t, store, mailbox, subj, time.Now()) sentIds[i] = id sentSizes[i] = size } @@ -215,9 +215,9 @@ func testDelete(t *testing.T, store storage.Store) { mailbox := "fred" subjects := []string{"alpha", "bravo", "charlie", "delta", "echo"} for _, subj := range subjects { - deliverMessage(t, store, mailbox, subj, time.Now()) + DeliverToStore(t, store, mailbox, subj, time.Now()) } - msgs := getAndCountMessages(t, store, mailbox, len(subjects)) + msgs := GetAndCountMessages(t, store, mailbox, len(subjects)) // Delete a couple messages. err := store.RemoveMessage(mailbox, msgs[1].ID()) if err != nil { @@ -229,7 +229,7 @@ func testDelete(t *testing.T, store storage.Store) { } // Confirm deletion. subjects = []string{"alpha", "charlie", "echo"} - msgs = getAndCountMessages(t, store, mailbox, len(subjects)) + msgs = GetAndCountMessages(t, store, mailbox, len(subjects)) for i, want := range subjects { got := msgs[i].Subject() if got != want { @@ -237,9 +237,9 @@ func testDelete(t *testing.T, store storage.Store) { } } // Try appending one more. - deliverMessage(t, store, mailbox, "foxtrot", time.Now()) + DeliverToStore(t, store, mailbox, "foxtrot", time.Now()) subjects = []string{"alpha", "charlie", "echo", "foxtrot"} - msgs = getAndCountMessages(t, store, mailbox, len(subjects)) + msgs = GetAndCountMessages(t, store, mailbox, len(subjects)) for i, want := range subjects { got := msgs[i].Subject() if got != want { @@ -253,15 +253,15 @@ func testPurge(t *testing.T, store storage.Store) { mailbox := "fred" subjects := []string{"alpha", "bravo", "charlie", "delta", "echo"} for _, subj := range subjects { - deliverMessage(t, store, mailbox, subj, time.Now()) + DeliverToStore(t, store, mailbox, subj, time.Now()) } - getAndCountMessages(t, store, mailbox, len(subjects)) + GetAndCountMessages(t, store, mailbox, len(subjects)) // Purge and verify. err := store.PurgeMessages(mailbox) if err != nil { t.Fatal(err) } - getAndCountMessages(t, store, mailbox, 0) + GetAndCountMessages(t, store, mailbox, 0) } // testMsgCap verifies the message cap is enforced. @@ -270,7 +270,7 @@ func testMsgCap(t *testing.T, store storage.Store) { mailbox := "captain" for i := 0; i < 20; i++ { subj := fmt.Sprintf("subject %v", i) - deliverMessage(t, store, mailbox, subj, time.Now()) + DeliverToStore(t, store, mailbox, subj, time.Now()) msgs, err := store.GetMessages(mailbox) if err != nil { t.Fatalf("Failed to GetMessages for %q: %v", mailbox, err) @@ -296,8 +296,8 @@ func testNoMsgCap(t *testing.T, store storage.Store) { mailbox := "captain" for i := 0; i < 20; i++ { subj := fmt.Sprintf("subject %v", i) - deliverMessage(t, store, mailbox, subj, time.Now()) - getAndCountMessages(t, store, mailbox, i+1) + DeliverToStore(t, store, mailbox, subj, time.Now()) + GetAndCountMessages(t, store, mailbox, i+1) } } @@ -306,8 +306,8 @@ func testNoMsgCap(t *testing.T, store storage.Store) { func testVisitMailboxes(t *testing.T, ds storage.Store) { boxes := []string{"abby", "bill", "christa", "donald", "evelyn"} for _, name := range boxes { - deliverMessage(t, ds, name, "Old Message", time.Now().Add(-24*time.Hour)) - deliverMessage(t, ds, name, "New Message", time.Now()) + DeliverToStore(t, ds, name, "Old Message", time.Now().Add(-24*time.Hour)) + DeliverToStore(t, ds, name, "New Message", time.Now()) } seen := 0 err := ds.VisitMailboxes(func(messages []storage.Message) bool { @@ -326,9 +326,9 @@ func testVisitMailboxes(t *testing.T, ds storage.Store) { } } -// deliverMessage creates and delivers a message to the specific mailbox, returning the size of the +// DeliverToStore creates and delivers a message to the specific mailbox, returning the size of the // generated message. -func deliverMessage( +func DeliverToStore( t *testing.T, store storage.Store, mailbox string, @@ -356,9 +356,9 @@ func deliverMessage( return id, int64(len(testMsg)) } -// getAndCountMessages is a test helper that expects to receive count messages or fails the test, it +// GetAndCountMessages is a test helper that expects to receive count messages or fails the test, it // also checks return error. -func getAndCountMessages(t *testing.T, s storage.Store, mailbox string, count int) []storage.Message { +func GetAndCountMessages(t *testing.T, s storage.Store, mailbox string, count int) []storage.Message { t.Helper() msgs, err := s.GetMessages(mailbox) if err != nil {