1
0
mirror of https://github.com/jhillyerd/inbucket.git synced 2025-12-17 17:47:03 +00:00

Dependency injection improvements (#288)

Refactor server life-cycle into it's own file, make service startup and monitoring more consistent and testable.

* Extract services creation in preparation for DI
* pop3: rename New to NewServer
* lifecycle: Add fatal error Notify()
* web: Introduce Server struct w/ Notify()
* Extract Start in lifecycle
* Add Start() to Hub
* RetentionScanner startup consistent with other svcs
* Remove global shutdown channel
* Implement a readiness notification system
This commit is contained in:
James Hillyerd
2022-08-13 13:22:34 -07:00
committed by GitHub
parent 29d1ed1e7f
commit eae4926b23
12 changed files with 288 additions and 188 deletions

View File

@@ -15,18 +15,10 @@ import (
"time" "time"
"github.com/inbucket/inbucket/pkg/config" "github.com/inbucket/inbucket/pkg/config"
"github.com/inbucket/inbucket/pkg/message" "github.com/inbucket/inbucket/pkg/server"
"github.com/inbucket/inbucket/pkg/msghub"
"github.com/inbucket/inbucket/pkg/policy"
"github.com/inbucket/inbucket/pkg/rest"
"github.com/inbucket/inbucket/pkg/server/pop3"
"github.com/inbucket/inbucket/pkg/server/smtp"
"github.com/inbucket/inbucket/pkg/server/web"
"github.com/inbucket/inbucket/pkg/storage" "github.com/inbucket/inbucket/pkg/storage"
"github.com/inbucket/inbucket/pkg/storage/file" "github.com/inbucket/inbucket/pkg/storage/file"
"github.com/inbucket/inbucket/pkg/storage/mem" "github.com/inbucket/inbucket/pkg/storage/mem"
"github.com/inbucket/inbucket/pkg/stringutil"
"github.com/inbucket/inbucket/pkg/webui"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
) )
@@ -114,36 +106,16 @@ func main() {
} }
} }
// Configure internal services. // Configure and start internal services.
rootCtx, rootCancel := context.WithCancel(context.Background()) svcCtx, svcCancel := context.WithCancel(context.Background())
shutdownChan := make(chan bool) services, err := server.FullAssembly(conf)
store, err := storage.FromConfig(conf.Storage)
if err != nil { if err != nil {
startupLog.Fatal().Err(err).Msg("Fatal error during startup")
removePIDFile(*pidfile) removePIDFile(*pidfile)
startupLog.Fatal().Err(err).Str("module", "storage").Msg("Fatal storage error")
} }
msgHub := msghub.New(rootCtx, conf.Web.MonitorHistory) services.Start(svcCtx, func() {
addrPolicy := &policy.Addressing{Config: conf} startupLog.Debug().Msg("All services report ready")
mmanager := &message.StoreManager{AddrPolicy: addrPolicy, Store: store, Hub: msgHub} })
// Start Retention scanner.
retentionScanner := storage.NewRetentionScanner(conf.Storage, store, shutdownChan)
retentionScanner.Start()
// Configure routes and start HTTP server.
prefix := stringutil.MakePathPrefixer(conf.Web.BasePath)
webui.SetupRoutes(web.Router.PathPrefix(prefix("/serve/")).Subrouter())
rest.SetupRoutes(web.Router.PathPrefix(prefix("/api/")).Subrouter())
web.Initialize(conf, shutdownChan, mmanager, msgHub)
go web.Start(rootCtx)
// Start POP3 server.
pop3Server := pop3.New(conf.POP3, shutdownChan, store)
go pop3Server.Start(rootCtx)
// Start SMTP server.
smtpServer := smtp.NewServer(conf.SMTP, shutdownChan, mmanager, addrPolicy)
go smtpServer.Start(rootCtx)
// Loop forever waiting for signals or shutdown channel. // Loop forever waiting for signals or shutdown channel.
signalLoop: signalLoop:
@@ -155,24 +127,27 @@ signalLoop:
// Shutdown requested // Shutdown requested
log.Info().Str("phase", "shutdown").Str("signal", "SIGINT"). log.Info().Str("phase", "shutdown").Str("signal", "SIGINT").
Msg("Received SIGINT, shutting down") Msg("Received SIGINT, shutting down")
close(shutdownChan) svcCancel()
break signalLoop
case syscall.SIGTERM: case syscall.SIGTERM:
// Shutdown requested // Shutdown requested
log.Info().Str("phase", "shutdown").Str("signal", "SIGTERM"). log.Info().Str("phase", "shutdown").Str("signal", "SIGTERM").
Msg("Received SIGTERM, shutting down") Msg("Received SIGTERM, shutting down")
close(shutdownChan) svcCancel()
break signalLoop
} }
case <-shutdownChan: case <-services.Notify():
rootCancel() log.Info().Str("phase", "shutdown").Msg("Shutting down due to service failure")
svcCancel()
break signalLoop break signalLoop
} }
} }
// Wait for active connections to finish. // Wait for active connections to finish.
go timedExit(*pidfile) go timedExit(*pidfile)
smtpServer.Drain() services.SMTPServer.Drain()
pop3Server.Drain() services.POP3Server.Drain()
retentionScanner.Join() services.RetentionScanner.Join()
removePIDFile(*pidfile) removePIDFile(*pidfile)
closeLog() closeLog()
} }

