1
0
mirror of https://blitiri.com.ar/repos/chasquid synced 2025-12-17 14:37:02 +00:00

queue: Implement persistency

This patch makes the queue read and write items to disk.

It uses protobuf for serialization. We serialize to text format to make
manual troubleshooting easier, as the performance difference is not very
relevant for us.
This commit is contained in:
Alberto Bertogli
2016-09-18 06:13:42 +01:00
parent 9ed30a747b
commit aacf8ffea7
11 changed files with 457 additions and 64 deletions

View File

@@ -2,18 +2,26 @@
// Accepted envelopes get put in the queue, and processed asynchronously.
package queue
// Command to generate queue.pb.go from queue.proto.
//go:generate protoc --go_out=. -I=${GOPATH}/src -I. queue.proto
import (
"crypto/rand"
"encoding/base64"
"fmt"
"os"
"path/filepath"
"sync"
"time"
"blitiri.com.ar/go/chasquid/internal/courier"
"blitiri.com.ar/go/chasquid/internal/envelope"
"blitiri.com.ar/go/chasquid/internal/protoio"
"blitiri.com.ar/go/chasquid/internal/set"
"github.com/golang/glog"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/timestamp"
"golang.org/x/net/trace"
)
@@ -23,6 +31,13 @@ const (
// Give up sending attempts after this duration.
giveUpAfter = 12 * time.Hour
// Prefix for item file names.
// This is for convenience, versioning, and to be able to tell them apart
// temporary files and other cruft.
// It's important that it's outside the base64 space so it doesn't get
// generated accidentally.
itemFilePrefix = "m:"
)
var (
@@ -68,19 +83,47 @@ type Queue struct {
// Domains we consider local.
localDomains *set.String
// Path where we store the queue.
path string
}
// TODO: Store the queue on disk.
// Load the queue and launch the sending loops on startup.
func New(localC, remoteC courier.Courier, localDomains *set.String) *Queue {
func New(path string, localDomains *set.String) *Queue {
os.MkdirAll(path, 0700)
return &Queue{
q: map[string]*Item{},
localC: localC,
remoteC: remoteC,
localC: &courier.Procmail{},
remoteC: &courier.SMTP{},
localDomains: localDomains,
path: path,
}
}
func (q *Queue) Load() error {
files, err := filepath.Glob(q.path + "/" + itemFilePrefix + "*")
if err != nil {
return err
}
for _, fname := range files {
item, err := ItemFromFile(fname)
if err != nil {
glog.Errorf("error loading queue item from %q: %v", fname, err)
continue
}
q.mu.Lock()
q.q[item.ID] = item
q.mu.Unlock()
go item.SendLoop(q)
}
return nil
}
func (q *Queue) Len() int {
q.mu.RLock()
defer q.mu.RUnlock()
@@ -94,13 +137,27 @@ func (q *Queue) Put(from string, to []string, data []byte) (string, error) {
}
item := &Item{
ID: <-newID,
From: from,
To: to,
Data: data,
Created: time.Now(),
Results: map[string]error{},
Message: Message{
ID: <-newID,
From: from,
Data: data,
},
CreatedAt: time.Now(),
}
for _, t := range to {
item.Rcpt = append(item.Rcpt, &Recipient{
Address: t,
Type: Recipient_EMAIL,
Status: Recipient_PENDING,
})
}
err := item.WriteTo(q.path)
if err != nil {
return "", fmt.Errorf("failed to write item: %v", err)
}
q.mu.Lock()
q.q[item.ID] = item
q.mu.Unlock()
@@ -115,6 +172,12 @@ func (q *Queue) Put(from string, to []string, data []byte) (string, error) {
// Remove an item from the queue.
func (q *Queue) Remove(id string) {
path := fmt.Sprintf("%s/%s%s", q.path, itemFilePrefix, id)
err := os.Remove(path)
if err != nil {
glog.Errorf("failed to remove queue file %q: %v", path, err)
}
q.mu.Lock()
delete(q.q, id)
q.mu.Unlock()
@@ -124,54 +187,70 @@ func (q *Queue) Remove(id string) {
// Register it in main().
// An item in the queue.
// This must be easily serializable, so no pointers.
type Item struct {
// Item ID. Uniquely identifies this item.
ID string
// Base the item on the protobuf message.
// We will use this for serialization, so any fields below are NOT
// serialized.
Message
// The envelope for this item.
From string
To []string
Data []byte
// Protect the entire item.
sync.Mutex
// Creation time.
Created time.Time
// Next attempt to send.
NextAttempt time.Time
// Map of recipient -> last result of sending it.
Results map[string]error
mu sync.Mutex
// Go-friendly version of Message.CreatedAtTs.
CreatedAt time.Time
}
func (item *Item) resultsFor(to string) (error, bool) {
item.mu.Lock()
defer item.mu.Unlock()
value, ok := item.Results[to]
return value, ok
func ItemFromFile(fname string) (*Item, error) {
item := &Item{}
err := protoio.ReadTextMessage(fname, &item.Message)
if err != nil {
return nil, err
}
item.CreatedAt, err = ptypes.Timestamp(item.CreatedAtTs)
return item, err
}
func (item *Item) WriteTo(dir string) error {
item.Lock()
defer item.Unlock()
var err error
item.CreatedAtTs, err = ptypes.TimestampProto(item.CreatedAt)
if err != nil {
return err
}
path := fmt.Sprintf("%s/%s%s", dir, itemFilePrefix, item.ID)
return protoio.WriteTextMessage(path, &item.Message, 0600)
}
func (item *Item) SendLoop(q *Queue) {
defer q.Remove(item.ID)
tr := trace.New("Queue", item.ID)
defer tr.Finish()
tr.LazyPrintf("from: %s", item.From)
var delay time.Duration
for time.Since(item.Created) < giveUpAfter {
for time.Since(item.CreatedAt) < giveUpAfter {
// Send to all recipients that are still pending.
var wg sync.WaitGroup
for _, to := range item.To {
if err, ok := item.resultsFor(to); ok && err == nil {
// Successful send for this recipient, nothing to do.
for _, rcpt := range item.Rcpt {
item.Lock()
status := rcpt.Status
item.Unlock()
if status != Recipient_PENDING {
continue
}
wg.Add(1)
go func(to string) {
go func(rcpt *Recipient, oldStatus Recipient_Status) {
defer wg.Done()
// TODO: Different types of recipients.
to := rcpt.Address
tr.LazyPrintf("%s sending", to)
var err error
@@ -183,9 +262,6 @@ func (item *Item) SendLoop(q *Queue) {
} else {
err = q.remoteC.Deliver(item.From, to, item.Data)
}
item.mu.Lock()
item.Results[to] = err
item.mu.Unlock()
if err != nil {
// TODO: Local deliveries should not be retried, if they
@@ -197,21 +273,39 @@ func (item *Item) SendLoop(q *Queue) {
} else {
tr.LazyPrintf("%s successful", to)
glog.Infof("%s -> %q sent", item.ID, to)
status = Recipient_SENT
}
}(to)
// Update + write on status change.
if oldStatus != status {
item.Lock()
rcpt.Status = status
item.Unlock()
err = item.WriteTo(q.path)
if err != nil {
tr.LazyPrintf("failed to write: %v", err)
glog.Errorf("%s failed to write: %v", item.ID, err)
}
}
}(rcpt, status)
}
wg.Wait()
successful := 0
for _, to := range item.To {
if err, ok := item.resultsFor(to); ok && err == nil {
successful++
pending := 0
for _, rcpt := range item.Rcpt {
if rcpt.Status == Recipient_PENDING {
pending++
}
}
if successful == len(item.To) {
if pending == 0 {
// Successfully sent to all recipients.
tr.LazyPrintf("all successful")
glog.Infof("%s all successful", item.ID)
q.Remove(item.ID)
return
}
@@ -219,11 +313,13 @@ func (item *Item) SendLoop(q *Queue) {
// that some of the messages have been delayed.
delay = nextDelay(delay)
tr.LazyPrintf("waiting for %v", delay)
glog.Infof("%s waiting for %v", item.ID, delay)
time.Sleep(delay)
}
// TODO: Send a notification message for the recipients we failed to send.
// TODO: Send a notification message for the recipients we failed to send,
// remove item from the queue, and remove from disk.
}
func nextDelay(last time.Duration) time.Duration {
@@ -238,3 +334,9 @@ func nextDelay(last time.Duration) time.Duration {
return 20 * time.Minute
}
}
func timestampNow() *timestamp.Timestamp {
now := time.Now()
ts, _ := ptypes.TimestampProto(now)
return ts
}