1
0
mirror of https://github.com/jhillyerd/inbucket.git synced 2025-12-17 17:47:03 +00:00

extension: Add MessageStored event (#316)

* Replace existing direct StoreManager->msghub communication with this
  event
* For #280 #309 #312 #310

Signed-off-by: James Hillyerd <james@hillyerd.com>

Signed-off-by: James Hillyerd <james@hillyerd.com>
This commit is contained in:
James Hillyerd
2023-01-16 21:30:47 -08:00
committed by GitHub
parent 3bf4b5c39b
commit f0d457b8f5
8 changed files with 124 additions and 67 deletions

View File

@@ -0,0 +1,17 @@
package event
import (
"net/mail"
"time"
)
// MessageMetadata contains the basic header data for a message event.
type MessageMetadata struct {
Mailbox string
ID string
From *mail.Address
To []*mail.Address
Date time.Time
Subject string
Size int64
}

23
pkg/extension/host.go Normal file
View File

@@ -0,0 +1,23 @@
package extension
import (
"github.com/inbucket/inbucket/pkg/extension/event"
)
// Host defines extension points for Inbucket.
type Host struct {
Events *Events
}
// Events defines all the event types supported by the extension host.
type Events struct {
MessageStored EventBroker[event.MessageMetadata, Void]
}
// Void indicates the event emitter will ignore any value returned by listeners.
type Void struct{}
// NewHost creates a new extension host.
func NewHost() *Host {
return &Host{Events: &Events{}}
}

View File

@@ -7,10 +7,10 @@ import (
"strings" "strings"
"time" "time"
"github.com/inbucket/inbucket/pkg/msghub" "github.com/inbucket/inbucket/pkg/extension"
"github.com/inbucket/inbucket/pkg/extension/event"
"github.com/inbucket/inbucket/pkg/policy" "github.com/inbucket/inbucket/pkg/policy"
"github.com/inbucket/inbucket/pkg/storage" "github.com/inbucket/inbucket/pkg/storage"
"github.com/inbucket/inbucket/pkg/stringutil"
"github.com/jhillyerd/enmime" "github.com/jhillyerd/enmime"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
) )
@@ -37,7 +37,7 @@ type Manager interface {
type StoreManager struct { type StoreManager struct {
AddrPolicy *policy.Addressing AddrPolicy *policy.Addressing
Store storage.Store Store storage.Store
Hub *msghub.Hub ExtHost *extension.Host
} }
// Deliver submits a new message to the store. // Deliver submits a new message to the store.
@@ -65,6 +65,7 @@ func (s *StoreManager) Deliver(
toaddr[i] = &torecip.Address toaddr[i] = &torecip.Address
} }
} }
log.Debug().Str("module", "message").Str("mailbox", to.Mailbox).Msg("Delivering message") log.Debug().Str("module", "message").Str("mailbox", to.Mailbox).Msg("Delivering message")
delivery := &Delivery{ delivery := &Delivery{
Meta: Metadata{ Meta: Metadata{
@@ -80,19 +81,20 @@ func (s *StoreManager) Deliver(
if err != nil { if err != nil {
return "", err return "", err
} }
if s.Hub != nil {
// Broadcast message information. // Emit message stored event.
broadcast := msghub.Message{ event := event.MessageMetadata{
Mailbox: to.Mailbox, Mailbox: to.Mailbox,
ID: id, ID: id,
From: stringutil.StringAddress(delivery.From()), From: delivery.From(),
To: stringutil.StringAddressList(delivery.To()), To: delivery.To(),
Subject: delivery.Subject(), Subject: delivery.Subject(),
Date: delivery.Date(), Date: delivery.Date(),
Size: delivery.Size(), Size: delivery.Size(),
}
s.Hub.Dispatch(broadcast)
} }
// TODO Add a unit test to make sure we send this.
s.ExtHost.Events.MessageStored.Emit(&event)
return id, nil return id, nil
} }

View File

@@ -3,26 +3,17 @@ package msghub
import ( import (
"container/ring" "container/ring"
"context" "context"
"time"
"github.com/inbucket/inbucket/pkg/extension"
"github.com/inbucket/inbucket/pkg/extension/event"
) )
// Length of msghub operation queue // Length of msghub operation queue
const opChanLen = 100 const opChanLen = 100
// Message contains the basic header data for a message
type Message struct {
Mailbox string
ID string
From string
To []string
Subject string
Date time.Time
Size int64
}
// Listener receives the contents of the history buffer, followed by new messages // Listener receives the contents of the history buffer, followed by new messages
type Listener interface { type Listener interface {
Receive(msg Message) error Receive(msg event.MessageMetadata) error
} }
// Hub relays messages on to its listeners // Hub relays messages on to its listeners
@@ -36,12 +27,21 @@ type Hub struct {
// New constructs a new Hub which will cache historyLen messages in memory for playback to future // New constructs a new Hub which will cache historyLen messages in memory for playback to future
// listeners. A goroutine is created to handle incoming messages; it will run until the provided // listeners. A goroutine is created to handle incoming messages; it will run until the provided
// context is canceled. // context is canceled.
func New(historyLen int) *Hub { func New(historyLen int, extHost *extension.Host) *Hub {
return &Hub{ hub := &Hub{
history: ring.New(historyLen), history: ring.New(historyLen),
listeners: make(map[Listener]struct{}), listeners: make(map[Listener]struct{}),
opChan: make(chan func(h *Hub), opChanLen), opChan: make(chan func(h *Hub), opChanLen),
} }
// Register an extension event listener for MessageStored.
extHost.Events.MessageStored.AddListener("msghub",
func(msg event.MessageMetadata) *extension.Void {
hub.Dispatch(msg)
return nil
})
return hub
} }
// Start Hub processing loop. // Start Hub processing loop.
@@ -60,12 +60,13 @@ func (hub *Hub) Start(ctx context.Context) {
// Dispatch queues a message for broadcast by the hub. The message will be placed into the // Dispatch queues a message for broadcast by the hub. The message will be placed into the
// history buffer and then relayed to all registered listeners. // history buffer and then relayed to all registered listeners.
func (hub *Hub) Dispatch(msg Message) { func (hub *Hub) Dispatch(msg event.MessageMetadata) {
hub.opChan <- func(h *Hub) { hub.opChan <- func(h *Hub) {
if h.history != nil { if h.history != nil {
// Add to history buffer // Add to history buffer
h.history.Value = msg h.history.Value = msg
h.history = h.history.Next() h.history = h.history.Next()
// Deliver message to all listeners, removing listeners if they return an error // Deliver message to all listeners, removing listeners if they return an error
for l := range h.listeners { for l := range h.listeners {
if err := l.Receive(msg); err != nil { if err := l.Receive(msg); err != nil {
@@ -82,7 +83,7 @@ func (hub *Hub) AddListener(l Listener) {
// Playback log // Playback log
h.history.Do(func(v interface{}) { h.history.Do(func(v interface{}) {
if v != nil { if v != nil {
l.Receive(v.(Message)) l.Receive(v.(event.MessageMetadata))
} }
}) })

View File

@@ -5,13 +5,16 @@ import (
"fmt" "fmt"
"testing" "testing"
"time" "time"
"github.com/inbucket/inbucket/pkg/extension"
"github.com/inbucket/inbucket/pkg/extension/event"
) )
// testListener implements the Listener interface, mock for unit tests // testListener implements the Listener interface, mock for unit tests
type testListener struct { type testListener struct {
messages []*Message // received messages messages []*event.MessageMetadata // received messages
wantMessages int // how many messages this listener wants to receive wantMessages int // how many messages this listener wants to receive
errorAfter int // when != 0, messages until Receive() begins returning error errorAfter int // when != 0, messages until Receive() begins returning error
done chan struct{} // closed once we have received wantMessages done chan struct{} // closed once we have received wantMessages
overflow chan struct{} // closed if we receive wantMessages+1 overflow chan struct{} // closed if we receive wantMessages+1
@@ -19,7 +22,7 @@ type testListener struct {
func newTestListener(want int) *testListener { func newTestListener(want int) *testListener {
l := &testListener{ l := &testListener{
messages: make([]*Message, 0, want*2), messages: make([]*event.MessageMetadata, 0, want*2),
wantMessages: want, wantMessages: want,
done: make(chan struct{}), done: make(chan struct{}),
overflow: make(chan struct{}), overflow: make(chan struct{}),
@@ -32,7 +35,7 @@ func newTestListener(want int) *testListener {
// Receive a Message, store it in the messages slice, close applicable channels, and return an error // Receive a Message, store it in the messages slice, close applicable channels, and return an error
// if instructed // if instructed
func (l *testListener) Receive(msg Message) error { func (l *testListener) Receive(msg event.MessageMetadata) error {
l.messages = append(l.messages, &msg) l.messages = append(l.messages, &msg)
if len(l.messages) == l.wantMessages { if len(l.messages) == l.wantMessages {
close(l.done) close(l.done)
@@ -52,7 +55,7 @@ func (l *testListener) String() string {
} }
func TestHubNew(t *testing.T) { func TestHubNew(t *testing.T) {
hub := New(5) hub := New(5, extension.NewHost())
if hub == nil { if hub == nil {
t.Fatal("New() == nil, expected a new Hub") t.Fatal("New() == nil, expected a new Hub")
} }
@@ -61,9 +64,9 @@ func TestHubNew(t *testing.T) {
func TestHubZeroLen(t *testing.T) { func TestHubZeroLen(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
hub := New(0) hub := New(0, extension.NewHost())
go hub.Start(ctx) go hub.Start(ctx)
m := Message{} m := event.MessageMetadata{}
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
hub.Dispatch(m) hub.Dispatch(m)
} }
@@ -73,9 +76,9 @@ func TestHubZeroLen(t *testing.T) {
func TestHubZeroListeners(t *testing.T) { func TestHubZeroListeners(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
hub := New(5) hub := New(5, extension.NewHost())
go hub.Start(ctx) go hub.Start(ctx)
m := Message{} m := event.MessageMetadata{}
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
hub.Dispatch(m) hub.Dispatch(m)
} }
@@ -85,9 +88,9 @@ func TestHubZeroListeners(t *testing.T) {
func TestHubOneListener(t *testing.T) { func TestHubOneListener(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
hub := New(5) hub := New(5, extension.NewHost())
go hub.Start(ctx) go hub.Start(ctx)
m := Message{} m := event.MessageMetadata{}
l := newTestListener(1) l := newTestListener(1)
hub.AddListener(l) hub.AddListener(l)
@@ -104,9 +107,9 @@ func TestHubOneListener(t *testing.T) {
func TestHubRemoveListener(t *testing.T) { func TestHubRemoveListener(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
hub := New(5) hub := New(5, extension.NewHost())
go hub.Start(ctx) go hub.Start(ctx)
m := Message{} m := event.MessageMetadata{}
l := newTestListener(1) l := newTestListener(1)
hub.AddListener(l) hub.AddListener(l)
@@ -127,9 +130,9 @@ func TestHubRemoveListener(t *testing.T) {
func TestHubRemoveListenerOnError(t *testing.T) { func TestHubRemoveListenerOnError(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
hub := New(5) hub := New(5, extension.NewHost())
go hub.Start(ctx) go hub.Start(ctx)
m := Message{} m := event.MessageMetadata{}
// error after 1 means listener should receive 2 messages before being removed // error after 1 means listener should receive 2 messages before being removed
l := newTestListener(2) l := newTestListener(2)
@@ -154,15 +157,15 @@ func TestHubRemoveListenerOnError(t *testing.T) {
func TestHubHistoryReplay(t *testing.T) { func TestHubHistoryReplay(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
hub := New(100) hub := New(100, extension.NewHost())
go hub.Start(ctx) go hub.Start(ctx)
l1 := newTestListener(3) l1 := newTestListener(3)
hub.AddListener(l1) hub.AddListener(l1)
// Broadcast 3 messages with no listeners // Broadcast 3 messages with no listeners
msgs := make([]Message, 3) msgs := make([]event.MessageMetadata, 3)
for i := 0; i < len(msgs); i++ { for i := 0; i < len(msgs); i++ {
msgs[i] = Message{ msgs[i] = event.MessageMetadata{
Subject: fmt.Sprintf("subj %v", i), Subject: fmt.Sprintf("subj %v", i),
} }
hub.Dispatch(msgs[i]) hub.Dispatch(msgs[i])
@@ -198,15 +201,15 @@ func TestHubHistoryReplay(t *testing.T) {
func TestHubHistoryReplayWrap(t *testing.T) { func TestHubHistoryReplayWrap(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
hub := New(5) hub := New(5, extension.NewHost())
go hub.Start(ctx) go hub.Start(ctx)
l1 := newTestListener(20) l1 := newTestListener(20)
hub.AddListener(l1) hub.AddListener(l1)
// Broadcast more messages than the hub can hold // Broadcast more messages than the hub can hold
msgs := make([]Message, 20) msgs := make([]event.MessageMetadata, 20)
for i := 0; i < len(msgs); i++ { for i := 0; i < len(msgs); i++ {
msgs[i] = Message{ msgs[i] = event.MessageMetadata{
Subject: fmt.Sprintf("subj %v", i), Subject: fmt.Sprintf("subj %v", i),
} }
hub.Dispatch(msgs[i]) hub.Dispatch(msgs[i])
@@ -241,9 +244,9 @@ func TestHubHistoryReplayWrap(t *testing.T) {
func TestHubContextCancel(t *testing.T) { func TestHubContextCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
hub := New(5) hub := New(5, extension.NewHost())
go hub.Start(ctx) go hub.Start(ctx)
m := Message{} m := event.MessageMetadata{}
l := newTestListener(1) l := newTestListener(1)
hub.AddListener(l) hub.AddListener(l)

View File

@@ -5,9 +5,11 @@ import (
"time" "time"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"github.com/inbucket/inbucket/pkg/extension/event"
"github.com/inbucket/inbucket/pkg/msghub" "github.com/inbucket/inbucket/pkg/msghub"
"github.com/inbucket/inbucket/pkg/rest/model" "github.com/inbucket/inbucket/pkg/rest/model"
"github.com/inbucket/inbucket/pkg/server/web" "github.com/inbucket/inbucket/pkg/server/web"
"github.com/inbucket/inbucket/pkg/stringutil"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
) )
@@ -33,9 +35,9 @@ var upgrader = websocket.Upgrader{
// msgListener handles messages from the msghub // msgListener handles messages from the msghub
type msgListener struct { type msgListener struct {
hub *msghub.Hub // Global message hub hub *msghub.Hub // Global message hub
c chan msghub.Message // Queue of messages from Receive() c chan event.MessageMetadata // Queue of messages from Receive()
mailbox string // Name of mailbox to monitor, "" == all mailboxes mailbox string // Name of mailbox to monitor, "" == all mailboxes
} }
// newMsgListener creates a listener and registers it. Optional mailbox parameter will restrict // newMsgListener creates a listener and registers it. Optional mailbox parameter will restrict
@@ -43,7 +45,7 @@ type msgListener struct {
func newMsgListener(hub *msghub.Hub, mailbox string) *msgListener { func newMsgListener(hub *msghub.Hub, mailbox string) *msgListener {
ml := &msgListener{ ml := &msgListener{
hub: hub, hub: hub,
c: make(chan msghub.Message, 100), c: make(chan event.MessageMetadata, 100),
mailbox: mailbox, mailbox: mailbox,
} }
hub.AddListener(ml) hub.AddListener(ml)
@@ -51,7 +53,7 @@ func newMsgListener(hub *msghub.Hub, mailbox string) *msgListener {
} }
// Receive handles an incoming message // Receive handles an incoming message
func (ml *msgListener) Receive(msg msghub.Message) error { func (ml *msgListener) Receive(msg event.MessageMetadata) error {
if ml.mailbox != "" && ml.mailbox != msg.Mailbox { if ml.mailbox != "" && ml.mailbox != msg.Mailbox {
// Did not match mailbox name // Did not match mailbox name
return nil return nil
@@ -112,8 +114,8 @@ func (ml *msgListener) WSWriter(conn *websocket.Conn) {
header := &model.JSONMessageHeaderV1{ header := &model.JSONMessageHeaderV1{
Mailbox: msg.Mailbox, Mailbox: msg.Mailbox,
ID: msg.ID, ID: msg.ID,
From: msg.From, From: stringutil.StringAddress(msg.From),
To: msg.To, To: stringutil.StringAddressList(msg.To),
Subject: msg.Subject, Subject: msg.Subject,
Date: msg.Date, Date: msg.Date,
PosixMillis: msg.Date.UnixNano() / 1000000, PosixMillis: msg.Date.UnixNano() / 1000000,

View File

@@ -5,6 +5,7 @@ import (
"sync" "sync"
"github.com/inbucket/inbucket/pkg/config" "github.com/inbucket/inbucket/pkg/config"
"github.com/inbucket/inbucket/pkg/extension"
"github.com/inbucket/inbucket/pkg/message" "github.com/inbucket/inbucket/pkg/message"
"github.com/inbucket/inbucket/pkg/msghub" "github.com/inbucket/inbucket/pkg/msghub"
"github.com/inbucket/inbucket/pkg/policy" "github.com/inbucket/inbucket/pkg/policy"
@@ -24,12 +25,16 @@ type Services struct {
RetentionScanner *storage.RetentionScanner RetentionScanner *storage.RetentionScanner
SMTPServer *smtp.Server SMTPServer *smtp.Server
WebServer *web.Server WebServer *web.Server
ExtHost *extension.Host
notify chan error // Combined notification for failed services. notify chan error // Combined notification for failed services.
ready *sync.WaitGroup // Tracks services that have not reported ready. ready *sync.WaitGroup // Tracks services that have not reported ready.
} }
// FullAssembly wires up a complete Inbucket environment. // FullAssembly wires up a complete Inbucket environment.
func FullAssembly(conf *config.Root) (*Services, error) { func FullAssembly(conf *config.Root) (*Services, error) {
// Configure extensions.
extHost := extension.NewHost()
// Configure storage. // Configure storage.
store, err := storage.FromConfig(conf.Storage) store, err := storage.FromConfig(conf.Storage)
if err != nil { if err != nil {
@@ -37,8 +42,9 @@ func FullAssembly(conf *config.Root) (*Services, error) {
} }
addrPolicy := &policy.Addressing{Config: conf} addrPolicy := &policy.Addressing{Config: conf}
msgHub := msghub.New(conf.Web.MonitorHistory) // Configure shared components.
mmanager := &message.StoreManager{AddrPolicy: addrPolicy, Store: store, Hub: msgHub} msgHub := msghub.New(conf.Web.MonitorHistory, extHost)
mmanager := &message.StoreManager{AddrPolicy: addrPolicy, Store: store, ExtHost: extHost}
// Start Retention scanner. // Start Retention scanner.
retentionScanner := storage.NewRetentionScanner(conf.Storage, store) retentionScanner := storage.NewRetentionScanner(conf.Storage, store)
@@ -58,6 +64,7 @@ func FullAssembly(conf *config.Root) (*Services, error) {
POP3Server: pop3Server, POP3Server: pop3Server,
SMTPServer: smtpServer, SMTPServer: smtpServer,
WebServer: webServer, WebServer: webServer,
ExtHost: extHost,
ready: &sync.WaitGroup{}, ready: &sync.WaitGroup{},
}, nil }, nil
} }

View File

@@ -12,6 +12,7 @@ import (
"time" "time"
"github.com/inbucket/inbucket/pkg/config" "github.com/inbucket/inbucket/pkg/config"
"github.com/inbucket/inbucket/pkg/extension"
"github.com/inbucket/inbucket/pkg/message" "github.com/inbucket/inbucket/pkg/message"
"github.com/inbucket/inbucket/pkg/msghub" "github.com/inbucket/inbucket/pkg/msghub"
"github.com/inbucket/inbucket/pkg/policy" "github.com/inbucket/inbucket/pkg/policy"
@@ -230,9 +231,10 @@ func startServer() (func(), error) {
} }
// TODO Test should not pass with unstarted msghub. // TODO Test should not pass with unstarted msghub.
msgHub := msghub.New(conf.Web.MonitorHistory)
addrPolicy := &policy.Addressing{Config: conf} addrPolicy := &policy.Addressing{Config: conf}
mmanager := &message.StoreManager{AddrPolicy: addrPolicy, Store: store, Hub: msgHub} extHost := extension.NewHost()
msgHub := msghub.New(conf.Web.MonitorHistory, extHost)
mmanager := &message.StoreManager{AddrPolicy: addrPolicy, Store: store, ExtHost: extHost}
// Start HTTP server. // Start HTTP server.
webui.SetupRoutes(web.Router.PathPrefix("/serve/").Subrouter()) webui.SetupRoutes(web.Router.PathPrefix("/serve/").Subrouter())