diff --git a/.gitignore b/.gitignore index b9ae205..8842df9 100644 --- a/.gitignore +++ b/.gitignore @@ -55,3 +55,6 @@ repl-temp-* # Dependency directories /ui/node_modules /ui/.parcel-cache + +# Test lua files +/inbucket.lua diff --git a/pkg/extension/broker.go b/pkg/extension/broker.go index 93caad3..fa711eb 100644 --- a/pkg/extension/broker.go +++ b/pkg/extension/broker.go @@ -1,6 +1,10 @@ package extension -import "sync" +import ( + "errors" + "sync" + "time" +) // EventBroker maintains a list of listeners interested in a specific type // of event. @@ -55,3 +59,38 @@ func (eb *EventBroker[E, R]) lockedRemoveListener(name string) { } } } + +// AsyncTestListener returns a func that will wait for an event and return it, or timeout +// with an error. +func (eb *EventBroker[E, R]) AsyncTestListener(capacity int) func() (*E, error) { + const name = "asyncTestListener" + + // Send event down channel. + events := make(chan E, capacity) + eb.AddListener(name, + func(msg E) *R { + events <- msg + return nil + }) + + count := 0 + + return func() (*E, error) { + count++ + + defer func() { + if count >= capacity { + eb.RemoveListener(name) + close(events) + } + }() + + select { + case event := <-events: + return &event, nil + + case <-time.After(time.Second * 2): + return nil, errors.New("Timeout waiting for event") + } + } +} diff --git a/pkg/extension/host.go b/pkg/extension/host.go index 5d3ab3e..bef9846 100644 --- a/pkg/extension/host.go +++ b/pkg/extension/host.go @@ -20,8 +20,9 @@ type Host struct { // processed asynchronously with respect to the rest of Inbuckets operation. However, an event // listener will not be called until the one before it complets. type Events struct { - AfterMessageStored EventBroker[event.MessageMetadata, Void] - BeforeMailAccepted EventBroker[event.AddressParts, bool] + AfterMessageDeleted EventBroker[event.MessageMetadata, Void] + AfterMessageStored EventBroker[event.MessageMetadata, Void] + BeforeMailAccepted EventBroker[event.AddressParts, bool] } // Void indicates the event emitter will ignore any value returned by listeners. diff --git a/pkg/message/manager.go b/pkg/message/manager.go index b62f268..bd89d79 100644 --- a/pkg/message/manager.go +++ b/pkg/message/manager.go @@ -98,7 +98,7 @@ func (s *StoreManager) GetMetadata(mailbox string) ([]*event.MessageMetadata, er } metas := make([]*event.MessageMetadata, len(messages)) for i, sm := range messages { - metas[i] = makeMetadata(sm) + metas[i] = MakeMetadata(sm) } return metas, nil } @@ -118,7 +118,7 @@ func (s *StoreManager) GetMessage(mailbox, id string) (*Message, error) { return nil, err } _ = r.Close() - header := makeMetadata(sm) + header := MakeMetadata(sm) return &Message{MessageMetadata: *header, env: env}, nil } @@ -153,8 +153,8 @@ func (s *StoreManager) MailboxForAddress(mailbox string) (string, error) { return s.AddrPolicy.ExtractMailbox(mailbox) } -// makeMetadata populates Metadata from a storage.Message. -func makeMetadata(m storage.Message) *event.MessageMetadata { +// MakeMetadata populates Metadata from a storage.Message. +func MakeMetadata(m storage.Message) *event.MessageMetadata { return &event.MessageMetadata{ Mailbox: m.Mailbox(), ID: m.ID(), diff --git a/pkg/server/lifecycle.go b/pkg/server/lifecycle.go index 757b397..a6b6faa 100644 --- a/pkg/server/lifecycle.go +++ b/pkg/server/lifecycle.go @@ -42,7 +42,7 @@ func FullAssembly(conf *config.Root) (*Services, error) { } // Configure storage. - store, err := storage.FromConfig(conf.Storage) + store, err := storage.FromConfig(conf.Storage, extHost) if err != nil { return nil, err } diff --git a/pkg/storage/file/fstore.go b/pkg/storage/file/fstore.go index b08b021..d1e6231 100644 --- a/pkg/storage/file/fstore.go +++ b/pkg/storage/file/fstore.go @@ -10,6 +10,7 @@ import ( "time" "github.com/inbucket/inbucket/pkg/config" + "github.com/inbucket/inbucket/pkg/extension" "github.com/inbucket/inbucket/pkg/storage" "github.com/inbucket/inbucket/pkg/stringutil" "github.com/rs/zerolog/log" @@ -45,10 +46,11 @@ type Store struct { mailPath string messageCap int bufReaderPool sync.Pool + extHost *extension.Host } // New creates a new DataStore object using the specified path -func New(cfg config.Storage) (storage.Store, error) { +func New(cfg config.Storage, extHost *extension.Host) (storage.Store, error) { path := cfg.Params["path"] if path == "" { return nil, fmt.Errorf("'path' parameter not specified") @@ -70,6 +72,7 @@ func New(cfg config.Storage) (storage.Store, error) { return bufio.NewReader(nil) }, }, + extHost: extHost, }, nil } diff --git a/pkg/storage/file/fstore_test.go b/pkg/storage/file/fstore_test.go index 71c6c48..bd325cd 100644 --- a/pkg/storage/file/fstore_test.go +++ b/pkg/storage/file/fstore_test.go @@ -14,6 +14,7 @@ import ( "time" "github.com/inbucket/inbucket/pkg/config" + "github.com/inbucket/inbucket/pkg/extension" "github.com/inbucket/inbucket/pkg/extension/event" "github.com/inbucket/inbucket/pkg/message" "github.com/inbucket/inbucket/pkg/storage" @@ -23,18 +24,19 @@ import ( // TestSuite runs storage package test suite on file store. func TestSuite(t *testing.T) { - test.StoreSuite(t, func(conf config.Storage) (storage.Store, func(), error) { - ds, _ := setupDataStore(conf) - destroy := func() { - teardownDataStore(ds) - } - return ds, destroy, nil - }) + test.StoreSuite(t, + func(conf config.Storage, extHost *extension.Host) (storage.Store, func(), error) { + ds, _ := setupDataStore(conf, extHost) + destroy := func() { + teardownDataStore(ds) + } + return ds, destroy, nil + }) } // Test directory structure created by filestore func TestFSDirStructure(t *testing.T) { - ds, logbuf := setupDataStore(config.Storage{}) + ds, logbuf := setupDataStore(config.Storage{}, extension.NewHost()) defer teardownDataStore(ds) root := ds.path @@ -112,7 +114,7 @@ func TestFSDirStructure(t *testing.T) { // Test missing files func TestFSMissing(t *testing.T) { - ds, logbuf := setupDataStore(config.Storage{}) + ds, logbuf := setupDataStore(config.Storage{}, extension.NewHost()) defer teardownDataStore(ds) mbName := "fred" @@ -147,7 +149,7 @@ func TestFSMissing(t *testing.T) { // Test Get the latest message func TestGetLatestMessage(t *testing.T) { - ds, logbuf := setupDataStore(config.Storage{}) + ds, logbuf := setupDataStore(config.Storage{}, extension.NewHost()) defer teardownDataStore(ds) // james hashes to 474ba67bdb289c6263b36dfd8a7bed6c85b04943 @@ -189,22 +191,25 @@ func TestGetLatestMessage(t *testing.T) { } // setupDataStore creates a new FileDataStore in a temporary directory -func setupDataStore(cfg config.Storage) (*Store, *bytes.Buffer) { +func setupDataStore(cfg config.Storage, extHost *extension.Host) (*Store, *bytes.Buffer) { path, err := ioutil.TempDir("", "inbucket") if err != nil { panic(err) } + // Capture log output. buf := new(bytes.Buffer) log.SetOutput(buf) + if cfg.Params == nil { cfg.Params = make(map[string]string) } cfg.Params["path"] = path - s, err := New(cfg) + s, err := New(cfg, extHost) if err != nil { panic(err) } + return s.(*Store), buf } diff --git a/pkg/storage/file/mbox.go b/pkg/storage/file/mbox.go index 42fe652..035f450 100644 --- a/pkg/storage/file/mbox.go +++ b/pkg/storage/file/mbox.go @@ -9,6 +9,7 @@ import ( "path/filepath" "sync" + "github.com/inbucket/inbucket/pkg/message" "github.com/inbucket/inbucket/pkg/storage" "github.com/rs/zerolog/log" ) @@ -72,6 +73,10 @@ func (mb *mbox) removeMessage(id string) error { msg = m // Slice around message we are deleting mb.messages = append(mb.messages[:i], mb.messages[i+1:]...) + + // Emit deleted event. + mb.store.extHost.Events.AfterMessageDeleted.Emit(message.MakeMetadata(msg)) + break } } diff --git a/pkg/storage/mem/store.go b/pkg/storage/mem/store.go index 8b4d017..fd1ceae 100644 --- a/pkg/storage/mem/store.go +++ b/pkg/storage/mem/store.go @@ -8,6 +8,8 @@ import ( "sync" "github.com/inbucket/inbucket/pkg/config" + "github.com/inbucket/inbucket/pkg/extension" + "github.com/inbucket/inbucket/pkg/message" "github.com/inbucket/inbucket/pkg/storage" ) @@ -18,6 +20,7 @@ type Store struct { cap int // Per-mailbox message cap. incoming chan *msgDone // New messages for size enforcer. remove chan *msgDone // Remove deleted messages from size enforcer. + extHost *extension.Host } type mbox struct { @@ -31,10 +34,11 @@ type mbox struct { var _ storage.Store = &Store{} // New returns an emtpy memory store. -func New(cfg config.Storage) (storage.Store, error) { +func New(cfg config.Storage, extHost *extension.Host) (storage.Store, error) { s := &Store{ - boxes: make(map[string]*mbox), - cap: cfg.MailboxMsgCap, + boxes: make(map[string]*mbox), + cap: cfg.MailboxMsgCap, + extHost: extHost, } if str, ok := cfg.Params["maxkb"]; ok { maxKB, err := strconv.ParseInt(str, 10, 64) @@ -163,6 +167,11 @@ func (s *Store) removeMessage(mailbox, id string) *Message { delete(mb.messages, id) } }) + + if m != nil { + s.extHost.Events.AfterMessageDeleted.Emit(message.MakeMetadata(m)) + } + return m } diff --git a/pkg/storage/mem/store_test.go b/pkg/storage/mem/store_test.go index 1fa27b7..efdf2bf 100644 --- a/pkg/storage/mem/store_test.go +++ b/pkg/storage/mem/store_test.go @@ -6,27 +6,32 @@ import ( "time" "github.com/inbucket/inbucket/pkg/config" + "github.com/inbucket/inbucket/pkg/extension" "github.com/inbucket/inbucket/pkg/storage" "github.com/inbucket/inbucket/pkg/test" ) // TestSuite runs storage package test suite on file store. func TestSuite(t *testing.T) { - test.StoreSuite(t, func(conf config.Storage) (storage.Store, func(), error) { - s, _ := New(conf) - destroy := func() {} - return s, destroy, nil - }) + test.StoreSuite(t, + func(conf config.Storage, extHost *extension.Host) (storage.Store, func(), error) { + s, _ := New(conf, extHost) + destroy := func() {} + return s, destroy, nil + }) } // TestMessageList verifies the operation of the global message list: mem.Store.messages. func TestMaxSize(t *testing.T) { + extHost := extension.NewHost() maxSize := int64(2048) - s, _ := New(config.Storage{Params: map[string]string{"maxkb": "2"}}) + s, _ := New(config.Storage{Params: map[string]string{"maxkb": "2"}}, extHost) boxes := []string{"alpha", "beta", "whiskey", "tango", "foxtrot"} + + // Ensure capacity so we do not block population. n := 10 - // total := 50 sizeChan := make(chan int64, len(boxes)) + // Populate mailboxes concurrently. for _, mailbox := range boxes { go func(mailbox string) { @@ -38,11 +43,13 @@ func TestMaxSize(t *testing.T) { sizeChan <- size }(mailbox) } + // Wait for sizes. sentBytesTotal := int64(0) for range boxes { sentBytesTotal += <-sizeChan } + // Calculate actual size. gotSize := int64(0) s.VisitMailboxes(func(messages []storage.Message) bool { @@ -51,6 +58,7 @@ func TestMaxSize(t *testing.T) { } return true }) + // Verify state. Messages are ~75 bytes each. if gotSize < 2048-75 { t.Errorf("Got total size %v, want greater than: %v", gotSize, 2048-75) @@ -58,6 +66,7 @@ func TestMaxSize(t *testing.T) { if gotSize > maxSize { t.Errorf("Got total size %v, want less than: %v", gotSize, maxSize) } + // Purge all messages concurrently, testing for deadlocks. wg := &sync.WaitGroup{} wg.Add(len(boxes)) diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index d644429..a9c00ec 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -9,6 +9,7 @@ import ( "time" "github.com/inbucket/inbucket/pkg/config" + "github.com/inbucket/inbucket/pkg/extension" ) var ( @@ -19,7 +20,7 @@ var ( ErrNotWritable = errors.New("Message not writable") // Constructors tracks registered storage constructors - Constructors = make(map[string]func(config.Storage) (Store, error)) + Constructors = make(map[string]func(config.Storage, *extension.Host) (Store, error)) ) // Store is the interface Inbucket uses to interact with storage implementations. @@ -48,9 +49,9 @@ type Message interface { } // FromConfig creates an instance of the Store based on the provided configuration. -func FromConfig(c config.Storage) (store Store, err error) { +func FromConfig(c config.Storage, extHost *extension.Host) (store Store, err error) { if cf := Constructors[c.Type]; cf != nil { - return cf(c) + return cf(c, extHost) } return nil, fmt.Errorf("unknown storage type configured: %q", c.Type) } diff --git a/pkg/test/integration_test.go b/pkg/test/integration_test.go index 4f54ef7..60d29bc 100644 --- a/pkg/test/integration_test.go +++ b/pkg/test/integration_test.go @@ -217,6 +217,10 @@ func formatMessage(m *client.Message) []byte { func startServer() (func(), error) { // TODO Move integration setup into lifecycle. log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr, NoColor: true}) + + extHost := extension.NewHost() + + // Storage setup. storage.Constructors["memory"] = mem.New os.Clearenv() conf, err := config.Process() @@ -224,7 +228,7 @@ func startServer() (func(), error) { return nil, err } svcCtx, svcCancel := context.WithCancel(context.Background()) - store, err := storage.FromConfig(conf.Storage) + store, err := storage.FromConfig(conf.Storage, extHost) if err != nil { svcCancel() return nil, err @@ -232,7 +236,6 @@ func startServer() (func(), error) { // TODO Test should not pass with unstarted msghub. addrPolicy := &policy.Addressing{Config: conf} - extHost := extension.NewHost() msgHub := msghub.New(conf.Web.MonitorHistory, extHost) mmanager := &message.StoreManager{AddrPolicy: addrPolicy, Store: store, ExtHost: extHost} diff --git a/pkg/test/storage_suite.go b/pkg/test/storage_suite.go index 3befcdd..fa13260 100644 --- a/pkg/test/storage_suite.go +++ b/pkg/test/storage_suite.go @@ -10,19 +10,23 @@ import ( "time" "github.com/inbucket/inbucket/pkg/config" + "github.com/inbucket/inbucket/pkg/extension" "github.com/inbucket/inbucket/pkg/extension/event" "github.com/inbucket/inbucket/pkg/message" "github.com/inbucket/inbucket/pkg/storage" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) // StoreFactory returns a new store for the test suite. -type StoreFactory func(config.Storage) (store storage.Store, destroy func(), err error) +type StoreFactory func( + config.Storage, *extension.Host) (store storage.Store, destroy func(), err error) // StoreSuite runs a set of general tests on the provided Store. func StoreSuite(t *testing.T, factory StoreFactory) { testCases := []struct { name string - test func(*testing.T, storage.Store) + test func(*testing.T, storage.Store, *extension.Host) conf config.Storage }{ {"metadata", testMetadata, config.Storage{}}, @@ -40,18 +44,19 @@ func StoreSuite(t *testing.T, factory StoreFactory) { } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - store, destroy, err := factory(tc.conf) + extHost := extension.NewHost() + store, destroy, err := factory(tc.conf, extHost) if err != nil { t.Fatal(err) } - tc.test(t, store) + tc.test(t, store, extHost) destroy() }) } } // testMetadata verifies message metadata is stored and retrieved correctly. -func testMetadata(t *testing.T, store storage.Store) { +func testMetadata(t *testing.T, store storage.Store, extHost *extension.Host) { mailbox := "testmailbox" from := &mail.Address{Name: "From Person", Address: "from@person.com"} to := []*mail.Address{ @@ -118,7 +123,7 @@ func testMetadata(t *testing.T, store storage.Store) { } // testContent generates some binary content and makes sure it is correctly retrieved. -func testContent(t *testing.T, store storage.Store) { +func testContent(t *testing.T, store storage.Store, extHost *extension.Host) { content := make([]byte, 5000) for i := 0; i < len(content); i++ { content[i] = byte(i % 256) @@ -175,7 +180,7 @@ func testContent(t *testing.T, store storage.Store) { // testDeliveryOrder delivers several messages to the same mailbox, meanwhile querying its contents // with a new GetMessages call each cycle. -func testDeliveryOrder(t *testing.T, store storage.Store) { +func testDeliveryOrder(t *testing.T, store storage.Store, extHost *extension.Host) { mailbox := "fred" subjects := []string{"alpha", "bravo", "charlie", "delta", "echo"} for i, subj := range subjects { @@ -195,7 +200,7 @@ func testDeliveryOrder(t *testing.T, store storage.Store) { // testLatest delivers several messages to the same mailbox, and confirms the id `latest` returns // the last message sent. -func testLatest(t *testing.T, store storage.Store) { +func testLatest(t *testing.T, store storage.Store, extHost *extension.Host) { mailbox := "fred" subjects := []string{"alpha", "bravo", "charlie", "delta", "echo"} for _, subj := range subjects { @@ -217,14 +222,14 @@ func testLatest(t *testing.T, store storage.Store) { } // testNaming ensures the store does not enforce local part mailbox naming. -func testNaming(t *testing.T, store storage.Store) { +func testNaming(t *testing.T, store storage.Store, extHost *extension.Host) { DeliverToStore(t, store, "fred@fish.net", "disk #27", time.Now()) GetAndCountMessages(t, store, "fred", 0) GetAndCountMessages(t, store, "fred@fish.net", 1) } // testSize verifies message content size metadata values. -func testSize(t *testing.T, store storage.Store) { +func testSize(t *testing.T, store storage.Store, extHost *extension.Host) { mailbox := "fred" subjects := []string{"a", "br", "much longer than the others"} sentIds := make([]string, len(subjects)) @@ -248,7 +253,7 @@ func testSize(t *testing.T, store storage.Store) { } // testSeen verifies a message can be marked as seen. -func testSeen(t *testing.T, store storage.Store) { +func testSeen(t *testing.T, store storage.Store, extHost *extension.Host) { mailbox := "lisa" id1, _ := DeliverToStore(t, store, mailbox, "whatever", time.Now()) id2, _ := DeliverToStore(t, store, mailbox, "hello?", time.Now()) @@ -284,22 +289,24 @@ func testSeen(t *testing.T, store storage.Store) { } // testDelete creates and deletes some messages. -func testDelete(t *testing.T, store storage.Store) { +func testDelete(t *testing.T, store storage.Store, extHost *extension.Host) { mailbox := "fred" subjects := []string{"alpha", "bravo", "charlie", "delta", "echo"} for _, subj := range subjects { DeliverToStore(t, store, mailbox, subj, time.Now()) } msgs := GetAndCountMessages(t, store, mailbox, len(subjects)) + + // Subscribe to events. + eventListener := extHost.Events.AfterMessageDeleted.AsyncTestListener(2) + // Delete a couple messages. - err := store.RemoveMessage(mailbox, msgs[1].ID()) - if err != nil { - t.Fatal(err) - } - err = store.RemoveMessage(mailbox, msgs[3].ID()) - if err != nil { - t.Fatal(err) + deleteIDs := []string{msgs[1].ID(), msgs[3].ID()} + for _, id := range deleteIDs { + err := store.RemoveMessage(mailbox, id) + require.NoError(t, err) } + // Confirm deletion. subjects = []string{"alpha", "charlie", "echo"} msgs = GetAndCountMessages(t, store, mailbox, len(subjects)) @@ -309,6 +316,17 @@ func testDelete(t *testing.T, store storage.Store) { t.Errorf("Got subject %q, want %q", got, want) } } + + // Capture events and check correct IDs were emitted. + ev1, err := eventListener() + require.NoError(t, err) + ev2, err := eventListener() + require.NoError(t, err) + eventIDs := []string{ev1.ID, ev2.ID} + for _, id := range deleteIDs { + assert.Contains(t, eventIDs, id) + } + // Try appending one more. DeliverToStore(t, store, mailbox, "foxtrot", time.Now()) subjects = []string{"alpha", "charlie", "echo", "foxtrot"} @@ -322,7 +340,7 @@ func testDelete(t *testing.T, store storage.Store) { } // testPurge makes sure mailboxes can be purged. -func testPurge(t *testing.T, store storage.Store) { +func testPurge(t *testing.T, store storage.Store, extHost *extension.Host) { mailbox := "fred" subjects := []string{"alpha", "bravo", "charlie", "delta", "echo"} for _, subj := range subjects { @@ -338,7 +356,7 @@ func testPurge(t *testing.T, store storage.Store) { } // testMsgCap verifies the message cap is enforced. -func testMsgCap(t *testing.T, store storage.Store) { +func testMsgCap(t *testing.T, store storage.Store, extHost *extension.Host) { mbCap := 10 mailbox := "captain" for i := 0; i < 20; i++ { @@ -365,7 +383,7 @@ func testMsgCap(t *testing.T, store storage.Store) { } // testNoMsgCap verfies a cap of 0 is not enforced. -func testNoMsgCap(t *testing.T, store storage.Store) { +func testNoMsgCap(t *testing.T, store storage.Store, extHost *extension.Host) { mailbox := "captain" for i := 0; i < 20; i++ { subj := fmt.Sprintf("subject %v", i) @@ -376,7 +394,7 @@ func testNoMsgCap(t *testing.T, store storage.Store) { // testVisitMailboxes creates some mailboxes and confirms the VisitMailboxes method visits all of // them. -func testVisitMailboxes(t *testing.T, ds storage.Store) { +func testVisitMailboxes(t *testing.T, ds storage.Store, extHost *extension.Host) { boxes := []string{"abby", "bill", "christa", "donald", "evelyn"} for _, name := range boxes { DeliverToStore(t, ds, name, "Old Message", time.Now().Add(-24*time.Hour)) diff --git a/shell.nix b/shell.nix index f1c5931..263b912 100644 --- a/shell.nix +++ b/shell.nix @@ -15,6 +15,7 @@ pkgs.mkShell { buildInputs = with pkgs; [ act dpkg + delve elmPackages.elm elmPackages.elm-analyse elmPackages.elm-format