From b9003a9328ac7a939b2e044c55b9101d550f0dfb Mon Sep 17 00:00:00 2001 From: James Hillyerd Date: Sat, 17 Mar 2018 12:39:09 -0700 Subject: [PATCH] smtp: Wire in policy.Recipient for #84 --- cmd/inbucket/main.go | 4 +- pkg/server/smtp/handler.go | 90 ++++++++++++--------------------- pkg/server/smtp/handler_test.go | 9 ++-- pkg/server/smtp/listener.go | 10 ++-- 4 files changed, 47 insertions(+), 66 deletions(-) diff --git a/cmd/inbucket/main.go b/cmd/inbucket/main.go index 7020676..ff5a313 100644 --- a/cmd/inbucket/main.go +++ b/cmd/inbucket/main.go @@ -16,6 +16,7 @@ import ( "github.com/jhillyerd/inbucket/pkg/log" "github.com/jhillyerd/inbucket/pkg/message" "github.com/jhillyerd/inbucket/pkg/msghub" + "github.com/jhillyerd/inbucket/pkg/policy" "github.com/jhillyerd/inbucket/pkg/rest" "github.com/jhillyerd/inbucket/pkg/server/pop3" "github.com/jhillyerd/inbucket/pkg/server/smtp" @@ -135,7 +136,8 @@ func main() { go pop3Server.Start(rootCtx) // Startup SMTP server - smtpServer = smtp.NewServer(config.GetSMTPConfig(), shutdownChan, ds, msgHub) + apolicy := &policy.Addressing{Config: config.GetSMTPConfig()} + smtpServer = smtp.NewServer(config.GetSMTPConfig(), shutdownChan, ds, apolicy, msgHub) go smtpServer.Start(rootCtx) // Loop forever waiting for signals or shutdown channel diff --git a/pkg/server/smtp/handler.go b/pkg/server/smtp/handler.go index e123e9f..f15656b 100644 --- a/pkg/server/smtp/handler.go +++ b/pkg/server/smtp/handler.go @@ -3,7 +3,6 @@ package smtp import ( "bufio" "bytes" - "container/list" "fmt" "io" "net" @@ -72,11 +71,6 @@ var commands = map[string]bool{ "TURN": true, } -// recipientDetails for message delivery -type recipientDetails struct { - address, localPart, domainPart string -} - // Session holds the state of an SMTP session type Session struct { server *Server @@ -88,14 +82,22 @@ type Session struct { state State reader *bufio.Reader from string - recipients *list.List + recipients []*policy.Recipient } // NewSession creates a new Session for the given connection func NewSession(server *Server, id int, conn net.Conn) *Session { reader := bufio.NewReader(conn) host, _, _ := net.SplitHostPort(conn.RemoteAddr().String()) - return &Session{server: server, id: id, conn: conn, state: GREET, reader: reader, remoteHost: host} + return &Session{ + server: server, + id: id, + conn: conn, + state: GREET, + reader: reader, + remoteHost: host, + recipients: make([]*policy.Recipient, 0), + } } func (ss *Session) String() string { @@ -297,7 +299,6 @@ func (ss *Session) readyHandler(cmd string, arg string) { } } ss.from = from - ss.recipients = list.New() ss.logInfo("Mail from: %v", from) ss.send(fmt.Sprintf("250 Roger, accepting mail from <%v>", from)) ss.enterState(MAIL) @@ -316,20 +317,21 @@ func (ss *Session) mailHandler(cmd string, arg string) { return } // This trim is probably too forgiving - recip := strings.Trim(arg[3:], "<> ") - if _, _, err := policy.ParseEmailAddress(recip); err != nil { + addr := strings.Trim(arg[3:], "<> ") + recip, err := ss.server.apolicy.NewRecipient(addr) + if err != nil { ss.send("501 Bad recipient address syntax") - ss.logWarn("Bad address as RCPT arg: %q, %s", recip, err) + ss.logWarn("Bad address as RCPT arg: %q, %s", addr, err) return } - if ss.recipients.Len() >= ss.server.maxRecips { + if len(ss.recipients) >= ss.server.maxRecips { ss.logWarn("Maximum limit of %v recipients reached", ss.server.maxRecips) ss.send(fmt.Sprintf("552 Maximum limit of %v recipients reached", ss.server.maxRecips)) return } - ss.recipients.PushBack(recip) - ss.logInfo("Recipient: %v", recip) - ss.send(fmt.Sprintf("250 I'll make sure <%v> gets this", recip)) + ss.recipients = append(ss.recipients, recip) + ss.logInfo("Recipient: %v", addr) + ss.send(fmt.Sprintf("250 I'll make sure <%v> gets this", addr)) return case "DATA": if arg != "" { @@ -337,7 +339,7 @@ func (ss *Session) mailHandler(cmd string, arg string) { ss.logWarn("Got unexpected args on DATA: %q", arg) return } - if ss.recipients.Len() > 0 { + if len(ss.recipients) > 0 { // We have recipients, go to accept data ss.enterState(DATA) return @@ -351,27 +353,6 @@ func (ss *Session) mailHandler(cmd string, arg string) { // DATA func (ss *Session) dataHandler() { - recipients := make([]recipientDetails, 0, ss.recipients.Len()) - // Get a Mailbox and a new Message for each recipient - if ss.server.storeMessages { - for e := ss.recipients.Front(); e != nil; e = e.Next() { - recip := e.Value.(string) - local, domain, err := policy.ParseEmailAddress(recip) - if err != nil { - ss.logError("Failed to parse address for %q", recip) - ss.send(fmt.Sprintf("451 Failed to open mailbox for %v", recip)) - ss.reset() - return - } - if strings.ToLower(domain) != ss.server.domainNoStore { - // Not our "no store" domain, so store the message - recipients = append(recipients, recipientDetails{recip, local, domain}) - } else { - log.Tracef("Not storing message for %q", recip) - } - } - } - ss.send("354 Start mail input; end with .") msgBuf := &bytes.Buffer{} for { @@ -388,31 +369,27 @@ func (ss *Session) dataHandler() { } if bytes.Equal(lineBuf, []byte(".\r\n")) || bytes.Equal(lineBuf, []byte(".\n")) { // Mail data complete - if ss.server.storeMessages { - // Create a message for each valid recipient - for _, r := range recipients { + for _, recip := range ss.recipients { + if recip.ShouldStore() { // TODO temporary hack to fix #77 until datastore revamp - mu, err := ss.server.dataStore.LockFor(r.localPart) + mu, err := ss.server.dataStore.LockFor(recip.LocalPart) if err != nil { - ss.logError("Failed to get lock for %q: %s", r.localPart, err) + ss.logError("Failed to get lock for %q: %s", recip.LocalPart, err) // Delivery failure - ss.send(fmt.Sprintf("451 Failed to store message for %v", r.localPart)) + ss.send(fmt.Sprintf("451 Failed to store message for %v", recip.LocalPart)) ss.reset() return } mu.Lock() - ok := ss.deliverMessage(r, msgBuf.Bytes()) + ok := ss.deliverMessage(recip, msgBuf.Bytes()) mu.Unlock() - if ok { - expReceivedTotal.Add(1) - } else { + if !ok { // Delivery failure - ss.send(fmt.Sprintf("451 Failed to store message for %v", r.localPart)) + ss.send(fmt.Sprintf("451 Failed to store message for %v", recip.LocalPart)) ss.reset() return } } - } else { expReceivedTotal.Add(1) } ss.send("250 Mail accepted for delivery") @@ -436,8 +413,8 @@ func (ss *Session) dataHandler() { } // deliverMessage creates and populates a new Message for the specified recipient -func (ss *Session) deliverMessage(r recipientDetails, content []byte) (ok bool) { - name, err := policy.ParseMailboxName(r.localPart) +func (ss *Session) deliverMessage(recip *policy.Recipient, content []byte) (ok bool) { + name, err := policy.ParseMailboxName(recip.LocalPart) if err != nil { // This parse already succeeded when MailboxFor was called, shouldn't fail here. return false @@ -445,7 +422,7 @@ func (ss *Session) deliverMessage(r recipientDetails, content []byte) (ok bool) // TODO replace with something that only reads header? env, err := enmime.ReadEnvelope(bytes.NewReader(content)) if err != nil { - ss.logError("Failed to parse message for %q: %v", r.localPart, err) + ss.logError("Failed to parse message for %q: %v", recip.LocalPart, err) return false } from, err := env.AddressList("From") @@ -461,7 +438,7 @@ func (ss *Session) deliverMessage(r recipientDetails, content []byte) (ok bool) // Generate Received header. stamp := time.Now().Format(timeStampFormat) recd := strings.NewReader(fmt.Sprintf("Received: from %s ([%s]) by %s\r\n for <%s>; %s\r\n", - ss.remoteDomain, ss.remoteHost, ss.server.domain, r.address, stamp)) + ss.remoteDomain, ss.remoteHost, ss.server.domain, recip.Address, stamp)) delivery := &message.Delivery{ Meta: message.Metadata{ Mailbox: name, @@ -474,7 +451,7 @@ func (ss *Session) deliverMessage(r recipientDetails, content []byte) (ok bool) } id, err := ss.server.dataStore.AddMessage(delivery) if err != nil { - ss.logError("Failed to store message for %q: %s", r.localPart, err) + ss.logError("Failed to store message for %q: %s", recip.LocalPart, err) return false } // Broadcast message information. @@ -519,8 +496,7 @@ func (ss *Session) send(msg string) { ss.logTrace(">> %v >>", msg) } -// readByteLine reads a line of input into the provided buffer. Does -// not reset the Buffer - please do so prior to calling. +// readByteLine reads a line of input, returns byte slice. func (ss *Session) readByteLine() ([]byte, error) { if err := ss.conn.SetReadDeadline(ss.nextDeadline()); err != nil { return nil, err diff --git a/pkg/server/smtp/handler_test.go b/pkg/server/smtp/handler_test.go index 94b0d48..6242b20 100644 --- a/pkg/server/smtp/handler_test.go +++ b/pkg/server/smtp/handler_test.go @@ -15,6 +15,7 @@ import ( "github.com/jhillyerd/inbucket/pkg/config" "github.com/jhillyerd/inbucket/pkg/msghub" + "github.com/jhillyerd/inbucket/pkg/policy" "github.com/jhillyerd/inbucket/pkg/storage" "github.com/jhillyerd/inbucket/pkg/test" ) @@ -172,10 +173,7 @@ func TestMailState(t *testing.T) { {"RCPT TO: u4@gmail.com", 250}, {"RSET", 250}, {"MAIL FROM:", 250}, - {"RCPT TO:name@host.com>", 250}, - {"RCPT TO:<\"user>name\"@host.com>", 250}, + {`RCPT TO:<"first/last"@host.com`, 250}, } if err := playSession(t, server, script); err != nil { t.Error(err) @@ -360,7 +358,8 @@ func setupSMTPServer(ds storage.Store) (s *Server, buf *bytes.Buffer, teardown f close(shutdownChan) cancel() } - s = NewServer(cfg, shutdownChan, ds, msghub.New(ctx, 100)) + apolicy := &policy.Addressing{Config: cfg} + s = NewServer(cfg, shutdownChan, ds, apolicy, msghub.New(ctx, 100)) return s, buf, teardown } diff --git a/pkg/server/smtp/listener.go b/pkg/server/smtp/listener.go index 83d5697..0e795b7 100644 --- a/pkg/server/smtp/listener.go +++ b/pkg/server/smtp/listener.go @@ -13,6 +13,7 @@ import ( "github.com/jhillyerd/inbucket/pkg/config" "github.com/jhillyerd/inbucket/pkg/log" "github.com/jhillyerd/inbucket/pkg/msghub" + "github.com/jhillyerd/inbucket/pkg/policy" "github.com/jhillyerd/inbucket/pkg/storage" ) @@ -48,9 +49,10 @@ type Server struct { storeMessages bool // Dependencies - dataStore storage.Store // Mailbox/message store - globalShutdown chan bool // Shuts down Inbucket - msgHub *msghub.Hub // Pub/sub for message info + dataStore storage.Store // Mailbox/message store + apolicy *policy.Addressing // Address policy + globalShutdown chan bool // Shuts down Inbucket + msgHub *msghub.Hub // Pub/sub for message info // State listener net.Listener // Incoming network connections @@ -83,6 +85,7 @@ func NewServer( cfg config.SMTPConfig, globalShutdown chan bool, ds storage.Store, + apolicy *policy.Addressing, msgHub *msghub.Hub) *Server { return &Server{ host: fmt.Sprintf("%v:%v", cfg.IP4address, cfg.IP4port), @@ -94,6 +97,7 @@ func NewServer( storeMessages: cfg.StoreMessages, globalShutdown: globalShutdown, dataStore: ds, + apolicy: apolicy, msgHub: msgHub, waitgroup: new(sync.WaitGroup), }