mirror of
https://github.com/jhillyerd/inbucket.git
synced 2026-01-08 04:01:55 +00:00
storage: emit AfterMessageDeleted events (#334)
* Ignore test lua script Signed-off-by: James Hillyerd <james@hillyerd.com> * Wire ExtHost into storage system imports Signed-off-by: James Hillyerd <james@hillyerd.com> * storage/file: emit deleted events Signed-off-by: James Hillyerd <james@hillyerd.com> * storage/mem: emit deleted events Signed-off-by: James Hillyerd <james@hillyerd.com> --------- Signed-off-by: James Hillyerd <james@hillyerd.com>
This commit is contained in:
@@ -10,6 +10,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/inbucket/inbucket/pkg/config"
|
||||
"github.com/inbucket/inbucket/pkg/extension"
|
||||
"github.com/inbucket/inbucket/pkg/storage"
|
||||
"github.com/inbucket/inbucket/pkg/stringutil"
|
||||
"github.com/rs/zerolog/log"
|
||||
@@ -45,10 +46,11 @@ type Store struct {
|
||||
mailPath string
|
||||
messageCap int
|
||||
bufReaderPool sync.Pool
|
||||
extHost *extension.Host
|
||||
}
|
||||
|
||||
// New creates a new DataStore object using the specified path
|
||||
func New(cfg config.Storage) (storage.Store, error) {
|
||||
func New(cfg config.Storage, extHost *extension.Host) (storage.Store, error) {
|
||||
path := cfg.Params["path"]
|
||||
if path == "" {
|
||||
return nil, fmt.Errorf("'path' parameter not specified")
|
||||
@@ -70,6 +72,7 @@ func New(cfg config.Storage) (storage.Store, error) {
|
||||
return bufio.NewReader(nil)
|
||||
},
|
||||
},
|
||||
extHost: extHost,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/inbucket/inbucket/pkg/config"
|
||||
"github.com/inbucket/inbucket/pkg/extension"
|
||||
"github.com/inbucket/inbucket/pkg/extension/event"
|
||||
"github.com/inbucket/inbucket/pkg/message"
|
||||
"github.com/inbucket/inbucket/pkg/storage"
|
||||
@@ -23,18 +24,19 @@ import (
|
||||
|
||||
// TestSuite runs storage package test suite on file store.
|
||||
func TestSuite(t *testing.T) {
|
||||
test.StoreSuite(t, func(conf config.Storage) (storage.Store, func(), error) {
|
||||
ds, _ := setupDataStore(conf)
|
||||
destroy := func() {
|
||||
teardownDataStore(ds)
|
||||
}
|
||||
return ds, destroy, nil
|
||||
})
|
||||
test.StoreSuite(t,
|
||||
func(conf config.Storage, extHost *extension.Host) (storage.Store, func(), error) {
|
||||
ds, _ := setupDataStore(conf, extHost)
|
||||
destroy := func() {
|
||||
teardownDataStore(ds)
|
||||
}
|
||||
return ds, destroy, nil
|
||||
})
|
||||
}
|
||||
|
||||
// Test directory structure created by filestore
|
||||
func TestFSDirStructure(t *testing.T) {
|
||||
ds, logbuf := setupDataStore(config.Storage{})
|
||||
ds, logbuf := setupDataStore(config.Storage{}, extension.NewHost())
|
||||
defer teardownDataStore(ds)
|
||||
root := ds.path
|
||||
|
||||
@@ -112,7 +114,7 @@ func TestFSDirStructure(t *testing.T) {
|
||||
|
||||
// Test missing files
|
||||
func TestFSMissing(t *testing.T) {
|
||||
ds, logbuf := setupDataStore(config.Storage{})
|
||||
ds, logbuf := setupDataStore(config.Storage{}, extension.NewHost())
|
||||
defer teardownDataStore(ds)
|
||||
|
||||
mbName := "fred"
|
||||
@@ -147,7 +149,7 @@ func TestFSMissing(t *testing.T) {
|
||||
|
||||
// Test Get the latest message
|
||||
func TestGetLatestMessage(t *testing.T) {
|
||||
ds, logbuf := setupDataStore(config.Storage{})
|
||||
ds, logbuf := setupDataStore(config.Storage{}, extension.NewHost())
|
||||
defer teardownDataStore(ds)
|
||||
|
||||
// james hashes to 474ba67bdb289c6263b36dfd8a7bed6c85b04943
|
||||
@@ -189,22 +191,25 @@ func TestGetLatestMessage(t *testing.T) {
|
||||
}
|
||||
|
||||
// setupDataStore creates a new FileDataStore in a temporary directory
|
||||
func setupDataStore(cfg config.Storage) (*Store, *bytes.Buffer) {
|
||||
func setupDataStore(cfg config.Storage, extHost *extension.Host) (*Store, *bytes.Buffer) {
|
||||
path, err := ioutil.TempDir("", "inbucket")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// Capture log output.
|
||||
buf := new(bytes.Buffer)
|
||||
log.SetOutput(buf)
|
||||
|
||||
if cfg.Params == nil {
|
||||
cfg.Params = make(map[string]string)
|
||||
}
|
||||
cfg.Params["path"] = path
|
||||
s, err := New(cfg)
|
||||
s, err := New(cfg, extHost)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return s.(*Store), buf
|
||||
}
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"path/filepath"
|
||||
"sync"
|
||||
|
||||
"github.com/inbucket/inbucket/pkg/message"
|
||||
"github.com/inbucket/inbucket/pkg/storage"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
@@ -72,6 +73,10 @@ func (mb *mbox) removeMessage(id string) error {
|
||||
msg = m
|
||||
// Slice around message we are deleting
|
||||
mb.messages = append(mb.messages[:i], mb.messages[i+1:]...)
|
||||
|
||||
// Emit deleted event.
|
||||
mb.store.extHost.Events.AfterMessageDeleted.Emit(message.MakeMetadata(msg))
|
||||
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,6 +8,8 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/inbucket/inbucket/pkg/config"
|
||||
"github.com/inbucket/inbucket/pkg/extension"
|
||||
"github.com/inbucket/inbucket/pkg/message"
|
||||
"github.com/inbucket/inbucket/pkg/storage"
|
||||
)
|
||||
|
||||
@@ -18,6 +20,7 @@ type Store struct {
|
||||
cap int // Per-mailbox message cap.
|
||||
incoming chan *msgDone // New messages for size enforcer.
|
||||
remove chan *msgDone // Remove deleted messages from size enforcer.
|
||||
extHost *extension.Host
|
||||
}
|
||||
|
||||
type mbox struct {
|
||||
@@ -31,10 +34,11 @@ type mbox struct {
|
||||
var _ storage.Store = &Store{}
|
||||
|
||||
// New returns an emtpy memory store.
|
||||
func New(cfg config.Storage) (storage.Store, error) {
|
||||
func New(cfg config.Storage, extHost *extension.Host) (storage.Store, error) {
|
||||
s := &Store{
|
||||
boxes: make(map[string]*mbox),
|
||||
cap: cfg.MailboxMsgCap,
|
||||
boxes: make(map[string]*mbox),
|
||||
cap: cfg.MailboxMsgCap,
|
||||
extHost: extHost,
|
||||
}
|
||||
if str, ok := cfg.Params["maxkb"]; ok {
|
||||
maxKB, err := strconv.ParseInt(str, 10, 64)
|
||||
@@ -163,6 +167,11 @@ func (s *Store) removeMessage(mailbox, id string) *Message {
|
||||
delete(mb.messages, id)
|
||||
}
|
||||
})
|
||||
|
||||
if m != nil {
|
||||
s.extHost.Events.AfterMessageDeleted.Emit(message.MakeMetadata(m))
|
||||
}
|
||||
|
||||
return m
|
||||
}
|
||||
|
||||
|
||||
@@ -6,27 +6,32 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/inbucket/inbucket/pkg/config"
|
||||
"github.com/inbucket/inbucket/pkg/extension"
|
||||
"github.com/inbucket/inbucket/pkg/storage"
|
||||
"github.com/inbucket/inbucket/pkg/test"
|
||||
)
|
||||
|
||||
// TestSuite runs storage package test suite on file store.
|
||||
func TestSuite(t *testing.T) {
|
||||
test.StoreSuite(t, func(conf config.Storage) (storage.Store, func(), error) {
|
||||
s, _ := New(conf)
|
||||
destroy := func() {}
|
||||
return s, destroy, nil
|
||||
})
|
||||
test.StoreSuite(t,
|
||||
func(conf config.Storage, extHost *extension.Host) (storage.Store, func(), error) {
|
||||
s, _ := New(conf, extHost)
|
||||
destroy := func() {}
|
||||
return s, destroy, nil
|
||||
})
|
||||
}
|
||||
|
||||
// TestMessageList verifies the operation of the global message list: mem.Store.messages.
|
||||
func TestMaxSize(t *testing.T) {
|
||||
extHost := extension.NewHost()
|
||||
maxSize := int64(2048)
|
||||
s, _ := New(config.Storage{Params: map[string]string{"maxkb": "2"}})
|
||||
s, _ := New(config.Storage{Params: map[string]string{"maxkb": "2"}}, extHost)
|
||||
boxes := []string{"alpha", "beta", "whiskey", "tango", "foxtrot"}
|
||||
|
||||
// Ensure capacity so we do not block population.
|
||||
n := 10
|
||||
// total := 50
|
||||
sizeChan := make(chan int64, len(boxes))
|
||||
|
||||
// Populate mailboxes concurrently.
|
||||
for _, mailbox := range boxes {
|
||||
go func(mailbox string) {
|
||||
@@ -38,11 +43,13 @@ func TestMaxSize(t *testing.T) {
|
||||
sizeChan <- size
|
||||
}(mailbox)
|
||||
}
|
||||
|
||||
// Wait for sizes.
|
||||
sentBytesTotal := int64(0)
|
||||
for range boxes {
|
||||
sentBytesTotal += <-sizeChan
|
||||
}
|
||||
|
||||
// Calculate actual size.
|
||||
gotSize := int64(0)
|
||||
s.VisitMailboxes(func(messages []storage.Message) bool {
|
||||
@@ -51,6 +58,7 @@ func TestMaxSize(t *testing.T) {
|
||||
}
|
||||
return true
|
||||
})
|
||||
|
||||
// Verify state. Messages are ~75 bytes each.
|
||||
if gotSize < 2048-75 {
|
||||
t.Errorf("Got total size %v, want greater than: %v", gotSize, 2048-75)
|
||||
@@ -58,6 +66,7 @@ func TestMaxSize(t *testing.T) {
|
||||
if gotSize > maxSize {
|
||||
t.Errorf("Got total size %v, want less than: %v", gotSize, maxSize)
|
||||
}
|
||||
|
||||
// Purge all messages concurrently, testing for deadlocks.
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(len(boxes))
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/inbucket/inbucket/pkg/config"
|
||||
"github.com/inbucket/inbucket/pkg/extension"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -19,7 +20,7 @@ var (
|
||||
ErrNotWritable = errors.New("Message not writable")
|
||||
|
||||
// Constructors tracks registered storage constructors
|
||||
Constructors = make(map[string]func(config.Storage) (Store, error))
|
||||
Constructors = make(map[string]func(config.Storage, *extension.Host) (Store, error))
|
||||
)
|
||||
|
||||
// Store is the interface Inbucket uses to interact with storage implementations.
|
||||
@@ -48,9 +49,9 @@ type Message interface {
|
||||
}
|
||||
|
||||
// FromConfig creates an instance of the Store based on the provided configuration.
|
||||
func FromConfig(c config.Storage) (store Store, err error) {
|
||||
func FromConfig(c config.Storage, extHost *extension.Host) (store Store, err error) {
|
||||
if cf := Constructors[c.Type]; cf != nil {
|
||||
return cf(c)
|
||||
return cf(c, extHost)
|
||||
}
|
||||
return nil, fmt.Errorf("unknown storage type configured: %q", c.Type)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user