Allow to abort execution while checking by closing a channel
This commit is contained in:
87
client.go
87
client.go
@@ -9,6 +9,7 @@ import (
|
|||||||
"regexp"
|
"regexp"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -116,70 +117,70 @@ func (s *Client) SetUnixUser(user string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Return a confirmation that spamd is alive.
|
// Return a confirmation that spamd is alive.
|
||||||
func (s *Client) Ping() (r *SpamDOut, err error) {
|
func (s *Client) Ping(abort chan bool) (r *SpamDOut, err error) {
|
||||||
return s.simpleCall(PING, []string{})
|
return s.simpleCall(abort, PING, []string{})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Just check if the passed message is spam or not and return score
|
// Just check if the passed message is spam or not and return score
|
||||||
func (s *Client) Check(msgpars ...string) (reply *SpamDOut, err error) {
|
func (s *Client) Check(abort chan bool, msgpars ...string) (reply *SpamDOut, err error) {
|
||||||
return s.simpleCall(CHECK, msgpars)
|
return s.simpleCall(abort, CHECK, msgpars)
|
||||||
}
|
}
|
||||||
|
|
||||||
//Ignore this message -- client opened connection then changed its mind
|
//Ignore this message -- client opened connection then changed its mind
|
||||||
func (s *Client) Skip(msgpars ...string) (reply *SpamDOut, err error) {
|
func (s *Client) Skip(abort chan bool, msgpars ...string) (reply *SpamDOut, err error) {
|
||||||
return s.simpleCall(SKIP, msgpars)
|
return s.simpleCall(abort, SKIP, msgpars)
|
||||||
}
|
}
|
||||||
|
|
||||||
//Check if message is spam or not, and return score plus list of symbols hit
|
//Check if message is spam or not, and return score plus list of symbols hit
|
||||||
func (s *Client) Symbols(msgpars ...string) (reply *SpamDOut, err error) {
|
func (s *Client) Symbols(abort chan bool, msgpars ...string) (reply *SpamDOut, err error) {
|
||||||
return s.simpleCall(SYMBOLS, msgpars)
|
return s.simpleCall(abort, SYMBOLS, msgpars)
|
||||||
}
|
}
|
||||||
|
|
||||||
//Check if message is spam or not, and return score plus report
|
//Check if message is spam or not, and return score plus report
|
||||||
func (s *Client) Report(msgpars ...string) (reply *SpamDOut, err error) {
|
func (s *Client) Report(abort chan bool, msgpars ...string) (reply *SpamDOut, err error) {
|
||||||
return s.simpleCall(REPORT, msgpars)
|
return s.simpleCall(abort, REPORT, msgpars)
|
||||||
}
|
}
|
||||||
|
|
||||||
//Check if message is spam or not, and return score plus report
|
//Check if message is spam or not, and return score plus report
|
||||||
func (s *Client) ReportIgnoreWarning(msgpars ...string) (reply *SpamDOut, err error) {
|
func (s *Client) ReportIgnoreWarning(abort chan bool, msgpars ...string) (reply *SpamDOut, err error) {
|
||||||
return s.simpleCall(REPORT_IGNOREWARNING, msgpars)
|
return s.simpleCall(abort, REPORT_IGNOREWARNING, msgpars)
|
||||||
}
|
}
|
||||||
|
|
||||||
//Check if message is spam or not, and return score plus report if the message is spam
|
//Check if message is spam or not, and return score plus report if the message is spam
|
||||||
func (s *Client) ReportIfSpam(msgpars ...string) (reply *SpamDOut, err error) {
|
func (s *Client) ReportIfSpam(abort chan bool, msgpars ...string) (reply *SpamDOut, err error) {
|
||||||
return s.simpleCall(REPORT_IFSPAM, msgpars)
|
return s.simpleCall(abort, REPORT_IFSPAM, msgpars)
|
||||||
}
|
}
|
||||||
|
|
||||||
//Process this message and return a modified message - on deloy
|
//Process this message and return a modified message - on deloy
|
||||||
func (s *Client) Process(msgpars ...string) (reply *SpamDOut, err error) {
|
func (s *Client) Process(abort chan bool, msgpars ...string) (reply *SpamDOut, err error) {
|
||||||
return s.simpleCall(PROCESS, msgpars)
|
return s.simpleCall(abort, PROCESS, msgpars)
|
||||||
}
|
}
|
||||||
|
|
||||||
//Same as PROCESS, but return only modified headers, not body (new in protocol 1.4)
|
//Same as PROCESS, but return only modified headers, not body (new in protocol 1.4)
|
||||||
func (s *Client) Headers(msgpars ...string) (reply *SpamDOut, err error) {
|
func (s *Client) Headers(abort chan bool, msgpars ...string) (reply *SpamDOut, err error) {
|
||||||
return s.simpleCall(HEADERS, msgpars)
|
return s.simpleCall(abort, HEADERS, msgpars)
|
||||||
}
|
}
|
||||||
|
|
||||||
//Sign the message as spam
|
//Sign the message as spam
|
||||||
func (s *Client) ReportingSpam(msgpars ...string) (reply *SpamDOut, err error) {
|
func (s *Client) ReportingSpam(abort chan bool, msgpars ...string) (reply *SpamDOut, err error) {
|
||||||
headers := map[string]string{
|
headers := map[string]string{
|
||||||
"Message-class": "spam",
|
"Message-class": "spam",
|
||||||
"Set": "local,remote",
|
"Set": "local,remote",
|
||||||
}
|
}
|
||||||
return s.Tell(msgpars, &headers)
|
return s.Tell(abort, msgpars, &headers)
|
||||||
}
|
}
|
||||||
|
|
||||||
//Sign the message as false-positive
|
//Sign the message as false-positive
|
||||||
func (s *Client) RevokeSpam(msgpars ...string) (reply *SpamDOut, err error) {
|
func (s *Client) RevokeSpam(abort chan bool, msgpars ...string) (reply *SpamDOut, err error) {
|
||||||
headers := map[string]string{
|
headers := map[string]string{
|
||||||
"Message-class": "ham",
|
"Message-class": "ham",
|
||||||
"Set": "local,remote",
|
"Set": "local,remote",
|
||||||
}
|
}
|
||||||
return s.Tell(msgpars, &headers)
|
return s.Tell(abort, msgpars, &headers)
|
||||||
}
|
}
|
||||||
|
|
||||||
//Learn if a message is spam or not
|
//Learn if a message is spam or not
|
||||||
func (s *Client) Learn(learnType string, msgpars ...string) (reply *SpamDOut, err error) {
|
func (s *Client) Learn(abort chan bool, learnType string, msgpars ...string) (reply *SpamDOut, err error) {
|
||||||
headers := make(map[string]string)
|
headers := make(map[string]string)
|
||||||
switch strings.ToUpper(learnType) {
|
switch strings.ToUpper(learnType) {
|
||||||
case LEARN_SPAM:
|
case LEARN_SPAM:
|
||||||
@@ -194,12 +195,12 @@ func (s *Client) Learn(learnType string, msgpars ...string) (reply *SpamDOut, er
|
|||||||
err = errors.New("Learn Type Not Found")
|
err = errors.New("Learn Type Not Found")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
return s.Tell(msgpars, &headers)
|
return s.Tell(abort, msgpars, &headers)
|
||||||
}
|
}
|
||||||
|
|
||||||
//wrapper to simple calls
|
//wrapper to simple calls
|
||||||
func (s *Client) simpleCall(cmd string, msgpars []string) (reply *SpamDOut, err error) {
|
func (s *Client) simpleCall(abort chan bool, cmd string, msgpars []string) (reply *SpamDOut, err error) {
|
||||||
return s.call(cmd, msgpars, func(data *bufio.Reader) (r *SpamDOut, e error) {
|
return s.call(abort, cmd, msgpars, func(data *bufio.Reader) (r *SpamDOut, e error) {
|
||||||
r, e = processResponse(cmd, data)
|
r, e = processResponse(cmd, data)
|
||||||
if r.Code == EX_OK {
|
if r.Code == EX_OK {
|
||||||
e = nil
|
e = nil
|
||||||
@@ -209,15 +210,15 @@ func (s *Client) simpleCall(cmd string, msgpars []string) (reply *SpamDOut, err
|
|||||||
}
|
}
|
||||||
|
|
||||||
//external wrapper to simple call
|
//external wrapper to simple call
|
||||||
func (s *Client) SimpleCall(cmd string, msgpars ...string) (reply *SpamDOut, err error) {
|
func (s *Client) SimpleCall(abort chan bool, cmd string, msgpars ...string) (reply *SpamDOut, err error) {
|
||||||
return s.simpleCall(strings.ToUpper(cmd), msgpars)
|
return s.simpleCall(abort, strings.ToUpper(cmd), msgpars)
|
||||||
}
|
}
|
||||||
|
|
||||||
//Tell what type of we are to process and what should be done
|
//Tell what type of we are to process and what should be done
|
||||||
//with that message. This includes setting or removing a local
|
//with that message. This includes setting or removing a local
|
||||||
//or a remote database (learning, reporting, forgetting, revoking)
|
//or a remote database (learning, reporting, forgetting, revoking)
|
||||||
func (s *Client) Tell(msgpars []string, headers *map[string]string) (reply *SpamDOut, err error) {
|
func (s *Client) Tell(abort chan bool, msgpars []string, headers *map[string]string) (reply *SpamDOut, err error) {
|
||||||
return s.call(TELL, msgpars, func(data *bufio.Reader) (r *SpamDOut, e error) {
|
return s.call(abort, TELL, msgpars, func(data *bufio.Reader) (r *SpamDOut, e error) {
|
||||||
r, e = processResponse(TELL, data)
|
r, e = processResponse(TELL, data)
|
||||||
|
|
||||||
if r.Code == EX_UNAVAILABLE {
|
if r.Code == EX_UNAVAILABLE {
|
||||||
@@ -233,7 +234,7 @@ func (s *Client) Tell(msgpars []string, headers *map[string]string) (reply *Spam
|
|||||||
}
|
}
|
||||||
|
|
||||||
//here a TCP socket is created to call SPAMD
|
//here a TCP socket is created to call SPAMD
|
||||||
func (s *Client) call(cmd string, msgpars []string, onData FnCallback, extraHeaders *map[string]string) (reply *SpamDOut, err error) {
|
func (s *Client) call(abort chan bool, cmd string, msgpars []string, onData FnCallback, extraHeaders *map[string]string) (reply *SpamDOut, err error) {
|
||||||
|
|
||||||
if extraHeaders == nil {
|
if extraHeaders == nil {
|
||||||
extraHeaders = &map[string]string{}
|
extraHeaders = &map[string]string{}
|
||||||
@@ -277,6 +278,24 @@ func (s *Client) call(cmd string, msgpars []string, onData FnCallback, extraHead
|
|||||||
err = errors.New("Connection to spamd Timed Out:" + errTimeout.Error())
|
err = errors.New("Connection to spamd Timed Out:" + errTimeout.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var aborted struct {
|
||||||
|
sync.Mutex
|
||||||
|
aborted bool
|
||||||
|
}
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
_, allowRunning := <-abort
|
||||||
|
if !allowRunning {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
stream.Close()
|
||||||
|
|
||||||
|
aborted.Lock()
|
||||||
|
defer aborted.Unlock()
|
||||||
|
aborted.aborted = true
|
||||||
|
}()
|
||||||
defer stream.Close()
|
defer stream.Close()
|
||||||
|
|
||||||
// Create Command to Send to spamd
|
// Create Command to Send to spamd
|
||||||
@@ -298,6 +317,12 @@ func (s *Client) call(cmd string, msgpars []string, onData FnCallback, extraHead
|
|||||||
|
|
||||||
// Execute onData callback throwing the buffer like parameter
|
// Execute onData callback throwing the buffer like parameter
|
||||||
reply, err = onData(bufio.NewReader(stream))
|
reply, err = onData(bufio.NewReader(stream))
|
||||||
|
|
||||||
|
aborted.Lock()
|
||||||
|
defer aborted.Unlock()
|
||||||
|
if aborted.aborted {
|
||||||
|
err = errors.New("Command execution was aborted")
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user