diff --git a/pkg/rest/model/apiv1_model.go b/pkg/rest/model/apiv1_model.go index 8c14d21..9569587 100644 --- a/pkg/rest/model/apiv1_model.go +++ b/pkg/rest/model/apiv1_model.go @@ -47,10 +47,3 @@ type JSONMessageBodyV1 struct { Text string `json:"text"` HTML string `json:"html"` } - -// JSONMonitorEventV1 contains events for the Inbucket mailbox and monitor tabs. -type JSONMonitorEventV1 struct { - // Event variant: `message-deleted`, `message-stored`. - Variant string `json:"variant"` - Header *JSONMessageHeaderV1 `json:"header"` -} diff --git a/pkg/rest/model/apiv2_model.go b/pkg/rest/model/apiv2_model.go new file mode 100644 index 0000000..1fed72f --- /dev/null +++ b/pkg/rest/model/apiv2_model.go @@ -0,0 +1,15 @@ +package model + +// JSONMessageIDV2 uniquely identifies a message. +type JSONMessageIDV2 struct { + Mailbox string `json:"mailbox"` + ID string `json:"id"` +} + +// JSONMonitorEventV2 contains events for the Inbucket mailbox and monitor tabs. +type JSONMonitorEventV2 struct { + // Event variant: `message-deleted`, `message-stored`. + Variant string `json:"variant"` + Identifier *JSONMessageIDV2 `json:"identifier"` + Header *JSONMessageHeaderV1 `json:"header"` +} diff --git a/pkg/rest/routes.go b/pkg/rest/routes.go index 8d14873..3fc5b40 100644 --- a/pkg/rest/routes.go +++ b/pkg/rest/routes.go @@ -22,4 +22,10 @@ func SetupRoutes(r *mux.Router) { web.Handler(MonitorAllMessagesV1)).Name("MonitorAllMessagesV1").Methods("GET") r.Path("/v1/monitor/messages/{name}").Handler( web.Handler(MonitorMailboxMessagesV1)).Name("MonitorMailboxMessagesV1").Methods("GET") + + // API v2 + r.Path("/v2/monitor/messages").Handler( + web.Handler(MonitorAllMessagesV2)).Name("MonitorAllMessagesV2").Methods("GET") + r.Path("/v2/monitor/messages/{name}").Handler( + web.Handler(MonitorMailboxMessagesV2)).Name("MonitorMailboxMessagesV2").Methods("GET") } diff --git a/pkg/rest/socketv1_controller.go b/pkg/rest/socketv1_controller.go index 9b4053a..c564ae5 100644 --- a/pkg/rest/socketv1_controller.go +++ b/pkg/rest/socketv1_controller.go @@ -15,37 +15,37 @@ import ( const ( // Time allowed to write a message to the peer. - writeWait = 10 * time.Second + writeWaitV1 = 10 * time.Second // Send pings to peer with this period. Must be less than pongWait. - pingPeriod = (pongWait * 9) / 10 + pingPeriodV1 = (pongWaitV1 * 9) / 10 // Time allowed to read the next pong message from the peer. - pongWait = 60 * time.Second + pongWaitV1 = 60 * time.Second // Maximum message size allowed from peer. - maxMessageSize = 512 + maxMessageSizeV1 = 512 ) // options for gorilla connection upgrader -var upgrader = websocket.Upgrader{ +var upgraderV1 = websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, } -// msgListener handles messages from the msghub -type msgListener struct { - hub *msghub.Hub // Global message hub. - c chan *model.JSONMonitorEventV1 // Queue of incoming events. - mailbox string // Name of mailbox to monitor, "" == all mailboxes. +// msgListenerV1 handles messages from the msghub +type msgListenerV1 struct { + hub *msghub.Hub // Global message hub + c chan event.MessageMetadata // Queue of messages from Receive() + mailbox string // Name of mailbox to monitor, "" == all mailboxes } -// newMsgListener creates a listener and registers it. Optional mailbox parameter will restrict +// newMsgListenerV1 creates a listener and registers it. Optional mailbox parameter will restrict // messages sent to WebSocket to that mailbox only. -func newMsgListener(hub *msghub.Hub, mailbox string) *msgListener { - ml := &msgListener{ +func newMsgListenerV1(hub *msghub.Hub, mailbox string) *msgListenerV1 { + ml := &msgListenerV1{ hub: hub, - c: make(chan *model.JSONMonitorEventV1, 100), + c: make(chan event.MessageMetadata, 100), mailbox: mailbox, } hub.AddListener(ml) @@ -53,50 +53,31 @@ func newMsgListener(hub *msghub.Hub, mailbox string) *msgListener { } // Receive handles an incoming message. -func (ml *msgListener) Receive(msg event.MessageMetadata) error { +func (ml *msgListenerV1) Receive(msg event.MessageMetadata) error { if ml.mailbox != "" && ml.mailbox != msg.Mailbox { // Did not match the watched mailbox name. return nil } - - // Enqueue for websocket. - ml.c <- &model.JSONMonitorEventV1{ - Variant: "message-stored", - Header: metadataToHeader(&msg), - } - + ml.c <- msg return nil } // Delete handles a deleted message. -func (ml *msgListener) Delete(mailbox string, id string) error { - if ml.mailbox != "" && ml.mailbox != mailbox { - // Did not match watched mailbox name. - return nil - } - - // Enqueue for websocket. - ml.c <- &model.JSONMonitorEventV1{ - Variant: "message-deleted", - Header: &model.JSONMessageHeaderV1{ - Mailbox: mailbox, - ID: id, - }, - } - +func (ml *msgListenerV1) Delete(mailbox string, id string) error { + // Deletes are ignored in socketv1 API. return nil } // WSReader makes sure the websocket client is still connected, discards any messages from client -func (ml *msgListener) WSReader(conn *websocket.Conn) { +func (ml *msgListenerV1) WSReader(conn *websocket.Conn) { slog := log.With().Str("module", "rest").Str("proto", "WebSocket"). Str("remote", conn.RemoteAddr().String()).Logger() defer ml.Close() - conn.SetReadLimit(maxMessageSize) - conn.SetReadDeadline(time.Now().Add(pongWait)) + conn.SetReadLimit(maxMessageSizeV1) + conn.SetReadDeadline(time.Now().Add(pongWaitV1)) conn.SetPongHandler(func(string) error { slog.Debug().Msg("Got pong") - conn.SetReadDeadline(time.Now().Add(pongWait)) + conn.SetReadDeadline(time.Now().Add(pongWaitV1)) return nil }) @@ -119,8 +100,8 @@ func (ml *msgListener) WSReader(conn *websocket.Conn) { } // WSWriter makes sure the websocket client is still connected -func (ml *msgListener) WSWriter(conn *websocket.Conn) { - ticker := time.NewTicker(pingPeriod) +func (ml *msgListenerV1) WSWriter(conn *websocket.Conn) { + ticker := time.NewTicker(pingPeriodV1) defer func() { ticker.Stop() ml.Close() @@ -129,20 +110,20 @@ func (ml *msgListener) WSWriter(conn *websocket.Conn) { // Handle messages from hub until msgListener is closed for { select { - case event, ok := <-ml.c: - conn.SetWriteDeadline(time.Now().Add(writeWait)) + case msg, ok := <-ml.c: + conn.SetWriteDeadline(time.Now().Add(writeWaitV1)) if !ok { // msgListener closed, exit conn.WriteMessage(websocket.CloseMessage, []byte{}) return } - if conn.WriteJSON(event) != nil { + if conn.WriteJSON(metadataToHeader(&msg)) != nil { // Write failed return } case <-ticker.C: // Send ping - conn.SetWriteDeadline(time.Now().Add(writeWait)) + conn.SetWriteDeadline(time.Now().Add(writeWaitV1)) if conn.WriteMessage(websocket.PingMessage, []byte{}) != nil { // Write error return @@ -154,7 +135,7 @@ func (ml *msgListener) WSWriter(conn *websocket.Conn) { } // Close removes the listener registration -func (ml *msgListener) Close() { +func (ml *msgListenerV1) Close() { select { case <-ml.c: // Already closed @@ -169,7 +150,7 @@ func (ml *msgListener) Close() { func MonitorAllMessagesV1( w http.ResponseWriter, req *http.Request, ctx *web.Context) (err error) { // Upgrade to Websocket. - conn, err := upgrader.Upgrade(w, req, nil) + conn, err := upgraderV1.Upgrade(w, req, nil) if err != nil { return err } @@ -181,7 +162,7 @@ func MonitorAllMessagesV1( log.Debug().Str("module", "rest").Str("proto", "WebSocket"). Str("remote", conn.RemoteAddr().String()).Msg("Upgraded to WebSocket") // Create, register listener; then interact with conn. - ml := newMsgListener(ctx.MsgHub, "") + ml := newMsgListenerV1(ctx.MsgHub, "") go ml.WSWriter(conn) ml.WSReader(conn) return nil @@ -196,7 +177,7 @@ func MonitorMailboxMessagesV1( return err } // Upgrade to Websocket. - conn, err := upgrader.Upgrade(w, req, nil) + conn, err := upgraderV1.Upgrade(w, req, nil) if err != nil { return err } @@ -208,7 +189,7 @@ func MonitorMailboxMessagesV1( log.Debug().Str("module", "rest").Str("proto", "WebSocket"). Str("remote", conn.RemoteAddr().String()).Msg("Upgraded to WebSocket") // Create, register listener; then interact with conn. - ml := newMsgListener(ctx.MsgHub, name) + ml := newMsgListenerV1(ctx.MsgHub, name) go ml.WSWriter(conn) ml.WSReader(conn) return nil diff --git a/pkg/rest/socketv2_controller.go b/pkg/rest/socketv2_controller.go new file mode 100644 index 0000000..fa70c1c --- /dev/null +++ b/pkg/rest/socketv2_controller.go @@ -0,0 +1,214 @@ +package rest + +import ( + "net/http" + "time" + + "github.com/gorilla/websocket" + "github.com/inbucket/inbucket/pkg/extension/event" + "github.com/inbucket/inbucket/pkg/msghub" + "github.com/inbucket/inbucket/pkg/rest/model" + "github.com/inbucket/inbucket/pkg/server/web" + "github.com/rs/zerolog/log" +) + +const ( + // Time allowed to write a message to the peer. + writeWaitV2 = 10 * time.Second + + // Send pings to peer with this period. Must be less than pongWait. + pingPeriodV2 = (pongWaitV2 * 9) / 10 + + // Time allowed to read the next pong message from the peer. + pongWaitV2 = 60 * time.Second + + // Maximum message size allowed from peer. + maxMessageSizeV2 = 512 +) + +// options for gorilla connection upgrader +var upgraderV2 = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, +} + +// msgListenerV2 handles messages from the msghub +type msgListenerV2 struct { + hub *msghub.Hub // Global message hub. + c chan *model.JSONMonitorEventV2 // Queue of incoming events. + mailbox string // Name of mailbox to monitor, "" == all mailboxes. +} + +// newMsgListenerV2 creates a listener and registers it. Optional mailbox parameter will restrict +// messages sent to WebSocket to that mailbox only. +func newMsgListenerV2(hub *msghub.Hub, mailbox string) *msgListenerV2 { + ml := &msgListenerV2{ + hub: hub, + c: make(chan *model.JSONMonitorEventV2, 100), + mailbox: mailbox, + } + hub.AddListener(ml) + return ml +} + +// Receive handles an incoming message. +func (ml *msgListenerV2) Receive(msg event.MessageMetadata) error { + if ml.mailbox != "" && ml.mailbox != msg.Mailbox { + // Did not match the watched mailbox name. + return nil + } + + // Enqueue for websocket. + ml.c <- &model.JSONMonitorEventV2{ + Variant: "message-stored", + Header: metadataToHeader(&msg), + } + + return nil +} + +// Delete handles a deleted message. +func (ml *msgListenerV2) Delete(mailbox string, id string) error { + if ml.mailbox != "" && ml.mailbox != mailbox { + // Did not match watched mailbox name. + return nil + } + + // Enqueue for websocket. + ml.c <- &model.JSONMonitorEventV2{ + Variant: "message-deleted", + Identifier: &model.JSONMessageIDV2{ + Mailbox: mailbox, + ID: id, + }, + } + + return nil +} + +// WSReader makes sure the websocket client is still connected, discards any messages from client +func (ml *msgListenerV2) WSReader(conn *websocket.Conn) { + slog := log.With().Str("module", "rest").Str("proto", "WebSocket"). + Str("remote", conn.RemoteAddr().String()).Logger() + defer ml.Close() + conn.SetReadLimit(maxMessageSizeV2) + conn.SetReadDeadline(time.Now().Add(pongWaitV2)) + conn.SetPongHandler(func(string) error { + slog.Debug().Msg("Got pong") + conn.SetReadDeadline(time.Now().Add(pongWaitV2)) + return nil + }) + + for { + if _, _, err := conn.ReadMessage(); err != nil { + if websocket.IsUnexpectedCloseError( + err, + websocket.CloseNormalClosure, + websocket.CloseGoingAway, + websocket.CloseNoStatusReceived, + ) { + // Unexpected close code + slog.Warn().Err(err).Msg("Socket error") + } else { + slog.Debug().Msg("Closing socket") + } + break + } + } +} + +// WSWriter makes sure the websocket client is still connected +func (ml *msgListenerV2) WSWriter(conn *websocket.Conn) { + ticker := time.NewTicker(pingPeriodV2) + defer func() { + ticker.Stop() + ml.Close() + }() + + // Handle messages from hub until msgListener is closed + for { + select { + case event, ok := <-ml.c: + conn.SetWriteDeadline(time.Now().Add(writeWaitV2)) + if !ok { + // msgListener closed, exit + conn.WriteMessage(websocket.CloseMessage, []byte{}) + return + } + if conn.WriteJSON(event) != nil { + // Write failed + return + } + case <-ticker.C: + // Send ping + conn.SetWriteDeadline(time.Now().Add(writeWaitV2)) + if conn.WriteMessage(websocket.PingMessage, []byte{}) != nil { + // Write error + return + } + log.Debug().Str("module", "rest").Str("proto", "WebSocket"). + Str("remote", conn.RemoteAddr().String()).Msg("Sent ping") + } + } +} + +// Close removes the listener registration +func (ml *msgListenerV2) Close() { + select { + case <-ml.c: + // Already closed + default: + ml.hub.RemoveListener(ml) + close(ml.c) + } +} + +// MonitorAllMessagesV2 is a web handler which upgrades the connection to a websocket and notifies +// the client of all messages received. +func MonitorAllMessagesV2( + w http.ResponseWriter, req *http.Request, ctx *web.Context) (err error) { + // Upgrade to Websocket. + conn, err := upgraderV2.Upgrade(w, req, nil) + if err != nil { + return err + } + web.ExpWebSocketConnectsCurrent.Add(1) + defer func() { + _ = conn.Close() + web.ExpWebSocketConnectsCurrent.Add(-1) + }() + log.Debug().Str("module", "rest").Str("proto", "WebSocket"). + Str("remote", conn.RemoteAddr().String()).Msg("Upgraded to WebSocket") + // Create, register listener; then interact with conn. + ml := newMsgListenerV2(ctx.MsgHub, "") + go ml.WSWriter(conn) + ml.WSReader(conn) + return nil +} + +// MonitorMailboxMessagesV2 is a web handler which upgrades the connection to a websocket and +// notifies the client of messages received by a particular mailbox. +func MonitorMailboxMessagesV2( + w http.ResponseWriter, req *http.Request, ctx *web.Context) (err error) { + name, err := ctx.Manager.MailboxForAddress(ctx.Vars["name"]) + if err != nil { + return err + } + // Upgrade to Websocket. + conn, err := upgraderV2.Upgrade(w, req, nil) + if err != nil { + return err + } + web.ExpWebSocketConnectsCurrent.Add(1) + defer func() { + _ = conn.Close() + web.ExpWebSocketConnectsCurrent.Add(-1) + }() + log.Debug().Str("module", "rest").Str("proto", "WebSocket"). + Str("remote", conn.RemoteAddr().String()).Msg("Upgraded to WebSocket") + // Create, register listener; then interact with conn. + ml := newMsgListenerV2(ctx.MsgHub, name) + go ml.WSWriter(conn) + ml.WSReader(conn) + return nil +} diff --git a/ui/src/Api.elm b/ui/src/Api.elm index 669f49a..6736299 100644 --- a/ui/src/Api.elm +++ b/ui/src/Api.elm @@ -127,7 +127,7 @@ markMessageSeen session msg mailboxName id = monitorUri : Session -> String monitorUri session = - apiV1Url session [ "monitor", "messages" ] + apiV2Url session [ "monitor", "messages" ] purgeMailbox : Session -> HttpResult msg -> String -> Cmd msg @@ -135,14 +135,24 @@ purgeMailbox session msg mailboxName = HttpUtil.delete msg (apiV1Url session [ "mailbox", mailboxName ]) +apiV1Url : Session -> List String -> String +apiV1Url = + apiUrl "v1" + + +apiV2Url : Session -> List String -> String +apiV2Url = + apiUrl "v2" + + {-| Builds a public REST API URL (see wiki). -} -apiV1Url : Session -> List String -> String -apiV1Url session elements = +apiUrl : String -> Session -> List String -> String +apiUrl version session elements = Url.Builder.absolute (List.concat [ splitBasePath session.config.basePath - , [ "api", "v1" ] + , [ "api", version ] , elements ] ) diff --git a/ui/src/Data/MonitorEvent.elm b/ui/src/Data/MonitorEvent.elm index 905e7aa..2314704 100644 --- a/ui/src/Data/MonitorEvent.elm +++ b/ui/src/Data/MonitorEvent.elm @@ -27,7 +27,7 @@ variantDecoder variant = case variant of "message-deleted" -> succeed MessageDeleted - |> required "header" messageIdDecoder + |> required "identifier" messageIdDecoder "message-stored" -> succeed MessageStored