mirror of
https://github.com/jhillyerd/inbucket.git
synced 2026-05-13 09:03:47 +00:00
Merge in branch for msg monitor feature, closes #44
This commit is contained in:
@@ -33,13 +33,15 @@ type POP3Config struct {
|
||||
|
||||
// WebConfig contains the HTTP server configuration
|
||||
type WebConfig struct {
|
||||
IP4address net.IP
|
||||
IP4port int
|
||||
TemplateDir string
|
||||
TemplateCache bool
|
||||
PublicDir string
|
||||
GreetingFile string
|
||||
CookieAuthKey string
|
||||
IP4address net.IP
|
||||
IP4port int
|
||||
TemplateDir string
|
||||
TemplateCache bool
|
||||
PublicDir string
|
||||
GreetingFile string
|
||||
CookieAuthKey string
|
||||
MonitorVisible bool
|
||||
MonitorHistory int
|
||||
}
|
||||
|
||||
// DataStoreConfig contains the mail store configuration
|
||||
@@ -130,6 +132,8 @@ func LoadConfig(filename string) error {
|
||||
requireOption(messages, "web", "template.dir")
|
||||
requireOption(messages, "web", "template.cache")
|
||||
requireOption(messages, "web", "public.dir")
|
||||
requireOption(messages, "web", "monitor.visible")
|
||||
requireOption(messages, "web", "monitor.history")
|
||||
requireOption(messages, "datastore", "path")
|
||||
requireOption(messages, "datastore", "retention.minutes")
|
||||
requireOption(messages, "datastore", "retention.sleep.millis")
|
||||
@@ -349,6 +353,19 @@ func parseWebConfig() error {
|
||||
}
|
||||
webConfig.GreetingFile = str
|
||||
|
||||
option = "monitor.visible"
|
||||
flag, err = Config.Bool(section, option)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to parse [%v]%v: '%v'", section, option, err)
|
||||
}
|
||||
webConfig.MonitorVisible = flag
|
||||
|
||||
option = "monitor.history"
|
||||
webConfig.MonitorHistory, err = Config.Int(section, option)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to parse [%v]%v: '%v'", section, option, err)
|
||||
}
|
||||
|
||||
option = "cookie.auth.key"
|
||||
if Config.HasOption(section, option) {
|
||||
str, err = Config.String(section, option)
|
||||
|
||||
@@ -91,6 +91,18 @@ greeting.file=%(install.dir)s/themes/greeting.html
|
||||
# and previous sessions will be invalidated.
|
||||
cookie.auth.key=secret-inbucket-session-cookie-key
|
||||
|
||||
# Enable or disable the live message monitor tab for the web UI. This will let
|
||||
# anybody see all messages delivered to Inbucket. This setting has no impact
|
||||
# on the availability of the underlying WebSocket.
|
||||
monitor.visible=true
|
||||
|
||||
# How many historical message headers should be cached for display by new
|
||||
# monitor connections. It does not limit the number of messages displayed by
|
||||
# the browser once the monitor is open; all freshly received messages will be
|
||||
# appended to the on screen list. This setting also affects the underlying
|
||||
# API/WebSocket.
|
||||
monitor.history=30
|
||||
|
||||
#############################################################################
|
||||
[datastore]
|
||||
|
||||
|
||||
@@ -93,6 +93,18 @@ greeting.file=/con/configuration/greeting.html
|
||||
# and previous sessions will be invalidated.
|
||||
#cookie.auth.key=secret-inbucket-session-cookie-key
|
||||
|
||||
# Enable or disable the live message monitor tab for the web UI. This will let
|
||||
# anybody see all messages delivered to Inbucket. This setting has no impact
|
||||
# on the availability of the underlying WebSocket.
|
||||
monitor.visible=true
|
||||
|
||||
# How many historical message headers should be cached for display by new
|
||||
# monitor connections. It does not limit the number of messages displayed by
|
||||
# the browser once the monitor is open; all freshly received messages will be
|
||||
# appended to the on screen list. This setting also affects the underlying
|
||||
# API/WebSocket.
|
||||
monitor.history=30
|
||||
|
||||
#############################################################################
|
||||
[datastore]
|
||||
|
||||
|
||||
@@ -93,6 +93,18 @@ greeting.file=%(themes.dir)s/greeting.html
|
||||
# and previous sessions will be invalidated.
|
||||
cookie.auth.key=secret-inbucket-session-cookie-key
|
||||
|
||||
# Enable or disable the live message monitor tab for the web UI. This will let
|
||||
# anybody see all messages delivered to Inbucket. This setting has no impact
|
||||
# on the availability of the underlying WebSocket.
|
||||
monitor.visible=true
|
||||
|
||||
# How many historical message headers should be cached for display by new
|
||||
# monitor connections. It does not limit the number of messages displayed by
|
||||
# the browser once the monitor is open; all freshly received messages will be
|
||||
# appended to the on screen list. This setting also affects the underlying
|
||||
# API/WebSocket.
|
||||
monitor.history=30
|
||||
|
||||
#############################################################################
|
||||
[datastore]
|
||||
|
||||
|
||||
@@ -91,6 +91,18 @@ greeting.file=%(install.dir)s/themes/greeting.html
|
||||
# and previous sessions will be invalidated.
|
||||
#cookie.auth.key=secret-inbucket-session-cookie-key
|
||||
|
||||
# Enable or disable the live message monitor tab for the web UI. This will let
|
||||
# anybody see all messages delivered to Inbucket. This setting has no impact
|
||||
# on the availability of the underlying WebSocket.
|
||||
monitor.visible=true
|
||||
|
||||
# How many historical message headers should be cached for display by new
|
||||
# monitor connections. It does not limit the number of messages displayed by
|
||||
# the browser once the monitor is open; all freshly received messages will be
|
||||
# appended to the on screen list. This setting also affects the underlying
|
||||
# API/WebSocket.
|
||||
monitor.history=30
|
||||
|
||||
#############################################################################
|
||||
[datastore]
|
||||
|
||||
|
||||
@@ -91,6 +91,18 @@ greeting.file=%(install.dir)s/themes/greeting.html
|
||||
# and previous sessions will be invalidated.
|
||||
#cookie.auth.key=secret-inbucket-session-cookie-key
|
||||
|
||||
# Enable or disable the live message monitor tab for the web UI. This will let
|
||||
# anybody see all messages delivered to Inbucket. This setting has no impact
|
||||
# on the availability of the underlying WebSocket.
|
||||
monitor.visible=true
|
||||
|
||||
# How many historical message headers should be cached for display by new
|
||||
# monitor connections. It does not limit the number of messages displayed by
|
||||
# the browser once the monitor is open; all freshly received messages will be
|
||||
# appended to the on screen list. This setting also affects the underlying
|
||||
# API/WebSocket.
|
||||
monitor.history=30
|
||||
|
||||
#############################################################################
|
||||
[datastore]
|
||||
|
||||
|
||||
@@ -91,6 +91,18 @@ greeting.file=%(install.dir)s\themes\greeting.html
|
||||
# and previous sessions will be invalidated.
|
||||
#cookie.auth.key=secret-inbucket-session-cookie-key
|
||||
|
||||
# Enable or disable the live message monitor tab for the web UI. This will let
|
||||
# anybody see all messages delivered to Inbucket. This setting has no impact
|
||||
# on the availability of the underlying WebSocket.
|
||||
monitor.visible=true
|
||||
|
||||
# How many historical message headers should be cached for display by new
|
||||
# monitor connections. It does not limit the number of messages displayed by
|
||||
# the browser once the monitor is open; all freshly received messages will be
|
||||
# appended to the on screen list. This setting also affects the underlying
|
||||
# API/WebSocket.
|
||||
monitor.history=30
|
||||
|
||||
#############################################################################
|
||||
[datastore]
|
||||
|
||||
|
||||
@@ -6,6 +6,8 @@ import (
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/gorilla/sessions"
|
||||
"github.com/jhillyerd/inbucket/config"
|
||||
"github.com/jhillyerd/inbucket/msghub"
|
||||
"github.com/jhillyerd/inbucket/smtpd"
|
||||
)
|
||||
|
||||
@@ -14,6 +16,8 @@ type Context struct {
|
||||
Vars map[string]string
|
||||
Session *sessions.Session
|
||||
DataStore smtpd.DataStore
|
||||
MsgHub *msghub.Hub
|
||||
WebConfig config.WebConfig
|
||||
IsJSON bool
|
||||
}
|
||||
|
||||
@@ -56,6 +60,8 @@ func NewContext(req *http.Request) (*Context, error) {
|
||||
Vars: vars,
|
||||
Session: sess,
|
||||
DataStore: DataStore,
|
||||
MsgHub: msgHub,
|
||||
WebConfig: webConfig,
|
||||
IsJSON: headerMatch(req, "Accept", "application/json"),
|
||||
}
|
||||
return ctx, err
|
||||
|
||||
@@ -3,17 +3,18 @@ package httpd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"expvar"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/goods/httpbuf"
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/gorilla/securecookie"
|
||||
"github.com/gorilla/sessions"
|
||||
"github.com/jhillyerd/inbucket/config"
|
||||
"github.com/jhillyerd/inbucket/log"
|
||||
"github.com/jhillyerd/inbucket/msghub"
|
||||
"github.com/jhillyerd/inbucket/smtpd"
|
||||
)
|
||||
|
||||
@@ -24,6 +25,9 @@ var (
|
||||
// DataStore is where all the mailboxes and messages live
|
||||
DataStore smtpd.DataStore
|
||||
|
||||
// msgHub holds a reference to the message pub/sub system
|
||||
msgHub *msghub.Hub
|
||||
|
||||
// Router is shared between httpd, webui and rest packages. It sends
|
||||
// incoming requests to the correct handler function
|
||||
Router = mux.NewRouter()
|
||||
@@ -33,15 +37,29 @@ var (
|
||||
listener net.Listener
|
||||
sessionStore sessions.Store
|
||||
globalShutdown chan bool
|
||||
|
||||
// ExpWebSocketConnectsCurrent tracks the number of open WebSockets
|
||||
ExpWebSocketConnectsCurrent = new(expvar.Int)
|
||||
)
|
||||
|
||||
func init() {
|
||||
m := expvar.NewMap("http")
|
||||
m.Set("WebSocketConnectsCurrent", ExpWebSocketConnectsCurrent)
|
||||
}
|
||||
|
||||
// Initialize sets up things for unit tests or the Start() method
|
||||
func Initialize(cfg config.WebConfig, ds smtpd.DataStore, shutdownChan chan bool) {
|
||||
func Initialize(
|
||||
cfg config.WebConfig,
|
||||
shutdownChan chan bool,
|
||||
ds smtpd.DataStore,
|
||||
mh *msghub.Hub) {
|
||||
|
||||
webConfig = cfg
|
||||
globalShutdown = shutdownChan
|
||||
|
||||
// NewContext() will use this DataStore for the web handlers
|
||||
DataStore = ds
|
||||
msgHub = mh
|
||||
|
||||
// Content Paths
|
||||
log.Infof("HTTP templates mapped to %q", cfg.TemplateDir)
|
||||
@@ -122,26 +140,13 @@ func (h Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
defer ctx.Close()
|
||||
|
||||
// Run the handler, grab the error, and report it
|
||||
buf := new(httpbuf.Buffer)
|
||||
log.Tracef("HTTP[%v] %v %v %q", req.RemoteAddr, req.Proto, req.Method, req.RequestURI)
|
||||
err = h(buf, req, ctx)
|
||||
err = h(w, req, ctx)
|
||||
if err != nil {
|
||||
log.Errorf("HTTP error handling %q: %v", req.RequestURI, err)
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
// Save the session
|
||||
if err = ctx.Session.Save(req, buf); err != nil {
|
||||
log.Errorf("HTTP failed to save session: %v", err)
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
// Apply the buffered response to the writer
|
||||
if _, err = buf.Apply(w); err != nil {
|
||||
log.Errorf("HTTP failed to write response: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func emergencyShutdown() {
|
||||
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
"github.com/jhillyerd/inbucket/config"
|
||||
"github.com/jhillyerd/inbucket/httpd"
|
||||
"github.com/jhillyerd/inbucket/log"
|
||||
"github.com/jhillyerd/inbucket/msghub"
|
||||
"github.com/jhillyerd/inbucket/pop3d"
|
||||
"github.com/jhillyerd/inbucket/rest"
|
||||
"github.com/jhillyerd/inbucket/smtpd"
|
||||
@@ -95,11 +96,14 @@ func main() {
|
||||
}
|
||||
}
|
||||
|
||||
// Create message hub
|
||||
msgHub := msghub.New(rootCtx, config.GetWebConfig().MonitorHistory)
|
||||
|
||||
// Grab our datastore
|
||||
ds := smtpd.DefaultFileDataStore()
|
||||
|
||||
// Start HTTP server
|
||||
httpd.Initialize(config.GetWebConfig(), ds, shutdownChan)
|
||||
httpd.Initialize(config.GetWebConfig(), shutdownChan, ds, msgHub)
|
||||
webui.SetupRoutes(httpd.Router)
|
||||
rest.SetupRoutes(httpd.Router)
|
||||
go httpd.Start(rootCtx)
|
||||
@@ -110,7 +114,7 @@ func main() {
|
||||
go pop3Server.Start(rootCtx)
|
||||
|
||||
// Startup SMTP server
|
||||
smtpServer = smtpd.NewServer(config.GetSMTPConfig(), ds, shutdownChan)
|
||||
smtpServer = smtpd.NewServer(config.GetSMTPConfig(), shutdownChan, ds, msgHub)
|
||||
go smtpServer.Start(rootCtx)
|
||||
|
||||
// Loop forever waiting for signals or shutdown channel
|
||||
|
||||
116
msghub/hub.go
Normal file
116
msghub/hub.go
Normal file
@@ -0,0 +1,116 @@
|
||||
package msghub
|
||||
|
||||
import (
|
||||
"container/ring"
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Message contains the basic header data for a message
|
||||
type Message struct {
|
||||
Mailbox string
|
||||
ID string
|
||||
From string
|
||||
To []string
|
||||
Subject string
|
||||
Date time.Time
|
||||
Size int64
|
||||
}
|
||||
|
||||
// Listener receives the contents of the log, followed by new messages
|
||||
type Listener interface {
|
||||
Receive(msg Message) error
|
||||
}
|
||||
|
||||
// Hub relays messages on to its listeners
|
||||
type Hub struct {
|
||||
// log stores history, points next spot to write. First non-nil entry is oldest Message
|
||||
log *ring.Ring
|
||||
logMx sync.RWMutex
|
||||
|
||||
// listeners interested in new messages
|
||||
listeners map[Listener]struct{}
|
||||
listenersMx sync.RWMutex
|
||||
|
||||
// broadcast receives new messages
|
||||
broadcast chan Message
|
||||
}
|
||||
|
||||
// New constructs a new Hub which will cache logSize messages in memory for playback to future
|
||||
// listeners. A goroutine is created to handle incoming messages; it will run until the provided
|
||||
// context is canceled.
|
||||
func New(ctx context.Context, logSize int) *Hub {
|
||||
h := &Hub{
|
||||
log: ring.New(logSize),
|
||||
listeners: make(map[Listener]struct{}),
|
||||
broadcast: make(chan Message, 100),
|
||||
}
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// Shutdown
|
||||
close(h.broadcast)
|
||||
h.broadcast = nil
|
||||
return
|
||||
case msg := <-h.broadcast:
|
||||
// Log message
|
||||
h.logMx.Lock()
|
||||
h.log.Value = msg
|
||||
h.log = h.log.Next()
|
||||
h.logMx.Unlock()
|
||||
// Deliver message to listeners
|
||||
h.deliver(msg)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return h
|
||||
}
|
||||
|
||||
// Broadcast queues a message for processing by the hub. The message will be placed into the
|
||||
// in-memory log and relayed to all registered listeners.
|
||||
func (h *Hub) Broadcast(msg Message) {
|
||||
if h.broadcast != nil {
|
||||
h.broadcast <- msg
|
||||
}
|
||||
}
|
||||
|
||||
// AddListener registers a listener to receive broadcasted messages.
|
||||
func (h *Hub) AddListener(l Listener) {
|
||||
// Playback log
|
||||
h.logMx.RLock()
|
||||
h.log.Do(func(v interface{}) {
|
||||
if v != nil {
|
||||
l.Receive(v.(Message))
|
||||
}
|
||||
})
|
||||
h.logMx.RUnlock()
|
||||
|
||||
// Add to listeners
|
||||
h.listenersMx.Lock()
|
||||
h.listeners[l] = struct{}{}
|
||||
h.listenersMx.Unlock()
|
||||
}
|
||||
|
||||
// RemoveListener deletes a listener registration, it will cease to receive messages.
|
||||
func (h *Hub) RemoveListener(l Listener) {
|
||||
h.listenersMx.Lock()
|
||||
defer h.listenersMx.Unlock()
|
||||
if _, ok := h.listeners[l]; ok {
|
||||
delete(h.listeners, l)
|
||||
}
|
||||
}
|
||||
|
||||
// deliver message to all listeners, removing listeners if they return an error
|
||||
func (h *Hub) deliver(msg Message) {
|
||||
h.listenersMx.RLock()
|
||||
defer h.listenersMx.RUnlock()
|
||||
for l := range h.listeners {
|
||||
if err := l.Receive(msg); err != nil {
|
||||
h.RemoveListener(l)
|
||||
}
|
||||
}
|
||||
}
|
||||
243
msghub/hub_test.go
Normal file
243
msghub/hub_test.go
Normal file
@@ -0,0 +1,243 @@
|
||||
package msghub
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// testListener implements the Listener interface, mock for unit tests
|
||||
type testListener struct {
|
||||
messages []*Message // received messages
|
||||
wantMessages int // how many messages this listener wants to receive
|
||||
errorAfter int // when != 0, messages until Receive() begins returning error
|
||||
|
||||
done chan struct{} // closed once we have received wantMessages
|
||||
overflow chan struct{} // closed if we receive wantMessages+1
|
||||
}
|
||||
|
||||
func newTestListener(want int) *testListener {
|
||||
l := &testListener{
|
||||
messages: make([]*Message, 0, want*2),
|
||||
wantMessages: want,
|
||||
done: make(chan struct{}),
|
||||
overflow: make(chan struct{}),
|
||||
}
|
||||
if want == 0 {
|
||||
close(l.done)
|
||||
}
|
||||
return l
|
||||
}
|
||||
|
||||
// Receive a Message, store it in the messages slice, close applicable channels, and return an error
|
||||
// if instructed
|
||||
func (l *testListener) Receive(msg Message) error {
|
||||
l.messages = append(l.messages, &msg)
|
||||
if len(l.messages) == l.wantMessages {
|
||||
close(l.done)
|
||||
}
|
||||
if len(l.messages) == l.wantMessages+1 {
|
||||
close(l.overflow)
|
||||
}
|
||||
if l.errorAfter > 0 && len(l.messages) > l.errorAfter {
|
||||
return fmt.Errorf("Too many messages")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// String formats the got vs wanted message counts
|
||||
func (l *testListener) String() string {
|
||||
return fmt.Sprintf("got %v messages, wanted %v", len(l.messages), l.wantMessages)
|
||||
}
|
||||
|
||||
func TestHubNew(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
hub := New(ctx, 5)
|
||||
if hub == nil {
|
||||
t.Fatal("New() == nil, expected a new Hub")
|
||||
}
|
||||
}
|
||||
|
||||
func TestHubZeroListeners(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
hub := New(ctx, 5)
|
||||
m := Message{}
|
||||
for i := 0; i < 100; i++ {
|
||||
hub.Broadcast(m)
|
||||
}
|
||||
// Just making sure Hub doesn't panic
|
||||
}
|
||||
|
||||
func TestHubOneListener(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
hub := New(ctx, 5)
|
||||
m := Message{}
|
||||
l := newTestListener(1)
|
||||
|
||||
hub.AddListener(l)
|
||||
hub.Broadcast(m)
|
||||
|
||||
// Wait for messages
|
||||
select {
|
||||
case <-l.done:
|
||||
case <-time.After(time.Second):
|
||||
t.Error("Timeout:", l)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHubRemoveListener(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
hub := New(ctx, 5)
|
||||
m := Message{}
|
||||
l := newTestListener(1)
|
||||
|
||||
hub.AddListener(l)
|
||||
hub.Broadcast(m)
|
||||
hub.RemoveListener(l)
|
||||
hub.Broadcast(m)
|
||||
|
||||
// Wait for messages
|
||||
select {
|
||||
case <-l.overflow:
|
||||
t.Error(l)
|
||||
case <-time.After(250 * time.Millisecond):
|
||||
// Expected result, no overflow
|
||||
}
|
||||
}
|
||||
|
||||
func TestHubRemoveListenerOnError(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
hub := New(ctx, 5)
|
||||
m := Message{}
|
||||
|
||||
// error after 1 means listener should receive 2 messages before being removed
|
||||
l := newTestListener(2)
|
||||
l.errorAfter = 1
|
||||
|
||||
hub.AddListener(l)
|
||||
hub.Broadcast(m)
|
||||
hub.Broadcast(m)
|
||||
hub.Broadcast(m)
|
||||
hub.Broadcast(m)
|
||||
|
||||
// Wait for messages
|
||||
select {
|
||||
case <-l.overflow:
|
||||
t.Error(l)
|
||||
case <-time.After(250 * time.Millisecond):
|
||||
// Expected result, no overflow
|
||||
}
|
||||
}
|
||||
|
||||
func TestHubLogReplay(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
hub := New(ctx, 100)
|
||||
l1 := newTestListener(3)
|
||||
hub.AddListener(l1)
|
||||
|
||||
// Broadcast 3 messages with no listeners
|
||||
msgs := make([]Message, 3)
|
||||
for i := 0; i < len(msgs); i++ {
|
||||
msgs[i] = Message{
|
||||
Subject: fmt.Sprintf("subj %v", i),
|
||||
}
|
||||
hub.Broadcast(msgs[i])
|
||||
}
|
||||
|
||||
// Wait for messages (live)
|
||||
select {
|
||||
case <-l1.done:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("Timeout:", l1)
|
||||
}
|
||||
|
||||
// Add a new listener
|
||||
l2 := newTestListener(3)
|
||||
hub.AddListener(l2)
|
||||
|
||||
// Wait for messages (log)
|
||||
select {
|
||||
case <-l2.done:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("Timeout:", l2)
|
||||
}
|
||||
|
||||
for i := 0; i < len(msgs); i++ {
|
||||
got := l2.messages[i].Subject
|
||||
want := msgs[i].Subject
|
||||
if got != want {
|
||||
t.Errorf("msg[%v].Subject == %q, want %q", i, got, want)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestHubLogReplayWrap(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
hub := New(ctx, 5)
|
||||
l1 := newTestListener(20)
|
||||
hub.AddListener(l1)
|
||||
|
||||
// Broadcast more messages than the hub can hold
|
||||
msgs := make([]Message, 20)
|
||||
for i := 0; i < len(msgs); i++ {
|
||||
msgs[i] = Message{
|
||||
Subject: fmt.Sprintf("subj %v", i),
|
||||
}
|
||||
hub.Broadcast(msgs[i])
|
||||
}
|
||||
|
||||
// Wait for messages (live)
|
||||
select {
|
||||
case <-l1.done:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("Timeout:", l1)
|
||||
}
|
||||
|
||||
// Add a new listener
|
||||
l2 := newTestListener(5)
|
||||
hub.AddListener(l2)
|
||||
|
||||
// Wait for messages (log)
|
||||
select {
|
||||
case <-l2.done:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("Timeout:", l2)
|
||||
}
|
||||
|
||||
for i := 0; i < 5; i++ {
|
||||
got := l2.messages[i].Subject
|
||||
want := msgs[i+15].Subject
|
||||
if got != want {
|
||||
t.Errorf("msg[%v].Subject == %q, want %q", i, got, want)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestHubContextCancel(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
hub := New(ctx, 5)
|
||||
m := Message{}
|
||||
l := newTestListener(1)
|
||||
|
||||
hub.AddListener(l)
|
||||
hub.Broadcast(m)
|
||||
cancel()
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
hub.Broadcast(m)
|
||||
|
||||
// Wait for messages
|
||||
select {
|
||||
case <-l.overflow:
|
||||
t.Error(l)
|
||||
case <-time.After(250 * time.Millisecond):
|
||||
// Expected result, no overflow
|
||||
}
|
||||
}
|
||||
@@ -6,9 +6,16 @@ import "github.com/jhillyerd/inbucket/httpd"
|
||||
// SetupRoutes populates the routes for the REST interface
|
||||
func SetupRoutes(r *mux.Router) {
|
||||
// API v1
|
||||
r.Path("/api/v1/mailbox/{name}").Handler(httpd.Handler(MailboxListV1)).Name("MailboxListV1").Methods("GET")
|
||||
r.Path("/api/v1/mailbox/{name}").Handler(httpd.Handler(MailboxPurgeV1)).Name("MailboxPurgeV1").Methods("DELETE")
|
||||
r.Path("/api/v1/mailbox/{name}/{id}").Handler(httpd.Handler(MailboxShowV1)).Name("MailboxShowV1").Methods("GET")
|
||||
r.Path("/api/v1/mailbox/{name}/{id}").Handler(httpd.Handler(MailboxDeleteV1)).Name("MailboxDeleteV1").Methods("DELETE")
|
||||
r.Path("/api/v1/mailbox/{name}/{id}/source").Handler(httpd.Handler(MailboxSourceV1)).Name("MailboxSourceV1").Methods("GET")
|
||||
r.Path("/api/v1/mailbox/{name}").Handler(
|
||||
httpd.Handler(MailboxListV1)).Name("MailboxListV1").Methods("GET")
|
||||
r.Path("/api/v1/mailbox/{name}").Handler(
|
||||
httpd.Handler(MailboxPurgeV1)).Name("MailboxPurgeV1").Methods("DELETE")
|
||||
r.Path("/api/v1/mailbox/{name}/{id}").Handler(
|
||||
httpd.Handler(MailboxShowV1)).Name("MailboxShowV1").Methods("GET")
|
||||
r.Path("/api/v1/mailbox/{name}/{id}").Handler(
|
||||
httpd.Handler(MailboxDeleteV1)).Name("MailboxDeleteV1").Methods("DELETE")
|
||||
r.Path("/api/v1/mailbox/{name}/{id}/source").Handler(
|
||||
httpd.Handler(MailboxSourceV1)).Name("MailboxSourceV1").Methods("GET")
|
||||
r.Path("/api/v1/monitor/all/messages").Handler(
|
||||
httpd.Handler(MonitorAllMessagesV1)).Name("MonitorAllMessagesV1").Methods("GET")
|
||||
}
|
||||
|
||||
160
rest/socketv1_controller.go
Normal file
160
rest/socketv1_controller.go
Normal file
@@ -0,0 +1,160 @@
|
||||
package rest
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/jhillyerd/inbucket/httpd"
|
||||
"github.com/jhillyerd/inbucket/log"
|
||||
"github.com/jhillyerd/inbucket/msghub"
|
||||
"github.com/jhillyerd/inbucket/rest/model"
|
||||
)
|
||||
|
||||
const (
|
||||
// Time allowed to write a message to the peer.
|
||||
writeWait = 10 * time.Second
|
||||
|
||||
// Send pings to peer with this period. Must be less than pongWait.
|
||||
pingPeriod = (pongWait * 9) / 10
|
||||
|
||||
// Time allowed to read the next pong message from the peer.
|
||||
pongWait = 60 * time.Second
|
||||
|
||||
// Maximum message size allowed from peer.
|
||||
maxMessageSize = 512
|
||||
)
|
||||
|
||||
// options for gorilla connection upgrader
|
||||
var upgrader = websocket.Upgrader{
|
||||
ReadBufferSize: 1024,
|
||||
WriteBufferSize: 1024,
|
||||
}
|
||||
|
||||
// msgListener handles messages from the msghub
|
||||
type msgListener struct {
|
||||
hub *msghub.Hub
|
||||
c chan msghub.Message
|
||||
}
|
||||
|
||||
// newMsgListener creates a listener and registers it
|
||||
func newMsgListener(hub *msghub.Hub) *msgListener {
|
||||
ml := &msgListener{
|
||||
hub: hub,
|
||||
c: make(chan msghub.Message, 100),
|
||||
}
|
||||
hub.AddListener(ml)
|
||||
return ml
|
||||
}
|
||||
|
||||
// Receive handles an incoming message
|
||||
func (ml *msgListener) Receive(msg msghub.Message) error {
|
||||
ml.c <- msg
|
||||
return nil
|
||||
}
|
||||
|
||||
// WSReader makes sure the websocket client is still connected
|
||||
func (ml *msgListener) WSReader(conn *websocket.Conn) {
|
||||
defer ml.Close()
|
||||
conn.SetReadLimit(maxMessageSize)
|
||||
conn.SetReadDeadline(time.Now().Add(pongWait))
|
||||
conn.SetPongHandler(func(string) error {
|
||||
log.Tracef("HTTP[%v] Got WebSocket pong", conn.RemoteAddr())
|
||||
conn.SetReadDeadline(time.Now().Add(pongWait))
|
||||
return nil
|
||||
})
|
||||
|
||||
for {
|
||||
if _, _, err := conn.ReadMessage(); err != nil {
|
||||
if websocket.IsUnexpectedCloseError(
|
||||
err,
|
||||
websocket.CloseNormalClosure,
|
||||
websocket.CloseGoingAway,
|
||||
websocket.CloseNoStatusReceived,
|
||||
) {
|
||||
// Unexpected close code
|
||||
log.Warnf("HTTP[%v] WebSocket error: %v", conn.RemoteAddr(), err)
|
||||
} else {
|
||||
log.Tracef("HTTP[%v] Closing WebSocket", conn.RemoteAddr())
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// WSWriter makes sure the websocket client is still connected
|
||||
func (ml *msgListener) WSWriter(conn *websocket.Conn) {
|
||||
ticker := time.NewTicker(pingPeriod)
|
||||
defer func() {
|
||||
ticker.Stop()
|
||||
ml.Close()
|
||||
}()
|
||||
|
||||
// Handle messages from hub until msgListener is closed
|
||||
for {
|
||||
select {
|
||||
case msg, ok := <-ml.c:
|
||||
conn.SetWriteDeadline(time.Now().Add(writeWait))
|
||||
if !ok {
|
||||
// msgListener closed, exit
|
||||
conn.WriteMessage(websocket.CloseMessage, []byte{})
|
||||
return
|
||||
}
|
||||
header := &model.JSONMessageHeaderV1{
|
||||
Mailbox: msg.Mailbox,
|
||||
ID: msg.ID,
|
||||
From: msg.From,
|
||||
To: msg.To,
|
||||
Subject: msg.Subject,
|
||||
Date: msg.Date,
|
||||
Size: msg.Size,
|
||||
}
|
||||
if conn.WriteJSON(header) != nil {
|
||||
// Write failed
|
||||
return
|
||||
}
|
||||
case <-ticker.C:
|
||||
// Send ping
|
||||
conn.SetWriteDeadline(time.Now().Add(writeWait))
|
||||
if conn.WriteMessage(websocket.PingMessage, []byte{}) != nil {
|
||||
// Write error
|
||||
return
|
||||
}
|
||||
log.Tracef("HTTP[%v] Sent WebSocket ping", conn.RemoteAddr())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Close removes the listener registration
|
||||
func (ml *msgListener) Close() {
|
||||
select {
|
||||
case <-ml.c:
|
||||
// Already closed
|
||||
default:
|
||||
ml.hub.RemoveListener(ml)
|
||||
close(ml.c)
|
||||
}
|
||||
}
|
||||
|
||||
func MonitorAllMessagesV1(
|
||||
w http.ResponseWriter, req *http.Request, ctx *httpd.Context) (err error) {
|
||||
// Upgrade to Websocket
|
||||
conn, err := upgrader.Upgrade(w, req, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
httpd.ExpWebSocketConnectsCurrent.Add(1)
|
||||
defer func() {
|
||||
_ = conn.Close()
|
||||
httpd.ExpWebSocketConnectsCurrent.Add(-1)
|
||||
}()
|
||||
|
||||
log.Tracef("HTTP[%v] Upgraded to websocket", req.RemoteAddr)
|
||||
|
||||
// Create, register listener; then interact with conn
|
||||
ml := newMsgListener(ctx.MsgHub)
|
||||
go ml.WSWriter(conn)
|
||||
ml.WSReader(conn)
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -50,6 +50,11 @@ func (m *MockMailbox) NewMessage() (smtpd.Message, error) {
|
||||
return args.Get(0).(smtpd.Message), args.Error(1)
|
||||
}
|
||||
|
||||
func (m *MockMailbox) Name() string {
|
||||
args := m.Called()
|
||||
return args.String(0)
|
||||
}
|
||||
|
||||
func (m *MockMailbox) String() string {
|
||||
args := m.Called()
|
||||
return args.String(0)
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
"github.com/jhillyerd/enmime"
|
||||
"github.com/jhillyerd/inbucket/config"
|
||||
"github.com/jhillyerd/inbucket/httpd"
|
||||
"github.com/jhillyerd/inbucket/msghub"
|
||||
"github.com/jhillyerd/inbucket/smtpd"
|
||||
)
|
||||
|
||||
@@ -199,7 +200,7 @@ func setupWebServer(ds smtpd.DataStore) *bytes.Buffer {
|
||||
PublicDir: "../themes/bootstrap/public",
|
||||
}
|
||||
shutdownChan := make(chan bool)
|
||||
httpd.Initialize(cfg, ds, shutdownChan)
|
||||
httpd.Initialize(cfg, shutdownChan, ds, &msghub.Hub{})
|
||||
SetupRoutes(httpd.Router)
|
||||
|
||||
return buf
|
||||
|
||||
@@ -29,6 +29,7 @@ type Mailbox interface {
|
||||
GetMessage(id string) (Message, error)
|
||||
Purge() error
|
||||
NewMessage() (Message, error)
|
||||
Name() string
|
||||
String() string
|
||||
}
|
||||
|
||||
|
||||
@@ -148,6 +148,10 @@ type FileMailbox struct {
|
||||
messages []*FileMessage
|
||||
}
|
||||
|
||||
func (mb *FileMailbox) Name() string {
|
||||
return mb.name
|
||||
}
|
||||
|
||||
func (mb *FileMailbox) String() string {
|
||||
return mb.name + "[" + mb.dirName + "]"
|
||||
}
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/jhillyerd/inbucket/log"
|
||||
"github.com/jhillyerd/inbucket/msghub"
|
||||
)
|
||||
|
||||
// State tracks the current mode of our SMTP state machine
|
||||
@@ -67,6 +68,12 @@ var commands = map[string]bool{
|
||||
"TURN": true,
|
||||
}
|
||||
|
||||
// recipientDetails for message delivery
|
||||
type recipientDetails struct {
|
||||
address, localPart, domainPart string
|
||||
mailbox Mailbox
|
||||
}
|
||||
|
||||
// Session holds the state of an SMTP session
|
||||
type Session struct {
|
||||
server *Server
|
||||
@@ -341,13 +348,7 @@ func (ss *Session) mailHandler(cmd string, arg string) {
|
||||
|
||||
// DATA
|
||||
func (ss *Session) dataHandler() {
|
||||
type RecipientDetails struct {
|
||||
address, localPart, domainPart string
|
||||
mailbox Mailbox
|
||||
}
|
||||
recipients := make([]RecipientDetails, 0, ss.recipients.Len())
|
||||
// Timestamp for Received header
|
||||
stamp := time.Now().Format(timeStampFormat)
|
||||
recipients := make([]recipientDetails, 0, ss.recipients.Len())
|
||||
// Get a Mailbox and a new Message for each recipient
|
||||
msgSize := 0
|
||||
if ss.server.storeMessages {
|
||||
@@ -369,7 +370,7 @@ func (ss *Session) dataHandler() {
|
||||
ss.reset()
|
||||
return
|
||||
}
|
||||
recipients = append(recipients, RecipientDetails{recip, local, domain, mb})
|
||||
recipients = append(recipients, recipientDetails{recip, local, domain, mb})
|
||||
} else {
|
||||
log.Tracef("Not storing message for %q", recip)
|
||||
}
|
||||
@@ -399,39 +400,15 @@ func (ss *Session) dataHandler() {
|
||||
if ss.server.storeMessages {
|
||||
// Create a message for each valid recipient
|
||||
for _, r := range recipients {
|
||||
msg, err := r.mailbox.NewMessage()
|
||||
if err != nil {
|
||||
ss.logError("Failed to create message for %q: %s", r.localPart, err)
|
||||
ss.send(fmt.Sprintf("451 Failed to create message for %v", r.localPart))
|
||||
if ok := ss.deliverMessage(r, msgBuf); ok {
|
||||
expReceivedTotal.Add(1)
|
||||
} else {
|
||||
// Delivery failure
|
||||
ss.send(fmt.Sprintf("451 Failed to store message for %v", r.localPart))
|
||||
ss.reset()
|
||||
return
|
||||
}
|
||||
|
||||
// Generate Received header
|
||||
recd := fmt.Sprintf("Received: from %s ([%s]) by %s\r\n for <%s>; %s\r\n",
|
||||
ss.remoteDomain, ss.remoteHost, ss.server.domain, r.address, stamp)
|
||||
if err := msg.Append([]byte(recd)); err != nil {
|
||||
ss.logError("Failed to write received header for %q: %s", r.localPart, err)
|
||||
ss.send(fmt.Sprintf("451 Failed to create message for %v", r.localPart))
|
||||
ss.reset()
|
||||
return
|
||||
}
|
||||
// Append lines from msgBuf
|
||||
for _, line = range msgBuf {
|
||||
if err := msg.Append(line); err != nil {
|
||||
ss.logError("Failed to append to mailbox %v: %v",
|
||||
r.mailbox, err)
|
||||
ss.send("554 Something went wrong")
|
||||
ss.reset()
|
||||
// Should really cleanup the crap on filesystem
|
||||
return
|
||||
}
|
||||
}
|
||||
if err := msg.Close(); err != nil {
|
||||
ss.logError("Error: %v while writing message", err)
|
||||
}
|
||||
expReceivedTotal.Add(1)
|
||||
} // end for
|
||||
}
|
||||
} else {
|
||||
expReceivedTotal.Add(1)
|
||||
}
|
||||
@@ -458,6 +435,51 @@ func (ss *Session) dataHandler() {
|
||||
} // end for
|
||||
}
|
||||
|
||||
// deliverMessage creates and populates a new Message for the specified recipient
|
||||
func (ss *Session) deliverMessage(r recipientDetails, msgBuf [][]byte) (ok bool) {
|
||||
msg, err := r.mailbox.NewMessage()
|
||||
if err != nil {
|
||||
ss.logError("Failed to create message for %q: %s", r.localPart, err)
|
||||
return false
|
||||
}
|
||||
|
||||
// Generate Received header
|
||||
stamp := time.Now().Format(timeStampFormat)
|
||||
recd := fmt.Sprintf("Received: from %s ([%s]) by %s\r\n for <%s>; %s\r\n",
|
||||
ss.remoteDomain, ss.remoteHost, ss.server.domain, r.address, stamp)
|
||||
if err := msg.Append([]byte(recd)); err != nil {
|
||||
ss.logError("Failed to write received header for %q: %s", r.localPart, err)
|
||||
return false
|
||||
}
|
||||
|
||||
// Append lines from msgBuf
|
||||
for _, line := range msgBuf {
|
||||
if err := msg.Append(line); err != nil {
|
||||
ss.logError("Failed to append to mailbox %v: %v", r.mailbox, err)
|
||||
// Should really cleanup the crap on filesystem
|
||||
return false
|
||||
}
|
||||
}
|
||||
if err := msg.Close(); err != nil {
|
||||
ss.logError("Error while closing message for %v: %v", r.mailbox, err)
|
||||
return false
|
||||
}
|
||||
|
||||
// Broadcast message information
|
||||
broadcast := msghub.Message{
|
||||
Mailbox: r.mailbox.Name(),
|
||||
ID: msg.ID(),
|
||||
From: msg.From(),
|
||||
To: msg.To(),
|
||||
Subject: msg.Subject(),
|
||||
Date: msg.Date(),
|
||||
Size: msg.Size(),
|
||||
}
|
||||
ss.server.msgHub.Broadcast(broadcast)
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (ss *Session) enterState(state State) {
|
||||
ss.state = state
|
||||
ss.logTrace("Entering state %v", state)
|
||||
|
||||
@@ -5,13 +5,15 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"github.com/jhillyerd/inbucket/config"
|
||||
"log"
|
||||
"net"
|
||||
"net/textproto"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/jhillyerd/inbucket/config"
|
||||
"github.com/jhillyerd/inbucket/msghub"
|
||||
)
|
||||
|
||||
type scriptStep struct {
|
||||
@@ -153,6 +155,13 @@ func TestMailState(t *testing.T) {
|
||||
msg1 := &MockMessage{}
|
||||
mds.On("MailboxFor").Return(mb1, nil)
|
||||
mb1.On("NewMessage").Return(msg1, nil)
|
||||
mb1.On("Name").Return("u1")
|
||||
msg1.On("ID").Return("")
|
||||
msg1.On("From").Return("")
|
||||
msg1.On("To").Return(make([]string, 0))
|
||||
msg1.On("Date").Return(time.Time{})
|
||||
msg1.On("Subject").Return("")
|
||||
msg1.On("Size").Return(0)
|
||||
msg1.On("Close").Return(nil)
|
||||
|
||||
server, logbuf := setupSMTPServer(mds)
|
||||
@@ -263,6 +272,13 @@ func TestDataState(t *testing.T) {
|
||||
msg1 := &MockMessage{}
|
||||
mds.On("MailboxFor").Return(mb1, nil)
|
||||
mb1.On("NewMessage").Return(msg1, nil)
|
||||
mb1.On("Name").Return("u1")
|
||||
msg1.On("ID").Return("")
|
||||
msg1.On("From").Return("")
|
||||
msg1.On("To").Return(make([]string, 0))
|
||||
msg1.On("Date").Return(time.Time{})
|
||||
msg1.On("Subject").Return("")
|
||||
msg1.On("Size").Return(0)
|
||||
msg1.On("Close").Return(nil)
|
||||
|
||||
server, logbuf := setupSMTPServer(mds)
|
||||
@@ -378,7 +394,7 @@ func setupSMTPServer(ds DataStore) (*Server, *bytes.Buffer) {
|
||||
|
||||
// Create a server, don't start it
|
||||
shutdownChan := make(chan bool)
|
||||
return NewServer(cfg, ds, shutdownChan), buf
|
||||
return NewServer(cfg, shutdownChan, ds, &msghub.Hub{}), buf
|
||||
}
|
||||
|
||||
var sessionNum int
|
||||
|
||||
@@ -12,24 +12,27 @@ import (
|
||||
|
||||
"github.com/jhillyerd/inbucket/config"
|
||||
"github.com/jhillyerd/inbucket/log"
|
||||
"github.com/jhillyerd/inbucket/msghub"
|
||||
)
|
||||
|
||||
// Server holds the configuration and state of our SMTP server
|
||||
type Server struct {
|
||||
// Configuration
|
||||
domain string
|
||||
domainNoStore string
|
||||
maxRecips int
|
||||
maxIdleSeconds int
|
||||
maxMessageBytes int
|
||||
dataStore DataStore
|
||||
storeMessages bool
|
||||
listener net.Listener
|
||||
|
||||
// globalShutdown is the signal Inbucket needs to shut down
|
||||
globalShutdown chan bool
|
||||
// Dependencies
|
||||
dataStore DataStore // Mailbox/message store
|
||||
globalShutdown chan bool // Shuts down Inbucket
|
||||
msgHub *msghub.Hub // Pub/sub for message info
|
||||
|
||||
// waitgroup tracks individual sessions
|
||||
waitgroup *sync.WaitGroup
|
||||
// State
|
||||
listener net.Listener // Incoming network connections
|
||||
waitgroup *sync.WaitGroup // Waitgroup tracks individual sessions
|
||||
}
|
||||
|
||||
var (
|
||||
@@ -54,17 +57,22 @@ var (
|
||||
)
|
||||
|
||||
// NewServer creates a new Server instance with the specificed config
|
||||
func NewServer(cfg config.SMTPConfig, ds DataStore, globalShutdown chan bool) *Server {
|
||||
func NewServer(
|
||||
cfg config.SMTPConfig,
|
||||
globalShutdown chan bool,
|
||||
ds DataStore,
|
||||
msgHub *msghub.Hub) *Server {
|
||||
return &Server{
|
||||
dataStore: ds,
|
||||
domain: cfg.Domain,
|
||||
domainNoStore: strings.ToLower(cfg.DomainNoStore),
|
||||
maxRecips: cfg.MaxRecipients,
|
||||
maxIdleSeconds: cfg.MaxIdleSeconds,
|
||||
maxMessageBytes: cfg.MaxMessageBytes,
|
||||
storeMessages: cfg.StoreMessages,
|
||||
domainNoStore: strings.ToLower(cfg.DomainNoStore),
|
||||
waitgroup: new(sync.WaitGroup),
|
||||
globalShutdown: globalShutdown,
|
||||
dataStore: ds,
|
||||
msgHub: msgHub,
|
||||
waitgroup: new(sync.WaitGroup),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -106,6 +106,11 @@ func (m *MockMailbox) NewMessage() (Message, error) {
|
||||
return args.Get(0).(Message), args.Error(1)
|
||||
}
|
||||
|
||||
func (m *MockMailbox) Name() string {
|
||||
args := m.Called()
|
||||
return args.String(0)
|
||||
}
|
||||
|
||||
func (m *MockMailbox) String() string {
|
||||
args := m.Called()
|
||||
return args.String(0)
|
||||
|
||||
@@ -31,7 +31,7 @@ export SWAKS_OPT_to="$to@inbucket.local"
|
||||
swaks $* --h-Subject: "Swaks Plain Text" --body text.txt
|
||||
|
||||
# Multi-recipient test
|
||||
swaks $* --to="$to@inbucket.local,Alt User <alternate@inbucket.local>" --h-Subject: "Swaks Multi-Recipient" \
|
||||
swaks $* --to="$to@inbucket.local,alternate@inbucket.local" --h-Subject: "Swaks Multi-Recipient" \
|
||||
--body text.txt
|
||||
|
||||
# HTML test
|
||||
|
||||
@@ -81,3 +81,12 @@ table.metrics {
|
||||
width: 200px;
|
||||
}
|
||||
|
||||
/* Monitor */
|
||||
#monitor-message-list td {
|
||||
cursor: pointer;
|
||||
font-size: 12px;
|
||||
}
|
||||
|
||||
#conn-status {
|
||||
font-style: italic;
|
||||
}
|
||||
|
||||
@@ -104,6 +104,7 @@ function displayMetrics(data, textStatus, jqXHR) {
|
||||
metric('memstatsHeapSys', data.memstats.HeapSys, sizeFilter, true);
|
||||
metric('memstatsHeapObjects', data.memstats.HeapObjects, numberFilter, true);
|
||||
metric('smtpConnectsCurrent', data.smtp.ConnectsCurrent, numberFilter, true);
|
||||
metric('httpWebSocketConnectsCurrent', data.http.WebSocketConnectsCurrent, numberFilter, true);
|
||||
|
||||
// Server-side history
|
||||
metric('smtpReceivedTotal', data.smtp.ReceivedTotal, numberFilter, false);
|
||||
|
||||
45
themes/bootstrap/public/monitor.js
Normal file
45
themes/bootstrap/public/monitor.js
Normal file
@@ -0,0 +1,45 @@
|
||||
var baseURL = window.location.protocol + '//' + window.location.host;
|
||||
|
||||
function startMonitor() {
|
||||
$.addTemplateFormatter({
|
||||
"date": function(value, template) {
|
||||
return moment(value).calendar();
|
||||
},
|
||||
"subject": function(value, template) {
|
||||
if (value == null || value.length == 0) {
|
||||
return "(No Subject)";
|
||||
}
|
||||
return value;
|
||||
}
|
||||
});
|
||||
|
||||
var uri = '/api/v1/monitor/all/messages'
|
||||
var l = window.location;
|
||||
var url = ((l.protocol === "https:") ? "wss://" : "ws://") + l.host + uri
|
||||
var ws = new WebSocket(url);
|
||||
|
||||
ws.addEventListener('open', function (e) {
|
||||
$('#conn-status').text('Connected.');
|
||||
});
|
||||
ws.addEventListener('message', function (e) {
|
||||
var msg = JSON.parse(e.data);
|
||||
msg['href'] = '/mailbox?name=' + msg.mailbox + '&id=' + msg.id;
|
||||
$('#monitor-message-list').loadTemplate(
|
||||
$('#message-template'),
|
||||
msg,
|
||||
{ append: true });
|
||||
});
|
||||
ws.addEventListener('close', function (e) {
|
||||
$('#conn-status').text('Disconnected!');
|
||||
});
|
||||
}
|
||||
|
||||
function messageClick(node) {
|
||||
var href = node.attributes['href'].value;
|
||||
var url = baseURL + href;
|
||||
window.location.assign(url);
|
||||
}
|
||||
|
||||
function clearClick() {
|
||||
$('#monitor-message-list').empty();
|
||||
}
|
||||
@@ -48,7 +48,10 @@
|
||||
</ul>
|
||||
</li>
|
||||
{{end}}
|
||||
<li id="nav-status"><a href="/status" accesskey="2">Status</a></li>
|
||||
{{if .ctx.WebConfig.MonitorVisible}}
|
||||
<li id="nav-monitor"><a href="/monitor" accesskey="2">Monitor</a></li>
|
||||
{{end}}
|
||||
<li id="nav-status"><a href="/status" accesskey="3">Status</a></li>
|
||||
</ul>
|
||||
<form class="navbar-form navbar-right" action="{{reverse "MailboxIndex"}}" method="GET">
|
||||
<div class="form-group">
|
||||
@@ -68,9 +71,9 @@
|
||||
</nav>
|
||||
|
||||
<div class="container">
|
||||
{{with .ctx.Session.Flashes "errors"}}
|
||||
{{with .errorFlash}}
|
||||
<div class="alert alert-danger">
|
||||
<p>Please fix the following errors and resubmit:<p>
|
||||
<p>Please fix the following errors and try again:<p>
|
||||
<ul>
|
||||
{{range .}}
|
||||
<li>{{.}}</li>
|
||||
|
||||
@@ -64,16 +64,6 @@ $(document).ready(function() {
|
||||
</div>
|
||||
</div>
|
||||
<div id="message-container" class="col-md-9">
|
||||
{{with .ctx.Session.Flashes "errors"}}
|
||||
<div class="errors">
|
||||
<p>Please fix the following errors and resubmit:<p>
|
||||
<ul>
|
||||
{{range .}}
|
||||
<li>{{.}}</li>
|
||||
{{end}}
|
||||
</ul>
|
||||
</div>
|
||||
{{end}}
|
||||
<div id="message-content">
|
||||
<p>Select a message at left, or enter a different username into the box on upper right.</p>
|
||||
</div>
|
||||
|
||||
53
themes/bootstrap/templates/root/monitor.html
Normal file
53
themes/bootstrap/templates/root/monitor.html
Normal file
@@ -0,0 +1,53 @@
|
||||
{{define "title"}}Inbucket Monitor{{end}}
|
||||
|
||||
{{define "script"}}
|
||||
<script src="/public/monitor.js" type="text/javascript" charset="utf-8"></script>
|
||||
<script>
|
||||
$(document).ready(function () {
|
||||
$('#nav-monitor').addClass('active');
|
||||
startMonitor();
|
||||
});
|
||||
</script>
|
||||
<script type="text/html" id="message-template">
|
||||
<tr data-href="href" onclick="messageClick(this);">
|
||||
<td data-content="date" data-format="date"/>
|
||||
<td data-content-text="from"/>
|
||||
<td data-content-text="mailbox"/>
|
||||
<td data-content-text="subject" data-format="subject"/>
|
||||
</tr>
|
||||
</script>
|
||||
{{end}}
|
||||
|
||||
{{define "menu"}}
|
||||
<div id="logo">
|
||||
<h1><a href="/">inbucket</a></h1>
|
||||
<h2>email testing service</h2>
|
||||
</div>
|
||||
{{end}}
|
||||
|
||||
{{define "content"}}
|
||||
<h2>Inbucket Monitor</h2>
|
||||
|
||||
<div class="pull-right">
|
||||
<button class="btn btn-primary" onclick="clearClick();">Clear</button>
|
||||
</div>
|
||||
|
||||
<p class="small">Messages will be listed here shortly after delivery.</p>
|
||||
|
||||
<div class="table-responsive clearfix">
|
||||
<table class="table table-condensed table-hover">
|
||||
<thead>
|
||||
<tr>
|
||||
<th>Date</th>
|
||||
<th>From</th>
|
||||
<th>Mailbox</th>
|
||||
<th>Subject</th>
|
||||
</tr>
|
||||
</thead>
|
||||
<tbody id="monitor-message-list"></tbody>
|
||||
</table>
|
||||
</div>
|
||||
|
||||
<div id="conn-status">Connecting...</div>
|
||||
{{end}}
|
||||
|
||||
@@ -107,6 +107,12 @@ $(document).ready(
|
||||
<div class="col-sm-4"><span id="s-memstatsHeapObjects">.</span></div>
|
||||
<div class="col-sm-2 hidden-xs">(10min)</div>
|
||||
</div>
|
||||
<div class="row">
|
||||
<div class="col-sm-3 col-xs-7"><b>Open WebSockets:</b></div>
|
||||
<div class="col-sm-3 col-xs-5"><span id="m-httpWebSocketConnectsCurrent">.</span></div>
|
||||
<div class="col-sm-4"><span id="s-httpWebSocketConnectsCurrent">.</span></div>
|
||||
<div class="col-sm-2 hidden-xs">(10min)</div>
|
||||
</div>
|
||||
</table>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
@@ -17,27 +17,32 @@ func MailboxIndex(w http.ResponseWriter, req *http.Request, ctx *httpd.Context)
|
||||
// Form values must be validated manually
|
||||
name := req.FormValue("name")
|
||||
selected := req.FormValue("id")
|
||||
|
||||
if len(name) == 0 {
|
||||
ctx.Session.AddFlash("Account name is required", "errors")
|
||||
_ = ctx.Session.Save(req, w)
|
||||
http.Redirect(w, req, httpd.Reverse("RootIndex"), http.StatusSeeOther)
|
||||
return nil
|
||||
}
|
||||
|
||||
name, err = smtpd.ParseMailboxName(name)
|
||||
if err != nil {
|
||||
ctx.Session.AddFlash(err.Error(), "errors")
|
||||
_ = ctx.Session.Save(req, w)
|
||||
http.Redirect(w, req, httpd.Reverse("RootIndex"), http.StatusSeeOther)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Remember this mailbox was visited
|
||||
RememberMailbox(ctx, name)
|
||||
|
||||
// Get flash messages, save session
|
||||
errorFlash := ctx.Session.Flashes("errors")
|
||||
if err = ctx.Session.Save(req, w); err != nil {
|
||||
return err
|
||||
}
|
||||
// Render template
|
||||
return httpd.RenderTemplate("mailbox/index.html", w, map[string]interface{}{
|
||||
"ctx": ctx,
|
||||
"name": name,
|
||||
"selected": selected,
|
||||
"ctx": ctx,
|
||||
"errorFlash": errorFlash,
|
||||
"name": name,
|
||||
"selected": selected,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -48,16 +53,17 @@ func MailboxLink(w http.ResponseWriter, req *http.Request, ctx *httpd.Context) (
|
||||
name, err := smtpd.ParseMailboxName(ctx.Vars["name"])
|
||||
if err != nil {
|
||||
ctx.Session.AddFlash(err.Error(), "errors")
|
||||
_ = ctx.Session.Save(req, w)
|
||||
http.Redirect(w, req, httpd.Reverse("RootIndex"), http.StatusSeeOther)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Build redirect
|
||||
uri := fmt.Sprintf("%s?name=%s&id=%s", httpd.Reverse("MailboxIndex"), name, id)
|
||||
http.Redirect(w, req, uri, http.StatusSeeOther)
|
||||
return nil
|
||||
}
|
||||
|
||||
// MailboxList renders a list of messages in a mailbox. Renders JSON or a partial
|
||||
// MailboxList renders a list of messages in a mailbox. Renders a partial
|
||||
func MailboxList(w http.ResponseWriter, req *http.Request, ctx *httpd.Context) (err error) {
|
||||
// Don't have to validate these aren't empty, Gorilla returns 404
|
||||
name, err := smtpd.ParseMailboxName(ctx.Vars["name"])
|
||||
@@ -75,7 +81,7 @@ func MailboxList(w http.ResponseWriter, req *http.Request, ctx *httpd.Context) (
|
||||
return fmt.Errorf("Failed to get messages for %v: %v", name, err)
|
||||
}
|
||||
log.Tracef("Got %v messsages", len(messages))
|
||||
|
||||
// Render partial template
|
||||
return httpd.RenderPartial("mailbox/_list.html", w, map[string]interface{}{
|
||||
"ctx": ctx,
|
||||
"name": name,
|
||||
@@ -109,10 +115,9 @@ func MailboxShow(w http.ResponseWriter, req *http.Request, ctx *httpd.Context) (
|
||||
if err != nil {
|
||||
return fmt.Errorf("ReadBody(%q) failed: %v", id, err)
|
||||
}
|
||||
|
||||
body := template.HTML(httpd.TextToHTML(mime.Text))
|
||||
htmlAvailable := mime.HTML != ""
|
||||
|
||||
// Render partial template
|
||||
return httpd.RenderPartial("mailbox/_show.html", w, map[string]interface{}{
|
||||
"ctx": ctx,
|
||||
"name": name,
|
||||
@@ -150,7 +155,7 @@ func MailboxHTML(w http.ResponseWriter, req *http.Request, ctx *httpd.Context) (
|
||||
if err != nil {
|
||||
return fmt.Errorf("ReadBody(%q) failed: %v", id, err)
|
||||
}
|
||||
|
||||
// Render partial template
|
||||
w.Header().Set("Content-Type", "text/html; charset=UTF-8")
|
||||
return httpd.RenderPartial("mailbox/_html.html", w, map[string]interface{}{
|
||||
"ctx": ctx,
|
||||
@@ -187,7 +192,7 @@ func MailboxSource(w http.ResponseWriter, req *http.Request, ctx *httpd.Context)
|
||||
if err != nil {
|
||||
return fmt.Errorf("ReadRaw(%q) failed: %v", id, err)
|
||||
}
|
||||
|
||||
// Output message source
|
||||
w.Header().Set("Content-Type", "text/plain")
|
||||
if _, err := io.WriteString(w, *raw); err != nil {
|
||||
return err
|
||||
@@ -203,6 +208,7 @@ func MailboxDownloadAttach(w http.ResponseWriter, req *http.Request, ctx *httpd.
|
||||
name, err := smtpd.ParseMailboxName(ctx.Vars["name"])
|
||||
if err != nil {
|
||||
ctx.Session.AddFlash(err.Error(), "errors")
|
||||
_ = ctx.Session.Save(req, w)
|
||||
http.Redirect(w, req, httpd.Reverse("RootIndex"), http.StatusSeeOther)
|
||||
return nil
|
||||
}
|
||||
@@ -210,10 +216,10 @@ func MailboxDownloadAttach(w http.ResponseWriter, req *http.Request, ctx *httpd.
|
||||
num, err := strconv.ParseUint(numStr, 10, 32)
|
||||
if err != nil {
|
||||
ctx.Session.AddFlash("Attachment number must be unsigned numeric", "errors")
|
||||
_ = ctx.Session.Save(req, w)
|
||||
http.Redirect(w, req, httpd.Reverse("RootIndex"), http.StatusSeeOther)
|
||||
return nil
|
||||
}
|
||||
|
||||
mb, err := ctx.DataStore.MailboxFor(name)
|
||||
if err != nil {
|
||||
// This doesn't indicate not found, likely an IO error
|
||||
@@ -234,11 +240,12 @@ func MailboxDownloadAttach(w http.ResponseWriter, req *http.Request, ctx *httpd.
|
||||
}
|
||||
if int(num) >= len(body.Attachments) {
|
||||
ctx.Session.AddFlash("Attachment number too high", "errors")
|
||||
_ = ctx.Session.Save(req, w)
|
||||
http.Redirect(w, req, httpd.Reverse("RootIndex"), http.StatusSeeOther)
|
||||
return nil
|
||||
}
|
||||
part := body.Attachments[num]
|
||||
|
||||
// Output attachment
|
||||
w.Header().Set("Content-Type", "application/octet-stream")
|
||||
w.Header().Set("Content-Disposition", "attachment")
|
||||
if _, err := io.Copy(w, part); err != nil {
|
||||
@@ -253,6 +260,7 @@ func MailboxViewAttach(w http.ResponseWriter, req *http.Request, ctx *httpd.Cont
|
||||
name, err := smtpd.ParseMailboxName(ctx.Vars["name"])
|
||||
if err != nil {
|
||||
ctx.Session.AddFlash(err.Error(), "errors")
|
||||
_ = ctx.Session.Save(req, w)
|
||||
http.Redirect(w, req, httpd.Reverse("RootIndex"), http.StatusSeeOther)
|
||||
return nil
|
||||
}
|
||||
@@ -261,10 +269,10 @@ func MailboxViewAttach(w http.ResponseWriter, req *http.Request, ctx *httpd.Cont
|
||||
num, err := strconv.ParseUint(numStr, 10, 32)
|
||||
if err != nil {
|
||||
ctx.Session.AddFlash("Attachment number must be unsigned numeric", "errors")
|
||||
_ = ctx.Session.Save(req, w)
|
||||
http.Redirect(w, req, httpd.Reverse("RootIndex"), http.StatusSeeOther)
|
||||
return nil
|
||||
}
|
||||
|
||||
mb, err := ctx.DataStore.MailboxFor(name)
|
||||
if err != nil {
|
||||
// This doesn't indicate not found, likely an IO error
|
||||
@@ -285,11 +293,12 @@ func MailboxViewAttach(w http.ResponseWriter, req *http.Request, ctx *httpd.Cont
|
||||
}
|
||||
if int(num) >= len(body.Attachments) {
|
||||
ctx.Session.AddFlash("Attachment number too high", "errors")
|
||||
_ = ctx.Session.Save(req, w)
|
||||
http.Redirect(w, req, httpd.Reverse("RootIndex"), http.StatusSeeOther)
|
||||
return nil
|
||||
}
|
||||
part := body.Attachments[num]
|
||||
|
||||
// Output attachment
|
||||
w.Header().Set("Content-Type", part.ContentType)
|
||||
if _, err := io.Copy(w, part); err != nil {
|
||||
return err
|
||||
|
||||
@@ -16,10 +16,36 @@ func RootIndex(w http.ResponseWriter, req *http.Request, ctx *httpd.Context) (er
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to load greeting: %v", err)
|
||||
}
|
||||
|
||||
// Get flash messages, save session
|
||||
errorFlash := ctx.Session.Flashes("errors")
|
||||
if err = ctx.Session.Save(req, w); err != nil {
|
||||
return err
|
||||
}
|
||||
// Render template
|
||||
return httpd.RenderTemplate("root/index.html", w, map[string]interface{}{
|
||||
"ctx": ctx,
|
||||
"greeting": template.HTML(string(greeting)),
|
||||
"ctx": ctx,
|
||||
"errorFlash": errorFlash,
|
||||
"greeting": template.HTML(string(greeting)),
|
||||
})
|
||||
}
|
||||
|
||||
// RootMonitor serves the Inbucket monitor page
|
||||
func RootMonitor(w http.ResponseWriter, req *http.Request, ctx *httpd.Context) (err error) {
|
||||
if !config.GetWebConfig().MonitorVisible {
|
||||
ctx.Session.AddFlash("Monitor is disabled in configuration", "errors")
|
||||
_ = ctx.Session.Save(req, w)
|
||||
http.Redirect(w, req, httpd.Reverse("RootIndex"), http.StatusSeeOther)
|
||||
return nil
|
||||
}
|
||||
// Get flash messages, save session
|
||||
errorFlash := ctx.Session.Flashes("errors")
|
||||
if err = ctx.Session.Save(req, w); err != nil {
|
||||
return err
|
||||
}
|
||||
// Render template
|
||||
return httpd.RenderTemplate("root/monitor.html", w, map[string]interface{}{
|
||||
"ctx": ctx,
|
||||
"errorFlash": errorFlash,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -31,8 +57,15 @@ func RootStatus(w http.ResponseWriter, req *http.Request, ctx *httpd.Context) (e
|
||||
config.GetPOP3Config().IP4port)
|
||||
webListener := fmt.Sprintf("%s:%d", config.GetWebConfig().IP4address.String(),
|
||||
config.GetWebConfig().IP4port)
|
||||
// Get flash messages, save session
|
||||
errorFlash := ctx.Session.Flashes("errors")
|
||||
if err = ctx.Session.Save(req, w); err != nil {
|
||||
return err
|
||||
}
|
||||
// Render template
|
||||
return httpd.RenderTemplate("root/status.html", w, map[string]interface{}{
|
||||
"ctx": ctx,
|
||||
"errorFlash": errorFlash,
|
||||
"version": config.Version,
|
||||
"buildDate": config.BuildDate,
|
||||
"smtpListener": smtpListener,
|
||||
|
||||
@@ -8,14 +8,26 @@ import (
|
||||
|
||||
// SetupRoutes populates routes for the webui into the provided Router
|
||||
func SetupRoutes(r *mux.Router) {
|
||||
r.Path("/").Handler(httpd.Handler(RootIndex)).Name("RootIndex").Methods("GET")
|
||||
r.Path("/status").Handler(httpd.Handler(RootStatus)).Name("RootStatus").Methods("GET")
|
||||
r.Path("/link/{name}/{id}").Handler(httpd.Handler(MailboxLink)).Name("MailboxLink").Methods("GET")
|
||||
r.Path("/mailbox").Handler(httpd.Handler(MailboxIndex)).Name("MailboxIndex").Methods("GET")
|
||||
r.Path("/mailbox/{name}").Handler(httpd.Handler(MailboxList)).Name("MailboxList").Methods("GET")
|
||||
r.Path("/mailbox/{name}/{id}").Handler(httpd.Handler(MailboxShow)).Name("MailboxShow").Methods("GET")
|
||||
r.Path("/mailbox/{name}/{id}/html").Handler(httpd.Handler(MailboxHTML)).Name("MailboxHtml").Methods("GET")
|
||||
r.Path("/mailbox/{name}/{id}/source").Handler(httpd.Handler(MailboxSource)).Name("MailboxSource").Methods("GET")
|
||||
r.Path("/mailbox/dattach/{name}/{id}/{num}/{file}").Handler(httpd.Handler(MailboxDownloadAttach)).Name("MailboxDownloadAttach").Methods("GET")
|
||||
r.Path("/mailbox/vattach/{name}/{id}/{num}/{file}").Handler(httpd.Handler(MailboxViewAttach)).Name("MailboxViewAttach").Methods("GET")
|
||||
r.Path("/").Handler(
|
||||
httpd.Handler(RootIndex)).Name("RootIndex").Methods("GET")
|
||||
r.Path("/monitor").Handler(
|
||||
httpd.Handler(RootMonitor)).Name("RootMonitor").Methods("GET")
|
||||
r.Path("/status").Handler(
|
||||
httpd.Handler(RootStatus)).Name("RootStatus").Methods("GET")
|
||||
r.Path("/link/{name}/{id}").Handler(
|
||||
httpd.Handler(MailboxLink)).Name("MailboxLink").Methods("GET")
|
||||
r.Path("/mailbox").Handler(
|
||||
httpd.Handler(MailboxIndex)).Name("MailboxIndex").Methods("GET")
|
||||
r.Path("/mailbox/{name}").Handler(
|
||||
httpd.Handler(MailboxList)).Name("MailboxList").Methods("GET")
|
||||
r.Path("/mailbox/{name}/{id}").Handler(
|
||||
httpd.Handler(MailboxShow)).Name("MailboxShow").Methods("GET")
|
||||
r.Path("/mailbox/{name}/{id}/html").Handler(
|
||||
httpd.Handler(MailboxHTML)).Name("MailboxHtml").Methods("GET")
|
||||
r.Path("/mailbox/{name}/{id}/source").Handler(
|
||||
httpd.Handler(MailboxSource)).Name("MailboxSource").Methods("GET")
|
||||
r.Path("/mailbox/dattach/{name}/{id}/{num}/{file}").Handler(
|
||||
httpd.Handler(MailboxDownloadAttach)).Name("MailboxDownloadAttach").Methods("GET")
|
||||
r.Path("/mailbox/vattach/{name}/{id}/{num}/{file}").Handler(
|
||||
httpd.Handler(MailboxViewAttach)).Name("MailboxViewAttach").Methods("GET")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user