mirror of
https://github.com/jhillyerd/inbucket.git
synced 2025-12-17 17:47:03 +00:00
Merge branch 'feature/memstore' into develop
This commit is contained in:
@@ -7,6 +7,11 @@ This project adheres to [Semantic Versioning](http://semver.org/).
|
||||
## [Unreleased]
|
||||
|
||||
### Added
|
||||
- Inbucket is now configured using environment variables instead of a config
|
||||
file.
|
||||
- In-memory storage option, best for small installations and desktops. Will be
|
||||
used by default.
|
||||
- Storage type is now displayed on Status page.
|
||||
- Store size is now calculated during retention scan and displayed on the Status
|
||||
page.
|
||||
|
||||
|
||||
@@ -23,6 +23,7 @@ import (
|
||||
"github.com/jhillyerd/inbucket/pkg/server/web"
|
||||
"github.com/jhillyerd/inbucket/pkg/storage"
|
||||
"github.com/jhillyerd/inbucket/pkg/storage/file"
|
||||
"github.com/jhillyerd/inbucket/pkg/storage/mem"
|
||||
"github.com/jhillyerd/inbucket/pkg/webui"
|
||||
)
|
||||
|
||||
@@ -45,6 +46,10 @@ func init() {
|
||||
expvar.Publish("goroutines", expvar.Func(func() interface{} {
|
||||
return runtime.NumGoroutine()
|
||||
}))
|
||||
|
||||
// Register storage implementations.
|
||||
storage.Constructors["file"] = file.New
|
||||
storage.Constructors["memory"] = mem.New
|
||||
}
|
||||
|
||||
func main() {
|
||||
@@ -97,8 +102,13 @@ func main() {
|
||||
// Configure internal services.
|
||||
rootCtx, rootCancel := context.WithCancel(context.Background())
|
||||
shutdownChan := make(chan bool)
|
||||
store, err := storage.FromConfig(conf.Storage)
|
||||
if err != nil {
|
||||
log.Errorf("Fatal storage error: %v", err)
|
||||
removePIDFile(*pidfile)
|
||||
os.Exit(1)
|
||||
}
|
||||
msgHub := msghub.New(rootCtx, conf.Web.MonitorHistory)
|
||||
store := file.New(conf.Storage)
|
||||
addrPolicy := &policy.Addressing{Config: conf.SMTP}
|
||||
mmanager := &message.StoreManager{Store: store, Hub: msgHub}
|
||||
// Start Retention scanner.
|
||||
|
||||
@@ -69,10 +69,11 @@ type Web struct {
|
||||
|
||||
// Storage contains the mail store configuration.
|
||||
type Storage struct {
|
||||
Path string `required:"true" default:"/tmp/inbucket" desc:"Mail store path"`
|
||||
RetentionPeriod time.Duration `required:"true" default:"24h" desc:"Duration to retain messages"`
|
||||
RetentionSleep time.Duration `required:"true" default:"100ms" desc:"Duration to sleep between deletes"`
|
||||
MailboxMsgCap int `required:"true" default:"500" desc:"Maximum messages per mailbox"`
|
||||
Type string `required:"true" default:"memory" desc:"Storage impl: file or memory"`
|
||||
Params map[string]string `desc:"Storage impl parameters, see docs."`
|
||||
RetentionPeriod time.Duration `required:"true" default:"24h" desc:"Duration to retain messages"`
|
||||
RetentionSleep time.Duration `required:"true" default:"50ms" desc:"Duration to sleep between mailboxes"`
|
||||
MailboxMsgCap int `required:"true" default:"500" desc:"Maximum messages per mailbox"`
|
||||
}
|
||||
|
||||
// Process loads and parses configuration from the environment.
|
||||
|
||||
@@ -48,11 +48,10 @@ type Store struct {
|
||||
}
|
||||
|
||||
// New creates a new DataStore object using the specified path
|
||||
func New(cfg config.Storage) storage.Store {
|
||||
path := cfg.Path
|
||||
func New(cfg config.Storage) (storage.Store, error) {
|
||||
path := cfg.Params["path"]
|
||||
if path == "" {
|
||||
log.Errorf("No value configured for datastore path")
|
||||
return nil
|
||||
return nil, fmt.Errorf("'path' parameter not specified")
|
||||
}
|
||||
mailPath := filepath.Join(path, "mail")
|
||||
if _, err := os.Stat(mailPath); err != nil {
|
||||
@@ -61,7 +60,7 @@ func New(cfg config.Storage) storage.Store {
|
||||
log.Errorf("Error creating dir %q: %v", mailPath, err)
|
||||
}
|
||||
}
|
||||
return &Store{path: path, mailPath: mailPath, messageCap: cfg.MailboxMsgCap}
|
||||
return &Store{path: path, mailPath: mailPath, messageCap: cfg.MailboxMsgCap}, nil
|
||||
}
|
||||
|
||||
// AddMessage adds a message to the specified mailbox.
|
||||
|
||||
@@ -22,8 +22,8 @@ import (
|
||||
|
||||
// TestSuite runs storage package test suite on file store.
|
||||
func TestSuite(t *testing.T) {
|
||||
test.StoreSuite(t, func() (storage.Store, func(), error) {
|
||||
ds, _ := setupDataStore(config.Storage{})
|
||||
test.StoreSuite(t, func(conf config.Storage) (storage.Store, func(), error) {
|
||||
ds, _ := setupDataStore(conf)
|
||||
destroy := func() {
|
||||
teardownDataStore(ds)
|
||||
}
|
||||
@@ -144,78 +144,6 @@ func TestFSMissing(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// Test delivering several messages to the same mailbox, see if message cap works
|
||||
func TestFSMessageCap(t *testing.T) {
|
||||
mbCap := 10
|
||||
ds, logbuf := setupDataStore(config.Storage{MailboxMsgCap: mbCap})
|
||||
defer teardownDataStore(ds)
|
||||
|
||||
mbName := "captain"
|
||||
for i := 0; i < 20; i++ {
|
||||
// Add a message
|
||||
subj := fmt.Sprintf("subject %v", i)
|
||||
deliverMessage(ds, mbName, subj, time.Now())
|
||||
t.Logf("Delivered %q", subj)
|
||||
|
||||
// Check number of messages
|
||||
msgs, err := ds.GetMessages(mbName)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to GetMessages for %q: %v", mbName, err)
|
||||
}
|
||||
if len(msgs) > mbCap {
|
||||
t.Errorf("Mailbox should be capped at %v messages, but has %v", mbCap, len(msgs))
|
||||
}
|
||||
|
||||
// Check that the first message is correct
|
||||
first := i - mbCap + 1
|
||||
if first < 0 {
|
||||
first = 0
|
||||
}
|
||||
firstSubj := fmt.Sprintf("subject %v", first)
|
||||
if firstSubj != msgs[0].Subject() {
|
||||
t.Errorf("Expected first subject to be %q, got %q", firstSubj, msgs[0].Subject())
|
||||
}
|
||||
}
|
||||
|
||||
if t.Failed() {
|
||||
// Wait for handler to finish logging
|
||||
time.Sleep(2 * time.Second)
|
||||
// Dump buffered log data if there was a failure
|
||||
_, _ = io.Copy(os.Stderr, logbuf)
|
||||
}
|
||||
}
|
||||
|
||||
// Test delivering several messages to the same mailbox, see if no message cap works
|
||||
func TestFSNoMessageCap(t *testing.T) {
|
||||
mbCap := 0
|
||||
ds, logbuf := setupDataStore(config.Storage{MailboxMsgCap: mbCap})
|
||||
defer teardownDataStore(ds)
|
||||
|
||||
mbName := "captain"
|
||||
for i := 0; i < 20; i++ {
|
||||
// Add a message
|
||||
subj := fmt.Sprintf("subject %v", i)
|
||||
deliverMessage(ds, mbName, subj, time.Now())
|
||||
t.Logf("Delivered %q", subj)
|
||||
|
||||
// Check number of messages
|
||||
msgs, err := ds.GetMessages(mbName)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to GetMessages for %q: %v", mbName, err)
|
||||
}
|
||||
if len(msgs) != i+1 {
|
||||
t.Errorf("Expected %v messages, got %v", i+1, len(msgs))
|
||||
}
|
||||
}
|
||||
|
||||
if t.Failed() {
|
||||
// Wait for handler to finish logging
|
||||
time.Sleep(2 * time.Second)
|
||||
// Dump buffered log data if there was a failure
|
||||
_, _ = io.Copy(os.Stderr, logbuf)
|
||||
}
|
||||
}
|
||||
|
||||
// Test Get the latest message
|
||||
func TestGetLatestMessage(t *testing.T) {
|
||||
ds, logbuf := setupDataStore(config.Storage{})
|
||||
@@ -265,13 +193,18 @@ func setupDataStore(cfg config.Storage) (*Store, *bytes.Buffer) {
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// Capture log output
|
||||
// Capture log output.
|
||||
buf := new(bytes.Buffer)
|
||||
log.SetOutput(buf)
|
||||
|
||||
cfg.Path = path
|
||||
return New(cfg).(*Store), buf
|
||||
if cfg.Params == nil {
|
||||
cfg.Params = make(map[string]string)
|
||||
}
|
||||
cfg.Params["path"] = path
|
||||
s, err := New(cfg)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return s.(*Store), buf
|
||||
}
|
||||
|
||||
// deliverMessage creates and delivers a message to the specific mailbox, returning
|
||||
|
||||
73
pkg/storage/mem/maxsize.go
Normal file
73
pkg/storage/mem/maxsize.go
Normal file
@@ -0,0 +1,73 @@
|
||||
package mem
|
||||
|
||||
import "container/list"
|
||||
|
||||
type msgDone struct {
|
||||
msg *Message
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
// enforceMaxSize will delete the oldest message until the entire mail store is equal to or less
|
||||
// than Store.maxSize bytes.
|
||||
func (s *Store) maxSizeEnforcer(maxSize int64) {
|
||||
all := &list.List{}
|
||||
curSize := int64(0)
|
||||
for {
|
||||
select {
|
||||
case md, ok := <-s.incoming:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
// Add message to all.
|
||||
m := md.msg
|
||||
el := all.PushBack(m)
|
||||
m.el = el
|
||||
curSize += int64(m.Size())
|
||||
for curSize > maxSize {
|
||||
// Remove oldest message.
|
||||
el := all.Front()
|
||||
all.Remove(el)
|
||||
m := el.Value.(*Message)
|
||||
if s.removeMessage(m.mailbox, m.id) != nil {
|
||||
curSize -= int64(m.Size())
|
||||
}
|
||||
}
|
||||
close(md.done)
|
||||
case md, ok := <-s.remove:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
// Remove message from all.
|
||||
m := md.msg
|
||||
el := all.Remove(m.el)
|
||||
if el != nil {
|
||||
curSize -= int64(m.Size())
|
||||
}
|
||||
close(md.done)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// enforcerDeliver sends delivery to enforcer if configured, and waits for completion.
|
||||
func (s *Store) enforcerDeliver(m *Message) {
|
||||
if s.incoming != nil {
|
||||
md := &msgDone{
|
||||
msg: m,
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
s.incoming <- md
|
||||
<-md.done
|
||||
}
|
||||
}
|
||||
|
||||
// enforcerRemove sends removal to enforcer if configured, and waits for completion.
|
||||
func (s *Store) enforcerRemove(m *Message) {
|
||||
if s.remove != nil {
|
||||
md := &msgDone{
|
||||
msg: m,
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
s.remove <- md
|
||||
<-md.done
|
||||
}
|
||||
}
|
||||
53
pkg/storage/mem/message.go
Normal file
53
pkg/storage/mem/message.go
Normal file
@@ -0,0 +1,53 @@
|
||||
package mem
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"container/list"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/mail"
|
||||
"time"
|
||||
|
||||
"github.com/jhillyerd/inbucket/pkg/storage"
|
||||
)
|
||||
|
||||
// Message is a memory store message.
|
||||
type Message struct {
|
||||
index int
|
||||
mailbox string
|
||||
id string
|
||||
from *mail.Address
|
||||
to []*mail.Address
|
||||
date time.Time
|
||||
subject string
|
||||
source []byte
|
||||
el *list.Element // This message in Store.messages
|
||||
}
|
||||
|
||||
var _ storage.Message = &Message{}
|
||||
|
||||
// Mailbox returns the mailbox name.
|
||||
func (m *Message) Mailbox() string { return m.mailbox }
|
||||
|
||||
// ID the message ID.
|
||||
func (m *Message) ID() string { return m.id }
|
||||
|
||||
// From returns the from address.
|
||||
func (m *Message) From() *mail.Address { return m.from }
|
||||
|
||||
// To returns the to address list.
|
||||
func (m *Message) To() []*mail.Address { return m.to }
|
||||
|
||||
// Date returns the date received.
|
||||
func (m *Message) Date() time.Time { return m.date }
|
||||
|
||||
// Subject returns the subject line.
|
||||
func (m *Message) Subject() string { return m.subject }
|
||||
|
||||
// Source returns a reader for the message source.
|
||||
func (m *Message) Source() (io.ReadCloser, error) {
|
||||
return ioutil.NopCloser(bytes.NewReader(m.source)), nil
|
||||
}
|
||||
|
||||
// Size returns the message size in bytes.
|
||||
func (m *Message) Size() int64 { return int64(len(m.source)) }
|
||||
197
pkg/storage/mem/store.go
Normal file
197
pkg/storage/mem/store.go
Normal file
@@ -0,0 +1,197 @@
|
||||
package mem
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"sort"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
"github.com/jhillyerd/inbucket/pkg/config"
|
||||
"github.com/jhillyerd/inbucket/pkg/storage"
|
||||
)
|
||||
|
||||
// Store implements an in-memory message store.
|
||||
type Store struct {
|
||||
sync.Mutex
|
||||
boxes map[string]*mbox
|
||||
cap int // Per-mailbox message cap.
|
||||
incoming chan *msgDone // New messages for size enforcer.
|
||||
remove chan *msgDone // Remove deleted messages from size enforcer.
|
||||
}
|
||||
|
||||
type mbox struct {
|
||||
sync.RWMutex
|
||||
name string
|
||||
last int
|
||||
first int
|
||||
messages map[string]*Message
|
||||
}
|
||||
|
||||
var _ storage.Store = &Store{}
|
||||
|
||||
// New returns an emtpy memory store.
|
||||
func New(cfg config.Storage) (storage.Store, error) {
|
||||
s := &Store{
|
||||
boxes: make(map[string]*mbox),
|
||||
cap: cfg.MailboxMsgCap,
|
||||
}
|
||||
if str, ok := cfg.Params["maxkb"]; ok {
|
||||
maxKB, err := strconv.ParseInt(str, 10, 64)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse maxkb: %v", err)
|
||||
}
|
||||
if maxKB > 0 {
|
||||
// Setup enforcer.
|
||||
s.incoming = make(chan *msgDone)
|
||||
s.remove = make(chan *msgDone)
|
||||
go s.maxSizeEnforcer(maxKB * 1024)
|
||||
}
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// AddMessage stores the message, message ID and Size will be ignored.
|
||||
func (s *Store) AddMessage(message storage.Message) (id string, err error) {
|
||||
r, ierr := message.Source()
|
||||
if ierr != nil {
|
||||
err = ierr
|
||||
return
|
||||
}
|
||||
source, ierr := ioutil.ReadAll(r)
|
||||
if ierr != nil {
|
||||
err = ierr
|
||||
return
|
||||
}
|
||||
m := &Message{
|
||||
mailbox: message.Mailbox(),
|
||||
from: message.From(),
|
||||
to: message.To(),
|
||||
date: message.Date(),
|
||||
subject: message.Subject(),
|
||||
}
|
||||
s.withMailbox(message.Mailbox(), true, func(mb *mbox) {
|
||||
// Generate message ID.
|
||||
mb.last++
|
||||
m.index = mb.last
|
||||
id = strconv.Itoa(mb.last)
|
||||
m.id = id
|
||||
m.source = source
|
||||
mb.messages[id] = m
|
||||
if s.cap > 0 {
|
||||
// Enforce cap.
|
||||
for len(mb.messages) > s.cap {
|
||||
delete(mb.messages, strconv.Itoa(mb.first))
|
||||
mb.first++
|
||||
}
|
||||
}
|
||||
})
|
||||
s.enforcerDeliver(m)
|
||||
return id, err
|
||||
}
|
||||
|
||||
// GetMessage gets a mesage.
|
||||
func (s *Store) GetMessage(mailbox, id string) (m storage.Message, err error) {
|
||||
s.withMailbox(mailbox, false, func(mb *mbox) {
|
||||
m = mb.messages[id]
|
||||
})
|
||||
return m, err
|
||||
}
|
||||
|
||||
// GetMessages gets a list of messages.
|
||||
func (s *Store) GetMessages(mailbox string) (ms []storage.Message, err error) {
|
||||
s.withMailbox(mailbox, false, func(mb *mbox) {
|
||||
ms = make([]storage.Message, 0, len(mb.messages))
|
||||
for _, v := range mb.messages {
|
||||
ms = append(ms, v)
|
||||
}
|
||||
sort.Slice(ms, func(i, j int) bool {
|
||||
return ms[i].(*Message).index < ms[j].(*Message).index
|
||||
})
|
||||
})
|
||||
return ms, err
|
||||
}
|
||||
|
||||
// PurgeMessages deletes the contents of a mailbox.
|
||||
func (s *Store) PurgeMessages(mailbox string) error {
|
||||
var messages map[string]*Message
|
||||
s.withMailbox(mailbox, true, func(mb *mbox) {
|
||||
messages = mb.messages
|
||||
mb.messages = make(map[string]*Message)
|
||||
})
|
||||
if len(messages) > 0 && s.remove != nil {
|
||||
for _, m := range messages {
|
||||
s.enforcerRemove(m)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// removeMessage deletes a single message without notifying the size enforcer. Returns the message
|
||||
// that was removed.
|
||||
func (s *Store) removeMessage(mailbox, id string) *Message {
|
||||
var m *Message
|
||||
s.withMailbox(mailbox, true, func(mb *mbox) {
|
||||
m = mb.messages[id]
|
||||
if m != nil {
|
||||
delete(mb.messages, id)
|
||||
}
|
||||
})
|
||||
return m
|
||||
}
|
||||
|
||||
// RemoveMessage deletes a single message.
|
||||
func (s *Store) RemoveMessage(mailbox, id string) error {
|
||||
m := s.removeMessage(mailbox, id)
|
||||
if m != nil {
|
||||
s.enforcerRemove(m)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// VisitMailboxes visits each mailbox in the store.
|
||||
func (s *Store) VisitMailboxes(f func([]storage.Message) (cont bool)) error {
|
||||
// Lock store, get names of all mailboxes.
|
||||
s.Lock()
|
||||
boxNames := make([]string, 0, len(s.boxes))
|
||||
for k := range s.boxes {
|
||||
boxNames = append(boxNames, k)
|
||||
}
|
||||
s.Unlock()
|
||||
// Process mailboxes.
|
||||
for _, mailbox := range boxNames {
|
||||
ms, _ := s.GetMessages(mailbox)
|
||||
if !f(ms) {
|
||||
break
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// withMailbox gets or creates a mailbox, locks it, then calls f.
|
||||
func (s *Store) withMailbox(mailbox string, writeLock bool, f func(mb *mbox)) {
|
||||
s.Lock()
|
||||
mb, ok := s.boxes[mailbox]
|
||||
if !ok {
|
||||
// Create mailbox
|
||||
mb = &mbox{
|
||||
name: mailbox,
|
||||
messages: make(map[string]*Message),
|
||||
}
|
||||
s.boxes[mailbox] = mb
|
||||
}
|
||||
s.Unlock()
|
||||
if writeLock {
|
||||
mb.Lock()
|
||||
} else {
|
||||
mb.RLock()
|
||||
}
|
||||
defer func() {
|
||||
if writeLock {
|
||||
mb.Unlock()
|
||||
} else {
|
||||
mb.RUnlock()
|
||||
}
|
||||
}()
|
||||
f(mb)
|
||||
}
|
||||
82
pkg/storage/mem/store_test.go
Normal file
82
pkg/storage/mem/store_test.go
Normal file
@@ -0,0 +1,82 @@
|
||||
package mem
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/jhillyerd/inbucket/pkg/config"
|
||||
"github.com/jhillyerd/inbucket/pkg/storage"
|
||||
"github.com/jhillyerd/inbucket/pkg/test"
|
||||
)
|
||||
|
||||
// TestSuite runs storage package test suite on file store.
|
||||
func TestSuite(t *testing.T) {
|
||||
test.StoreSuite(t, func(conf config.Storage) (storage.Store, func(), error) {
|
||||
s, _ := New(conf)
|
||||
destroy := func() {}
|
||||
return s, destroy, nil
|
||||
})
|
||||
}
|
||||
|
||||
// TestMessageList verifies the operation of the global message list: mem.Store.messages.
|
||||
func TestMaxSize(t *testing.T) {
|
||||
maxSize := int64(2048)
|
||||
s, _ := New(config.Storage{Params: map[string]string{"maxkb": "2"}})
|
||||
boxes := []string{"alpha", "beta", "whiskey", "tango", "foxtrot"}
|
||||
n := 10
|
||||
// total := 50
|
||||
sizeChan := make(chan int64, len(boxes))
|
||||
// Populate mailboxes concurrently.
|
||||
for _, mailbox := range boxes {
|
||||
go func(mailbox string) {
|
||||
size := int64(0)
|
||||
for i := 0; i < n; i++ {
|
||||
_, nbytes := test.DeliverToStore(t, s, mailbox, "subject", time.Now())
|
||||
size += nbytes
|
||||
}
|
||||
sizeChan <- size
|
||||
}(mailbox)
|
||||
}
|
||||
// Wait for sizes.
|
||||
sentBytesTotal := int64(0)
|
||||
for range boxes {
|
||||
sentBytesTotal += <-sizeChan
|
||||
}
|
||||
// Calculate actual size.
|
||||
gotSize := int64(0)
|
||||
s.VisitMailboxes(func(messages []storage.Message) bool {
|
||||
for _, m := range messages {
|
||||
gotSize += m.Size()
|
||||
}
|
||||
return true
|
||||
})
|
||||
// Verify state. Messages are ~75 bytes each.
|
||||
if gotSize < 2048-75 {
|
||||
t.Errorf("Got total size %v, want greater than: %v", gotSize, 2048-75)
|
||||
}
|
||||
if gotSize > maxSize {
|
||||
t.Errorf("Got total size %v, want less than: %v", gotSize, maxSize)
|
||||
}
|
||||
// Purge all messages concurrently, testing for deadlocks.
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(len(boxes))
|
||||
for _, mailbox := range boxes {
|
||||
go func(mailbox string) {
|
||||
err := s.PurgeMessages(mailbox)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
wg.Done()
|
||||
}(mailbox)
|
||||
}
|
||||
wg.Wait()
|
||||
count := 0
|
||||
s.VisitMailboxes(func(messages []storage.Message) bool {
|
||||
count += len(messages)
|
||||
return true
|
||||
})
|
||||
if count != 0 {
|
||||
t.Errorf("Got %v total messages, want: %v", count, 0)
|
||||
}
|
||||
}
|
||||
@@ -3,9 +3,12 @@ package storage
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/mail"
|
||||
"time"
|
||||
|
||||
"github.com/jhillyerd/inbucket/pkg/config"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -14,6 +17,9 @@ var (
|
||||
|
||||
// ErrNotWritable indicates the message is closed; no longer writable
|
||||
ErrNotWritable = errors.New("Message not writable")
|
||||
|
||||
// Constructors tracks registered storage constructors
|
||||
Constructors = make(map[string]func(config.Storage) (Store, error))
|
||||
)
|
||||
|
||||
// Store is the interface Inbucket uses to interact with storage implementations.
|
||||
@@ -38,3 +44,11 @@ type Message interface {
|
||||
Source() (io.ReadCloser, error)
|
||||
Size() int64
|
||||
}
|
||||
|
||||
// FromConfig creates an instance of the Store based on the provided configuration.
|
||||
func FromConfig(c config.Storage) (store Store, err error) {
|
||||
if cf := Constructors[c.Type]; cf != nil {
|
||||
return cf(c)
|
||||
}
|
||||
return nil, fmt.Errorf("unknown storage type configured: %q", c.Type)
|
||||
}
|
||||
|
||||
@@ -9,30 +9,34 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/jhillyerd/inbucket/pkg/config"
|
||||
"github.com/jhillyerd/inbucket/pkg/message"
|
||||
"github.com/jhillyerd/inbucket/pkg/storage"
|
||||
)
|
||||
|
||||
// StoreFactory returns a new store for the test suite.
|
||||
type StoreFactory func() (store storage.Store, destroy func(), err error)
|
||||
type StoreFactory func(config.Storage) (store storage.Store, destroy func(), err error)
|
||||
|
||||
// StoreSuite runs a set of general tests on the provided Store.
|
||||
func StoreSuite(t *testing.T, factory StoreFactory) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
test func(*testing.T, storage.Store)
|
||||
conf config.Storage
|
||||
}{
|
||||
{"metadata", testMetadata},
|
||||
{"content", testContent},
|
||||
{"delivery order", testDeliveryOrder},
|
||||
{"size", testSize},
|
||||
{"delete", testDelete},
|
||||
{"purge", testPurge},
|
||||
{"visit mailboxes", testVisitMailboxes},
|
||||
{"metadata", testMetadata, config.Storage{}},
|
||||
{"content", testContent, config.Storage{}},
|
||||
{"delivery order", testDeliveryOrder, config.Storage{}},
|
||||
{"size", testSize, config.Storage{}},
|
||||
{"delete", testDelete, config.Storage{}},
|
||||
{"purge", testPurge, config.Storage{}},
|
||||
{"cap=10", testMsgCap, config.Storage{MailboxMsgCap: 10}},
|
||||
{"cap=0", testNoMsgCap, config.Storage{MailboxMsgCap: 0}},
|
||||
{"visit mailboxes", testVisitMailboxes, config.Storage{}},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
store, destroy, err := factory()
|
||||
store, destroy, err := factory(tc.conf)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -169,11 +173,11 @@ func testDeliveryOrder(t *testing.T, store storage.Store) {
|
||||
subjects := []string{"alpha", "bravo", "charlie", "delta", "echo"}
|
||||
for i, subj := range subjects {
|
||||
// Check mailbox count.
|
||||
getAndCountMessages(t, store, mailbox, i)
|
||||
deliverMessage(t, store, mailbox, subj, time.Now())
|
||||
GetAndCountMessages(t, store, mailbox, i)
|
||||
DeliverToStore(t, store, mailbox, subj, time.Now())
|
||||
}
|
||||
// Confirm delivery order.
|
||||
msgs := getAndCountMessages(t, store, mailbox, 5)
|
||||
msgs := GetAndCountMessages(t, store, mailbox, 5)
|
||||
for i, want := range subjects {
|
||||
got := msgs[i].Subject()
|
||||
if got != want {
|
||||
@@ -189,7 +193,7 @@ func testSize(t *testing.T, store storage.Store) {
|
||||
sentIds := make([]string, len(subjects))
|
||||
sentSizes := make([]int64, len(subjects))
|
||||
for i, subj := range subjects {
|
||||
id, size := deliverMessage(t, store, mailbox, subj, time.Now())
|
||||
id, size := DeliverToStore(t, store, mailbox, subj, time.Now())
|
||||
sentIds[i] = id
|
||||
sentSizes[i] = size
|
||||
}
|
||||
@@ -211,9 +215,9 @@ func testDelete(t *testing.T, store storage.Store) {
|
||||
mailbox := "fred"
|
||||
subjects := []string{"alpha", "bravo", "charlie", "delta", "echo"}
|
||||
for _, subj := range subjects {
|
||||
deliverMessage(t, store, mailbox, subj, time.Now())
|
||||
DeliverToStore(t, store, mailbox, subj, time.Now())
|
||||
}
|
||||
msgs := getAndCountMessages(t, store, mailbox, len(subjects))
|
||||
msgs := GetAndCountMessages(t, store, mailbox, len(subjects))
|
||||
// Delete a couple messages.
|
||||
err := store.RemoveMessage(mailbox, msgs[1].ID())
|
||||
if err != nil {
|
||||
@@ -225,7 +229,7 @@ func testDelete(t *testing.T, store storage.Store) {
|
||||
}
|
||||
// Confirm deletion.
|
||||
subjects = []string{"alpha", "charlie", "echo"}
|
||||
msgs = getAndCountMessages(t, store, mailbox, len(subjects))
|
||||
msgs = GetAndCountMessages(t, store, mailbox, len(subjects))
|
||||
for i, want := range subjects {
|
||||
got := msgs[i].Subject()
|
||||
if got != want {
|
||||
@@ -233,9 +237,9 @@ func testDelete(t *testing.T, store storage.Store) {
|
||||
}
|
||||
}
|
||||
// Try appending one more.
|
||||
deliverMessage(t, store, mailbox, "foxtrot", time.Now())
|
||||
DeliverToStore(t, store, mailbox, "foxtrot", time.Now())
|
||||
subjects = []string{"alpha", "charlie", "echo", "foxtrot"}
|
||||
msgs = getAndCountMessages(t, store, mailbox, len(subjects))
|
||||
msgs = GetAndCountMessages(t, store, mailbox, len(subjects))
|
||||
for i, want := range subjects {
|
||||
got := msgs[i].Subject()
|
||||
if got != want {
|
||||
@@ -249,15 +253,52 @@ func testPurge(t *testing.T, store storage.Store) {
|
||||
mailbox := "fred"
|
||||
subjects := []string{"alpha", "bravo", "charlie", "delta", "echo"}
|
||||
for _, subj := range subjects {
|
||||
deliverMessage(t, store, mailbox, subj, time.Now())
|
||||
DeliverToStore(t, store, mailbox, subj, time.Now())
|
||||
}
|
||||
getAndCountMessages(t, store, mailbox, len(subjects))
|
||||
GetAndCountMessages(t, store, mailbox, len(subjects))
|
||||
// Purge and verify.
|
||||
err := store.PurgeMessages(mailbox)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
getAndCountMessages(t, store, mailbox, 0)
|
||||
GetAndCountMessages(t, store, mailbox, 0)
|
||||
}
|
||||
|
||||
// testMsgCap verifies the message cap is enforced.
|
||||
func testMsgCap(t *testing.T, store storage.Store) {
|
||||
mbCap := 10
|
||||
mailbox := "captain"
|
||||
for i := 0; i < 20; i++ {
|
||||
subj := fmt.Sprintf("subject %v", i)
|
||||
DeliverToStore(t, store, mailbox, subj, time.Now())
|
||||
msgs, err := store.GetMessages(mailbox)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to GetMessages for %q: %v", mailbox, err)
|
||||
}
|
||||
if len(msgs) > mbCap {
|
||||
t.Errorf("Mailbox has %v messages, should be capped at %v", len(msgs), mbCap)
|
||||
break
|
||||
}
|
||||
// Check that the first message is correct.
|
||||
first := i - mbCap + 1
|
||||
if first < 0 {
|
||||
first = 0
|
||||
}
|
||||
firstSubj := fmt.Sprintf("subject %v", first)
|
||||
if firstSubj != msgs[0].Subject() {
|
||||
t.Errorf("Got subject %q, wanted first subject: %q", msgs[0].Subject(), firstSubj)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// testNoMsgCap verfies a cap of 0 is not enforced.
|
||||
func testNoMsgCap(t *testing.T, store storage.Store) {
|
||||
mailbox := "captain"
|
||||
for i := 0; i < 20; i++ {
|
||||
subj := fmt.Sprintf("subject %v", i)
|
||||
DeliverToStore(t, store, mailbox, subj, time.Now())
|
||||
GetAndCountMessages(t, store, mailbox, i+1)
|
||||
}
|
||||
}
|
||||
|
||||
// testVisitMailboxes creates some mailboxes and confirms the VisitMailboxes method visits all of
|
||||
@@ -265,8 +306,8 @@ func testPurge(t *testing.T, store storage.Store) {
|
||||
func testVisitMailboxes(t *testing.T, ds storage.Store) {
|
||||
boxes := []string{"abby", "bill", "christa", "donald", "evelyn"}
|
||||
for _, name := range boxes {
|
||||
deliverMessage(t, ds, name, "Old Message", time.Now().Add(-24*time.Hour))
|
||||
deliverMessage(t, ds, name, "New Message", time.Now())
|
||||
DeliverToStore(t, ds, name, "Old Message", time.Now().Add(-24*time.Hour))
|
||||
DeliverToStore(t, ds, name, "New Message", time.Now())
|
||||
}
|
||||
seen := 0
|
||||
err := ds.VisitMailboxes(func(messages []storage.Message) bool {
|
||||
@@ -285,9 +326,9 @@ func testVisitMailboxes(t *testing.T, ds storage.Store) {
|
||||
}
|
||||
}
|
||||
|
||||
// deliverMessage creates and delivers a message to the specific mailbox, returning the size of the
|
||||
// DeliverToStore creates and delivers a message to the specific mailbox, returning the size of the
|
||||
// generated message.
|
||||
func deliverMessage(
|
||||
func DeliverToStore(
|
||||
t *testing.T,
|
||||
store storage.Store,
|
||||
mailbox string,
|
||||
@@ -315,9 +356,9 @@ func deliverMessage(
|
||||
return id, int64(len(testMsg))
|
||||
}
|
||||
|
||||
// getAndCountMessages is a test helper that expects to receive count messages or fails the test, it
|
||||
// GetAndCountMessages is a test helper that expects to receive count messages or fails the test, it
|
||||
// also checks return error.
|
||||
func getAndCountMessages(t *testing.T, s storage.Store, mailbox string, count int) []storage.Message {
|
||||
func GetAndCountMessages(t *testing.T, s storage.Store, mailbox string, count int) []storage.Message {
|
||||
t.Helper()
|
||||
msgs, err := s.GetMessages(mailbox)
|
||||
if err != nil {
|
||||
|
||||
@@ -171,6 +171,10 @@ $(document).ready(
|
||||
</div>
|
||||
<div class="panel-body">
|
||||
<table class="metrics">
|
||||
<div class="row">
|
||||
<div class="col-sm-3 col-xs-7"><b>Store Type:</b></div>
|
||||
<div class="col-sm-8 col-xs-5">{{ .storageConfig.Type }}</div>
|
||||
</div>
|
||||
<div class="row">
|
||||
<div class="col-sm-3 col-xs-7"><b>Retention Period:</b></div>
|
||||
<div class="col-sm-8 col-xs-5">
|
||||
|
||||
Reference in New Issue
Block a user