1
0
mirror of https://blitiri.com.ar/repos/chasquid synced 2025-12-16 14:27:01 +00:00
Files
go-chasquid-smtp/test/util/chamuyero
Alberto Bertogli a996106eee smtpsrv: Strict CRLF enforcement in DATA contents
The RFCs are very clear that in DATA contents:

> CR and LF MUST only occur together as CRLF; they MUST NOT appear
> independently in the body.

https://www.rfc-editor.org/rfc/rfc5322#section-2.3
https://www.rfc-editor.org/rfc/rfc5321#section-2.3.8

Allowing "independent" CR and LF can cause a number of problems.

In particular, there is a new "SMTP smuggling attack" published recently
that involves the server incorrectly parsing the end of DATA marker
`\r\n.\r\n`, which an attacker can exploit to impersonate a server when
email is transmitted server-to-server.

https://www.postfix.org/smtp-smuggling.html
https://sec-consult.com/blog/detail/smtp-smuggling-spoofing-e-mails-worldwide/

Currently, chasquid is vulnerable to this attack, because Go's standard
libraries net/textproto and net/mail do not enforce CRLF strictly.

This patch fixes the problem by introducing a new "dot reader" function
that strictly enforces CRLF when reading dot-terminated data, used in
the DATA input processing.

When an invalid newline terminator is found, the connection is aborted
immediately because we cannot safely recover from that state.

We still keep the internal representation as LF-terminated for
convenience and simplicity.

However, the MDA courier is changed to pass CRLF-terminated lines, since
that is an external program which could be strict when receiving email
messages.

See https://github.com/albertito/chasquid/issues/47 for more details and
discussion.
2023-12-24 10:43:27 +00:00

262 lines
8.0 KiB
Python
Executable File

