mirror of
https://blitiri.com.ar/repos/chasquid
synced 2025-12-17 14:37:02 +00:00
test: Add "chamuyero", a tool to test line-oriented I/O
This patch adds "chamuyero", a a tool to test and validate line-oriented commands and servers. It can launch and communicate with other processes, and follow a script of line-oriented request-response, validating the dialog as it goes along. This can be used to test line-oriented network protocols (such as SMTP) or interactive command-line tools. It will be used in follow up patches to test new commands and functionality.
This commit is contained in:
225
test/util/chamuyero
Executable file
225
test/util/chamuyero
Executable file
@@ -0,0 +1,225 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
chamuyero is a tool to test and validate line-oriented commands and servers.
|
||||
|
||||
It can launch and communicate with other processes, and follow a script of
|
||||
line-oriented request-response, validating the dialog as it goes along.
|
||||
|
||||
This can be used to test line-oriented network protocols (such as SMTP) or
|
||||
interactive command-line tools.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import re
|
||||
import socket
|
||||
import subprocess
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
|
||||
# Command-line flags.
|
||||
ap = argparse.ArgumentParser()
|
||||
ap.add_argument("script", type=argparse.FileType('r', encoding='UTF-8'))
|
||||
args = ap.parse_args()
|
||||
|
||||
|
||||
class Process (object):
|
||||
def __init__(self, cmd, **kwargs):
|
||||
self.cmd = subprocess.Popen(cmd, **kwargs)
|
||||
|
||||
def write(self, s):
|
||||
self.cmd.stdin.write(s)
|
||||
|
||||
def readline(self):
|
||||
return self.cmd.stdout.readline()
|
||||
|
||||
def wait(self):
|
||||
return self.cmd.wait()
|
||||
|
||||
|
||||
class Sock (object):
|
||||
"""A (generic) socket.
|
||||
|
||||
This class implements the common code for socket support.
|
||||
Subclasses will implement the behaviour specific to different socket
|
||||
types.
|
||||
"""
|
||||
def __init__(self, addr):
|
||||
self.addr = addr
|
||||
self.sock = NotImplemented
|
||||
self.connr = None
|
||||
self.connw = None
|
||||
self.has_conn = threading.Event()
|
||||
|
||||
def listen(self):
|
||||
self.sock.bind(self.addr)
|
||||
self.sock.listen(1)
|
||||
threading.Thread(target=self._accept).start()
|
||||
|
||||
def _accept(self):
|
||||
conn, _ = self.sock.accept()
|
||||
self.connr = conn.makefile(mode="r")
|
||||
self.connw = conn.makefile(mode="w")
|
||||
self.has_conn.set()
|
||||
|
||||
def write(self, s):
|
||||
self.has_conn.wait()
|
||||
self.connw.write(s)
|
||||
self.connw.flush()
|
||||
|
||||
def readline(self):
|
||||
self.has_conn.wait()
|
||||
return self.connr.readline()
|
||||
|
||||
|
||||
class UnixSock (Sock):
|
||||
def __init__(self, addr):
|
||||
Sock.__init__(self, addr)
|
||||
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||
|
||||
def listen(self):
|
||||
if os.path.exists(self.addr):
|
||||
os.remove(self.addr)
|
||||
Sock.listen(self)
|
||||
|
||||
|
||||
class TCPSock (Sock):
|
||||
def __init__(self, addr):
|
||||
host, port = addr.rsplit(":", 1)
|
||||
Sock.__init__(self, (host, int(port)))
|
||||
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
|
||||
def connect(self):
|
||||
self.sock = socket.create_connection(self.addr)
|
||||
self.connr = self.sock.makefile(mode="r")
|
||||
self.connw = self.sock.makefile(mode="w")
|
||||
self.has_conn.set()
|
||||
|
||||
|
||||
class Interpreter (object):
|
||||
"""Interpreter for chamuyero scripts."""
|
||||
def __init__(self):
|
||||
# Processes and sockets we have spawn. Indexed by the id provided by
|
||||
# the user.
|
||||
self.procs = {}
|
||||
|
||||
# Line number we are processing.
|
||||
self.nline = 0
|
||||
|
||||
def syntax_error(self, msg):
|
||||
raise SyntaxError("Error in line %d: %s" % (self.nline, msg))
|
||||
|
||||
def runtime_error(self, msg):
|
||||
raise RuntimeError("Error in line %d: %s" % (self.nline, msg))
|
||||
|
||||
def run(self, fd):
|
||||
"""Main processing loop."""
|
||||
cont_l = ""
|
||||
for l in fd:
|
||||
self.nline += 1
|
||||
|
||||
# Remove rightmost \n.
|
||||
l = l[:-1]
|
||||
|
||||
# Continuations with \.
|
||||
if cont_l:
|
||||
l = cont_l + " " + l.lstrip()
|
||||
if l.endswith("\\"):
|
||||
cont_l = l[:-1]
|
||||
continue
|
||||
else:
|
||||
cont_l = ""
|
||||
|
||||
# Comments start with a "#".
|
||||
if l.strip().startswith("#") or l.strip() == "":
|
||||
continue
|
||||
|
||||
print(l)
|
||||
|
||||
# Everything else is of the form:
|
||||
# <proc> <op> [params]
|
||||
sp = l.split(None, 2)
|
||||
if len(sp) == 3:
|
||||
proc, op, params = sp
|
||||
else:
|
||||
proc, op = sp
|
||||
params = ""
|
||||
|
||||
# = Launch a process.
|
||||
if op == "=":
|
||||
cmd = Process(params, shell=True, universal_newlines=True,
|
||||
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT)
|
||||
self.procs[proc] = cmd
|
||||
|
||||
# |= Launch a process, do not capture stdout.
|
||||
elif op == "|=":
|
||||
cmd = Process(params, shell=True, stdin=subprocess.PIPE)
|
||||
self.procs[proc] = cmd
|
||||
|
||||
# unix_listen Listen on an UNIX socket.
|
||||
elif op == "unix_listen":
|
||||
sock = UnixSock(params)
|
||||
sock.listen()
|
||||
self.procs[proc] = sock
|
||||
|
||||
# tcp_listen Listen on a TCP socket.
|
||||
elif op == "tcp_listen":
|
||||
sock = TCPSock(params)
|
||||
sock.listen()
|
||||
self.procs[proc] = sock
|
||||
|
||||
elif op == "tcp_connect":
|
||||
sock = TCPSock(params)
|
||||
sock.connect()
|
||||
self.procs[proc] = sock
|
||||
|
||||
# -> Send to a process stdin, with a \n at the end.
|
||||
# .> Send to a process stdin, no \n at the end.
|
||||
elif op == "->":
|
||||
self.procs[proc].write(params + "\n")
|
||||
elif op == ".>":
|
||||
self.procs[proc].write(params)
|
||||
|
||||
# <- Read from the process, expect matching input.
|
||||
# <~ Read from the process, match input using regexp.
|
||||
# <... Read many lines until one matches.
|
||||
elif op == "<-":
|
||||
read = self.procs[proc].readline()
|
||||
if read != params + "\n":
|
||||
self.runtime_error("data different that expected:\n"
|
||||
+ " expected: %s\n" % repr(params)
|
||||
+ " got: %s" % repr(read))
|
||||
elif op == "<~":
|
||||
read = self.procs[proc].readline()
|
||||
m = re.match(params, read)
|
||||
if m is None:
|
||||
self.runtime_error("data did not match regexp:\n"
|
||||
+ " regexp: %s\n" % repr(params)
|
||||
+ " got: %s" % repr(read))
|
||||
elif op == "<...":
|
||||
while True:
|
||||
read = self.procs[proc].readline()
|
||||
m = re.match(params, read)
|
||||
if m:
|
||||
break
|
||||
|
||||
# sleep Sleep this number of seconds (process-independent).
|
||||
elif op == "sleep":
|
||||
time.sleep(float(params))
|
||||
|
||||
# wait Wait for the process to exit (with the given code).
|
||||
elif op == "wait":
|
||||
retcode = self.procs[proc].wait()
|
||||
if params and retcode != int(params):
|
||||
self.runtime_error("return code did not match:\n"
|
||||
+ " expected %s, got %d" % (params, retcode))
|
||||
|
||||
else:
|
||||
self.syntax_error("unknown syntax")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
i = Interpreter()
|
||||
i.run(args.script)
|
||||
@@ -5,11 +5,11 @@ function init() {
|
||||
set -v
|
||||
fi
|
||||
|
||||
export UTILDIR="$( realpath `dirname "${BASH_SOURCE[0]}"` )"
|
||||
|
||||
export TBASE="$(realpath `dirname ${0}`)"
|
||||
cd ${TBASE}
|
||||
|
||||
export UTILDIR="$(realpath ${TBASE}/../util/)"
|
||||
|
||||
if [ "${RACE}" == "1" ]; then
|
||||
RACE="-race"
|
||||
fi
|
||||
@@ -69,6 +69,10 @@ function mail_diff() {
|
||||
${UTILDIR}/mail_diff "$@"
|
||||
}
|
||||
|
||||
function chamuyero() {
|
||||
${UTILDIR}/chamuyero "$@"
|
||||
}
|
||||
|
||||
function success() {
|
||||
echo success
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user