1
0
mirror of https://github.com/jhillyerd/inbucket.git synced 2025-12-17 17:47:03 +00:00

smtp: Move delivery into message.Manager for #69

This commit is contained in:
James Hillyerd
2018-03-17 16:54:29 -07:00
parent a22412f65e
commit f953bcf4bb
5 changed files with 102 additions and 93 deletions

View File

@@ -115,32 +115,27 @@ func main() {
} }
} }
// Create message hub // Configure internal services.
msgHub := msghub.New(rootCtx, config.GetWebConfig().MonitorHistory) msgHub := msghub.New(rootCtx, config.GetWebConfig().MonitorHistory)
// Setup our datastore
dscfg := config.GetDataStoreConfig() dscfg := config.GetDataStoreConfig()
ds := file.New(dscfg) store := file.New(dscfg)
retentionScanner := storage.NewRetentionScanner(dscfg, ds, shutdownChan) apolicy := &policy.Addressing{Config: config.GetSMTPConfig()}
mmanager := &message.StoreManager{Store: store, Hub: msgHub}
// Start Retention scanner.
retentionScanner := storage.NewRetentionScanner(dscfg, store, shutdownChan)
retentionScanner.Start() retentionScanner.Start()
// Start HTTP server.
// Start HTTP server web.Initialize(config.GetWebConfig(), shutdownChan, mmanager, msgHub)
mm := &message.StoreManager{Store: ds}
web.Initialize(config.GetWebConfig(), shutdownChan, mm, msgHub)
webui.SetupRoutes(web.Router) webui.SetupRoutes(web.Router)
rest.SetupRoutes(web.Router) rest.SetupRoutes(web.Router)
go web.Start(rootCtx) go web.Start(rootCtx)
// Start POP3 server.
// Start POP3 server pop3Server = pop3.New(config.GetPOP3Config(), shutdownChan, store)
pop3Server = pop3.New(config.GetPOP3Config(), shutdownChan, ds)
go pop3Server.Start(rootCtx) go pop3Server.Start(rootCtx)
// Start SMTP server.
// Startup SMTP server smtpServer = smtp.NewServer(config.GetSMTPConfig(), shutdownChan, mmanager, apolicy)
apolicy := &policy.Addressing{Config: config.GetSMTPConfig()}
smtpServer = smtp.NewServer(config.GetSMTPConfig(), shutdownChan, ds, apolicy, msgHub)
go smtpServer.Start(rootCtx) go smtpServer.Start(rootCtx)
// Loop forever waiting for signals or shutdown channel.
// Loop forever waiting for signals or shutdown channel
signalLoop: signalLoop:
for { for {
select { select {

View File

@@ -1,15 +1,28 @@
package message package message
import ( import (
"bytes"
"io" "io"
"net/mail"
"strings"
"time"
"github.com/jhillyerd/enmime" "github.com/jhillyerd/enmime"
"github.com/jhillyerd/inbucket/pkg/msghub"
"github.com/jhillyerd/inbucket/pkg/policy" "github.com/jhillyerd/inbucket/pkg/policy"
"github.com/jhillyerd/inbucket/pkg/storage" "github.com/jhillyerd/inbucket/pkg/storage"
"github.com/jhillyerd/inbucket/pkg/stringutil"
) )
// Manager is the interface controllers use to interact with messages. // Manager is the interface controllers use to interact with messages.
type Manager interface { type Manager interface {
Deliver(
to *policy.Recipient,
from string,
recipients []*policy.Recipient,
prefix string,
content []byte,
) (id string, err error)
GetMetadata(mailbox string) ([]*Metadata, error) GetMetadata(mailbox string) ([]*Metadata, error)
GetMessage(mailbox, id string) (*Message, error) GetMessage(mailbox, id string) (*Message, error)
PurgeMessages(mailbox string) error PurgeMessages(mailbox string) error
@@ -21,6 +34,61 @@ type Manager interface {
// StoreManager is a message Manager backed by the storage.Store. // StoreManager is a message Manager backed by the storage.Store.
type StoreManager struct { type StoreManager struct {
Store storage.Store Store storage.Store
Hub *msghub.Hub
}
// Deliver submits a new message to the store.
func (s *StoreManager) Deliver(
to *policy.Recipient,
from string,
recipients []*policy.Recipient,
prefix string,
content []byte,
) (string, error) {
// TODO enmime is too heavy for this step, only need header
env, err := enmime.ReadEnvelope(bytes.NewReader(content))
if err != nil {
return "", err
}
fromaddr, err := env.AddressList("From")
if err != nil || len(fromaddr) == 0 {
fromaddr = []*mail.Address{{Address: from}}
}
toaddr, err := env.AddressList("To")
if err != nil {
toaddr = make([]*mail.Address, len(recipients))
for i, torecip := range recipients {
toaddr[i] = &torecip.Address
}
}
delivery := &Delivery{
Meta: Metadata{
Mailbox: to.Mailbox,
From: fromaddr[0],
To: toaddr,
Date: time.Now(),
Subject: env.GetHeader("Subject"),
},
Reader: io.MultiReader(strings.NewReader(prefix), bytes.NewReader(content)),
}
id, err := s.Store.AddMessage(delivery)
if err != nil {
return "", err
}
if s.Hub != nil {
// Broadcast message information.
broadcast := msghub.Message{
Mailbox: to.Mailbox,
ID: id,
From: delivery.From().String(),
To: stringutil.StringAddressList(delivery.To()),
Subject: delivery.Subject(),
Date: delivery.Date(),
Size: delivery.Size(),
}
s.Hub.Dispatch(broadcast)
}
return id, nil
} }
// GetMetadata returns a slice of metadata for the specified mailbox. // GetMetadata returns a slice of metadata for the specified mailbox.

View File

@@ -6,18 +6,13 @@ import (
"fmt" "fmt"
"io" "io"
"net" "net"
"net/mail"
"regexp" "regexp"
"strconv" "strconv"
"strings" "strings"
"time" "time"
"github.com/jhillyerd/enmime"
"github.com/jhillyerd/inbucket/pkg/log" "github.com/jhillyerd/inbucket/pkg/log"
"github.com/jhillyerd/inbucket/pkg/message"
"github.com/jhillyerd/inbucket/pkg/msghub"
"github.com/jhillyerd/inbucket/pkg/policy" "github.com/jhillyerd/inbucket/pkg/policy"
"github.com/jhillyerd/inbucket/pkg/stringutil"
) )
// State tracks the current mode of our SMTP state machine // State tracks the current mode of our SMTP state machine
@@ -370,9 +365,18 @@ func (ss *Session) dataHandler() {
} }
if bytes.Equal(lineBuf, []byte(".\r\n")) || bytes.Equal(lineBuf, []byte(".\n")) { if bytes.Equal(lineBuf, []byte(".\r\n")) || bytes.Equal(lineBuf, []byte(".\n")) {
// Mail data complete. // Mail data complete.
tstamp := time.Now().Format(timeStampFormat)
for _, recip := range ss.recipients { for _, recip := range ss.recipients {
if recip.ShouldStore() { if recip.ShouldStore() {
if ok := ss.deliverMessage(recip, msgBuf.Bytes()); !ok { // Generate Received header.
prefix := fmt.Sprintf("Received: from %s ([%s]) by %s\r\n for <%s>; %s\r\n",
ss.remoteDomain, ss.remoteHost, ss.server.domain, recip.Address.Address,
tstamp)
// Deliver message.
_, err := ss.server.manager.Deliver(
recip, ss.from, ss.recipients, prefix, msgBuf.Bytes())
if err != nil {
ss.logError("delivery for %v: %v", recip.LocalPart, err)
ss.send(fmt.Sprintf("451 Failed to store message for %v", recip.LocalPart)) ss.send(fmt.Sprintf("451 Failed to store message for %v", recip.LocalPart))
ss.reset() ss.reset()
return return
@@ -385,7 +389,7 @@ func (ss *Session) dataHandler() {
ss.reset() ss.reset()
return return
} }
// RFC says remove leading periods from input. // RFC: remove leading periods from DATA.
if len(lineBuf) > 0 && lineBuf[0] == '.' { if len(lineBuf) > 0 && lineBuf[0] == '.' {
lineBuf = lineBuf[1:] lineBuf = lineBuf[1:]
} }
@@ -399,59 +403,6 @@ func (ss *Session) dataHandler() {
} }
} }
// deliverMessage creates and populates a new Message for the specified recipient
func (ss *Session) deliverMessage(recip *policy.Recipient, content []byte) (ok bool) {
// TODO replace with something that only reads header?
env, err := enmime.ReadEnvelope(bytes.NewReader(content))
if err != nil {
ss.logError("Failed to parse message for %q: %v", recip.LocalPart, err)
return false
}
from, err := env.AddressList("From")
if err != nil {
from = []*mail.Address{{Address: ss.from}}
}
to, err := env.AddressList("To")
if err != nil {
to = make([]*mail.Address, len(ss.recipients))
for i, torecip := range ss.recipients {
to[i] = &torecip.Address
}
}
// Generate Received header.
stamp := time.Now().Format(timeStampFormat)
recd := strings.NewReader(fmt.Sprintf("Received: from %s ([%s]) by %s\r\n for <%s>; %s\r\n",
ss.remoteDomain, ss.remoteHost, ss.server.domain, recip.Address, stamp))
delivery := &message.Delivery{
Meta: message.Metadata{
Mailbox: recip.Mailbox,
From: from[0],
To: to,
Date: time.Now(),
Subject: env.GetHeader("Subject"),
},
Reader: io.MultiReader(recd, bytes.NewReader(content)),
}
id, err := ss.server.dataStore.AddMessage(delivery)
if err != nil {
ss.logError("Failed to store message for %q: %s", recip.LocalPart, err)
return false
}
// Broadcast message information.
// TODO this belongs in message pkg.
broadcast := msghub.Message{
Mailbox: recip.Mailbox,
ID: id,
From: delivery.From().String(),
To: stringutil.StringAddressList(delivery.To()),
Subject: delivery.Subject(),
Date: delivery.Date(),
Size: delivery.Size(),
}
ss.server.msgHub.Dispatch(broadcast)
return true
}
func (ss *Session) enterState(state State) { func (ss *Session) enterState(state State) {
ss.state = state ss.state = state
ss.logTrace("Entering state %v", state) ss.logTrace("Entering state %v", state)

View File

@@ -2,7 +2,6 @@ package smtp
import ( import (
"bytes" "bytes"
"context"
"fmt" "fmt"
"io" "io"
@@ -14,7 +13,7 @@ import (
"time" "time"
"github.com/jhillyerd/inbucket/pkg/config" "github.com/jhillyerd/inbucket/pkg/config"
"github.com/jhillyerd/inbucket/pkg/msghub" "github.com/jhillyerd/inbucket/pkg/message"
"github.com/jhillyerd/inbucket/pkg/policy" "github.com/jhillyerd/inbucket/pkg/policy"
"github.com/jhillyerd/inbucket/pkg/storage" "github.com/jhillyerd/inbucket/pkg/storage"
"github.com/jhillyerd/inbucket/pkg/test" "github.com/jhillyerd/inbucket/pkg/test"
@@ -379,13 +378,12 @@ func setupSMTPServer(ds storage.Store) (s *Server, buf *bytes.Buffer, teardown f
// Create a server, don't start it // Create a server, don't start it
shutdownChan := make(chan bool) shutdownChan := make(chan bool)
ctx, cancel := context.WithCancel(context.Background())
teardown = func() { teardown = func() {
close(shutdownChan) close(shutdownChan)
cancel()
} }
apolicy := &policy.Addressing{Config: cfg} apolicy := &policy.Addressing{Config: cfg}
s = NewServer(cfg, shutdownChan, ds, apolicy, msghub.New(ctx, 100)) manager := &message.StoreManager{Store: ds}
s = NewServer(cfg, shutdownChan, manager, apolicy)
return s, buf, teardown return s, buf, teardown
} }

View File

@@ -12,9 +12,8 @@ import (
"github.com/jhillyerd/inbucket/pkg/config" "github.com/jhillyerd/inbucket/pkg/config"
"github.com/jhillyerd/inbucket/pkg/log" "github.com/jhillyerd/inbucket/pkg/log"
"github.com/jhillyerd/inbucket/pkg/msghub" "github.com/jhillyerd/inbucket/pkg/message"
"github.com/jhillyerd/inbucket/pkg/policy" "github.com/jhillyerd/inbucket/pkg/policy"
"github.com/jhillyerd/inbucket/pkg/storage"
) )
func init() { func init() {
@@ -49,10 +48,9 @@ type Server struct {
storeMessages bool storeMessages bool
// Dependencies // Dependencies
dataStore storage.Store // Mailbox/message store apolicy *policy.Addressing // Address policy.
apolicy *policy.Addressing // Address policy globalShutdown chan bool // Shuts down Inbucket.
globalShutdown chan bool // Shuts down Inbucket manager message.Manager // Used to deliver messages.
msgHub *msghub.Hub // Pub/sub for message info
// State // State
listener net.Listener // Incoming network connections listener net.Listener // Incoming network connections
@@ -84,9 +82,9 @@ var (
func NewServer( func NewServer(
cfg config.SMTPConfig, cfg config.SMTPConfig,
globalShutdown chan bool, globalShutdown chan bool,
ds storage.Store, manager message.Manager,
apolicy *policy.Addressing, apolicy *policy.Addressing,
msgHub *msghub.Hub) *Server { ) *Server {
return &Server{ return &Server{
host: fmt.Sprintf("%v:%v", cfg.IP4address, cfg.IP4port), host: fmt.Sprintf("%v:%v", cfg.IP4address, cfg.IP4port),
domain: cfg.Domain, domain: cfg.Domain,
@@ -96,9 +94,8 @@ func NewServer(
maxMessageBytes: cfg.MaxMessageBytes, maxMessageBytes: cfg.MaxMessageBytes,
storeMessages: cfg.StoreMessages, storeMessages: cfg.StoreMessages,
globalShutdown: globalShutdown, globalShutdown: globalShutdown,
dataStore: ds, manager: manager,
apolicy: apolicy, apolicy: apolicy,
msgHub: msgHub,
waitgroup: new(sync.WaitGroup), waitgroup: new(sync.WaitGroup),
} }
} }