1
0
mirror of https://blitiri.com.ar/repos/chasquid synced 2025-12-17 14:37:02 +00:00

queue: Support sending to pipes

With the introduction of aliases, the queue may now be delivering mail to
pipes. This patch implements pipe delivery.

It uses a fixed 30s timeout for now, as these commands should really not take
much time, and we don't want to overly complicate the configuration for now.
This commit is contained in:
Alberto Bertogli
2016-09-22 02:18:35 +01:00
parent a531092f8b
commit bab8a8083c
4 changed files with 76 additions and 33 deletions

View File

@@ -6,14 +6,19 @@ package queue
//go:generate protoc --go_out=. -I=${GOPATH}/src -I. queue.proto
import (
"context"
"crypto/rand"
"encoding/base64"
"fmt"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
"time"
"bytes"
"blitiri.com.ar/go/chasquid/internal/courier"
"blitiri.com.ar/go/chasquid/internal/envelope"
"blitiri.com.ar/go/chasquid/internal/protoio"
@@ -248,20 +253,10 @@ func (item *Item) SendLoop(q *Queue) {
wg.Add(1)
go func(rcpt *Recipient, oldStatus Recipient_Status) {
defer wg.Done()
// TODO: Different types of recipients.
to := rcpt.Address
tr.LazyPrintf("%s sending", to)
var err error
// TODO: If this is all the difference we end up having
// between the two couriers, consider going back to using a
// routing courier.
if envelope.DomainIn(to, q.localDomains) {
err = q.localC.Deliver(item.From, to, item.Data)
} else {
err = q.remoteC.Deliver(item.From, to, item.Data)
}
err := item.deliver(q, rcpt)
if err != nil {
// TODO: Local deliveries should not be retried, if they
@@ -322,6 +317,26 @@ func (item *Item) SendLoop(q *Queue) {
// remove item from the queue, and remove from disk.
}
func (item *Item) deliver(q *Queue, rcpt *Recipient) error {
if rcpt.Type == Recipient_PIPE {
c := strings.Fields(rcpt.Address)
if len(c) == 0 {
return fmt.Errorf("empty pipe")
}
ctx, _ := context.WithDeadline(context.Background(),
time.Now().Add(30*time.Second))
cmd := exec.CommandContext(ctx, c[0], c[1:]...)
cmd.Stdin = bytes.NewReader(item.Data)
return cmd.Run()
} else {
if envelope.DomainIn(rcpt.Address, q.localDomains) {
return q.localC.Deliver(item.From, rcpt.Address, item.Data)
} else {
return q.remoteC.Deliver(item.From, rcpt.Address, item.Data)
}
}
}
func nextDelay(last time.Duration) time.Duration {
switch {
case last < 1*time.Minute:

View File

@@ -34,13 +34,16 @@ type Recipient_Type int32
const (
Recipient_EMAIL Recipient_Type = 0
Recipient_PIPE Recipient_Type = 1
)
var Recipient_Type_name = map[int32]string{
0: "EMAIL",
1: "PIPE",
}
var Recipient_Type_value = map[string]int32{
"EMAIL": 0,
"PIPE": 1,
}
func (x Recipient_Type) String() string {
@@ -124,26 +127,26 @@ func init() {
func init() { proto.RegisterFile("queue.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{
// 329 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x64, 0x8f, 0x41, 0x6b, 0xab, 0x40,
0x14, 0x85, 0xa3, 0x31, 0xe6, 0xe5, 0xfa, 0x5e, 0xf0, 0x0d, 0x94, 0x4a, 0x56, 0x41, 0xba, 0x48,
0x29, 0x28, 0xa4, 0xcb, 0x42, 0x21, 0xa0, 0x2d, 0x81, 0x26, 0x94, 0x89, 0xfb, 0x30, 0xd1, 0x89,
0x15, 0x62, 0xc6, 0x3a, 0xd7, 0x45, 0x7f, 0x51, 0x7f, 0x47, 0xff, 0x59, 0xc7, 0x89, 0x69, 0xa1,
0xdd, 0xdd, 0x3b, 0xe7, 0x9c, 0xb9, 0xdf, 0x01, 0xe7, 0xb5, 0xe1, 0x0d, 0x0f, 0xaa, 0x5a, 0xa0,
0x20, 0x03, 0xbd, 0x4c, 0xee, 0xf2, 0x02, 0x5f, 0x9a, 0x5d, 0x90, 0x8a, 0x32, 0xcc, 0xc5, 0x81,
0x1d, 0xf3, 0x50, 0xeb, 0xbb, 0x66, 0x1f, 0x56, 0xf8, 0x56, 0x71, 0x19, 0x62, 0x51, 0x72, 0x89,
0xac, 0xac, 0xbe, 0xa7, 0xd3, 0x1f, 0xfe, 0xbb, 0x01, 0xc3, 0x15, 0x97, 0x92, 0xe5, 0x9c, 0x8c,
0xc1, 0x5c, 0x46, 0x9e, 0x31, 0x35, 0x66, 0x23, 0x6a, 0x16, 0x11, 0x21, 0x60, 0xed, 0x6b, 0x51,
0x7a, 0xa6, 0x7e, 0xd1, 0x33, 0xb9, 0x02, 0xab, 0x4e, 0x2b, 0xf4, 0xfa, 0xd3, 0xfe, 0xcc, 0x99,
0xbb, 0xc1, 0x89, 0x87, 0xf2, 0xb4, 0xa8, 0x0a, 0x7e, 0x44, 0xaa, 0xd5, 0x36, 0x99, 0x31, 0x64,
0x9e, 0xa5, 0x92, 0x7f, 0xa9, 0x9e, 0xc9, 0x3d, 0xfc, 0x4b, 0x6b, 0xce, 0x90, 0x67, 0x5b, 0x86,
0x5b, 0x94, 0xde, 0x40, 0x89, 0xce, 0x7c, 0x12, 0xe4, 0x42, 0xe4, 0x87, 0xae, 0x93, 0x62, 0x0e,
0x92, 0x33, 0x22, 0x75, 0xba, 0xc0, 0x02, 0x13, 0xe9, 0x7f, 0x18, 0x30, 0xfa, 0xba, 0x43, 0x3c,
0x18, 0xb2, 0x2c, 0xab, 0x15, 0x79, 0x07, 0x7c, 0x5e, 0xc9, 0x35, 0x58, 0x6d, 0x69, 0x4d, 0x3d,
0x9e, 0x5f, 0xfc, 0x24, 0x0c, 0x12, 0x25, 0x52, 0x6d, 0x21, 0x21, 0xd8, 0xea, 0x10, 0x36, 0x52,
0xd5, 0x69, 0xcd, 0x97, 0xbf, 0xcc, 0x1b, 0x2d, 0xd3, 0xce, 0xe6, 0xff, 0x07, 0xab, 0x8d, 0x93,
0x11, 0x0c, 0xe2, 0xd5, 0x62, 0xf9, 0xe4, 0xf6, 0xfc, 0x1b, 0xb0, 0x4f, 0x26, 0xe2, 0xc0, 0xf0,
0x39, 0x5e, 0x47, 0xcb, 0xf5, 0xa3, 0xdb, 0x23, 0x7f, 0xc0, 0xda, 0xc4, 0xeb, 0xc4, 0x35, 0x08,
0x80, 0xfd, 0xa0, 0xac, 0x71, 0xe4, 0x9a, 0x3b, 0x5b, 0x97, 0xbc, 0xfd, 0x0c, 0x00, 0x00, 0xff,
0xff, 0x64, 0x3a, 0x92, 0xc1, 0xc7, 0x01, 0x00, 0x00,
// 332 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x64, 0x8f, 0x51, 0x4b, 0xc3, 0x30,
0x14, 0x85, 0x6d, 0xd7, 0x75, 0xee, 0x56, 0x47, 0x09, 0x88, 0x65, 0xbe, 0x8c, 0xe2, 0xc3, 0x44,
0x68, 0x61, 0x3e, 0x0a, 0xc2, 0xa0, 0x55, 0x0a, 0x6e, 0x8c, 0xac, 0xef, 0x23, 0x6d, 0xb3, 0x5a,
0x58, 0x97, 0xda, 0xa4, 0x0f, 0xfe, 0x22, 0x7f, 0x8c, 0x7f, 0xca, 0x34, 0xed, 0x14, 0xf4, 0xed,
0xde, 0x9c, 0x73, 0x72, 0xbf, 0x03, 0xd6, 0x7b, 0x43, 0x1b, 0xea, 0x55, 0x35, 0x13, 0x0c, 0x0d,
0xd5, 0x32, 0x7d, 0xcc, 0x0b, 0xf1, 0xd6, 0x24, 0x5e, 0xca, 0x4a, 0x3f, 0x67, 0x07, 0x72, 0xcc,
0x7d, 0xa5, 0x27, 0xcd, 0xde, 0xaf, 0xc4, 0x47, 0x45, 0xb9, 0x2f, 0x8a, 0x92, 0x72, 0x41, 0xca,
0xea, 0x77, 0xea, 0xfe, 0x70, 0x3f, 0x35, 0x18, 0xad, 0x28, 0xe7, 0x24, 0xa7, 0x68, 0x02, 0x7a,
0x14, 0x38, 0xda, 0x4c, 0x9b, 0x8f, 0xb1, 0x5e, 0x04, 0x08, 0x81, 0xb1, 0xaf, 0x59, 0xe9, 0xe8,
0xea, 0x45, 0xcd, 0xe8, 0x16, 0x8c, 0x3a, 0xad, 0x84, 0x33, 0x98, 0x0d, 0xe6, 0xd6, 0xc2, 0xf6,
0x3a, 0x1e, 0x4c, 0xd3, 0xa2, 0x2a, 0xe8, 0x51, 0x60, 0xa5, 0xb6, 0xc9, 0x8c, 0x08, 0xe2, 0x18,
0x32, 0x79, 0x81, 0xd5, 0x8c, 0x9e, 0xe0, 0x32, 0xad, 0x29, 0x11, 0x34, 0xdb, 0x11, 0xb1, 0x13,
0xdc, 0x19, 0x4a, 0xd1, 0x5a, 0x4c, 0xbd, 0x9c, 0xb1, 0xfc, 0xd0, 0x77, 0x92, 0xcc, 0x5e, 0x7c,
0x42, 0xc4, 0x56, 0x1f, 0x58, 0x8a, 0x98, 0xbb, 0x5f, 0x1a, 0x8c, 0x7f, 0xee, 0x20, 0x07, 0x46,
0x24, 0xcb, 0x6a, 0x49, 0xde, 0x03, 0x9f, 0x56, 0x74, 0x07, 0x46, 0x5b, 0x5a, 0x51, 0x4f, 0x16,
0x57, 0x7f, 0x09, 0xbd, 0x58, 0x8a, 0x58, 0x59, 0x90, 0x0f, 0xa6, 0x3c, 0x24, 0x1a, 0x2e, 0xeb,
0xb4, 0xe6, 0xeb, 0x7f, 0xe6, 0xad, 0x92, 0x71, 0x6f, 0x73, 0x6f, 0xc0, 0x68, 0xe3, 0x68, 0x0c,
0xc3, 0x70, 0xb5, 0x8c, 0x5e, 0xed, 0x33, 0x74, 0x0e, 0xc6, 0x26, 0xda, 0x84, 0xb6, 0xe6, 0xde,
0x83, 0xd9, 0xd9, 0x91, 0x05, 0xa3, 0x4d, 0xb8, 0x0e, 0xa2, 0xf5, 0x4b, 0x67, 0xd8, 0x86, 0xeb,
0xd8, 0xd6, 0x10, 0x80, 0xf9, 0x2c, 0x43, 0x61, 0x60, 0xeb, 0x89, 0xa9, 0xea, 0x3e, 0x7c, 0x07,
0x00, 0x00, 0xff, 0xff, 0xd1, 0x22, 0xa1, 0x0c, 0xd1, 0x01, 0x00, 0x00,
}

View File

@@ -25,6 +25,7 @@ message Recipient {
enum Type {
EMAIL = 0;
PIPE = 1;
}
Type type = 2;

View File

@@ -135,3 +135,27 @@ func TestFullQueue(t *testing.T) {
}
q.Remove(id)
}
func TestPipes(t *testing.T) {
q := New("/tmp/queue_test", set.NewString("loco"))
item := &Item{
Message: Message{
ID: <-newID,
From: "from",
Rcpt: []*Recipient{
{"true", Recipient_PIPE, Recipient_PENDING}},
Data: []byte("data"),
},
CreatedAt: time.Now(),
}
if err := item.deliver(q, item.Rcpt[0]); err != nil {
t.Errorf("pipe delivery failed: %v", err)
}
// Make the command "false", should fail.
item.Rcpt[0].Address = "false"
if err := item.deliver(q, item.Rcpt[0]); err == nil {
t.Errorf("pipe delivery worked, expected failure")
}
}