From bb0fb410c1cb7822dcaa12b17967332cbea636ca Mon Sep 17 00:00:00 2001 From: James Hillyerd Date: Thu, 22 Mar 2018 22:02:08 -0700 Subject: [PATCH] mem: Initial in-memory store implementation for #88 - Reduce default retention sleep, change description. --- pkg/config/config.go | 2 +- pkg/storage/mem/message.go | 51 ++++++++++++ pkg/storage/mem/store.go | 148 ++++++++++++++++++++++++++++++++++ pkg/storage/mem/store_test.go | 17 ++++ 4 files changed, 217 insertions(+), 1 deletion(-) create mode 100644 pkg/storage/mem/message.go create mode 100644 pkg/storage/mem/store.go create mode 100644 pkg/storage/mem/store_test.go diff --git a/pkg/config/config.go b/pkg/config/config.go index d6b4b68..25f805a 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -71,7 +71,7 @@ type Web struct { type Storage struct { Path string `required:"true" default:"/tmp/inbucket" desc:"Mail store path"` RetentionPeriod time.Duration `required:"true" default:"24h" desc:"Duration to retain messages"` - RetentionSleep time.Duration `required:"true" default:"100ms" desc:"Duration to sleep between deletes"` + RetentionSleep time.Duration `required:"true" default:"50ms" desc:"Duration to sleep between mailboxes"` MailboxMsgCap int `required:"true" default:"500" desc:"Maximum messages per mailbox"` } diff --git a/pkg/storage/mem/message.go b/pkg/storage/mem/message.go new file mode 100644 index 0000000..ea0d351 --- /dev/null +++ b/pkg/storage/mem/message.go @@ -0,0 +1,51 @@ +package mem + +import ( + "bytes" + "io" + "io/ioutil" + "net/mail" + "time" + + "github.com/jhillyerd/inbucket/pkg/storage" +) + +// Message is a memory store message. +type Message struct { + index int + mailbox string + id string + from *mail.Address + to []*mail.Address + date time.Time + subject string + source []byte +} + +var _ storage.Message = &Message{} + +// Mailbox returns the mailbox name. +func (m *Message) Mailbox() string { return m.mailbox } + +// ID the message ID. +func (m *Message) ID() string { return m.id } + +// From returns the from address. +func (m *Message) From() *mail.Address { return m.from } + +// To returns the to address list. +func (m *Message) To() []*mail.Address { return m.to } + +// Date returns the date received. +func (m *Message) Date() time.Time { return m.date } + +// Subject returns the subject line. +func (m *Message) Subject() string { return m.subject } + +// Source returns a reader for the message source. +func (m *Message) Source() (io.ReadCloser, error) { + return ioutil.NopCloser(bytes.NewReader(m.source)), nil +} + +// Size returns the message size in bytes. +func (m *Message) Size() int64 { return int64(len(m.source)) } diff --git a/pkg/storage/mem/store.go b/pkg/storage/mem/store.go new file mode 100644 index 0000000..f70d5be --- /dev/null +++ b/pkg/storage/mem/store.go @@ -0,0 +1,148 @@ +package mem + +import ( + "io/ioutil" + "sort" + "strconv" + "sync" + + "github.com/jhillyerd/inbucket/pkg/storage" +) + +// Store implements an in-memory message store. +type Store struct { + sync.Mutex + boxes map[string]*mbox +} + +type mbox struct { + sync.RWMutex + name string + counter int + messages map[string]*Message +} + +var _ storage.Store = &Store{} + +// New returns an emtpy memory store. +func New() *Store { + return &Store{ + boxes: make(map[string]*mbox), + } +} + +// AddMessage stores the message, message ID and Size will be ignored. +func (s *Store) AddMessage(message storage.Message) (id string, err error) { + s.withMailbox(message.Mailbox(), true, func(mb *mbox) { + r, ierr := message.Source() + if ierr != nil { + err = ierr + return + } + source, ierr := ioutil.ReadAll(r) + if ierr != nil { + err = ierr + return + } + // Generate message ID. + mb.counter++ + id = strconv.Itoa(mb.counter) + m := &Message{ + index: mb.counter, + mailbox: message.Mailbox(), + id: id, + from: message.From(), + to: message.To(), + date: message.Date(), + subject: message.Subject(), + source: source, + } + mb.messages[id] = m + }) + return id, err +} + +// GetMessage gets a mesage. +func (s *Store) GetMessage(mailbox, id string) (m storage.Message, err error) { + s.withMailbox(mailbox, false, func(mb *mbox) { + m = mb.messages[id] + }) + return m, err +} + +// GetMessages gets a list of messages. +func (s *Store) GetMessages(mailbox string) (ms []storage.Message, err error) { + s.withMailbox(mailbox, false, func(mb *mbox) { + ms = make([]storage.Message, 0, len(mb.messages)) + for _, v := range mb.messages { + ms = append(ms, v) + } + sort.Slice(ms, func(i, j int) bool { + return ms[i].(*Message).index < ms[j].(*Message).index + }) + }) + return ms, err +} + +// PurgeMessages deletes the contents of a mailbox. +func (s *Store) PurgeMessages(mailbox string) error { + s.withMailbox(mailbox, true, func(mb *mbox) { + mb.messages = make(map[string]*Message) + }) + return nil +} + +// RemoveMessage deletes a single message. +func (s *Store) RemoveMessage(mailbox, id string) error { + s.withMailbox(mailbox, true, func(mb *mbox) { + delete(mb.messages, id) + }) + return nil +} + +// VisitMailboxes visits each mailbox in the store. +func (s *Store) VisitMailboxes(f func([]storage.Message) (cont bool)) error { + // Lock store, get names of all mailboxes. + s.Lock() + boxNames := make([]string, 0, len(s.boxes)) + for k := range s.boxes { + boxNames = append(boxNames, k) + } + s.Unlock() + // Process mailboxes. + for _, mailbox := range boxNames { + ms, _ := s.GetMessages(mailbox) + if !f(ms) { + break + } + } + return nil +} + +// withMailbox gets or creates a mailbox, locks it, then calls f. +func (s *Store) withMailbox(mailbox string, rw bool, f func(mb *mbox)) { + s.Lock() + mb, ok := s.boxes[mailbox] + if !ok { + // Create mailbox + mb = &mbox{ + name: mailbox, + messages: make(map[string]*Message), + } + s.boxes[mailbox] = mb + } + s.Unlock() + if rw { + mb.Lock() + } else { + mb.RLock() + } + defer func() { + if rw { + mb.Unlock() + } else { + mb.RUnlock() + } + }() + f(mb) +} diff --git a/pkg/storage/mem/store_test.go b/pkg/storage/mem/store_test.go new file mode 100644 index 0000000..cbfdb05 --- /dev/null +++ b/pkg/storage/mem/store_test.go @@ -0,0 +1,17 @@ +package mem + +import ( + "testing" + + "github.com/jhillyerd/inbucket/pkg/storage" + "github.com/jhillyerd/inbucket/pkg/test" +) + +// TestSuite runs storage package test suite on file store. +func TestSuite(t *testing.T) { + test.StoreSuite(t, func() (storage.Store, func(), error) { + s := New() + destroy := func() {} + return s, destroy, nil + }) +}