diff --git a/pkg/msghub/hub.go b/pkg/msghub/hub.go index fdbb380..714f493 100644 --- a/pkg/msghub/hub.go +++ b/pkg/msghub/hub.go @@ -15,6 +15,7 @@ const opChanLen = 100 // Listener receives the contents of the history buffer, followed by new messages type Listener interface { Receive(msg event.MessageMetadata) error + Delete(mailbox string, id string) error } // Hub relays messages on to its listeners @@ -42,6 +43,12 @@ func New(historyLen int, extHost *extension.Host) *Hub { return nil }) + extHost.Events.AfterMessageDeleted.AddListener("msghub", + func(msg event.MessageMetadata) *extension.Void { + hub.Delete(msg.Mailbox, msg.ID) + return nil + }) + return hub } @@ -68,7 +75,7 @@ func (hub *Hub) Dispatch(msg event.MessageMetadata) { h.history.Value = msg h.history = h.history.Next() - // Deliver message to all listeners, removing listeners if they return an error + // Relay event to all listeners, removing listeners if they return an error. for l := range h.listeners { if err := l.Receive(msg); err != nil { delete(h.listeners, l) @@ -78,6 +85,37 @@ func (hub *Hub) Dispatch(msg event.MessageMetadata) { } } +// Delete removes the message from the history buffer and instructs listeners to do the same. +func (hub *Hub) Delete(mailbox string, id string) { + hub.opChan <- func(h *Hub) { + if h.history == nil { + return + } + + // Locate and remove history entry. + p := h.history + end := p + for { + if next, ok := p.Next().Value.(event.MessageMetadata); ok { + if mailbox == next.Mailbox && id == next.ID { + p.Unlink(1) // Remove next node. + break + } + } + if p = p.Next(); p == end { + break + } + } + + // Relay event to all listeners, removing listeners if they return an error. + for l := range h.listeners { + if err := l.Delete(mailbox, id); err != nil { + delete(h.listeners, l) + } + } + } +} + // AddListener registers a listener to receive broadcasted messages. func (hub *Hub) AddListener(l Listener) { hub.opChan <- func(h *Hub) { diff --git a/pkg/msghub/hub_test.go b/pkg/msghub/hub_test.go index a269c47..d357cf3 100644 --- a/pkg/msghub/hub_test.go +++ b/pkg/msghub/hub_test.go @@ -3,6 +3,7 @@ package msghub import ( "context" "fmt" + "strconv" "testing" "time" @@ -12,9 +13,11 @@ import ( // testListener implements the Listener interface, mock for unit tests type testListener struct { - messages []*event.MessageMetadata // received messages - wantMessages int // how many messages this listener wants to receive - errorAfter int // when != 0, messages until Receive() begins returning error + messages []*event.MessageMetadata // received messages + deletes []string // received deletes + wantEvents int // how many events this listener wants to receive + errorAfter int // when != 0, event count until Receive() begins returning error + gotEvents int done chan struct{} // closed once we have received wantMessages overflow chan struct{} // closed if we receive wantMessages+1 @@ -22,10 +25,11 @@ type testListener struct { func newTestListener(want int) *testListener { l := &testListener{ - messages: make([]*event.MessageMetadata, 0, want*2), - wantMessages: want, - done: make(chan struct{}), - overflow: make(chan struct{}), + messages: make([]*event.MessageMetadata, 0, want*2), + deletes: make([]string, 0, want*2), + wantEvents: want, + done: make(chan struct{}), + overflow: make(chan struct{}), } if want == 0 { close(l.done) @@ -36,22 +40,29 @@ func newTestListener(want int) *testListener { // Receive a Message, store it in the messages slice, close applicable channels, and return an error // if instructed func (l *testListener) Receive(msg event.MessageMetadata) error { + l.gotEvents++ l.messages = append(l.messages, &msg) - if len(l.messages) == l.wantMessages { + if l.gotEvents == l.wantEvents { close(l.done) } - if len(l.messages) == l.wantMessages+1 { + if l.gotEvents == l.wantEvents+1 { close(l.overflow) } - if l.errorAfter > 0 && len(l.messages) > l.errorAfter { + if l.errorAfter > 0 && l.gotEvents > l.errorAfter { return fmt.Errorf("Too many messages") } return nil } +func (l *testListener) Delete(mailbox string, id string) error { + l.gotEvents++ + l.deletes = append(l.deletes, mailbox+"/"+id) + return nil +} + // String formats the got vs wanted message counts func (l *testListener) String() string { - return fmt.Sprintf("got %v messages, wanted %v", len(l.messages), l.wantMessages) + return fmt.Sprintf("got %v messages, wanted %v", len(l.messages), l.wantEvents) } func TestHubNew(t *testing.T) { @@ -198,6 +209,55 @@ func TestHubHistoryReplay(t *testing.T) { } } +func TestHubHistoryDelete(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + hub := New(100, extension.NewHost()) + go hub.Start(ctx) + l1 := newTestListener(3) + hub.AddListener(l1) + + // Broadcast 3 messages with no listeners + msgs := make([]event.MessageMetadata, 3) + for i := 0; i < len(msgs); i++ { + msgs[i] = event.MessageMetadata{ + Mailbox: "hub", + ID: strconv.Itoa(i), + Subject: fmt.Sprintf("subj %v", i), + } + hub.Dispatch(msgs[i]) + } + + // Wait for messages (live) + select { + case <-l1.done: + case <-time.After(time.Second): + t.Fatal("Timeout:", l1) + } + + hub.Delete("hub", "1") // Delete a message + hub.Delete("zzz", "0") // Attempt to delete non-existent mailbox message + + // Add a new listener, waits for 2 messages + l2 := newTestListener(2) + hub.AddListener(l2) + + // Wait for messages (history) + select { + case <-l2.done: + case <-time.After(time.Second): + t.Fatal("Timeout:", l2) + } + + want := []string{"subj 0", "subj 2"} + for i := 0; i < len(want); i++ { + got := l2.messages[i].Subject + if got != want[i] { + t.Errorf("msg[%v].Subject == %q, want %q", i, got, want[i]) + } + } +} + func TestHubHistoryReplayWrap(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/pkg/rest/model/apiv1_model.go b/pkg/rest/model/apiv1_model.go index 9feff81..8c14d21 100644 --- a/pkg/rest/model/apiv1_model.go +++ b/pkg/rest/model/apiv1_model.go @@ -4,7 +4,7 @@ import ( "time" ) -// JSONMessageHeaderV1 contains the basic header data for a message +// JSONMessageHeaderV1 contains the basic header data for a message. type JSONMessageHeaderV1 struct { Mailbox string `json:"mailbox"` ID string `json:"id"` @@ -17,7 +17,7 @@ type JSONMessageHeaderV1 struct { Seen bool `json:"seen"` } -// JSONMessageV1 contains the same data as the header plus a JSONMessageBody +// JSONMessageV1 contains the same data as the header plus a JSONMessageBody. type JSONMessageV1 struct { Mailbox string `json:"mailbox"` ID string `json:"id"` @@ -33,7 +33,7 @@ type JSONMessageV1 struct { Attachments []*JSONMessageAttachmentV1 `json:"attachments"` } -// JSONMessageAttachmentV1 contains information about a MIME attachment +// JSONMessageAttachmentV1 contains information about a MIME attachment. type JSONMessageAttachmentV1 struct { FileName string `json:"filename"` ContentType string `json:"content-type"` @@ -42,8 +42,15 @@ type JSONMessageAttachmentV1 struct { MD5 string `json:"md5"` } -// JSONMessageBodyV1 contains the Text and HTML versions of the message body +// JSONMessageBodyV1 contains the Text and HTML versions of the message body. type JSONMessageBodyV1 struct { Text string `json:"text"` HTML string `json:"html"` } + +// JSONMonitorEventV1 contains events for the Inbucket mailbox and monitor tabs. +type JSONMonitorEventV1 struct { + // Event variant: `message-deleted`, `message-stored`. + Variant string `json:"variant"` + Header *JSONMessageHeaderV1 `json:"header"` +} diff --git a/pkg/rest/socketv1_controller.go b/pkg/rest/socketv1_controller.go index 99f2bcd..9b4053a 100644 --- a/pkg/rest/socketv1_controller.go +++ b/pkg/rest/socketv1_controller.go @@ -35,9 +35,9 @@ var upgrader = websocket.Upgrader{ // msgListener handles messages from the msghub type msgListener struct { - hub *msghub.Hub // Global message hub - c chan event.MessageMetadata // Queue of messages from Receive() - mailbox string // Name of mailbox to monitor, "" == all mailboxes + hub *msghub.Hub // Global message hub. + c chan *model.JSONMonitorEventV1 // Queue of incoming events. + mailbox string // Name of mailbox to monitor, "" == all mailboxes. } // newMsgListener creates a listener and registers it. Optional mailbox parameter will restrict @@ -45,20 +45,45 @@ type msgListener struct { func newMsgListener(hub *msghub.Hub, mailbox string) *msgListener { ml := &msgListener{ hub: hub, - c: make(chan event.MessageMetadata, 100), + c: make(chan *model.JSONMonitorEventV1, 100), mailbox: mailbox, } hub.AddListener(ml) return ml } -// Receive handles an incoming message +// Receive handles an incoming message. func (ml *msgListener) Receive(msg event.MessageMetadata) error { if ml.mailbox != "" && ml.mailbox != msg.Mailbox { - // Did not match mailbox name + // Did not match the watched mailbox name. return nil } - ml.c <- msg + + // Enqueue for websocket. + ml.c <- &model.JSONMonitorEventV1{ + Variant: "message-stored", + Header: metadataToHeader(&msg), + } + + return nil +} + +// Delete handles a deleted message. +func (ml *msgListener) Delete(mailbox string, id string) error { + if ml.mailbox != "" && ml.mailbox != mailbox { + // Did not match watched mailbox name. + return nil + } + + // Enqueue for websocket. + ml.c <- &model.JSONMonitorEventV1{ + Variant: "message-deleted", + Header: &model.JSONMessageHeaderV1{ + Mailbox: mailbox, + ID: id, + }, + } + return nil } @@ -104,24 +129,14 @@ func (ml *msgListener) WSWriter(conn *websocket.Conn) { // Handle messages from hub until msgListener is closed for { select { - case msg, ok := <-ml.c: + case event, ok := <-ml.c: conn.SetWriteDeadline(time.Now().Add(writeWait)) if !ok { // msgListener closed, exit conn.WriteMessage(websocket.CloseMessage, []byte{}) return } - header := &model.JSONMessageHeaderV1{ - Mailbox: msg.Mailbox, - ID: msg.ID, - From: stringutil.StringAddress(msg.From), - To: stringutil.StringAddressList(msg.To), - Subject: msg.Subject, - Date: msg.Date, - PosixMillis: msg.Date.UnixNano() / 1000000, - Size: msg.Size, - } - if conn.WriteJSON(header) != nil { + if conn.WriteJSON(event) != nil { // Write failed return } @@ -198,3 +213,16 @@ func MonitorMailboxMessagesV1( ml.WSReader(conn) return nil } + +func metadataToHeader(msg *event.MessageMetadata) *model.JSONMessageHeaderV1 { + return &model.JSONMessageHeaderV1{ + Mailbox: msg.Mailbox, + ID: msg.ID, + From: stringutil.StringAddress(msg.From), + To: stringutil.StringAddressList(msg.To), + Subject: msg.Subject, + Date: msg.Date, + PosixMillis: msg.Date.UnixNano() / 1000000, + Size: msg.Size, + } +} diff --git a/ui/src/Data/MonitorEvent.elm b/ui/src/Data/MonitorEvent.elm new file mode 100644 index 0000000..905e7aa --- /dev/null +++ b/ui/src/Data/MonitorEvent.elm @@ -0,0 +1,44 @@ +module Data.MonitorEvent exposing (MessageID, MonitorEvent(..), decoder) + +import Data.MessageHeader as MessageHeader exposing (MessageHeader) +import Json.Decode exposing (Decoder, andThen, fail, field, string, succeed) +import Json.Decode.Pipeline exposing (required) + + +type alias MessageID = + { mailbox : String + , id : String + } + + +type MonitorEvent + = MessageStored MessageHeader + | MessageDeleted MessageID + + +decoder : Decoder MonitorEvent +decoder = + field "variant" string + |> andThen variantDecoder + + +variantDecoder : String -> Decoder MonitorEvent +variantDecoder variant = + case variant of + "message-deleted" -> + succeed MessageDeleted + |> required "header" messageIdDecoder + + "message-stored" -> + succeed MessageStored + |> required "header" MessageHeader.decoder + + unknown -> + fail <| "Unknown variant: " ++ unknown + + +messageIdDecoder : Decoder MessageID +messageIdDecoder = + succeed MessageID + |> required "mailbox" string + |> required "id" string diff --git a/ui/src/Page/Monitor.elm b/ui/src/Page/Monitor.elm index 016c27c..7e01dd6 100644 --- a/ui/src/Page/Monitor.elm +++ b/ui/src/Page/Monitor.elm @@ -1,7 +1,8 @@ module Page.Monitor exposing (Model, Msg, init, update, view) import Api -import Data.MessageHeader as MessageHeader exposing (MessageHeader) +import Data.MessageHeader exposing (MessageHeader) +import Data.MonitorEvent as MonitorEvent import Data.Session exposing (Session) import DateFormat as DF import Effect exposing (Effect) @@ -68,11 +69,23 @@ update msg model = ( { model | connected = False }, Effect.none ) MessageReceived value -> - case D.decodeValue (MessageHeader.decoder |> D.at [ "detail" ]) value of - Ok header -> - ( { model | messages = header :: List.take 500 model.messages } - , Effect.none - ) + case D.decodeValue (MonitorEvent.decoder |> D.at [ "detail" ]) value of + Ok event -> + case event of + MonitorEvent.MessageDeleted deleted -> + ( { model + | messages = + List.filter + (\x -> x.mailbox /= deleted.mailbox || x.id /= deleted.id) + model.messages + } + , Effect.none + ) + + MonitorEvent.MessageStored header -> + ( { model | messages = header :: List.take 500 model.messages } + , Effect.none + ) Err err -> let