1
0
mirror of https://blitiri.com.ar/repos/chasquid synced 2025-12-18 14:47:03 +00:00
Files
go-chasquid-smtp/internal/queue/queue_test.go
Alberto Bertogli fea808f8e3 queue: Get the DSN domain from the message
Picking the domain used in the DSN message "From" header is more
complicated than it needs to be, causing confusing code paths and having
different uses for the hostname, which should be purely aesthetic.

This patch makes the queue pick the DSN "From" domain from the message
itself, by looking for a local domain in either the sender or the
original recipients. We should find at least one, otherwise it'd be
relaying.

This allows the code to be simplified, and we can narrow the scope of
the hostname option even further.
2016-11-03 00:51:59 +00:00

288 lines
6.9 KiB
Go

package queue
import (
"bytes"
"fmt"
"strings"
"sync"
"testing"
"time"
"blitiri.com.ar/go/chasquid/internal/aliases"
"blitiri.com.ar/go/chasquid/internal/set"
)
// Test courier. Delivery is done by sending on a channel, so users have fine
// grain control over the results.
type ChanCourier struct {
requests chan deliverRequest
results chan error
}
type deliverRequest struct {
from string
to string
data []byte
}
func (cc *ChanCourier) Deliver(from string, to string, data []byte) (error, bool) {
cc.requests <- deliverRequest{from, to, data}
return <-cc.results, false
}
func newChanCourier() *ChanCourier {
return &ChanCourier{
requests: make(chan deliverRequest),
results: make(chan error),
}
}
// Courier for test purposes. Never fails, and always remembers everything.
type TestCourier struct {
wg sync.WaitGroup
requests []*deliverRequest
reqFor map[string]*deliverRequest
sync.Mutex
}
func (tc *TestCourier) Deliver(from string, to string, data []byte) (error, bool) {
defer tc.wg.Done()
dr := &deliverRequest{from, to, data}
tc.Lock()
tc.requests = append(tc.requests, dr)
tc.reqFor[to] = dr
tc.Unlock()
return nil, false
}
func newTestCourier() *TestCourier {
return &TestCourier{
reqFor: map[string]*deliverRequest{},
}
}
func TestBasic(t *testing.T) {
localC := newTestCourier()
remoteC := newTestCourier()
q := New("/tmp/queue_test", set.NewString("loco"), aliases.NewResolver(),
localC, remoteC)
localC.wg.Add(2)
remoteC.wg.Add(1)
id, err := q.Put("from", []string{"am@loco", "x@remote", "nodomain"}, []byte("data"))
if err != nil {
t.Fatalf("Put: %v", err)
}
if len(id) < 6 {
t.Errorf("short ID: %v", id)
}
localC.wg.Wait()
remoteC.wg.Wait()
// Make sure the delivered items leave the queue.
for d := time.Now().Add(2 * time.Second); time.Now().Before(d); {
if q.Len() == 0 {
break
}
time.Sleep(20 * time.Millisecond)
}
if q.Len() != 0 {
t.Fatalf("%d items not removed from the queue after delivery", q.Len())
}
cases := []struct {
courier *TestCourier
expectedTo string
}{
{localC, "nodomain"},
{localC, "am@loco"},
{remoteC, "x@remote"},
}
for _, c := range cases {
req := c.courier.reqFor[c.expectedTo]
if req == nil {
t.Errorf("missing request for %q", c.expectedTo)
continue
}
if req.from != "from" || req.to != c.expectedTo ||
!bytes.Equal(req.data, []byte("data")) {
t.Errorf("wrong request for %q: %v", c.expectedTo, req)
}
}
}
func TestDSNOnTimeout(t *testing.T) {
localC := newTestCourier()
remoteC := newTestCourier()
q := New("/tmp/queue_test", set.NewString("loco"), aliases.NewResolver(),
localC, remoteC)
// Insert an expired item in the queue.
item := &Item{
Message: Message{
ID: <-newID,
From: fmt.Sprintf("from@loco"),
Rcpt: []*Recipient{
{"to@to", Recipient_EMAIL, Recipient_PENDING, "err", "to@to"}},
Data: []byte("data"),
},
CreatedAt: time.Now().Add(-24 * time.Hour),
}
q.q[item.ID] = item
err := item.WriteTo(q.path)
if err != nil {
t.Errorf("failed to write item: %v", err)
}
// Launch the sending loop, expect 1 local delivery (the DSN).
localC.wg.Add(1)
go item.SendLoop(q)
localC.wg.Wait()
req := localC.reqFor["from@loco"]
if req == nil {
t.Fatal("missing DSN")
}
if req.from != "<>" || req.to != "from@loco" ||
!strings.Contains(string(req.data), "X-Failed-Recipients: to@to,") {
t.Errorf("wrong DSN: %q", string(req.data))
}
}
func TestAliases(t *testing.T) {
localC := newTestCourier()
remoteC := newTestCourier()
q := New("/tmp/queue_test", set.NewString("loco"), aliases.NewResolver(),
localC, remoteC)
q.aliases.AddDomain("loco")
q.aliases.AddAliasForTesting("ab@loco", "pq@loco", aliases.EMAIL)
q.aliases.AddAliasForTesting("ab@loco", "rs@loco", aliases.EMAIL)
q.aliases.AddAliasForTesting("cd@loco", "ata@hualpa", aliases.EMAIL)
// Note the pipe aliases are tested below, as they don't use the couriers
// and it can be quite inconvenient to test them in this way.
localC.wg.Add(2)
remoteC.wg.Add(1)
_, err := q.Put("from", []string{"ab@loco", "cd@loco"}, []byte("data"))
if err != nil {
t.Fatalf("Put: %v", err)
}
localC.wg.Wait()
remoteC.wg.Wait()
cases := []struct {
courier *TestCourier
expectedTo string
}{
{localC, "pq@loco"},
{localC, "rs@loco"},
{remoteC, "ata@hualpa"},
}
for _, c := range cases {
req := c.courier.reqFor[c.expectedTo]
if req == nil {
t.Errorf("missing request for %q", c.expectedTo)
continue
}
if req.from != "from" || req.to != c.expectedTo ||
!bytes.Equal(req.data, []byte("data")) {
t.Errorf("wrong request for %q: %v", c.expectedTo, req)
}
}
}
// Dumb courier, for when we just want to return directly.
type DumbCourier struct{}
func (c DumbCourier) Deliver(from string, to string, data []byte) (error, bool) {
return nil, false
}
var dumbCourier = DumbCourier{}
func TestFullQueue(t *testing.T) {
q := New("/tmp/queue_test", set.NewString(), aliases.NewResolver(),
dumbCourier, dumbCourier)
// Force-insert maxQueueSize items in the queue.
oneID := ""
for i := 0; i < maxQueueSize; i++ {
item := &Item{
Message: Message{
ID: <-newID,
From: fmt.Sprintf("from-%d", i),
Rcpt: []*Recipient{
{"to", Recipient_EMAIL, Recipient_PENDING, "", ""}},
Data: []byte("data"),
},
CreatedAt: time.Now(),
}
q.q[item.ID] = item
oneID = item.ID
}
// This one should fail due to the queue being too big.
id, err := q.Put("from", []string{"to"}, []byte("data-qf"))
if err != errQueueFull {
t.Errorf("Not failed as expected: %v - %v", id, err)
}
// Remove one, and try again: it should succeed.
// Write it first so we don't get complaints about the file not existing
// (as we did not all the items properly).
q.q[oneID].WriteTo(q.path)
q.Remove(oneID)
id, err = q.Put("from", []string{"to"}, []byte("data"))
if err != nil {
t.Errorf("Put: %v", err)
}
q.Remove(id)
}
func TestPipes(t *testing.T) {
q := New("/tmp/queue_test", set.NewString("loco"), aliases.NewResolver(),
dumbCourier, dumbCourier)
item := &Item{
Message: Message{
ID: <-newID,
From: "from",
Rcpt: []*Recipient{
{"true", Recipient_PIPE, Recipient_PENDING, "", ""}},
Data: []byte("data"),
},
CreatedAt: time.Now(),
}
if err, _ := item.deliver(q, item.Rcpt[0]); err != nil {
t.Errorf("pipe delivery failed: %v", err)
}
}
func TestNextDelay(t *testing.T) {
cases := []struct{ since, min time.Duration }{
{10 * time.Second, 1 * time.Minute},
{3 * time.Minute, 5 * time.Minute},
{7 * time.Minute, 10 * time.Minute},
{15 * time.Minute, 20 * time.Minute},
{30 * time.Minute, 20 * time.Minute},
}
for _, c := range cases {
// Repeat each case a few times to exercise the perturbation a bit.
for i := 0; i < 10; i++ {
delay := nextDelay(time.Now().Add(-c.since))
max := c.min + 1*time.Minute
if delay < c.min || delay > max {
t.Errorf("since:%v expected [%v, %v], got %v",
c.since, c.min, max, delay)
}
}
}
}