mirror of
https://github.com/jhillyerd/inbucket.git
synced 2025-12-17 09:37:02 +00:00
smtp: Wire in policy.Recipient for #84
This commit is contained in:
@@ -16,6 +16,7 @@ import (
|
||||
"github.com/jhillyerd/inbucket/pkg/log"
|
||||
"github.com/jhillyerd/inbucket/pkg/message"
|
||||
"github.com/jhillyerd/inbucket/pkg/msghub"
|
||||
"github.com/jhillyerd/inbucket/pkg/policy"
|
||||
"github.com/jhillyerd/inbucket/pkg/rest"
|
||||
"github.com/jhillyerd/inbucket/pkg/server/pop3"
|
||||
"github.com/jhillyerd/inbucket/pkg/server/smtp"
|
||||
@@ -135,7 +136,8 @@ func main() {
|
||||
go pop3Server.Start(rootCtx)
|
||||
|
||||
// Startup SMTP server
|
||||
smtpServer = smtp.NewServer(config.GetSMTPConfig(), shutdownChan, ds, msgHub)
|
||||
apolicy := &policy.Addressing{Config: config.GetSMTPConfig()}
|
||||
smtpServer = smtp.NewServer(config.GetSMTPConfig(), shutdownChan, ds, apolicy, msgHub)
|
||||
go smtpServer.Start(rootCtx)
|
||||
|
||||
// Loop forever waiting for signals or shutdown channel
|
||||
|
||||
@@ -3,7 +3,6 @@ package smtp
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"container/list"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
@@ -72,11 +71,6 @@ var commands = map[string]bool{
|
||||
"TURN": true,
|
||||
}
|
||||
|
||||
// recipientDetails for message delivery
|
||||
type recipientDetails struct {
|
||||
address, localPart, domainPart string
|
||||
}
|
||||
|
||||
// Session holds the state of an SMTP session
|
||||
type Session struct {
|
||||
server *Server
|
||||
@@ -88,14 +82,22 @@ type Session struct {
|
||||
state State
|
||||
reader *bufio.Reader
|
||||
from string
|
||||
recipients *list.List
|
||||
recipients []*policy.Recipient
|
||||
}
|
||||
|
||||
// NewSession creates a new Session for the given connection
|
||||
func NewSession(server *Server, id int, conn net.Conn) *Session {
|
||||
reader := bufio.NewReader(conn)
|
||||
host, _, _ := net.SplitHostPort(conn.RemoteAddr().String())
|
||||
return &Session{server: server, id: id, conn: conn, state: GREET, reader: reader, remoteHost: host}
|
||||
return &Session{
|
||||
server: server,
|
||||
id: id,
|
||||
conn: conn,
|
||||
state: GREET,
|
||||
reader: reader,
|
||||
remoteHost: host,
|
||||
recipients: make([]*policy.Recipient, 0),
|
||||
}
|
||||
}
|
||||
|
||||
func (ss *Session) String() string {
|
||||
@@ -297,7 +299,6 @@ func (ss *Session) readyHandler(cmd string, arg string) {
|
||||
}
|
||||
}
|
||||
ss.from = from
|
||||
ss.recipients = list.New()
|
||||
ss.logInfo("Mail from: %v", from)
|
||||
ss.send(fmt.Sprintf("250 Roger, accepting mail from <%v>", from))
|
||||
ss.enterState(MAIL)
|
||||
@@ -316,20 +317,21 @@ func (ss *Session) mailHandler(cmd string, arg string) {
|
||||
return
|
||||
}
|
||||
// This trim is probably too forgiving
|
||||
recip := strings.Trim(arg[3:], "<> ")
|
||||
if _, _, err := policy.ParseEmailAddress(recip); err != nil {
|
||||
addr := strings.Trim(arg[3:], "<> ")
|
||||
recip, err := ss.server.apolicy.NewRecipient(addr)
|
||||
if err != nil {
|
||||
ss.send("501 Bad recipient address syntax")
|
||||
ss.logWarn("Bad address as RCPT arg: %q, %s", recip, err)
|
||||
ss.logWarn("Bad address as RCPT arg: %q, %s", addr, err)
|
||||
return
|
||||
}
|
||||
if ss.recipients.Len() >= ss.server.maxRecips {
|
||||
if len(ss.recipients) >= ss.server.maxRecips {
|
||||
ss.logWarn("Maximum limit of %v recipients reached", ss.server.maxRecips)
|
||||
ss.send(fmt.Sprintf("552 Maximum limit of %v recipients reached", ss.server.maxRecips))
|
||||
return
|
||||
}
|
||||
ss.recipients.PushBack(recip)
|
||||
ss.logInfo("Recipient: %v", recip)
|
||||
ss.send(fmt.Sprintf("250 I'll make sure <%v> gets this", recip))
|
||||
ss.recipients = append(ss.recipients, recip)
|
||||
ss.logInfo("Recipient: %v", addr)
|
||||
ss.send(fmt.Sprintf("250 I'll make sure <%v> gets this", addr))
|
||||
return
|
||||
case "DATA":
|
||||
if arg != "" {
|
||||
@@ -337,7 +339,7 @@ func (ss *Session) mailHandler(cmd string, arg string) {
|
||||
ss.logWarn("Got unexpected args on DATA: %q", arg)
|
||||
return
|
||||
}
|
||||
if ss.recipients.Len() > 0 {
|
||||
if len(ss.recipients) > 0 {
|
||||
// We have recipients, go to accept data
|
||||
ss.enterState(DATA)
|
||||
return
|
||||
@@ -351,27 +353,6 @@ func (ss *Session) mailHandler(cmd string, arg string) {
|
||||
|
||||
// DATA
|
||||
func (ss *Session) dataHandler() {
|
||||
recipients := make([]recipientDetails, 0, ss.recipients.Len())
|
||||
// Get a Mailbox and a new Message for each recipient
|
||||
if ss.server.storeMessages {
|
||||
for e := ss.recipients.Front(); e != nil; e = e.Next() {
|
||||
recip := e.Value.(string)
|
||||
local, domain, err := policy.ParseEmailAddress(recip)
|
||||
if err != nil {
|
||||
ss.logError("Failed to parse address for %q", recip)
|
||||
ss.send(fmt.Sprintf("451 Failed to open mailbox for %v", recip))
|
||||
ss.reset()
|
||||
return
|
||||
}
|
||||
if strings.ToLower(domain) != ss.server.domainNoStore {
|
||||
// Not our "no store" domain, so store the message
|
||||
recipients = append(recipients, recipientDetails{recip, local, domain})
|
||||
} else {
|
||||
log.Tracef("Not storing message for %q", recip)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ss.send("354 Start mail input; end with <CRLF>.<CRLF>")
|
||||
msgBuf := &bytes.Buffer{}
|
||||
for {
|
||||
@@ -388,31 +369,27 @@ func (ss *Session) dataHandler() {
|
||||
}
|
||||
if bytes.Equal(lineBuf, []byte(".\r\n")) || bytes.Equal(lineBuf, []byte(".\n")) {
|
||||
// Mail data complete
|
||||
if ss.server.storeMessages {
|
||||
// Create a message for each valid recipient
|
||||
for _, r := range recipients {
|
||||
for _, recip := range ss.recipients {
|
||||
if recip.ShouldStore() {
|
||||
// TODO temporary hack to fix #77 until datastore revamp
|
||||
mu, err := ss.server.dataStore.LockFor(r.localPart)
|
||||
mu, err := ss.server.dataStore.LockFor(recip.LocalPart)
|
||||
if err != nil {
|
||||
ss.logError("Failed to get lock for %q: %s", r.localPart, err)
|
||||
ss.logError("Failed to get lock for %q: %s", recip.LocalPart, err)
|
||||
// Delivery failure
|
||||
ss.send(fmt.Sprintf("451 Failed to store message for %v", r.localPart))
|
||||
ss.send(fmt.Sprintf("451 Failed to store message for %v", recip.LocalPart))
|
||||
ss.reset()
|
||||
return
|
||||
}
|
||||
mu.Lock()
|
||||
ok := ss.deliverMessage(r, msgBuf.Bytes())
|
||||
ok := ss.deliverMessage(recip, msgBuf.Bytes())
|
||||
mu.Unlock()
|
||||
if ok {
|
||||
expReceivedTotal.Add(1)
|
||||
} else {
|
||||
if !ok {
|
||||
// Delivery failure
|
||||
ss.send(fmt.Sprintf("451 Failed to store message for %v", r.localPart))
|
||||
ss.send(fmt.Sprintf("451 Failed to store message for %v", recip.LocalPart))
|
||||
ss.reset()
|
||||
return
|
||||
}
|
||||
}
|
||||
} else {
|
||||
expReceivedTotal.Add(1)
|
||||
}
|
||||
ss.send("250 Mail accepted for delivery")
|
||||
@@ -436,8 +413,8 @@ func (ss *Session) dataHandler() {
|
||||
}
|
||||
|
||||
// deliverMessage creates and populates a new Message for the specified recipient
|
||||
func (ss *Session) deliverMessage(r recipientDetails, content []byte) (ok bool) {
|
||||
name, err := policy.ParseMailboxName(r.localPart)
|
||||
func (ss *Session) deliverMessage(recip *policy.Recipient, content []byte) (ok bool) {
|
||||
name, err := policy.ParseMailboxName(recip.LocalPart)
|
||||
if err != nil {
|
||||
// This parse already succeeded when MailboxFor was called, shouldn't fail here.
|
||||
return false
|
||||
@@ -445,7 +422,7 @@ func (ss *Session) deliverMessage(r recipientDetails, content []byte) (ok bool)
|
||||
// TODO replace with something that only reads header?
|
||||
env, err := enmime.ReadEnvelope(bytes.NewReader(content))
|
||||
if err != nil {
|
||||
ss.logError("Failed to parse message for %q: %v", r.localPart, err)
|
||||
ss.logError("Failed to parse message for %q: %v", recip.LocalPart, err)
|
||||
return false
|
||||
}
|
||||
from, err := env.AddressList("From")
|
||||
@@ -461,7 +438,7 @@ func (ss *Session) deliverMessage(r recipientDetails, content []byte) (ok bool)
|
||||
// Generate Received header.
|
||||
stamp := time.Now().Format(timeStampFormat)
|
||||
recd := strings.NewReader(fmt.Sprintf("Received: from %s ([%s]) by %s\r\n for <%s>; %s\r\n",
|
||||
ss.remoteDomain, ss.remoteHost, ss.server.domain, r.address, stamp))
|
||||
ss.remoteDomain, ss.remoteHost, ss.server.domain, recip.Address, stamp))
|
||||
delivery := &message.Delivery{
|
||||
Meta: message.Metadata{
|
||||
Mailbox: name,
|
||||
@@ -474,7 +451,7 @@ func (ss *Session) deliverMessage(r recipientDetails, content []byte) (ok bool)
|
||||
}
|
||||
id, err := ss.server.dataStore.AddMessage(delivery)
|
||||
if err != nil {
|
||||
ss.logError("Failed to store message for %q: %s", r.localPart, err)
|
||||
ss.logError("Failed to store message for %q: %s", recip.LocalPart, err)
|
||||
return false
|
||||
}
|
||||
// Broadcast message information.
|
||||
@@ -519,8 +496,7 @@ func (ss *Session) send(msg string) {
|
||||
ss.logTrace(">> %v >>", msg)
|
||||
}
|
||||
|
||||
// readByteLine reads a line of input into the provided buffer. Does
|
||||
// not reset the Buffer - please do so prior to calling.
|
||||
// readByteLine reads a line of input, returns byte slice.
|
||||
func (ss *Session) readByteLine() ([]byte, error) {
|
||||
if err := ss.conn.SetReadDeadline(ss.nextDeadline()); err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -15,6 +15,7 @@ import (
|
||||
|
||||
"github.com/jhillyerd/inbucket/pkg/config"
|
||||
"github.com/jhillyerd/inbucket/pkg/msghub"
|
||||
"github.com/jhillyerd/inbucket/pkg/policy"
|
||||
"github.com/jhillyerd/inbucket/pkg/storage"
|
||||
"github.com/jhillyerd/inbucket/pkg/test"
|
||||
)
|
||||
@@ -172,10 +173,7 @@ func TestMailState(t *testing.T) {
|
||||
{"RCPT TO: u4@gmail.com", 250},
|
||||
{"RSET", 250},
|
||||
{"MAIL FROM:<john@gmail.com>", 250},
|
||||
{"RCPT TO:<user\\@internal@external.com", 250},
|
||||
{"RCPT TO:<\"first last\"@host.com", 250},
|
||||
{"RCPT TO:<user\\>name@host.com>", 250},
|
||||
{"RCPT TO:<\"user>name\"@host.com>", 250},
|
||||
{`RCPT TO:<"first/last"@host.com`, 250},
|
||||
}
|
||||
if err := playSession(t, server, script); err != nil {
|
||||
t.Error(err)
|
||||
@@ -360,7 +358,8 @@ func setupSMTPServer(ds storage.Store) (s *Server, buf *bytes.Buffer, teardown f
|
||||
close(shutdownChan)
|
||||
cancel()
|
||||
}
|
||||
s = NewServer(cfg, shutdownChan, ds, msghub.New(ctx, 100))
|
||||
apolicy := &policy.Addressing{Config: cfg}
|
||||
s = NewServer(cfg, shutdownChan, ds, apolicy, msghub.New(ctx, 100))
|
||||
return s, buf, teardown
|
||||
}
|
||||
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
"github.com/jhillyerd/inbucket/pkg/config"
|
||||
"github.com/jhillyerd/inbucket/pkg/log"
|
||||
"github.com/jhillyerd/inbucket/pkg/msghub"
|
||||
"github.com/jhillyerd/inbucket/pkg/policy"
|
||||
"github.com/jhillyerd/inbucket/pkg/storage"
|
||||
)
|
||||
|
||||
@@ -49,6 +50,7 @@ type Server struct {
|
||||
|
||||
// Dependencies
|
||||
dataStore storage.Store // Mailbox/message store
|
||||
apolicy *policy.Addressing // Address policy
|
||||
globalShutdown chan bool // Shuts down Inbucket
|
||||
msgHub *msghub.Hub // Pub/sub for message info
|
||||
|
||||
@@ -83,6 +85,7 @@ func NewServer(
|
||||
cfg config.SMTPConfig,
|
||||
globalShutdown chan bool,
|
||||
ds storage.Store,
|
||||
apolicy *policy.Addressing,
|
||||
msgHub *msghub.Hub) *Server {
|
||||
return &Server{
|
||||
host: fmt.Sprintf("%v:%v", cfg.IP4address, cfg.IP4port),
|
||||
@@ -94,6 +97,7 @@ func NewServer(
|
||||
storeMessages: cfg.StoreMessages,
|
||||
globalShutdown: globalShutdown,
|
||||
dataStore: ds,
|
||||
apolicy: apolicy,
|
||||
msgHub: msgHub,
|
||||
waitgroup: new(sync.WaitGroup),
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user