View File

@@ -36,27 +36,26 @@ type Hub struct {
// New constructs a new Hub which will cache historyLen messages in memory for playback to future // New constructs a new Hub which will cache historyLen messages in memory for playback to future
// listeners. A goroutine is created to handle incoming messages; it will run until the provided // listeners. A goroutine is created to handle incoming messages; it will run until the provided
// context is canceled. // context is canceled.
func New(ctx context.Context, historyLen int) *Hub { func New(historyLen int) *Hub {
h := &Hub{ return &Hub{
history: ring.New(historyLen), history: ring.New(historyLen),
listeners: make(map[Listener]struct{}), listeners: make(map[Listener]struct{}),
opChan: make(chan func(h *Hub), opChanLen), opChan: make(chan func(h *Hub), opChanLen),
} }
}
go func() { // Start Hub processing loop.
for { func (hub *Hub) Start(ctx context.Context) {
select { for {
case <-ctx.Done(): select {
// Shutdown case <-ctx.Done():
close(h.opChan) // Shutdown
return close(hub.opChan)
case op := <-h.opChan: return
op(h) case op := <-hub.opChan:
} op(hub)
} }
}() }
return h
} }
// Dispatch queues a message for broadcast by the hub. The message will be placed into the // Dispatch queues a message for broadcast by the hub. The message will be placed into the

View File

@@ -52,9 +52,7 @@ func (l *testListener) String() string {
} }
func TestHubNew(t *testing.T) { func TestHubNew(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) hub := New(5)
defer cancel()
hub := New(ctx, 5)
if hub == nil { if hub == nil {
t.Fatal("New() == nil, expected a new Hub") t.Fatal("New() == nil, expected a new Hub")
} }
@@ -63,29 +61,32 @@ func TestHubNew(t *testing.T) {
func TestHubZeroLen(t *testing.T) { func TestHubZeroLen(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
hub := New(ctx, 0) hub := New(0)
go hub.Start(ctx)
m := Message{} m := Message{}
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
hub.Dispatch(m) hub.Dispatch(m)
} }
// Just making sure Hub doesn't panic // Ensures Hub doesn't panic
} }
func TestHubZeroListeners(t *testing.T) { func TestHubZeroListeners(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
hub := New(ctx, 5) hub := New(5)
go hub.Start(ctx)
m := Message{} m := Message{}
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
hub.Dispatch(m) hub.Dispatch(m)
} }
// Just making sure Hub doesn't panic // Ensures Hub doesn't panic
} }
func TestHubOneListener(t *testing.T) { func TestHubOneListener(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
hub := New(ctx, 5) hub := New(5)
go hub.Start(ctx)
m := Message{} m := Message{}
l := newTestListener(1) l := newTestListener(1)
@@ -103,7 +104,8 @@ func TestHubOneListener(t *testing.T) {
func TestHubRemoveListener(t *testing.T) { func TestHubRemoveListener(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
hub := New(ctx, 5) hub := New(5)
go hub.Start(ctx)
m := Message{} m := Message{}
l := newTestListener(1) l := newTestListener(1)
@@ -125,7 +127,8 @@ func TestHubRemoveListener(t *testing.T) {
func TestHubRemoveListenerOnError(t *testing.T) { func TestHubRemoveListenerOnError(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
hub := New(ctx, 5) hub := New(5)
go hub.Start(ctx)
m := Message{} m := Message{}
// error after 1 means listener should receive 2 messages before being removed // error after 1 means listener should receive 2 messages before being removed
@@ -151,7 +154,8 @@ func TestHubRemoveListenerOnError(t *testing.T) {
func TestHubHistoryReplay(t *testing.T) { func TestHubHistoryReplay(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
hub := New(ctx, 100) hub := New(100)
go hub.Start(ctx)
l1 := newTestListener(3) l1 := newTestListener(3)
hub.AddListener(l1) hub.AddListener(l1)
@@ -194,7 +198,8 @@ func TestHubHistoryReplay(t *testing.T) {
func TestHubHistoryReplayWrap(t *testing.T) { func TestHubHistoryReplayWrap(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
hub := New(ctx, 5) hub := New(5)
go hub.Start(ctx)
l1 := newTestListener(20) l1 := newTestListener(20)
hub.AddListener(l1) hub.AddListener(l1)
@@ -236,7 +241,8 @@ func TestHubHistoryReplayWrap(t *testing.T) {
func TestHubContextCancel(t *testing.T) { func TestHubContextCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
hub := New(ctx, 5) hub := New(5)
go hub.Start(ctx)
m := Message{} m := Message{}
l := newTestListener(1) l := newTestListener(1)

View File

@@ -48,9 +48,8 @@ func setupWebServer(mm message.Manager) *bytes.Buffer {
UIDir: "../ui", UIDir: "../ui",
}, },
} }
shutdownChan := make(chan bool)
SetupRoutes(web.Router.PathPrefix("/api/").Subrouter()) SetupRoutes(web.Router.PathPrefix("/api/").Subrouter())
web.Initialize(cfg, shutdownChan, mm, &msghub.Hub{}) web.NewServer(cfg, mm, &msghub.Hub{})
return buf return buf
} }

105
pkg/server/lifecycle.go Normal file
View File

@@ -0,0 +1,105 @@
package server
import (
"context"
"sync"
"github.com/inbucket/inbucket/pkg/config"
"github.com/inbucket/inbucket/pkg/message"
"github.com/inbucket/inbucket/pkg/msghub"
"github.com/inbucket/inbucket/pkg/policy"
"github.com/inbucket/inbucket/pkg/rest"
"github.com/inbucket/inbucket/pkg/server/pop3"
"github.com/inbucket/inbucket/pkg/server/smtp"
"github.com/inbucket/inbucket/pkg/server/web"
"github.com/inbucket/inbucket/pkg/storage"
"github.com/inbucket/inbucket/pkg/stringutil"
"github.com/inbucket/inbucket/pkg/webui"
)
// Services holds the configured services.
type Services struct {
MsgHub *msghub.Hub
POP3Server *pop3.Server
RetentionScanner *storage.RetentionScanner
SMTPServer *smtp.Server
WebServer *web.Server
notify chan error // Combined notification for failed services.
ready *sync.WaitGroup // Tracks services that have not reported ready.
}
// FullAssembly wires up a complete Inbucket environment.
func FullAssembly(conf *config.Root) (*Services, error) {
// Configure storage.
store, err := storage.FromConfig(conf.Storage)
if err != nil {
return nil, err
}
addrPolicy := &policy.Addressing{Config: conf}
msgHub := msghub.New(conf.Web.MonitorHistory)
mmanager := &message.StoreManager{AddrPolicy: addrPolicy, Store: store, Hub: msgHub}
// Start Retention scanner.
retentionScanner := storage.NewRetentionScanner(conf.Storage, store)
// Configure routes and build HTTP server.
prefix := stringutil.MakePathPrefixer(conf.Web.BasePath)
webui.SetupRoutes(web.Router.PathPrefix(prefix("/serve/")).Subrouter())
rest.SetupRoutes(web.Router.PathPrefix(prefix("/api/")).Subrouter())
webServer := web.NewServer(conf, mmanager, msgHub)
pop3Server := pop3.NewServer(conf.POP3, store)
smtpServer := smtp.NewServer(conf.SMTP, mmanager, addrPolicy)
return &Services{
MsgHub: msgHub,
RetentionScanner: retentionScanner,
POP3Server: pop3Server,
SMTPServer: smtpServer,
WebServer: webServer,
ready: &sync.WaitGroup{},
}, nil
}
// Start all services, returns immediately. Callers may use Notify to detect failed services.
func (s *Services) Start(ctx context.Context, readyFunc func()) {
go s.MsgHub.Start(ctx)
go s.WebServer.Start(ctx, s.makeReadyFunc())
go s.SMTPServer.Start(ctx, s.makeReadyFunc())
go s.POP3Server.Start(ctx, s.makeReadyFunc())
go s.RetentionScanner.Start(ctx)
// Notify when all services report ready.
go func() {
s.ready.Wait()
readyFunc()
}()
}
// Notify merges the error notification channels of all fallible services, allowing the process to
// be shutdown if needed.
func (s *Services) Notify() <-chan error {
c := make(chan error, 1)
go func() {
// TODO: What level to log failure.
select {
case err := <-s.POP3Server.Notify():
c <- err
case err := <-s.SMTPServer.Notify():
c <- err
case err := <-s.WebServer.Notify():
c <- err
}
}()
return c
}
func (s *Services) makeReadyFunc() func() {
s.ready.Add(1)
var once sync.Once
return func() {
once.Do(s.ready.Done)
}
}

View File

@@ -13,47 +13,53 @@ import (
// Server defines an instance of the POP3 server. // Server defines an instance of the POP3 server.
type Server struct { type Server struct {
config config.POP3 // POP3 configuration. config config.POP3 // POP3 configuration.
store storage.Store // Mail store. store storage.Store // Mail store.
listener net.Listener // TCP listener. listener net.Listener // TCP listener.
globalShutdown chan bool // Inbucket shutdown signal. wg *sync.WaitGroup // Waitgroup tracking sessions.
wg *sync.WaitGroup // Waitgroup tracking sessions. notify chan error // Notify on fatal error.
} }
// New creates a new Server struct. // NewServer creates a new, unstarted, POP3 server.
func New(pop3Config config.POP3, shutdownChan chan bool, store storage.Store) *Server { func NewServer(pop3Config config.POP3, store storage.Store) *Server {
return &Server{ return &Server{
config: pop3Config, config: pop3Config,
store: store, store: store,
globalShutdown: shutdownChan, wg: new(sync.WaitGroup),
wg: new(sync.WaitGroup), notify: make(chan error, 1),
} }
} }
// Start the server and listen for connections // Start the server and listen for connections
func (s *Server) Start(ctx context.Context) { func (s *Server) Start(ctx context.Context, readyFunc func()) {
slog := log.With().Str("module", "pop3").Str("phase", "startup").Logger() slog := log.With().Str("module", "pop3").Str("phase", "startup").Logger()
addr, err := net.ResolveTCPAddr("tcp4", s.config.Addr) addr, err := net.ResolveTCPAddr("tcp4", s.config.Addr)
if err != nil { if err != nil {
slog.Error().Err(err).Msg("Failed to build tcp4 address") slog.Error().Err(err).Msg("Failed to build tcp4 address")
s.emergencyShutdown() s.notify <- err
close(s.notify)
return return
} }
slog.Info().Str("addr", addr.String()).Msg("POP3 listening on tcp4") slog.Info().Str("addr", addr.String()).Msg("POP3 listening on tcp4")
s.listener, err = net.ListenTCP("tcp4", addr) s.listener, err = net.ListenTCP("tcp4", addr)
if err != nil { if err != nil {
slog.Error().Err(err).Msg("Failed to start tcp4 listener") slog.Error().Err(err).Msg("Failed to start tcp4 listener")
s.emergencyShutdown() s.notify <- err
close(s.notify)
return return
} }
// Listener go routine.
// Start listener go routine.
go s.serve(ctx) go s.serve(ctx)
readyFunc()
// Wait for shutdown. // Wait for shutdown.
select { select {
case _ = <-ctx.Done(): case _ = <-ctx.Done():
} }
slog = log.With().Str("module", "pop3").Str("phase", "shutdown").Logger() slog = log.With().Str("module", "pop3").Str("phase", "shutdown").Logger()
slog.Debug().Msg("POP3 shutdown requested, connections will be drained") slog.Debug().Msg("POP3 shutdown requested, connections will be drained")
// Closing the listener will cause the serve() go routine to exit. // Closing the listener will cause the serve() go routine to exit.
if err := s.listener.Close(); err != nil { if err := s.listener.Close(); err != nil {
slog.Error().Err(err).Msg("Failed to close POP3 listener") slog.Error().Err(err).Msg("Failed to close POP3 listener")
@@ -88,7 +94,8 @@ func (s *Server) serve(ctx context.Context) {
return return
default: default:
// Something went wrong. // Something went wrong.
s.emergencyShutdown() s.notify <- err
close(s.notify)
return return
} }
} }
@@ -100,18 +107,14 @@ func (s *Server) serve(ctx context.Context) {
} }
} }
func (s *Server) emergencyShutdown() {
// Shutdown Inbucket
select {
case _ = <-s.globalShutdown:
default:
close(s.globalShutdown)
}
}
// Drain causes the caller to block until all active POP3 sessions have finished // Drain causes the caller to block until all active POP3 sessions have finished
func (s *Server) Drain() { func (s *Server) Drain() {
// Wait for sessions to close // Wait for sessions to close
s.wg.Wait() s.wg.Wait()
log.Debug().Str("module", "pop3").Str("phase", "shutdown").Msg("POP3 connections have drained") log.Debug().Str("module", "pop3").Str("phase", "shutdown").Msg("POP3 connections have drained")
} }
// Notify allows the running POP3 server to be monitored for a fatal error.
func (s *Server) Notify() <-chan error {
return s.notify
}

View File

@@ -484,13 +484,11 @@ func setupSMTPServer(ds storage.Store) (s *Server, buf *bytes.Buffer, teardown f
buf = new(bytes.Buffer) buf = new(bytes.Buffer)
log.SetOutput(buf) log.SetOutput(buf)
// Create a server, don't start it. // Create a server, don't start it.
shutdownChan := make(chan bool) // TODO Remove teardown.
teardown = func() { teardown = func() {}
close(shutdownChan)
}
addrPolicy := &policy.Addressing{Config: cfg} addrPolicy := &policy.Addressing{Config: cfg}
manager := &message.StoreManager{Store: ds} manager := &message.StoreManager{Store: ds}
s = NewServer(cfg.SMTP, shutdownChan, manager, addrPolicy) s = NewServer(cfg.SMTP, manager, addrPolicy)
return s, buf, teardown return s, buf, teardown
} }

View File

@@ -58,19 +58,18 @@ func init() {
// Server holds the configuration and state of our SMTP server. // Server holds the configuration and state of our SMTP server.
type Server struct { type Server struct {
config config.SMTP // SMTP configuration. config config.SMTP // SMTP configuration.
addrPolicy *policy.Addressing // Address policy. addrPolicy *policy.Addressing // Address policy.
globalShutdown chan bool // Shuts down Inbucket. manager message.Manager // Used to deliver messages.
manager message.Manager // Used to deliver messages. listener net.Listener // Incoming network connections.
listener net.Listener // Incoming network connections. wg *sync.WaitGroup // Waitgroup tracks individual sessions.
wg *sync.WaitGroup // Waitgroup tracks individual sessions. tlsConfig *tls.Config // TLS encryption configuration.
tlsConfig *tls.Config notify chan error // Notify on fatal error.
} }
// NewServer creates a new Server instance with the specificed config. // NewServer creates a new, unstarted, SMTP server instance with the specificed config.
func NewServer( func NewServer(
smtpConfig config.SMTP, smtpConfig config.SMTP,
globalShutdown chan bool,
manager message.Manager, manager message.Manager,
apolicy *policy.Addressing, apolicy *policy.Addressing,
) *Server { ) *Server {
@@ -90,37 +89,43 @@ func NewServer(
} }
return &Server{ return &Server{
config: smtpConfig, config: smtpConfig,
globalShutdown: globalShutdown, manager: manager,
manager: manager, addrPolicy: apolicy,
addrPolicy: apolicy, wg: new(sync.WaitGroup),
wg: new(sync.WaitGroup), tlsConfig: tlsConfig,
tlsConfig: tlsConfig, notify: make(chan error, 1),
} }
} }
// Start the listener and handle incoming connections. // Start the listener and handle incoming connections.
func (s *Server) Start(ctx context.Context) { func (s *Server) Start(ctx context.Context, readyFunc func()) {
slog := log.With().Str("module", "smtp").Str("phase", "startup").Logger() slog := log.With().Str("module", "smtp").Str("phase", "startup").Logger()
addr, err := net.ResolveTCPAddr("tcp4", s.config.Addr) addr, err := net.ResolveTCPAddr("tcp4", s.config.Addr)
if err != nil { if err != nil {
slog.Error().Err(err).Msg("Failed to build tcp4 address") slog.Error().Err(err).Msg("Failed to build tcp4 address")
s.emergencyShutdown() s.notify <- err
close(s.notify)
return return
} }
slog.Info().Str("addr", addr.String()).Msg("SMTP listening on tcp4") slog.Info().Str("addr", addr.String()).Msg("SMTP listening on tcp4")
s.listener, err = net.ListenTCP("tcp4", addr) s.listener, err = net.ListenTCP("tcp4", addr)
if err != nil { if err != nil {
slog.Error().Err(err).Msg("Failed to start tcp4 listener") slog.Error().Err(err).Msg("Failed to start tcp4 listener")
s.emergencyShutdown() s.notify <- err
close(s.notify)
return return
} }
// Listener go routine.
// Start listener go routine.
go s.serve(ctx) go s.serve(ctx)
readyFunc()
// Wait for shutdown. // Wait for shutdown.
<-ctx.Done() <-ctx.Done()
slog = log.With().Str("module", "smtp").Str("phase", "shutdown").Logger() slog = log.With().Str("module", "smtp").Str("phase", "shutdown").Logger()
slog.Debug().Msg("SMTP shutdown requested, connections will be drained") slog.Debug().Msg("SMTP shutdown requested, connections will be drained")
// Closing the listener will cause the serve() go routine to exit. // Closing the listener will cause the serve() go routine to exit.
if err := s.listener.Close(); err != nil { if err := s.listener.Close(); err != nil {
slog.Error().Err(err).Msg("Failed to close SMTP listener") slog.Error().Err(err).Msg("Failed to close SMTP listener")
@@ -156,7 +161,8 @@ func (s *Server) serve(ctx context.Context) {
return return
default: default:
// Something went wrong. // Something went wrong.
s.emergencyShutdown() s.notify <- err
close(s.notify)
return return
} }
} }
@@ -169,18 +175,14 @@ func (s *Server) serve(ctx context.Context) {
} }
} }
func (s *Server) emergencyShutdown() {
// Shutdown Inbucket.
select {
case <-s.globalShutdown:
default:
close(s.globalShutdown)
}
}
// Drain causes the caller to block until all active SMTP sessions have finished // Drain causes the caller to block until all active SMTP sessions have finished
func (s *Server) Drain() { func (s *Server) Drain() {
// Wait for sessions to close. // Wait for sessions to close.
s.wg.Wait() s.wg.Wait()
log.Debug().Str("module", "smtp").Str("phase", "shutdown").Msg("SMTP connections have drained") log.Debug().Str("module", "smtp").Str("phase", "shutdown").Msg("SMTP connections have drained")
} }
// Notify allows the running SMTP server to be monitored for a fatal error.
func (s *Server) Notify() <-chan error {
return s.notify
}

View File

@@ -31,10 +31,9 @@ var (
// incoming requests to the correct handler function // incoming requests to the correct handler function
Router = mux.NewRouter() Router = mux.NewRouter()
rootConfig *config.Root rootConfig *config.Root
server *http.Server server *http.Server
listener net.Listener listener net.Listener
globalShutdown chan bool
// ExpWebSocketConnectsCurrent tracks the number of open WebSockets // ExpWebSocketConnectsCurrent tracks the number of open WebSockets
ExpWebSocketConnectsCurrent = new(expvar.Int) ExpWebSocketConnectsCurrent = new(expvar.Int)
@@ -45,15 +44,19 @@ func init() {
m.Set("WebSocketConnectsCurrent", ExpWebSocketConnectsCurrent) m.Set("WebSocketConnectsCurrent", ExpWebSocketConnectsCurrent)
} }
// Initialize sets up things for unit tests or the Start() method. // Server defines an instance of the Web server.
func Initialize( type Server struct {
// TODO Migrate global vars here.
notify chan error // Notify on fatal error.
}
// NewServer sets up things for unit tests or the Start() method.
func NewServer(
conf *config.Root, conf *config.Root,
shutdownChan chan bool,
mm message.Manager, mm message.Manager,
mh *msghub.Hub) { mh *msghub.Hub) *Server {
rootConfig = conf rootConfig = conf
globalShutdown = shutdownChan
// NewContext() will use this DataStore for the web handlers. // NewContext() will use this DataStore for the web handlers.
msgHub = mh msgHub = mh
@@ -118,10 +121,16 @@ func Initialize(
http.StatusNotFound, "No route matches URI path") http.StatusNotFound, "No route matches URI path")
Router.MethodNotAllowedHandler = noMatchHandler( Router.MethodNotAllowedHandler = noMatchHandler(
http.StatusMethodNotAllowed, "Method not allowed for URI path") http.StatusMethodNotAllowed, "Method not allowed for URI path")
s := &Server{
notify: make(chan error, 1),
}
return s
} }
// Start begins listening for HTTP requests // Start begins listening for HTTP requests
func Start(ctx context.Context) { func (s *Server) Start(ctx context.Context, readyFunc func()) {
server = &http.Server{ server = &http.Server{
Addr: rootConfig.Web.Addr, Addr: rootConfig.Web.Addr,
Handler: requestLoggingWrapper(Router), Handler: requestLoggingWrapper(Router),
@@ -137,12 +146,14 @@ func Start(ctx context.Context) {
if err != nil { if err != nil {
log.Error().Str("module", "web").Str("phase", "startup").Err(err). log.Error().Str("module", "web").Str("phase", "startup").Err(err).
Msg("HTTP failed to start TCP4 listener") Msg("HTTP failed to start TCP4 listener")
emergencyShutdown() s.notify <- err
close(s.notify)
return return
} }
// Listener go routine // Start listener go routine
go serve(ctx) go s.serve(ctx)
readyFunc()
// Wait for shutdown // Wait for shutdown
select { select {
@@ -176,7 +187,7 @@ func appConfigCookie(webConfig config.Web) *http.Cookie {
} }
// serve begins serving HTTP requests // serve begins serving HTTP requests
func serve(ctx context.Context) { func (s *Server) serve(ctx context.Context) {
// server.Serve blocks until we close the listener // server.Serve blocks until we close the listener
err := server.Serve(listener) err := server.Serve(listener)
@@ -186,16 +197,13 @@ func serve(ctx context.Context) {
default: default:
log.Error().Str("module", "web").Str("phase", "startup").Err(err). log.Error().Str("module", "web").Str("phase", "startup").Err(err).
Msg("HTTP server failed") Msg("HTTP server failed")
emergencyShutdown() s.notify <- err
close(s.notify)
return return
} }
} }
func emergencyShutdown() { // Notify allows the running Web server to be monitored for a fatal error.
// Shutdown Inbucket func (s *Server) Notify() <-chan error {
select { return s.notify
case _ = <-globalShutdown:
default:
close(globalShutdown)
}
} }

View File

@@ -2,6 +2,7 @@ package storage
import ( import (
"container/list" "container/list"
"context"
"expvar" "expvar"
"time" "time"
@@ -50,7 +51,6 @@ func init() {
// RetentionScanner looks for messages older than the configured retention period and deletes them. // RetentionScanner looks for messages older than the configured retention period and deletes them.
type RetentionScanner struct { type RetentionScanner struct {
globalShutdown chan bool // Closes when Inbucket needs to shut down
retentionShutdown chan bool // Closed after the scanner has shut down retentionShutdown chan bool // Closed after the scanner has shut down
ds Store ds Store
retentionPeriod time.Duration retentionPeriod time.Duration
@@ -61,10 +61,8 @@ type RetentionScanner struct {
func NewRetentionScanner( func NewRetentionScanner(
cfg config.Storage, cfg config.Storage,
ds Store, ds Store,
shutdownChannel chan bool,
) *RetentionScanner { ) *RetentionScanner {
rs := &RetentionScanner{ rs := &RetentionScanner{
globalShutdown: shutdownChannel,
retentionShutdown: make(chan bool), retentionShutdown: make(chan bool),
ds: ds, ds: ds,
retentionPeriod: cfg.RetentionPeriod, retentionPeriod: cfg.RetentionPeriod,
@@ -76,20 +74,16 @@ func NewRetentionScanner(
} }
// Start up the retention scanner if retention period > 0 // Start up the retention scanner if retention period > 0
func (rs *RetentionScanner) Start() { func (rs *RetentionScanner) Start(ctx context.Context) {
slog := log.With().Str("module", "storage").Logger()
if rs.retentionPeriod <= 0 { if rs.retentionPeriod <= 0 {
log.Info().Str("phase", "startup").Str("module", "storage").Msg("Retention scanner disabled") slog.Info().Str("phase", "startup").Msg("Retention scanner disabled")
close(rs.retentionShutdown) close(rs.retentionShutdown)
return return
} }
log.Info().Str("phase", "startup").Str("module", "storage"). slog.Info().Str("phase", "startup").Msgf("Retention configured for %v", rs.retentionPeriod)
Msgf("Retention configured for %v", rs.retentionPeriod)
go rs.run()
}
// run loops to kick off the scanner on the correct schedule
func (rs *RetentionScanner) run() {
slog := log.With().Str("module", "storage").Logger()
start := time.Now() start := time.Now()
retentionLoop: retentionLoop:
for { for {
@@ -99,19 +93,19 @@ retentionLoop:
dur := time.Minute - since dur := time.Minute - since
slog.Debug().Msgf("Retention scanner sleeping for %v", dur) slog.Debug().Msgf("Retention scanner sleeping for %v", dur)
select { select {
case <-rs.globalShutdown: case <-ctx.Done():
break retentionLoop break retentionLoop
case <-time.After(dur): case <-time.After(dur):
} }
} }
// Kickoff scan // Kickoff scan
start = time.Now() start = time.Now()
if err := rs.DoScan(); err != nil { if err := rs.DoScan(ctx); err != nil {
slog.Error().Err(err).Msg("Error during retention scan") slog.Error().Err(err).Msg("Error during retention scan")
} }
// Check for global shutdown // Check for global shutdown
select { select {
case <-rs.globalShutdown: case <-ctx.Done():
break retentionLoop break retentionLoop
default: default:
} }
@@ -121,13 +115,14 @@ retentionLoop:
} }
// DoScan does a single pass of all mailboxes looking for messages that can be purged. // DoScan does a single pass of all mailboxes looking for messages that can be purged.
func (rs *RetentionScanner) DoScan() error { func (rs *RetentionScanner) DoScan(ctx context.Context) error {
slog := log.With().Str("module", "storage").Logger() slog := log.With().Str("module", "storage").Logger()
slog.Debug().Msg("Starting retention scan") slog.Debug().Msg("Starting retention scan")
cutoff := time.Now().Add(-1 * rs.retentionPeriod) cutoff := time.Now().Add(-1 * rs.retentionPeriod)
// Loop over all mailboxes.
retained := 0 retained := 0
storeSize := int64(0) storeSize := int64(0)
// Loop over all mailboxes.
err := rs.ds.VisitMailboxes(func(messages []Message) bool { err := rs.ds.VisitMailboxes(func(messages []Message) bool {
for _, msg := range messages { for _, msg := range messages {
if msg.Date().Before(cutoff) { if msg.Date().Before(cutoff) {
@@ -145,7 +140,7 @@ func (rs *RetentionScanner) DoScan() error {
} }
} }
select { select {
case <-rs.globalShutdown: case <-ctx.Done():
slog.Debug().Str("phase", "shutdown").Msg("Retention scan aborted due to shutdown") slog.Debug().Str("phase", "shutdown").Msg("Retention scan aborted due to shutdown")
return false return false
case <-time.After(rs.retentionSleep): case <-time.After(rs.retentionSleep):
@@ -156,10 +151,12 @@ func (rs *RetentionScanner) DoScan() error {
if err != nil { if err != nil {
return err return err
} }
// Update metrics // Update metrics
scanCompletedMillis.Set(time.Now().UnixNano() / 1000000) scanCompletedMillis.Set(time.Now().UnixNano() / 1000000)
expRetainedCurrent.Set(int64(retained)) expRetainedCurrent.Set(int64(retained))
expRetainedSize.Set(storeSize) expRetainedSize.Set(storeSize)
return nil return nil
} }

View File

@@ -1,6 +1,7 @@
package storage_test package storage_test
import ( import (
"context"
"fmt" "fmt"
"testing" "testing"
"time" "time"
@@ -13,6 +14,7 @@ import (
func TestDoRetentionScan(t *testing.T) { func TestDoRetentionScan(t *testing.T) {
ds := test.NewStore() ds := test.NewStore()
// Mockup some different aged messages (num is in hours) // Mockup some different aged messages (num is in hours)
new1 := stubMessage("mb1", 0) new1 := stubMessage("mb1", 0)
new2 := stubMessage("mb2", 1) new2 := stubMessage("mb2", 1)
@@ -26,22 +28,26 @@ func TestDoRetentionScan(t *testing.T) {
ds.AddMessage(old3) ds.AddMessage(old3)
ds.AddMessage(new2) ds.AddMessage(new2)
ds.AddMessage(new3) ds.AddMessage(new3)
// Test 4 hour retention // Test 4 hour retention
cfg := config.Storage{ cfg := config.Storage{
RetentionPeriod: 239 * time.Minute, RetentionPeriod: 239 * time.Minute,
RetentionSleep: 0, RetentionSleep: 0,
} }
shutdownChan := make(chan bool) ctx, cancel := context.WithCancel(context.Background())
rs := storage.NewRetentionScanner(cfg, ds, shutdownChan) defer cancel()
if err := rs.DoScan(); err != nil { rs := storage.NewRetentionScanner(cfg, ds)
if err := rs.DoScan(ctx); err != nil {
t.Error(err) t.Error(err)
} }
// Delete should not have been called on new messages // Delete should not have been called on new messages
for _, m := range []storage.Message{new1, new2, new3} { for _, m := range []storage.Message{new1, new2, new3} {
if ds.MessageDeleted(m) { if ds.MessageDeleted(m) {
t.Errorf("Expected %v to be present, was deleted", m.ID()) t.Errorf("Expected %v to be present, was deleted", m.ID())
} }
} }
// Delete should have been called once on old messages // Delete should have been called once on old messages
for _, m := range []storage.Message{old1, old2, old3} { for _, m := range []storage.Message{old1, old2, old3} {
if !ds.MessageDeleted(m) { if !ds.MessageDeleted(m) {

View File

@@ -214,7 +214,7 @@ func formatMessage(m *client.Message) []byte {
} }
func startServer() (func(), error) { func startServer() (func(), error) {
// TODO Refactor inbucket/main.go so we don't need to repeat all this here. // TODO Move integration setup into lifecycle.
log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr, NoColor: true}) log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr, NoColor: true})
storage.Constructors["memory"] = mem.New storage.Constructors["memory"] = mem.New
os.Clearenv() os.Clearenv()
@@ -222,32 +222,34 @@ func startServer() (func(), error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
rootCtx, rootCancel := context.WithCancel(context.Background()) svcCtx, svcCancel := context.WithCancel(context.Background())
shutdownChan := make(chan bool)
store, err := storage.FromConfig(conf.Storage) store, err := storage.FromConfig(conf.Storage)
if err != nil { if err != nil {
rootCancel() svcCancel()
return nil, err return nil, err
} }
msgHub := msghub.New(rootCtx, conf.Web.MonitorHistory)
// TODO Test should not pass with unstarted msghub.
msgHub := msghub.New(conf.Web.MonitorHistory)
addrPolicy := &policy.Addressing{Config: conf} addrPolicy := &policy.Addressing{Config: conf}
mmanager := &message.StoreManager{AddrPolicy: addrPolicy, Store: store, Hub: msgHub} mmanager := &message.StoreManager{AddrPolicy: addrPolicy, Store: store, Hub: msgHub}
// Start HTTP server. // Start HTTP server.
webui.SetupRoutes(web.Router.PathPrefix("/serve/").Subrouter()) webui.SetupRoutes(web.Router.PathPrefix("/serve/").Subrouter())
rest.SetupRoutes(web.Router.PathPrefix("/api/").Subrouter()) rest.SetupRoutes(web.Router.PathPrefix("/api/").Subrouter())
web.Initialize(conf, shutdownChan, mmanager, msgHub) webServer := web.NewServer(conf, mmanager, msgHub)
go web.Start(rootCtx) go webServer.Start(svcCtx, func() {})
// Start SMTP server.
smtpServer := smtp.NewServer(conf.SMTP, shutdownChan, mmanager, addrPolicy)
go smtpServer.Start(rootCtx)
// TODO Implmement an elegant way to determine server readiness. // Start SMTP server.
smtpServer := smtp.NewServer(conf.SMTP, mmanager, addrPolicy)
go smtpServer.Start(svcCtx, func() {})
// TODO Use a readyFunc to determine server readiness.
time.Sleep(500 * time.Millisecond) time.Sleep(500 * time.Millisecond)
return func() { return func() {
// Shut everything down. // Shut everything down.
close(shutdownChan) svcCancel()
rootCancel()
smtpServer.Drain() smtpServer.Drain()
}, nil }, nil
} }