1
0
mirror of https://blitiri.com.ar/repos/chasquid synced 2025-12-17 14:37:02 +00:00
Files
go-chasquid-smtp/test/util/chamuyero
Alberto Bertogli 61d2961ee9 test: Add a new integration test with minor dialogs
This patch adds a new integration test, which executes various small
dialogs, to cover corner cases that are not well covered (according to
our coverage report).

For example, "EHLO" without domain, or invalid DATA.

While we could do them via Go tests, this way is more realistic, and the
tests are easier to write.
2018-03-02 19:37:37 +00:00

245 lines
7.3 KiB
Python
Executable File

#!/usr/bin/env python3
"""
chamuyero is a tool to test and validate line-oriented commands and servers.
It can launch and communicate with other processes, and follow a script of
line-oriented request-response, validating the dialog as it goes along.
This can be used to test line-oriented network protocols (such as SMTP) or
interactive command-line tools.
"""
import argparse
import os
import re
import ssl
import socket
import subprocess
import sys
import threading
import time
# Command-line flags.
ap = argparse.ArgumentParser()
ap.add_argument("script", type=argparse.FileType('r', encoding='UTF-8'))
args = ap.parse_args()
class Process (object):
def __init__(self, cmd, **kwargs):
self.cmd = subprocess.Popen(cmd, **kwargs)
def write(self, s):
self.cmd.stdin.write(s)
def readline(self):
return self.cmd.stdout.readline()
def wait(self):
return self.cmd.wait()
class Sock (object):
"""A (generic) socket.
This class implements the common code for socket support.
Subclasses will implement the behaviour specific to different socket
types.
"""
def __init__(self, addr):
self.addr = addr
self.sock = NotImplemented
self.connr = None
self.connw = None
self.has_conn = threading.Event()
def listen(self):
self.sock.bind(self.addr)
self.sock.listen(1)
threading.Thread(target=self._accept).start()
def _accept(self):
conn, _ = self.sock.accept()
self.connr = conn.makefile(mode="r")
self.connw = conn.makefile(mode="w")
self.has_conn.set()
def write(self, s):
self.has_conn.wait()
self.connw.write(s)
self.connw.flush()
def readline(self):
self.has_conn.wait()
return self.connr.readline()
class UnixSock (Sock):
def __init__(self, addr):
Sock.__init__(self, addr)
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
def listen(self):
if os.path.exists(self.addr):
os.remove(self.addr)
Sock.listen(self)
class TCPSock (Sock):
def __init__(self, addr):
host, port = addr.rsplit(":", 1)
Sock.__init__(self, (host, int(port)))
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
def connect(self):
self.sock = socket.create_connection(self.addr)
self.connr = self.sock.makefile(mode="r")
self.connw = self.sock.makefile(mode="w")
self.has_conn.set()
class TLSSock (Sock):
def __init__(self, addr):
host, port = addr.rsplit(":", 1)
Sock.__init__(self, (host, int(port)))
plain_sock = socket.create_connection(self.addr)
self.sock = ssl.wrap_socket(plain_sock)
def connect(self):
self.connr = self.sock.makefile(mode="r")
self.connw = self.sock.makefile(mode="w")
self.has_conn.set()
class Interpreter (object):
"""Interpreter for chamuyero scripts."""
def __init__(self):
# Processes and sockets we have spawn. Indexed by the id provided by
# the user.
self.procs = {}
# Line number we are processing.
self.nline = 0
def syntax_error(self, msg):
raise SyntaxError("Error in line %d: %s" % (self.nline, msg))
def runtime_error(self, msg):
raise RuntimeError("Error in line %d: %s" % (self.nline, msg))
def run(self, fd):
"""Main processing loop."""
cont_l = ""
for l in fd:
self.nline += 1
# Remove rightmost \n.
l = l[:-1]
# Continuations with \.
if cont_l:
l = cont_l + " " + l.lstrip()
if l.endswith("\\"):
cont_l = l[:-1]
continue
else:
cont_l = ""
# Comments start with a "#".
if l.strip().startswith("#") or l.strip() == "":
continue
print(l)
# Everything else is of the form:
# <proc> <op> [params]
sp = l.split(None, 2)
if len(sp) == 3:
proc, op, params = sp
else:
proc, op = sp
params = ""
# = Launch a process.
if op == "=":
cmd = Process(params, shell=True, universal_newlines=True,
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
self.procs[proc] = cmd
# |= Launch a process, do not capture stdout.
elif op == "|=":
cmd = Process(params, shell=True, stdin=subprocess.PIPE)
self.procs[proc] = cmd
# unix_listen Listen on an UNIX socket.
elif op == "unix_listen":
sock = UnixSock(params)
sock.listen()
self.procs[proc] = sock
# tcp_listen Listen on a TCP socket.
elif op == "tcp_listen":
sock = TCPSock(params)
sock.listen()
self.procs[proc] = sock
elif op == "tcp_connect":
sock = TCPSock(params)
sock.connect()
self.procs[proc] = sock
elif op == "tls_connect":
sock = TLSSock(params)
sock.connect()
self.procs[proc] = sock
# -> Send to a process stdin, with a \n at the end.
# .> Send to a process stdin, no \n at the end.
elif op == "->":
self.procs[proc].write(params + "\n")
elif op == ".>":
self.procs[proc].write(params)
# <- Read from the process, expect matching input.
# <~ Read from the process, match input using regexp.
# <... Read many lines until one matches.
elif op == "<-":
read = self.procs[proc].readline()
if read != params + "\n":
self.runtime_error("data different that expected:\n"
+ " expected: %s\n" % repr(params)
+ " got: %s" % repr(read))
elif op == "<~":
read = self.procs[proc].readline()
m = re.match(params, read)
if m is None:
self.runtime_error("data did not match regexp:\n"
+ " regexp: %s\n" % repr(params)
+ " got: %s" % repr(read))
elif op == "<...":
while True:
read = self.procs[proc].readline()
m = re.match(params, read)
if m:
break
# sleep Sleep this number of seconds (process-independent).
elif op == "sleep":
time.sleep(float(params))
# wait Wait for the process to exit (with the given code).
elif op == "wait":
retcode = self.procs[proc].wait()
if params and retcode != int(params):
self.runtime_error("return code did not match:\n"
+ " expected %s, got %d" % (params, retcode))
else:
self.syntax_error("unknown syntax")
if __name__ == "__main__":
i = Interpreter()
i.run(args.script)