1
0
mirror of https://blitiri.com.ar/repos/chasquid synced 2025-12-17 14:37:02 +00:00
Files
go-chasquid-smtp/internal/queue/queue_test.go
Alberto Bertogli d660f88f67 queue: Send DSN for messages that time out in the queue
The queue currently only considers failed recipients when deciding
whether to send a DSN or not. This is a bug, as recipients that time out
are not taken into account.

This patch fixes that issue by including both failed and pending
recipients in the DSN.

It also adds more comprehensive tests for this case, both in the queue
and in the dsn generation code.
2016-10-21 22:20:49 +01:00

260 lines
6.5 KiB
Go

package queue
import (
"bytes"
"fmt"
"reflect"
"strings"
"sync"
"testing"
"time"
"blitiri.com.ar/go/chasquid/internal/aliases"
"blitiri.com.ar/go/chasquid/internal/set"
)
// Test courier. Delivery is done by sending on a channel, so users have fine
// grain control over the results.
type ChanCourier struct {
requests chan deliverRequest
results chan error
}
type deliverRequest struct {
from string
to string
data []byte
}
func (cc *ChanCourier) Deliver(from string, to string, data []byte) (error, bool) {
cc.requests <- deliverRequest{from, to, data}
return <-cc.results, false
}
func newChanCourier() *ChanCourier {
return &ChanCourier{
requests: make(chan deliverRequest),
results: make(chan error),
}
}
// Courier for test purposes. Never fails, and always remembers everything.
type TestCourier struct {
wg sync.WaitGroup
requests []*deliverRequest
reqFor map[string]*deliverRequest
sync.Mutex
}
func (tc *TestCourier) Deliver(from string, to string, data []byte) (error, bool) {
defer tc.wg.Done()
dr := &deliverRequest{from, to, data}
tc.Lock()
tc.requests = append(tc.requests, dr)
tc.reqFor[to] = dr
tc.Unlock()
return nil, false
}
func newTestCourier() *TestCourier {
return &TestCourier{
reqFor: map[string]*deliverRequest{},
}
}
func TestBasic(t *testing.T) {
localC := newTestCourier()
remoteC := newTestCourier()
q := New("/tmp/queue_test", set.NewString("loco"), aliases.NewResolver(),
localC, remoteC, "dsndomain")
localC.wg.Add(2)
remoteC.wg.Add(1)
id, err := q.Put("from", []string{"am@loco", "x@remote", "nodomain"}, []byte("data"))
if err != nil {
t.Fatalf("Put: %v", err)
}
if len(id) < 6 {
t.Errorf("short ID: %v", id)
}
localC.wg.Wait()
remoteC.wg.Wait()
// Make sure the delivered items leave the queue.
for d := time.Now().Add(2 * time.Second); time.Now().Before(d); {
if q.Len() == 0 {
break
}
time.Sleep(20 * time.Millisecond)
}
if q.Len() != 0 {
t.Fatalf("%d items not removed from the queue after delivery", q.Len())
}
cases := []struct {
courier *TestCourier
expectedTo string
}{
{localC, "nodomain"},
{localC, "am@loco"},
{remoteC, "x@remote"},
}
for _, c := range cases {
req := c.courier.reqFor[c.expectedTo]
if req == nil {
t.Errorf("missing request for %q", c.expectedTo)
continue
}
if req.from != "from" || req.to != c.expectedTo ||
!bytes.Equal(req.data, []byte("data")) {
t.Errorf("wrong request for %q: %v", c.expectedTo, req)
}
}
}
func TestDSNOnTimeout(t *testing.T) {
localC := newTestCourier()
remoteC := newTestCourier()
q := New("/tmp/queue_test", set.NewString("loco"), aliases.NewResolver(),
localC, remoteC, "dsndomain")
// Insert an expired item in the queue.
item := &Item{
Message: Message{
ID: <-newID,
From: fmt.Sprintf("from@loco"),
Rcpt: []*Recipient{
{"to@to", Recipient_EMAIL, Recipient_PENDING, "err", "to@to"}},
Data: []byte("data"),
},
CreatedAt: time.Now().Add(-24 * time.Hour),
}
q.q[item.ID] = item
err := item.WriteTo(q.path)
if err != nil {
t.Errorf("failed to write item: %v", err)
}
// Launch the sending loop, expect 1 local delivery (the DSN).
localC.wg.Add(1)
go item.SendLoop(q)
localC.wg.Wait()
req := localC.reqFor["from@loco"]
if req == nil {
t.Fatal("missing DSN")
}
if req.from != "<>" || req.to != "from@loco" ||
!strings.Contains(string(req.data), "X-Failed-Recipients: to@to,") {
t.Errorf("wrong DSN: %q", string(req.data))
}
}
func TestFullQueue(t *testing.T) {
q := New("/tmp/queue_test", set.NewString(), aliases.NewResolver(),
dumbCourier, dumbCourier, "dsndomain")
// Force-insert maxQueueSize items in the queue.
oneID := ""
for i := 0; i < maxQueueSize; i++ {
item := &Item{
Message: Message{
ID: <-newID,
From: fmt.Sprintf("from-%d", i),
Rcpt: []*Recipient{
{"to", Recipient_EMAIL, Recipient_PENDING, "", ""}},
Data: []byte("data"),
},
CreatedAt: time.Now(),
}
q.q[item.ID] = item
oneID = item.ID
}
// This one should fail due to the queue being too big.
id, err := q.Put("from", []string{"to"}, []byte("data-qf"))
if err != errQueueFull {
t.Errorf("Not failed as expected: %v - %v", id, err)
}
// Remove one, and try again: it should succeed.
// Write it first so we don't get complaints about the file not existing
// (as we did not all the items properly).
q.q[oneID].WriteTo(q.path)
q.Remove(oneID)
id, err = q.Put("from", []string{"to"}, []byte("data"))
if err != nil {
t.Errorf("Put: %v", err)
}
q.Remove(id)
}
// Dumb courier, for when we just want to return directly.
type DumbCourier struct{}
func (c DumbCourier) Deliver(from string, to string, data []byte) (error, bool) {
return nil, false
}
var dumbCourier = DumbCourier{}
func TestAliases(t *testing.T) {
q := New("/tmp/queue_test", set.NewString("loco"), aliases.NewResolver(),
dumbCourier, dumbCourier, "dsndomain")
q.aliases.AddDomain("loco")
q.aliases.AddAliasForTesting("ab@loco", "pq@loco", aliases.EMAIL)
q.aliases.AddAliasForTesting("ab@loco", "rs@loco", aliases.EMAIL)
q.aliases.AddAliasForTesting("ab@loco", "command", aliases.PIPE)
q.aliases.AddAliasForTesting("cd@loco", "ata@hualpa", aliases.EMAIL)
cases := []struct {
to []string
expected []*Recipient
}{
{[]string{"ab@loco"}, []*Recipient{
{"pq@loco", Recipient_EMAIL, Recipient_PENDING, "", "ab@loco"},
{"rs@loco", Recipient_EMAIL, Recipient_PENDING, "", "ab@loco"},
{"command", Recipient_PIPE, Recipient_PENDING, "", "ab@loco"}}},
{[]string{"ab@loco", "cd@loco"}, []*Recipient{
{"pq@loco", Recipient_EMAIL, Recipient_PENDING, "", "ab@loco"},
{"rs@loco", Recipient_EMAIL, Recipient_PENDING, "", "ab@loco"},
{"command", Recipient_PIPE, Recipient_PENDING, "", "ab@loco"},
{"ata@hualpa", Recipient_EMAIL, Recipient_PENDING, "", "cd@loco"}}},
}
for _, c := range cases {
id, err := q.Put("from", c.to, []byte("data"))
if err != nil {
t.Errorf("Put: %v", err)
}
item := q.q[id]
if !reflect.DeepEqual(item.Rcpt, c.expected) {
t.Errorf("case %q, expected %v, got %v", c.to, c.expected, item.Rcpt)
}
q.Remove(id)
}
}
func TestPipes(t *testing.T) {
q := New("/tmp/queue_test", set.NewString("loco"), aliases.NewResolver(),
dumbCourier, dumbCourier, "dsndomain")
item := &Item{
Message: Message{
ID: <-newID,
From: "from",
Rcpt: []*Recipient{
{"true", Recipient_PIPE, Recipient_PENDING, "", ""}},
Data: []byte("data"),
},
CreatedAt: time.Now(),
}
if err, _ := item.deliver(q, item.Rcpt[0]); err != nil {
t.Errorf("pipe delivery failed: %v", err)
}
}