diff --git a/client.go b/client.go index bfc2d9e..7d8a34f 100644 --- a/client.go +++ b/client.go @@ -9,6 +9,7 @@ import ( "regexp" "strconv" "strings" + "sync" "time" ) @@ -116,70 +117,70 @@ func (s *Client) SetUnixUser(user string) { } // Return a confirmation that spamd is alive. -func (s *Client) Ping() (r *SpamDOut, err error) { - return s.simpleCall(PING, []string{}) +func (s *Client) Ping(abort chan bool) (r *SpamDOut, err error) { + return s.simpleCall(abort, PING, []string{}) } // Just check if the passed message is spam or not and return score -func (s *Client) Check(msgpars ...string) (reply *SpamDOut, err error) { - return s.simpleCall(CHECK, msgpars) +func (s *Client) Check(abort chan bool, msgpars ...string) (reply *SpamDOut, err error) { + return s.simpleCall(abort, CHECK, msgpars) } //Ignore this message -- client opened connection then changed its mind -func (s *Client) Skip(msgpars ...string) (reply *SpamDOut, err error) { - return s.simpleCall(SKIP, msgpars) +func (s *Client) Skip(abort chan bool, msgpars ...string) (reply *SpamDOut, err error) { + return s.simpleCall(abort, SKIP, msgpars) } //Check if message is spam or not, and return score plus list of symbols hit -func (s *Client) Symbols(msgpars ...string) (reply *SpamDOut, err error) { - return s.simpleCall(SYMBOLS, msgpars) +func (s *Client) Symbols(abort chan bool, msgpars ...string) (reply *SpamDOut, err error) { + return s.simpleCall(abort, SYMBOLS, msgpars) } //Check if message is spam or not, and return score plus report -func (s *Client) Report(msgpars ...string) (reply *SpamDOut, err error) { - return s.simpleCall(REPORT, msgpars) +func (s *Client) Report(abort chan bool, msgpars ...string) (reply *SpamDOut, err error) { + return s.simpleCall(abort, REPORT, msgpars) } //Check if message is spam or not, and return score plus report -func (s *Client) ReportIgnoreWarning(msgpars ...string) (reply *SpamDOut, err error) { - return s.simpleCall(REPORT_IGNOREWARNING, msgpars) +func (s *Client) ReportIgnoreWarning(abort chan bool, msgpars ...string) (reply *SpamDOut, err error) { + return s.simpleCall(abort, REPORT_IGNOREWARNING, msgpars) } //Check if message is spam or not, and return score plus report if the message is spam -func (s *Client) ReportIfSpam(msgpars ...string) (reply *SpamDOut, err error) { - return s.simpleCall(REPORT_IFSPAM, msgpars) +func (s *Client) ReportIfSpam(abort chan bool, msgpars ...string) (reply *SpamDOut, err error) { + return s.simpleCall(abort, REPORT_IFSPAM, msgpars) } //Process this message and return a modified message - on deloy -func (s *Client) Process(msgpars ...string) (reply *SpamDOut, err error) { - return s.simpleCall(PROCESS, msgpars) +func (s *Client) Process(abort chan bool, msgpars ...string) (reply *SpamDOut, err error) { + return s.simpleCall(abort, PROCESS, msgpars) } //Same as PROCESS, but return only modified headers, not body (new in protocol 1.4) -func (s *Client) Headers(msgpars ...string) (reply *SpamDOut, err error) { - return s.simpleCall(HEADERS, msgpars) +func (s *Client) Headers(abort chan bool, msgpars ...string) (reply *SpamDOut, err error) { + return s.simpleCall(abort, HEADERS, msgpars) } //Sign the message as spam -func (s *Client) ReportingSpam(msgpars ...string) (reply *SpamDOut, err error) { +func (s *Client) ReportingSpam(abort chan bool, msgpars ...string) (reply *SpamDOut, err error) { headers := map[string]string{ "Message-class": "spam", "Set": "local,remote", } - return s.Tell(msgpars, &headers) + return s.Tell(abort, msgpars, &headers) } //Sign the message as false-positive -func (s *Client) RevokeSpam(msgpars ...string) (reply *SpamDOut, err error) { +func (s *Client) RevokeSpam(abort chan bool, msgpars ...string) (reply *SpamDOut, err error) { headers := map[string]string{ "Message-class": "ham", "Set": "local,remote", } - return s.Tell(msgpars, &headers) + return s.Tell(abort, msgpars, &headers) } //Learn if a message is spam or not -func (s *Client) Learn(learnType string, msgpars ...string) (reply *SpamDOut, err error) { +func (s *Client) Learn(abort chan bool, learnType string, msgpars ...string) (reply *SpamDOut, err error) { headers := make(map[string]string) switch strings.ToUpper(learnType) { case LEARN_SPAM: @@ -194,12 +195,12 @@ func (s *Client) Learn(learnType string, msgpars ...string) (reply *SpamDOut, er err = errors.New("Learn Type Not Found") return } - return s.Tell(msgpars, &headers) + return s.Tell(abort, msgpars, &headers) } //wrapper to simple calls -func (s *Client) simpleCall(cmd string, msgpars []string) (reply *SpamDOut, err error) { - return s.call(cmd, msgpars, func(data *bufio.Reader) (r *SpamDOut, e error) { +func (s *Client) simpleCall(abort chan bool, cmd string, msgpars []string) (reply *SpamDOut, err error) { + return s.call(abort, cmd, msgpars, func(data *bufio.Reader) (r *SpamDOut, e error) { r, e = processResponse(cmd, data) if r.Code == EX_OK { e = nil @@ -209,15 +210,15 @@ func (s *Client) simpleCall(cmd string, msgpars []string) (reply *SpamDOut, err } //external wrapper to simple call -func (s *Client) SimpleCall(cmd string, msgpars ...string) (reply *SpamDOut, err error) { - return s.simpleCall(strings.ToUpper(cmd), msgpars) +func (s *Client) SimpleCall(abort chan bool, cmd string, msgpars ...string) (reply *SpamDOut, err error) { + return s.simpleCall(abort, strings.ToUpper(cmd), msgpars) } //Tell what type of we are to process and what should be done //with that message. This includes setting or removing a local //or a remote database (learning, reporting, forgetting, revoking) -func (s *Client) Tell(msgpars []string, headers *map[string]string) (reply *SpamDOut, err error) { - return s.call(TELL, msgpars, func(data *bufio.Reader) (r *SpamDOut, e error) { +func (s *Client) Tell(abort chan bool, msgpars []string, headers *map[string]string) (reply *SpamDOut, err error) { + return s.call(abort, TELL, msgpars, func(data *bufio.Reader) (r *SpamDOut, e error) { r, e = processResponse(TELL, data) if r.Code == EX_UNAVAILABLE { @@ -233,7 +234,7 @@ func (s *Client) Tell(msgpars []string, headers *map[string]string) (reply *Spam } //here a TCP socket is created to call SPAMD -func (s *Client) call(cmd string, msgpars []string, onData FnCallback, extraHeaders *map[string]string) (reply *SpamDOut, err error) { +func (s *Client) call(abort chan bool, cmd string, msgpars []string, onData FnCallback, extraHeaders *map[string]string) (reply *SpamDOut, err error) { if extraHeaders == nil { extraHeaders = &map[string]string{} @@ -277,6 +278,24 @@ func (s *Client) call(cmd string, msgpars []string, onData FnCallback, extraHead err = errors.New("Connection to spamd Timed Out:" + errTimeout.Error()) return } + + var aborted struct { + sync.Mutex + aborted bool + } + go func() { + for { + _, allowRunning := <-abort + if !allowRunning { + break + } + } + stream.Close() + + aborted.Lock() + defer aborted.Unlock() + aborted.aborted = true + }() defer stream.Close() // Create Command to Send to spamd @@ -298,6 +317,12 @@ func (s *Client) call(cmd string, msgpars []string, onData FnCallback, extraHead // Execute onData callback throwing the buffer like parameter reply, err = onData(bufio.NewReader(stream)) + + aborted.Lock() + defer aborted.Unlock() + if aborted.aborted { + err = errors.New("Command execution was aborted") + } return }