mirror of
https://github.com/jhillyerd/inbucket.git
synced 2025-12-17 17:47:03 +00:00
message: migrate more delivery logic into manager.go (#414)
* message: migrate more delivery logic into manager.go Signed-off-by: James Hillyerd <james@hillyerd.com> * manager: tidy up a few things Signed-off-by: James Hillyerd <james@hillyerd.com> --------- Signed-off-by: James Hillyerd <james@hillyerd.com>
This commit is contained in:
@@ -2,6 +2,7 @@ package message
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net/mail"
|
"net/mail"
|
||||||
"strings"
|
"strings"
|
||||||
@@ -15,15 +16,17 @@ import (
|
|||||||
"github.com/rs/zerolog/log"
|
"github.com/rs/zerolog/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// recvdTimeFmt to use in generated Received header.
|
||||||
|
const recvdTimeFmt = "Mon, 02 Jan 2006 15:04:05 -0700 (MST)"
|
||||||
|
|
||||||
// Manager is the interface controllers use to interact with messages.
|
// Manager is the interface controllers use to interact with messages.
|
||||||
type Manager interface {
|
type Manager interface {
|
||||||
Deliver(
|
Deliver(
|
||||||
to *policy.Recipient,
|
|
||||||
from *policy.Origin,
|
from *policy.Origin,
|
||||||
recipients []*policy.Recipient,
|
recipients []*policy.Recipient,
|
||||||
prefix string,
|
recvdHeader string,
|
||||||
content []byte,
|
content []byte,
|
||||||
) (id string, err error)
|
) error
|
||||||
GetMetadata(mailbox string) ([]*event.MessageMetadata, error)
|
GetMetadata(mailbox string) ([]*event.MessageMetadata, error)
|
||||||
GetMessage(mailbox, id string) (*Message, error)
|
GetMessage(mailbox, id string) (*Message, error)
|
||||||
MarkSeen(mailbox, id string) error
|
MarkSeen(mailbox, id string) error
|
||||||
@@ -42,15 +45,17 @@ type StoreManager struct {
|
|||||||
|
|
||||||
// Deliver submits a new message to the store.
|
// Deliver submits a new message to the store.
|
||||||
func (s *StoreManager) Deliver(
|
func (s *StoreManager) Deliver(
|
||||||
to *policy.Recipient,
|
|
||||||
from *policy.Origin,
|
from *policy.Origin,
|
||||||
recipients []*policy.Recipient,
|
recipients []*policy.Recipient,
|
||||||
prefix string,
|
recvdHeader string,
|
||||||
source []byte,
|
source []byte,
|
||||||
) (string, error) {
|
) error {
|
||||||
|
logger := log.With().Str("module", "message").Logger()
|
||||||
|
|
||||||
|
// Parse envelope headers.
|
||||||
header, err := enmime.DecodeHeaders(source)
|
header, err := enmime.DecodeHeaders(source)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return err
|
||||||
}
|
}
|
||||||
fromaddr, err := enmime.ParseAddressList(header.Get("From"))
|
fromaddr, err := enmime.ParseAddressList(header.Get("From"))
|
||||||
if err != nil || len(fromaddr) == 0 {
|
if err != nil || len(fromaddr) == 0 {
|
||||||
@@ -65,28 +70,41 @@ func (s *StoreManager) Deliver(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debug().Str("module", "message").Str("mailbox", to.Mailbox).Msg("Delivering message")
|
now := time.Now()
|
||||||
delivery := &Delivery{
|
tstamp := now.UTC().Format(recvdTimeFmt)
|
||||||
Meta: event.MessageMetadata{
|
|
||||||
Mailbox: to.Mailbox,
|
// Deliver to mailboxes.
|
||||||
From: fromaddr[0],
|
for _, recip := range recipients {
|
||||||
To: toaddr,
|
if recip.ShouldStore() {
|
||||||
Date: time.Now(),
|
// Append recipient and timestamp to generated Recieved header.
|
||||||
Subject: header.Get("Subject"),
|
recvd := fmt.Sprintf("%s for <%s>; %s\r\n", recvdHeader, recip.Address.Address, tstamp)
|
||||||
},
|
|
||||||
Reader: io.MultiReader(strings.NewReader(prefix), bytes.NewReader(source)),
|
// Deliver message.
|
||||||
}
|
logger.Debug().Str("mailbox", recip.Mailbox).Msg("Delivering message")
|
||||||
id, err := s.Store.AddMessage(delivery)
|
delivery := &Delivery{
|
||||||
if err != nil {
|
Meta: event.MessageMetadata{
|
||||||
return "", err
|
Mailbox: recip.Mailbox,
|
||||||
|
From: fromaddr[0],
|
||||||
|
To: toaddr,
|
||||||
|
Date: now,
|
||||||
|
Subject: header.Get("Subject"),
|
||||||
|
},
|
||||||
|
Reader: io.MultiReader(strings.NewReader(recvd), bytes.NewReader(source)),
|
||||||
|
}
|
||||||
|
id, err := s.Store.AddMessage(delivery)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error().Str("mailbox", recip.Mailbox).Err(err).Msg("Delivery failed")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Emit message stored event.
|
||||||
|
event := delivery.Meta
|
||||||
|
event.ID = id
|
||||||
|
s.ExtHost.Events.AfterMessageStored.Emit(&event)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Emit message stored event.
|
return nil
|
||||||
event := delivery.Meta
|
|
||||||
event.ID = id
|
|
||||||
s.ExtHost.Events.AfterMessageStored.Emit(&event)
|
|
||||||
|
|
||||||
return id, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetMetadata returns a slice of metadata for the specified mailbox.
|
// GetMetadata returns a slice of metadata for the specified mailbox.
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ package message_test
|
|||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/inbucket/inbucket/v3/pkg/config"
|
||||||
"github.com/inbucket/inbucket/v3/pkg/extension"
|
"github.com/inbucket/inbucket/v3/pkg/extension"
|
||||||
"github.com/inbucket/inbucket/v3/pkg/message"
|
"github.com/inbucket/inbucket/v3/pkg/message"
|
||||||
"github.com/inbucket/inbucket/v3/pkg/policy"
|
"github.com/inbucket/inbucket/v3/pkg/policy"
|
||||||
@@ -11,23 +12,38 @@ import (
|
|||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestManagerEmitsMessageStoredEvent(t *testing.T) {
|
func TestDeliverStoresMessages(t *testing.T) {
|
||||||
extHost := extension.NewHost()
|
sm, _ := testStoreManager()
|
||||||
sm := &message.StoreManager{
|
|
||||||
AddrPolicy: &policy.Addressing{},
|
// Attempt to deliver a message to two mailboxes.
|
||||||
Store: test.NewStore(),
|
origin, _ := sm.AddrPolicy.ParseOrigin("from@example.com")
|
||||||
ExtHost: extHost,
|
recip1, _ := sm.AddrPolicy.NewRecipient("u1@example.com")
|
||||||
|
recip2, _ := sm.AddrPolicy.NewRecipient("u2@example.com")
|
||||||
|
if err := sm.Deliver(
|
||||||
|
origin,
|
||||||
|
[]*policy.Recipient{recip1, recip2},
|
||||||
|
"Received: xyz\n",
|
||||||
|
[]byte("From: from@example.com\nSubject: tsub\n\ntest email"),
|
||||||
|
); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
assertMessageCount(t, sm, "u1@example.com", 1)
|
||||||
|
assertMessageCount(t, sm, "u2@example.com", 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDeliverEmitsAfterMessageStoredEvent(t *testing.T) {
|
||||||
|
sm, extHost := testStoreManager()
|
||||||
|
|
||||||
listener := extHost.Events.AfterMessageStored.AsyncTestListener("manager", 1)
|
listener := extHost.Events.AfterMessageStored.AsyncTestListener("manager", 1)
|
||||||
|
|
||||||
// Attempt to deliver a message to generate event.
|
// Attempt to deliver a message to generate event.
|
||||||
origin, _ := sm.AddrPolicy.ParseOrigin("from@example.com")
|
origin, _ := sm.AddrPolicy.ParseOrigin("from@example.com")
|
||||||
if _, err := sm.Deliver(
|
recip, _ := sm.AddrPolicy.NewRecipient("to@example.com")
|
||||||
&policy.Recipient{},
|
if err := sm.Deliver(
|
||||||
origin,
|
origin,
|
||||||
[]*policy.Recipient{},
|
[]*policy.Recipient{recip},
|
||||||
"prefix",
|
"Received: xyz\n",
|
||||||
[]byte("From: from@example.com\n\ntest email"),
|
[]byte("From: from@example.com\n\ntest email"),
|
||||||
); err != nil {
|
); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@@ -36,4 +52,36 @@ func TestManagerEmitsMessageStoredEvent(t *testing.T) {
|
|||||||
got, err := listener()
|
got, err := listener()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.NotNil(t, got, "No event received, or it was nil")
|
assert.NotNil(t, got, "No event received, or it was nil")
|
||||||
|
assertMessageCount(t, sm, "to@example.com", 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func testStoreManager() (*message.StoreManager, *extension.Host) {
|
||||||
|
extHost := extension.NewHost()
|
||||||
|
|
||||||
|
sm := &message.StoreManager{
|
||||||
|
AddrPolicy: &policy.Addressing{
|
||||||
|
Config: &config.Root{
|
||||||
|
MailboxNaming: config.FullNaming,
|
||||||
|
SMTP: config.SMTP{
|
||||||
|
DefaultStore: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Store: test.NewStore(),
|
||||||
|
ExtHost: extHost,
|
||||||
|
}
|
||||||
|
|
||||||
|
return sm, extHost
|
||||||
|
}
|
||||||
|
|
||||||
|
func assertMessageCount(t *testing.T, sm *message.StoreManager, mailbox string, count int) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
metas, err := sm.GetMetadata(mailbox)
|
||||||
|
assert.NoError(t, err, "StoreManager GetMetadata failed")
|
||||||
|
|
||||||
|
got := len(metas)
|
||||||
|
if got != count {
|
||||||
|
t.Errorf("Mailbox %q got %v messages, wanted %v", mailbox, got, count)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -22,9 +22,6 @@ import (
|
|||||||
type State int
|
type State int
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// timeStampFormat to use in Received header.
|
|
||||||
timeStampFormat = "Mon, 02 Jan 2006 15:04:05 -0700 (MST)"
|
|
||||||
|
|
||||||
// Messages sent to user during LOGIN auth procedure. Can vary, but values are taken directly
|
// Messages sent to user during LOGIN auth procedure. Can vary, but values are taken directly
|
||||||
// from spec https://tools.ietf.org/html/draft-murchison-sasl-login-00
|
// from spec https://tools.ietf.org/html/draft-murchison-sasl-login-00
|
||||||
|
|
||||||
@@ -532,27 +529,21 @@ func (s *Session) dataHandler() {
|
|||||||
}
|
}
|
||||||
mailData := bytes.NewBuffer(msgBuf)
|
mailData := bytes.NewBuffer(msgBuf)
|
||||||
|
|
||||||
// Mail data complete.
|
// Generate Received header; Deliver() will append recipient and timestamp to this.
|
||||||
tstamp := time.Now().UTC().Format(timeStampFormat)
|
recvdHeader := fmt.Sprintf("Received: from %s ([%s]) by %s\r\n",
|
||||||
for _, recip := range s.recipients {
|
s.remoteDomain, s.remoteHost, s.config.Domain)
|
||||||
if recip.ShouldStore() {
|
|
||||||
// Generate Received header.
|
|
||||||
prefix := fmt.Sprintf("Received: from %s ([%s]) by %s\r\n for <%s>; %s\r\n",
|
|
||||||
s.remoteDomain, s.remoteHost, s.config.Domain, recip.Address.Address,
|
|
||||||
tstamp)
|
|
||||||
|
|
||||||
// Deliver message.
|
// Deliver message.
|
||||||
_, err := s.manager.Deliver(
|
if err := s.manager.Deliver(s.from, s.recipients, recvdHeader, mailData.Bytes()); err != nil {
|
||||||
recip, s.from, s.recipients, prefix, mailData.Bytes())
|
// Deliver() logs failure details, and the effected mailbox.
|
||||||
if err != nil {
|
s.send("451 Failed to store message")
|
||||||
s.logger.Error().Msgf("delivery for %v: %v", recip.LocalPart, err)
|
s.reset()
|
||||||
s.send(fmt.Sprintf("451 Failed to store message for %v", recip.LocalPart))
|
return
|
||||||
s.reset()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
expReceivedTotal.Add(1)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO Consider changing this to just 1 regardless of # of recipents.
|
||||||
|
expReceivedTotal.Add(int64(len(s.recipients)))
|
||||||
|
|
||||||
s.send("250 Mail accepted for delivery")
|
s.send("250 Mail accepted for delivery")
|
||||||
s.logger.Info().Msgf("Message size %v bytes", mailData.Len())
|
s.logger.Info().Msgf("Message size %v bytes", mailData.Len())
|
||||||
s.reset()
|
s.reset()
|
||||||
|
|||||||
Reference in New Issue
Block a user