#!/usr/bin/env python3
"""
chamuyero is a tool to test and validate line-oriented commands and servers.
It can launch and communicate with other processes, and follow a script of
line-oriented request-response, validating the dialog as it goes along.
This can be used to test line-oriented network protocols (such as SMTP) or
interactive command-line tools.
"""
import argparse
import os
import re
import ssl
import socket
import subprocess
import sys
import threading
import time
# Command-line flags.
ap = argparse.ArgumentParser()
ap.add_argument("script", type=argparse.FileType('r', encoding='utf8'))
args = ap.parse_args()
# Make sure stdout is open in utf8 mode, as we will print our input, which is
# utf8, and want it to work regardless of the environment.
sys.stdout = open(sys.stdout.fileno(), mode='w', encoding='utf8', buffering=1)
class Process (object):
def __init__(self, cmd, **kwargs):
self.cmd = subprocess.Popen(cmd, **kwargs)
def write(self, s):
self.cmd.stdin.write(s)
def readline(self):
return self.cmd.stdout.readline()
def wait(self):
return self.cmd.wait()
def close(self):
return self.cmd.terminate()
class Sock (object):
"""A (generic) socket.
This class implements the common code for socket support.
Subclasses will implement the behaviour specific to different socket
types.
"""
def __init__(self, addr):
self.addr = addr
self.sock = NotImplemented
self.connr = None
self.connw = None
self.has_conn = threading.Event()
def listen(self):
self.sock.bind(self.addr)
self.sock.listen(1)
threading.Thread(target=self._accept).start()
def _accept(self):
conn, _ = self.sock.accept()
self.connr = conn.makefile(mode="r", encoding="utf8")
self.connw = conn.makefile(mode="w", encoding="utf8")
self.has_conn.set()
def write(self, s):
self.has_conn.wait()
self.connw.write(s)
self.connw.flush()
def readline(self):
self.has_conn.wait()
return self.connr.readline()
def close(self):
self.connr.close()
self.connw.close()
self.sock.close()
class UnixSock (Sock):
def __init__(self, addr):
Sock.__init__(self, addr)
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
def listen(self):
if os.path.exists(self.addr):
os.remove(self.addr)
Sock.listen(self)
class TCPSock (Sock):
def __init__(self, addr):
host, port = addr.rsplit(":", 1)
Sock.__init__(self, (host, int(port)))
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
def connect(self):
self.sock = socket.create_connection(self.addr)
self.connr = self.sock.makefile(mode="r", encoding="utf8")
self.connw = self.sock.makefile(mode="w", encoding="utf8")
self.has_conn.set()
class TLSSock (Sock):
def __init__(self, addr):
host, port = addr.rsplit(":", 1)
Sock.__init__(self, (host, int(port)))
plain_sock = socket.create_connection(self.addr)
self.sock = ssl.wrap_socket(plain_sock)
def connect(self):
self.connr = self.sock.makefile(mode="r", encoding="utf8")
self.connw = self.sock.makefile(mode="w", encoding="utf8")
self.has_conn.set()
class Interpreter (object):
"""Interpreter for chamuyero scripts."""
def __init__(self):
# Processes and sockets we have spawn. Indexed by the id provided by
# the user.
self.procs = {}
# Line number we are processing.
self.nline = 0
def syntax_error(self, msg):
raise SyntaxError("Error in line %d: %s" % (self.nline, msg))
def runtime_error(self, msg):
raise RuntimeError("Error in line %d: %s" % (self.nline, msg))
def run(self, fd):
"""Main processing loop."""
cont_l = ""
for l in fd:
self.nline += 1
# Remove rightmost \n.
l = l[:-1]
# Continuations with \.
if cont_l:
l = cont_l + " " + l.lstrip()
if l.endswith("\\"):
cont_l = l[:-1]
continue
else:
cont_l = ""
# Comments start with a "#".
if l.strip().startswith("#") or l.strip() == "":
continue
print(l)
# Everything else is of the form:
# <proc> <op> [params]
sp = l.split(None, 2)
if len(sp) == 3:
proc, op, params = sp
else:
proc, op = sp
params = ""
# = Launch a process.
if op == "=":
cmd = Process(params, shell=True, universal_newlines=True,
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
self.procs[proc] = cmd
# |= Launch a process, do not capture stdout.
elif op == "|=":
cmd = Process(params, shell=True, stdin=subprocess.PIPE)
self.procs[proc] = cmd
# unix_listen Listen on an UNIX socket.
elif op == "unix_listen":
sock = UnixSock(params)
sock.listen()
self.procs[proc] = sock
# tcp_listen Listen on a TCP socket.
elif op == "tcp_listen":
sock = TCPSock(params)
sock.listen()
self.procs[proc] = sock
elif op == "tcp_connect":
sock = TCPSock(params)
sock.connect()
self.procs[proc] = sock
elif op == "tls_connect":
sock = TLSSock(params)
sock.connect()
self.procs[proc] = sock
# -> Send to a process stdin, with a \r\n at the end.
# .> Send to a process stdin, no \r\n at the end.
# ~> Send to a process stdin, string is python-evaluated.
elif op == "->":
self.procs[proc].write(params + "\r\n")
elif op == ".>":
self.procs[proc].write(params)
elif op == "~>":
self.procs[proc].write(eval(params))
# <- Read from the process, expect matching input.
# <~ Read from the process, match input using regexp.
# <... Read many lines until one matches.
elif op == "<-":
read = self.procs[proc].readline()
if read != params + "\n":
self.runtime_error("data different that expected:\n"
+ " expected: %s\n" % repr(params)
+ " got: %s" % repr(read))
elif op == "<~":
read = self.procs[proc].readline()
m = re.match(params, read)
if m is None:
self.runtime_error("data did not match regexp:\n"
+ " regexp: %s\n" % repr(params)
+ " got: %s" % repr(read))
elif op == "<...":
while True:
read = self.procs[proc].readline()
m = re.match(params, read)
if m:
break
# sleep Sleep this number of seconds (process-independent).
elif op == "sleep":
time.sleep(float(params))
# wait Wait for the process to exit (with the given code).
elif op == "wait":
retcode = self.procs[proc].wait()
if params and retcode != int(params):
self.runtime_error("return code did not match:\n"
+ " expected %s, got %d" % (params, retcode))
# close Close the process.
elif op == "close":
self.procs[proc].close()
else:
self.syntax_error("unknown syntax")
if __name__ == "__main__":
i = Interpreter()
i.run(args.script)