From f0d457b8f50712e3cdb8c59d9841016861fd84b2 Mon Sep 17 00:00:00 2001 From: James Hillyerd Date: Mon, 16 Jan 2023 21:30:47 -0800 Subject: [PATCH] extension: Add MessageStored event (#316) * Replace existing direct StoreManager->msghub communication with this event * For #280 #309 #312 #310 Signed-off-by: James Hillyerd Signed-off-by: James Hillyerd --- pkg/extension/event/events.go | 17 +++++++++++ pkg/extension/host.go | 23 +++++++++++++++ pkg/message/manager.go | 32 +++++++++++---------- pkg/msghub/hub.go | 35 +++++++++++----------- pkg/msghub/hub_test.go | 51 +++++++++++++++++---------------- pkg/rest/socketv1_controller.go | 16 ++++++----- pkg/server/lifecycle.go | 11 +++++-- pkg/test/integration_test.go | 6 ++-- 8 files changed, 124 insertions(+), 67 deletions(-) create mode 100644 pkg/extension/event/events.go create mode 100644 pkg/extension/host.go diff --git a/pkg/extension/event/events.go b/pkg/extension/event/events.go new file mode 100644 index 0000000..4f83e3b --- /dev/null +++ b/pkg/extension/event/events.go @@ -0,0 +1,17 @@ +package event + +import ( + "net/mail" + "time" +) + +// MessageMetadata contains the basic header data for a message event. +type MessageMetadata struct { + Mailbox string + ID string + From *mail.Address + To []*mail.Address + Date time.Time + Subject string + Size int64 +} diff --git a/pkg/extension/host.go b/pkg/extension/host.go new file mode 100644 index 0000000..ac86b7a --- /dev/null +++ b/pkg/extension/host.go @@ -0,0 +1,23 @@ +package extension + +import ( + "github.com/inbucket/inbucket/pkg/extension/event" +) + +// Host defines extension points for Inbucket. +type Host struct { + Events *Events +} + +// Events defines all the event types supported by the extension host. +type Events struct { + MessageStored EventBroker[event.MessageMetadata, Void] +} + +// Void indicates the event emitter will ignore any value returned by listeners. +type Void struct{} + +// NewHost creates a new extension host. +func NewHost() *Host { + return &Host{Events: &Events{}} +} diff --git a/pkg/message/manager.go b/pkg/message/manager.go index f942ec6..75c5f2b 100644 --- a/pkg/message/manager.go +++ b/pkg/message/manager.go @@ -7,10 +7,10 @@ import ( "strings" "time" - "github.com/inbucket/inbucket/pkg/msghub" + "github.com/inbucket/inbucket/pkg/extension" + "github.com/inbucket/inbucket/pkg/extension/event" "github.com/inbucket/inbucket/pkg/policy" "github.com/inbucket/inbucket/pkg/storage" - "github.com/inbucket/inbucket/pkg/stringutil" "github.com/jhillyerd/enmime" "github.com/rs/zerolog/log" ) @@ -37,7 +37,7 @@ type Manager interface { type StoreManager struct { AddrPolicy *policy.Addressing Store storage.Store - Hub *msghub.Hub + ExtHost *extension.Host } // Deliver submits a new message to the store. @@ -65,6 +65,7 @@ func (s *StoreManager) Deliver( toaddr[i] = &torecip.Address } } + log.Debug().Str("module", "message").Str("mailbox", to.Mailbox).Msg("Delivering message") delivery := &Delivery{ Meta: Metadata{ @@ -80,19 +81,20 @@ func (s *StoreManager) Deliver( if err != nil { return "", err } - if s.Hub != nil { - // Broadcast message information. - broadcast := msghub.Message{ - Mailbox: to.Mailbox, - ID: id, - From: stringutil.StringAddress(delivery.From()), - To: stringutil.StringAddressList(delivery.To()), - Subject: delivery.Subject(), - Date: delivery.Date(), - Size: delivery.Size(), - } - s.Hub.Dispatch(broadcast) + + // Emit message stored event. + event := event.MessageMetadata{ + Mailbox: to.Mailbox, + ID: id, + From: delivery.From(), + To: delivery.To(), + Subject: delivery.Subject(), + Date: delivery.Date(), + Size: delivery.Size(), } + // TODO Add a unit test to make sure we send this. + s.ExtHost.Events.MessageStored.Emit(&event) + return id, nil } diff --git a/pkg/msghub/hub.go b/pkg/msghub/hub.go index 59f5445..30c8bfa 100644 --- a/pkg/msghub/hub.go +++ b/pkg/msghub/hub.go @@ -3,26 +3,17 @@ package msghub import ( "container/ring" "context" - "time" + + "github.com/inbucket/inbucket/pkg/extension" + "github.com/inbucket/inbucket/pkg/extension/event" ) // Length of msghub operation queue const opChanLen = 100 -// Message contains the basic header data for a message -type Message struct { - Mailbox string - ID string - From string - To []string - Subject string - Date time.Time - Size int64 -} - // Listener receives the contents of the history buffer, followed by new messages type Listener interface { - Receive(msg Message) error + Receive(msg event.MessageMetadata) error } // Hub relays messages on to its listeners @@ -36,12 +27,21 @@ type Hub struct { // New constructs a new Hub which will cache historyLen messages in memory for playback to future // listeners. A goroutine is created to handle incoming messages; it will run until the provided // context is canceled. -func New(historyLen int) *Hub { - return &Hub{ +func New(historyLen int, extHost *extension.Host) *Hub { + hub := &Hub{ history: ring.New(historyLen), listeners: make(map[Listener]struct{}), opChan: make(chan func(h *Hub), opChanLen), } + + // Register an extension event listener for MessageStored. + extHost.Events.MessageStored.AddListener("msghub", + func(msg event.MessageMetadata) *extension.Void { + hub.Dispatch(msg) + return nil + }) + + return hub } // Start Hub processing loop. @@ -60,12 +60,13 @@ func (hub *Hub) Start(ctx context.Context) { // Dispatch queues a message for broadcast by the hub. The message will be placed into the // history buffer and then relayed to all registered listeners. -func (hub *Hub) Dispatch(msg Message) { +func (hub *Hub) Dispatch(msg event.MessageMetadata) { hub.opChan <- func(h *Hub) { if h.history != nil { // Add to history buffer h.history.Value = msg h.history = h.history.Next() + // Deliver message to all listeners, removing listeners if they return an error for l := range h.listeners { if err := l.Receive(msg); err != nil { @@ -82,7 +83,7 @@ func (hub *Hub) AddListener(l Listener) { // Playback log h.history.Do(func(v interface{}) { if v != nil { - l.Receive(v.(Message)) + l.Receive(v.(event.MessageMetadata)) } }) diff --git a/pkg/msghub/hub_test.go b/pkg/msghub/hub_test.go index 5e0ec5c..a269c47 100644 --- a/pkg/msghub/hub_test.go +++ b/pkg/msghub/hub_test.go @@ -5,13 +5,16 @@ import ( "fmt" "testing" "time" + + "github.com/inbucket/inbucket/pkg/extension" + "github.com/inbucket/inbucket/pkg/extension/event" ) // testListener implements the Listener interface, mock for unit tests type testListener struct { - messages []*Message // received messages - wantMessages int // how many messages this listener wants to receive - errorAfter int // when != 0, messages until Receive() begins returning error + messages []*event.MessageMetadata // received messages + wantMessages int // how many messages this listener wants to receive + errorAfter int // when != 0, messages until Receive() begins returning error done chan struct{} // closed once we have received wantMessages overflow chan struct{} // closed if we receive wantMessages+1 @@ -19,7 +22,7 @@ type testListener struct { func newTestListener(want int) *testListener { l := &testListener{ - messages: make([]*Message, 0, want*2), + messages: make([]*event.MessageMetadata, 0, want*2), wantMessages: want, done: make(chan struct{}), overflow: make(chan struct{}), @@ -32,7 +35,7 @@ func newTestListener(want int) *testListener { // Receive a Message, store it in the messages slice, close applicable channels, and return an error // if instructed -func (l *testListener) Receive(msg Message) error { +func (l *testListener) Receive(msg event.MessageMetadata) error { l.messages = append(l.messages, &msg) if len(l.messages) == l.wantMessages { close(l.done) @@ -52,7 +55,7 @@ func (l *testListener) String() string { } func TestHubNew(t *testing.T) { - hub := New(5) + hub := New(5, extension.NewHost()) if hub == nil { t.Fatal("New() == nil, expected a new Hub") } @@ -61,9 +64,9 @@ func TestHubNew(t *testing.T) { func TestHubZeroLen(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hub := New(0) + hub := New(0, extension.NewHost()) go hub.Start(ctx) - m := Message{} + m := event.MessageMetadata{} for i := 0; i < 100; i++ { hub.Dispatch(m) } @@ -73,9 +76,9 @@ func TestHubZeroLen(t *testing.T) { func TestHubZeroListeners(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hub := New(5) + hub := New(5, extension.NewHost()) go hub.Start(ctx) - m := Message{} + m := event.MessageMetadata{} for i := 0; i < 100; i++ { hub.Dispatch(m) } @@ -85,9 +88,9 @@ func TestHubZeroListeners(t *testing.T) { func TestHubOneListener(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hub := New(5) + hub := New(5, extension.NewHost()) go hub.Start(ctx) - m := Message{} + m := event.MessageMetadata{} l := newTestListener(1) hub.AddListener(l) @@ -104,9 +107,9 @@ func TestHubOneListener(t *testing.T) { func TestHubRemoveListener(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hub := New(5) + hub := New(5, extension.NewHost()) go hub.Start(ctx) - m := Message{} + m := event.MessageMetadata{} l := newTestListener(1) hub.AddListener(l) @@ -127,9 +130,9 @@ func TestHubRemoveListener(t *testing.T) { func TestHubRemoveListenerOnError(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hub := New(5) + hub := New(5, extension.NewHost()) go hub.Start(ctx) - m := Message{} + m := event.MessageMetadata{} // error after 1 means listener should receive 2 messages before being removed l := newTestListener(2) @@ -154,15 +157,15 @@ func TestHubRemoveListenerOnError(t *testing.T) { func TestHubHistoryReplay(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hub := New(100) + hub := New(100, extension.NewHost()) go hub.Start(ctx) l1 := newTestListener(3) hub.AddListener(l1) // Broadcast 3 messages with no listeners - msgs := make([]Message, 3) + msgs := make([]event.MessageMetadata, 3) for i := 0; i < len(msgs); i++ { - msgs[i] = Message{ + msgs[i] = event.MessageMetadata{ Subject: fmt.Sprintf("subj %v", i), } hub.Dispatch(msgs[i]) @@ -198,15 +201,15 @@ func TestHubHistoryReplay(t *testing.T) { func TestHubHistoryReplayWrap(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hub := New(5) + hub := New(5, extension.NewHost()) go hub.Start(ctx) l1 := newTestListener(20) hub.AddListener(l1) // Broadcast more messages than the hub can hold - msgs := make([]Message, 20) + msgs := make([]event.MessageMetadata, 20) for i := 0; i < len(msgs); i++ { - msgs[i] = Message{ + msgs[i] = event.MessageMetadata{ Subject: fmt.Sprintf("subj %v", i), } hub.Dispatch(msgs[i]) @@ -241,9 +244,9 @@ func TestHubHistoryReplayWrap(t *testing.T) { func TestHubContextCancel(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) - hub := New(5) + hub := New(5, extension.NewHost()) go hub.Start(ctx) - m := Message{} + m := event.MessageMetadata{} l := newTestListener(1) hub.AddListener(l) diff --git a/pkg/rest/socketv1_controller.go b/pkg/rest/socketv1_controller.go index 64e84c2..99f2bcd 100644 --- a/pkg/rest/socketv1_controller.go +++ b/pkg/rest/socketv1_controller.go @@ -5,9 +5,11 @@ import ( "time" "github.com/gorilla/websocket" + "github.com/inbucket/inbucket/pkg/extension/event" "github.com/inbucket/inbucket/pkg/msghub" "github.com/inbucket/inbucket/pkg/rest/model" "github.com/inbucket/inbucket/pkg/server/web" + "github.com/inbucket/inbucket/pkg/stringutil" "github.com/rs/zerolog/log" ) @@ -33,9 +35,9 @@ var upgrader = websocket.Upgrader{ // msgListener handles messages from the msghub type msgListener struct { - hub *msghub.Hub // Global message hub - c chan msghub.Message // Queue of messages from Receive() - mailbox string // Name of mailbox to monitor, "" == all mailboxes + hub *msghub.Hub // Global message hub + c chan event.MessageMetadata // Queue of messages from Receive() + mailbox string // Name of mailbox to monitor, "" == all mailboxes } // newMsgListener creates a listener and registers it. Optional mailbox parameter will restrict @@ -43,7 +45,7 @@ type msgListener struct { func newMsgListener(hub *msghub.Hub, mailbox string) *msgListener { ml := &msgListener{ hub: hub, - c: make(chan msghub.Message, 100), + c: make(chan event.MessageMetadata, 100), mailbox: mailbox, } hub.AddListener(ml) @@ -51,7 +53,7 @@ func newMsgListener(hub *msghub.Hub, mailbox string) *msgListener { } // Receive handles an incoming message -func (ml *msgListener) Receive(msg msghub.Message) error { +func (ml *msgListener) Receive(msg event.MessageMetadata) error { if ml.mailbox != "" && ml.mailbox != msg.Mailbox { // Did not match mailbox name return nil @@ -112,8 +114,8 @@ func (ml *msgListener) WSWriter(conn *websocket.Conn) { header := &model.JSONMessageHeaderV1{ Mailbox: msg.Mailbox, ID: msg.ID, - From: msg.From, - To: msg.To, + From: stringutil.StringAddress(msg.From), + To: stringutil.StringAddressList(msg.To), Subject: msg.Subject, Date: msg.Date, PosixMillis: msg.Date.UnixNano() / 1000000, diff --git a/pkg/server/lifecycle.go b/pkg/server/lifecycle.go index 79b477d..fa5be3c 100644 --- a/pkg/server/lifecycle.go +++ b/pkg/server/lifecycle.go @@ -5,6 +5,7 @@ import ( "sync" "github.com/inbucket/inbucket/pkg/config" + "github.com/inbucket/inbucket/pkg/extension" "github.com/inbucket/inbucket/pkg/message" "github.com/inbucket/inbucket/pkg/msghub" "github.com/inbucket/inbucket/pkg/policy" @@ -24,12 +25,16 @@ type Services struct { RetentionScanner *storage.RetentionScanner SMTPServer *smtp.Server WebServer *web.Server + ExtHost *extension.Host notify chan error // Combined notification for failed services. ready *sync.WaitGroup // Tracks services that have not reported ready. } // FullAssembly wires up a complete Inbucket environment. func FullAssembly(conf *config.Root) (*Services, error) { + // Configure extensions. + extHost := extension.NewHost() + // Configure storage. store, err := storage.FromConfig(conf.Storage) if err != nil { @@ -37,8 +42,9 @@ func FullAssembly(conf *config.Root) (*Services, error) { } addrPolicy := &policy.Addressing{Config: conf} - msgHub := msghub.New(conf.Web.MonitorHistory) - mmanager := &message.StoreManager{AddrPolicy: addrPolicy, Store: store, Hub: msgHub} + // Configure shared components. + msgHub := msghub.New(conf.Web.MonitorHistory, extHost) + mmanager := &message.StoreManager{AddrPolicy: addrPolicy, Store: store, ExtHost: extHost} // Start Retention scanner. retentionScanner := storage.NewRetentionScanner(conf.Storage, store) @@ -58,6 +64,7 @@ func FullAssembly(conf *config.Root) (*Services, error) { POP3Server: pop3Server, SMTPServer: smtpServer, WebServer: webServer, + ExtHost: extHost, ready: &sync.WaitGroup{}, }, nil } diff --git a/pkg/test/integration_test.go b/pkg/test/integration_test.go index 14cf476..ae0b01a 100644 --- a/pkg/test/integration_test.go +++ b/pkg/test/integration_test.go @@ -12,6 +12,7 @@ import ( "time" "github.com/inbucket/inbucket/pkg/config" + "github.com/inbucket/inbucket/pkg/extension" "github.com/inbucket/inbucket/pkg/message" "github.com/inbucket/inbucket/pkg/msghub" "github.com/inbucket/inbucket/pkg/policy" @@ -230,9 +231,10 @@ func startServer() (func(), error) { } // TODO Test should not pass with unstarted msghub. - msgHub := msghub.New(conf.Web.MonitorHistory) addrPolicy := &policy.Addressing{Config: conf} - mmanager := &message.StoreManager{AddrPolicy: addrPolicy, Store: store, Hub: msgHub} + extHost := extension.NewHost() + msgHub := msghub.New(conf.Web.MonitorHistory, extHost) + mmanager := &message.StoreManager{AddrPolicy: addrPolicy, Store: store, ExtHost: extHost} // Start HTTP server. webui.SetupRoutes(web.Router.PathPrefix("/serve/").Subrouter())