1
0
mirror of https://blitiri.com.ar/repos/chasquid synced 2025-12-18 14:47:03 +00:00

queue: Simplify sending loop

This patch simplifies the sending loop code:

 - Move the recipient sending function from a closure to a method.
 - Simplify the status update logic: we now update and write
   unconditionally (as we should have been doing).
 - Create a function for counting recipients in a given status.

It also adds a test for the removal of completed items from the queue,
which was not covered before and came up during development.
This commit is contained in:
Alberto Bertogli
2016-10-12 23:54:49 +01:00
parent c172894317
commit 09d3c73f6c
2 changed files with 66 additions and 68 deletions

View File

@@ -305,60 +305,17 @@ func (item *Item) SendLoop(q *Queue) {
// Send to all recipients that are still pending.
var wg sync.WaitGroup
for _, rcpt := range item.Rcpt {
item.Lock()
status := rcpt.Status
item.Unlock()
if status != Recipient_PENDING {
if rcpt.Status != Recipient_PENDING {
continue
}
wg.Add(1)
go func(rcpt *Recipient, oldStatus Recipient_Status) {
defer wg.Done()
to := rcpt.Address
tr.Debugf("%s sending", to)
err, permanent := item.deliver(q, rcpt)
if err != nil {
if permanent {
tr.Errorf("%s permanent error: %v", to, err)
status = Recipient_FAILED
} else {
tr.Printf("%s temporary error: %v", to, err)
}
} else {
tr.Printf("%s sent", to)
status = Recipient_SENT
}
// Update + write on status change.
if oldStatus != status {
item.Lock()
rcpt.Status = status
if err != nil {
rcpt.LastFailureMessage = err.Error()
}
item.Unlock()
err = item.WriteTo(q.path)
if err != nil {
tr.Errorf("failed to write: %v", err)
}
}
}(rcpt, status)
go item.sendOneRcpt(&wg, tr, q, rcpt)
}
wg.Wait()
// If they're all done, no need to wait.
pending := 0
for _, rcpt := range item.Rcpt {
if rcpt.Status == Recipient_PENDING {
pending++
}
}
if pending == 0 {
if item.countRcpt(Recipient_PENDING) == 0 {
break
}
@@ -371,41 +328,41 @@ func (item *Item) SendLoop(q *Queue) {
}
// Completed to all recipients (some may not have succeeded).
failed := 0
for _, rcpt := range item.Rcpt {
if rcpt.Status == Recipient_FAILED {
failed++
}
}
if failed > 0 && item.From != "<>" {
if item.countRcpt(Recipient_FAILED) > 0 && item.From != "<>" {
sendDSN(tr, q, item)
}
tr.Printf("all done")
q.Remove(item.ID)
return
}
func sendDSN(tr *trace.Trace, q *Queue, item *Item) {
tr.Debugf("sending DSN")
// sendOneRcpt, and update it with the results.
func (item *Item) sendOneRcpt(wg *sync.WaitGroup, tr *trace.Trace, q *Queue, rcpt *Recipient) {
defer wg.Done()
to := rcpt.Address
tr.Debugf("%s sending", to)
msg, err := deliveryStatusNotification(item)
err, permanent := item.deliver(q, rcpt)
item.Lock()
if err != nil {
tr.Errorf("failed to build DSN: %v", err)
return
rcpt.LastFailureMessage = err.Error()
if permanent {
tr.Errorf("%s permanent error: %v", to, err)
rcpt.Status = Recipient_FAILED
} else {
tr.Printf("%s temporary error: %v", to, err)
}
} else {
tr.Printf("%s sent", to)
rcpt.Status = Recipient_SENT
}
item.Unlock()
id, err := q.Put(item.Hostname, "<>", []string{item.From}, msg)
err = item.WriteTo(q.path)
if err != nil {
tr.Errorf("failed to queue DSN: %v", err)
return
tr.Errorf("failed to write: %v", err)
}
tr.Printf("queued DSN: %s", id)
dsnQueued.Add(1)
}
// deliver the item to the given recipient, using the couriers from the queue.
@@ -452,6 +409,36 @@ func (item *Item) deliver(q *Queue, rcpt *Recipient) (err error, permanent bool)
}
}
// countRcpt counts how many recipients are in the given status.
func (item *Item) countRcpt(status Recipient_Status) int {
c := 0
for _, rcpt := range item.Rcpt {
if rcpt.Status == status {
c++
}
}
return c
}
func sendDSN(tr *trace.Trace, q *Queue, item *Item) {
tr.Debugf("sending DSN")
msg, err := deliveryStatusNotification(item)
if err != nil {
tr.Errorf("failed to build DSN: %v", err)
return
}
id, err := q.Put(item.Hostname, "<>", []string{item.From}, msg)
if err != nil {
tr.Errorf("failed to queue DSN: %v", err)
return
}
tr.Printf("queued DSN: %s", id)
dsnQueued.Add(1)
}
func nextDelay(last time.Duration) time.Duration {
switch {
case last < 1*time.Minute:

View File

@@ -80,6 +80,17 @@ func TestBasic(t *testing.T) {
localC.wg.Wait()
remoteC.wg.Wait()
// Make sure the delivered items leave the queue.
for d := time.Now().Add(2 * time.Second); time.Now().Before(d); {
if q.Len() == 0 {
break
}
time.Sleep(20 * time.Millisecond)
}
if q.Len() != 0 {
t.Fatalf("%d items not removed from the queue after delivery", q.Len())
}
cases := []struct {
courier *TestCourier
expectedTo string