mirror of
https://github.com/jhillyerd/inbucket.git
synced 2025-12-18 10:07:02 +00:00
More shutdown work, closes #11
- Drain SMTP connections - Force exit after 15 seconds of draining
This commit is contained in:
21
inbucket.go
21
inbucket.go
@@ -92,11 +92,14 @@ func main() {
|
|||||||
fmt.Fprintf(pidf, "%v\n", os.Getpid())
|
fmt.Fprintf(pidf, "%v\n", os.Getpid())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Startup SMTP server
|
// Start HTTP server
|
||||||
smtpServer = smtpd.New()
|
go web.Start()
|
||||||
go smtpServer.Start()
|
|
||||||
|
|
||||||
web.Start()
|
// Startup SMTP server, block until it exits
|
||||||
|
smtpServer = smtpd.New()
|
||||||
|
smtpServer.Start()
|
||||||
|
// Wait for active connections to finish
|
||||||
|
smtpServer.Drain()
|
||||||
}
|
}
|
||||||
|
|
||||||
// openLogFile creates or appends to the logfile passed on commandline
|
// openLogFile creates or appends to the logfile passed on commandline
|
||||||
@@ -135,16 +138,24 @@ func signalProcessor(c <-chan os.Signal) {
|
|||||||
case syscall.SIGTERM:
|
case syscall.SIGTERM:
|
||||||
// Initiate shutdown
|
// Initiate shutdown
|
||||||
log.Info("Received SIGTERM, shutting down")
|
log.Info("Received SIGTERM, shutting down")
|
||||||
|
go timedExit()
|
||||||
|
web.Stop()
|
||||||
if smtpServer != nil {
|
if smtpServer != nil {
|
||||||
smtpServer.Stop()
|
smtpServer.Stop()
|
||||||
} else {
|
} else {
|
||||||
log.Error("smtpServer was nil during shutdown")
|
log.Error("smtpServer was nil during shutdown")
|
||||||
}
|
}
|
||||||
web.Stop()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// timedExit is called as a goroutine during shutdown, it will force an exit after 15 seconds
|
||||||
|
func timedExit() {
|
||||||
|
time.Sleep(15 * time.Second)
|
||||||
|
log.Error("Inbucket clean shutdown timed out, forcing exit")
|
||||||
|
os.Exit(0)
|
||||||
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
flag.Usage = func() {
|
flag.Usage = func() {
|
||||||
fmt.Fprintln(os.Stderr, "Usage of inbucket [options] <conf file>:")
|
fmt.Fprintln(os.Stderr, "Usage of inbucket [options] <conf file>:")
|
||||||
|
|||||||
@@ -90,8 +90,11 @@ func (ss *Session) String() string {
|
|||||||
func (s *Server) startSession(id int, conn net.Conn) {
|
func (s *Server) startSession(id int, conn net.Conn) {
|
||||||
log.Info("Connection from %v, starting session <%v>", conn.RemoteAddr(), id)
|
log.Info("Connection from %v, starting session <%v>", conn.RemoteAddr(), id)
|
||||||
expConnectsCurrent.Add(1)
|
expConnectsCurrent.Add(1)
|
||||||
defer conn.Close()
|
defer func() {
|
||||||
defer expConnectsCurrent.Add(-1)
|
conn.Close()
|
||||||
|
s.waitgroup.Done()
|
||||||
|
expConnectsCurrent.Add(-1)
|
||||||
|
}()
|
||||||
|
|
||||||
ss := NewSession(s, id, conn)
|
ss := NewSession(s, id, conn)
|
||||||
ss.greet()
|
ss.greet()
|
||||||
@@ -300,7 +303,7 @@ func (ss *Session) dataHandler() {
|
|||||||
i := 0
|
i := 0
|
||||||
for e := ss.recipients.Front(); e != nil; e = e.Next() {
|
for e := ss.recipients.Front(); e != nil; e = e.Next() {
|
||||||
recip := e.Value.(string)
|
recip := e.Value.(string)
|
||||||
if !strings.HasSuffix(strings.ToLower(recip), "@" + ss.server.domainNoStore) {
|
if !strings.HasSuffix(strings.ToLower(recip), "@"+ss.server.domainNoStore) {
|
||||||
// Not our "no store" domain, so store the message
|
// Not our "no store" domain, so store the message
|
||||||
mb, err := ss.server.dataStore.MailboxFor(recip)
|
mb, err := ss.server.dataStore.MailboxFor(recip)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ import (
|
|||||||
"github.com/jhillyerd/inbucket/log"
|
"github.com/jhillyerd/inbucket/log"
|
||||||
"net"
|
"net"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -22,6 +23,7 @@ type Server struct {
|
|||||||
storeMessages bool
|
storeMessages bool
|
||||||
listener net.Listener
|
listener net.Listener
|
||||||
shutdown bool
|
shutdown bool
|
||||||
|
waitgroup *sync.WaitGroup
|
||||||
}
|
}
|
||||||
|
|
||||||
// Raw stat collectors
|
// Raw stat collectors
|
||||||
@@ -49,7 +51,8 @@ func New() *Server {
|
|||||||
cfg := config.GetSmtpConfig()
|
cfg := config.GetSmtpConfig()
|
||||||
return &Server{dataStore: ds, domain: cfg.Domain, maxRecips: cfg.MaxRecipients,
|
return &Server{dataStore: ds, domain: cfg.Domain, maxRecips: cfg.MaxRecipients,
|
||||||
maxIdleSeconds: cfg.MaxIdleSeconds, maxMessageBytes: cfg.MaxMessageBytes,
|
maxIdleSeconds: cfg.MaxIdleSeconds, maxMessageBytes: cfg.MaxMessageBytes,
|
||||||
storeMessages: cfg.StoreMessages, domainNoStore: strings.ToLower(cfg.DomainNoStore)}
|
storeMessages: cfg.StoreMessages, domainNoStore: strings.ToLower(cfg.DomainNoStore),
|
||||||
|
waitgroup: new(sync.WaitGroup)}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Main listener loop
|
// Main listener loop
|
||||||
@@ -109,17 +112,25 @@ func (s *Server) Start() {
|
|||||||
} else {
|
} else {
|
||||||
tempDelay = 0
|
tempDelay = 0
|
||||||
expConnectsTotal.Add(1)
|
expConnectsTotal.Add(1)
|
||||||
|
s.waitgroup.Add(1)
|
||||||
go s.startSession(sid, conn)
|
go s.startSession(sid, conn)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Stop requests the SMTP server closes it's listener
|
||||||
func (s *Server) Stop() {
|
func (s *Server) Stop() {
|
||||||
log.Trace("SMTP shutdown requested")
|
log.Trace("SMTP shutdown requested, connections will be drained")
|
||||||
s.shutdown = true
|
s.shutdown = true
|
||||||
s.listener.Close()
|
s.listener.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Drain causes the caller to block until all active SMTP sessions have finished
|
||||||
|
func (s *Server) Drain() {
|
||||||
|
s.waitgroup.Wait()
|
||||||
|
log.Trace("SMTP connections drained")
|
||||||
|
}
|
||||||
|
|
||||||
// When the provided Ticker ticks, we update our metrics history
|
// When the provided Ticker ticks, we update our metrics history
|
||||||
func metricsTicker(t *time.Ticker) {
|
func metricsTicker(t *time.Ticker) {
|
||||||
ok := true
|
ok := true
|
||||||
|
|||||||
Reference in New Issue
Block a user