From f953bcf4bbb7c9281f3b93f7323e74be8da4f500 Mon Sep 17 00:00:00 2001 From: James Hillyerd Date: Sat, 17 Mar 2018 16:54:29 -0700 Subject: [PATCH] smtp: Move delivery into message.Manager for #69 --- cmd/inbucket/main.go | 31 ++++++-------- pkg/message/manager.go | 68 +++++++++++++++++++++++++++++++ pkg/server/smtp/handler.go | 71 +++++---------------------------- pkg/server/smtp/handler_test.go | 8 ++-- pkg/server/smtp/listener.go | 17 ++++---- 5 files changed, 102 insertions(+), 93 deletions(-) diff --git a/cmd/inbucket/main.go b/cmd/inbucket/main.go index ff5a313..0de0b07 100644 --- a/cmd/inbucket/main.go +++ b/cmd/inbucket/main.go @@ -115,32 +115,27 @@ func main() { } } - // Create message hub + // Configure internal services. msgHub := msghub.New(rootCtx, config.GetWebConfig().MonitorHistory) - - // Setup our datastore dscfg := config.GetDataStoreConfig() - ds := file.New(dscfg) - retentionScanner := storage.NewRetentionScanner(dscfg, ds, shutdownChan) + store := file.New(dscfg) + apolicy := &policy.Addressing{Config: config.GetSMTPConfig()} + mmanager := &message.StoreManager{Store: store, Hub: msgHub} + // Start Retention scanner. + retentionScanner := storage.NewRetentionScanner(dscfg, store, shutdownChan) retentionScanner.Start() - - // Start HTTP server - mm := &message.StoreManager{Store: ds} - web.Initialize(config.GetWebConfig(), shutdownChan, mm, msgHub) + // Start HTTP server. + web.Initialize(config.GetWebConfig(), shutdownChan, mmanager, msgHub) webui.SetupRoutes(web.Router) rest.SetupRoutes(web.Router) go web.Start(rootCtx) - - // Start POP3 server - pop3Server = pop3.New(config.GetPOP3Config(), shutdownChan, ds) + // Start POP3 server. + pop3Server = pop3.New(config.GetPOP3Config(), shutdownChan, store) go pop3Server.Start(rootCtx) - - // Startup SMTP server - apolicy := &policy.Addressing{Config: config.GetSMTPConfig()} - smtpServer = smtp.NewServer(config.GetSMTPConfig(), shutdownChan, ds, apolicy, msgHub) + // Start SMTP server. + smtpServer = smtp.NewServer(config.GetSMTPConfig(), shutdownChan, mmanager, apolicy) go smtpServer.Start(rootCtx) - - // Loop forever waiting for signals or shutdown channel + // Loop forever waiting for signals or shutdown channel. signalLoop: for { select { diff --git a/pkg/message/manager.go b/pkg/message/manager.go index 1c4e04e..c9b74f1 100644 --- a/pkg/message/manager.go +++ b/pkg/message/manager.go @@ -1,15 +1,28 @@ package message import ( + "bytes" "io" + "net/mail" + "strings" + "time" "github.com/jhillyerd/enmime" + "github.com/jhillyerd/inbucket/pkg/msghub" "github.com/jhillyerd/inbucket/pkg/policy" "github.com/jhillyerd/inbucket/pkg/storage" + "github.com/jhillyerd/inbucket/pkg/stringutil" ) // Manager is the interface controllers use to interact with messages. type Manager interface { + Deliver( + to *policy.Recipient, + from string, + recipients []*policy.Recipient, + prefix string, + content []byte, + ) (id string, err error) GetMetadata(mailbox string) ([]*Metadata, error) GetMessage(mailbox, id string) (*Message, error) PurgeMessages(mailbox string) error @@ -21,6 +34,61 @@ type Manager interface { // StoreManager is a message Manager backed by the storage.Store. type StoreManager struct { Store storage.Store + Hub *msghub.Hub +} + +// Deliver submits a new message to the store. +func (s *StoreManager) Deliver( + to *policy.Recipient, + from string, + recipients []*policy.Recipient, + prefix string, + content []byte, +) (string, error) { + // TODO enmime is too heavy for this step, only need header + env, err := enmime.ReadEnvelope(bytes.NewReader(content)) + if err != nil { + return "", err + } + fromaddr, err := env.AddressList("From") + if err != nil || len(fromaddr) == 0 { + fromaddr = []*mail.Address{{Address: from}} + } + toaddr, err := env.AddressList("To") + if err != nil { + toaddr = make([]*mail.Address, len(recipients)) + for i, torecip := range recipients { + toaddr[i] = &torecip.Address + } + } + delivery := &Delivery{ + Meta: Metadata{ + Mailbox: to.Mailbox, + From: fromaddr[0], + To: toaddr, + Date: time.Now(), + Subject: env.GetHeader("Subject"), + }, + Reader: io.MultiReader(strings.NewReader(prefix), bytes.NewReader(content)), + } + id, err := s.Store.AddMessage(delivery) + if err != nil { + return "", err + } + if s.Hub != nil { + // Broadcast message information. + broadcast := msghub.Message{ + Mailbox: to.Mailbox, + ID: id, + From: delivery.From().String(), + To: stringutil.StringAddressList(delivery.To()), + Subject: delivery.Subject(), + Date: delivery.Date(), + Size: delivery.Size(), + } + s.Hub.Dispatch(broadcast) + } + return id, nil } // GetMetadata returns a slice of metadata for the specified mailbox. diff --git a/pkg/server/smtp/handler.go b/pkg/server/smtp/handler.go index 686f94a..4d6ee33 100644 --- a/pkg/server/smtp/handler.go +++ b/pkg/server/smtp/handler.go @@ -6,18 +6,13 @@ import ( "fmt" "io" "net" - "net/mail" "regexp" "strconv" "strings" "time" - "github.com/jhillyerd/enmime" "github.com/jhillyerd/inbucket/pkg/log" - "github.com/jhillyerd/inbucket/pkg/message" - "github.com/jhillyerd/inbucket/pkg/msghub" "github.com/jhillyerd/inbucket/pkg/policy" - "github.com/jhillyerd/inbucket/pkg/stringutil" ) // State tracks the current mode of our SMTP state machine @@ -370,9 +365,18 @@ func (ss *Session) dataHandler() { } if bytes.Equal(lineBuf, []byte(".\r\n")) || bytes.Equal(lineBuf, []byte(".\n")) { // Mail data complete. + tstamp := time.Now().Format(timeStampFormat) for _, recip := range ss.recipients { if recip.ShouldStore() { - if ok := ss.deliverMessage(recip, msgBuf.Bytes()); !ok { + // Generate Received header. + prefix := fmt.Sprintf("Received: from %s ([%s]) by %s\r\n for <%s>; %s\r\n", + ss.remoteDomain, ss.remoteHost, ss.server.domain, recip.Address.Address, + tstamp) + // Deliver message. + _, err := ss.server.manager.Deliver( + recip, ss.from, ss.recipients, prefix, msgBuf.Bytes()) + if err != nil { + ss.logError("delivery for %v: %v", recip.LocalPart, err) ss.send(fmt.Sprintf("451 Failed to store message for %v", recip.LocalPart)) ss.reset() return @@ -385,7 +389,7 @@ func (ss *Session) dataHandler() { ss.reset() return } - // RFC says remove leading periods from input. + // RFC: remove leading periods from DATA. if len(lineBuf) > 0 && lineBuf[0] == '.' { lineBuf = lineBuf[1:] } @@ -399,59 +403,6 @@ func (ss *Session) dataHandler() { } } -// deliverMessage creates and populates a new Message for the specified recipient -func (ss *Session) deliverMessage(recip *policy.Recipient, content []byte) (ok bool) { - // TODO replace with something that only reads header? - env, err := enmime.ReadEnvelope(bytes.NewReader(content)) - if err != nil { - ss.logError("Failed to parse message for %q: %v", recip.LocalPart, err) - return false - } - from, err := env.AddressList("From") - if err != nil { - from = []*mail.Address{{Address: ss.from}} - } - to, err := env.AddressList("To") - if err != nil { - to = make([]*mail.Address, len(ss.recipients)) - for i, torecip := range ss.recipients { - to[i] = &torecip.Address - } - } - // Generate Received header. - stamp := time.Now().Format(timeStampFormat) - recd := strings.NewReader(fmt.Sprintf("Received: from %s ([%s]) by %s\r\n for <%s>; %s\r\n", - ss.remoteDomain, ss.remoteHost, ss.server.domain, recip.Address, stamp)) - delivery := &message.Delivery{ - Meta: message.Metadata{ - Mailbox: recip.Mailbox, - From: from[0], - To: to, - Date: time.Now(), - Subject: env.GetHeader("Subject"), - }, - Reader: io.MultiReader(recd, bytes.NewReader(content)), - } - id, err := ss.server.dataStore.AddMessage(delivery) - if err != nil { - ss.logError("Failed to store message for %q: %s", recip.LocalPart, err) - return false - } - // Broadcast message information. - // TODO this belongs in message pkg. - broadcast := msghub.Message{ - Mailbox: recip.Mailbox, - ID: id, - From: delivery.From().String(), - To: stringutil.StringAddressList(delivery.To()), - Subject: delivery.Subject(), - Date: delivery.Date(), - Size: delivery.Size(), - } - ss.server.msgHub.Dispatch(broadcast) - return true -} - func (ss *Session) enterState(state State) { ss.state = state ss.logTrace("Entering state %v", state) diff --git a/pkg/server/smtp/handler_test.go b/pkg/server/smtp/handler_test.go index 49100d2..283c6d1 100644 --- a/pkg/server/smtp/handler_test.go +++ b/pkg/server/smtp/handler_test.go @@ -2,7 +2,6 @@ package smtp import ( "bytes" - "context" "fmt" "io" @@ -14,7 +13,7 @@ import ( "time" "github.com/jhillyerd/inbucket/pkg/config" - "github.com/jhillyerd/inbucket/pkg/msghub" + "github.com/jhillyerd/inbucket/pkg/message" "github.com/jhillyerd/inbucket/pkg/policy" "github.com/jhillyerd/inbucket/pkg/storage" "github.com/jhillyerd/inbucket/pkg/test" @@ -379,13 +378,12 @@ func setupSMTPServer(ds storage.Store) (s *Server, buf *bytes.Buffer, teardown f // Create a server, don't start it shutdownChan := make(chan bool) - ctx, cancel := context.WithCancel(context.Background()) teardown = func() { close(shutdownChan) - cancel() } apolicy := &policy.Addressing{Config: cfg} - s = NewServer(cfg, shutdownChan, ds, apolicy, msghub.New(ctx, 100)) + manager := &message.StoreManager{Store: ds} + s = NewServer(cfg, shutdownChan, manager, apolicy) return s, buf, teardown } diff --git a/pkg/server/smtp/listener.go b/pkg/server/smtp/listener.go index 0e795b7..959b6f5 100644 --- a/pkg/server/smtp/listener.go +++ b/pkg/server/smtp/listener.go @@ -12,9 +12,8 @@ import ( "github.com/jhillyerd/inbucket/pkg/config" "github.com/jhillyerd/inbucket/pkg/log" - "github.com/jhillyerd/inbucket/pkg/msghub" + "github.com/jhillyerd/inbucket/pkg/message" "github.com/jhillyerd/inbucket/pkg/policy" - "github.com/jhillyerd/inbucket/pkg/storage" ) func init() { @@ -49,10 +48,9 @@ type Server struct { storeMessages bool // Dependencies - dataStore storage.Store // Mailbox/message store - apolicy *policy.Addressing // Address policy - globalShutdown chan bool // Shuts down Inbucket - msgHub *msghub.Hub // Pub/sub for message info + apolicy *policy.Addressing // Address policy. + globalShutdown chan bool // Shuts down Inbucket. + manager message.Manager // Used to deliver messages. // State listener net.Listener // Incoming network connections @@ -84,9 +82,9 @@ var ( func NewServer( cfg config.SMTPConfig, globalShutdown chan bool, - ds storage.Store, + manager message.Manager, apolicy *policy.Addressing, - msgHub *msghub.Hub) *Server { +) *Server { return &Server{ host: fmt.Sprintf("%v:%v", cfg.IP4address, cfg.IP4port), domain: cfg.Domain, @@ -96,9 +94,8 @@ func NewServer( maxMessageBytes: cfg.MaxMessageBytes, storeMessages: cfg.StoreMessages, globalShutdown: globalShutdown, - dataStore: ds, + manager: manager, apolicy: apolicy, - msgHub: msgHub, waitgroup: new(sync.WaitGroup), } }