#!/usr/bin/env python3 """ chamuyero is a tool to test and validate line-oriented commands and servers. It can launch and communicate with other processes, and follow a script of line-oriented request-response, validating the dialog as it goes along. This can be used to test line-oriented network protocols (such as SMTP) or interactive command-line tools. """ import argparse import os import re import ssl import socket import subprocess import sys import threading import time # Command-line flags. ap = argparse.ArgumentParser() ap.add_argument("script", type=argparse.FileType('r', encoding='utf8')) args = ap.parse_args() # Make sure stdout is open in utf8 mode, as we will print our input, which is # utf8, and want it to work regardless of the environment. sys.stdout = open(sys.stdout.fileno(), mode='w', encoding='utf8', buffering=1) class Process (object): def __init__(self, cmd, **kwargs): self.cmd = subprocess.Popen(cmd, **kwargs) def write(self, s): self.cmd.stdin.write(s) def readline(self): return self.cmd.stdout.readline() def wait(self): return self.cmd.wait() def close(self): return self.cmd.terminate() class Sock (object): """A (generic) socket. This class implements the common code for socket support. Subclasses will implement the behaviour specific to different socket types. """ def __init__(self, addr): self.addr = addr self.sock = NotImplemented self.connr = None self.connw = None self.has_conn = threading.Event() def listen(self): self.sock.bind(self.addr) self.sock.listen(1) threading.Thread(target=self._accept).start() def _accept(self): conn, _ = self.sock.accept() self.connr = conn.makefile(mode="r", encoding="utf8") self.connw = conn.makefile(mode="w", encoding="utf8") self.has_conn.set() def write(self, s): self.has_conn.wait() self.connw.write(s) self.connw.flush() def readline(self): self.has_conn.wait() return self.connr.readline() def close(self): self.connr.close() self.connw.close() self.sock.close() class UnixSock (Sock): def __init__(self, addr): Sock.__init__(self, addr) self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) def listen(self): if os.path.exists(self.addr): os.remove(self.addr) Sock.listen(self) class TCPSock (Sock): def __init__(self, addr): host, port = addr.rsplit(":", 1) Sock.__init__(self, (host, int(port))) self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) def connect(self): self.sock = socket.create_connection(self.addr) self.connr = self.sock.makefile(mode="r", encoding="utf8") self.connw = self.sock.makefile(mode="w", encoding="utf8") self.has_conn.set() class TLSSock (Sock): def __init__(self, addr): host, port = addr.rsplit(":", 1) Sock.__init__(self, (host, int(port))) plain_sock = socket.create_connection(self.addr) self.sock = ssl.wrap_socket(plain_sock) def connect(self): self.connr = self.sock.makefile(mode="r", encoding="utf8") self.connw = self.sock.makefile(mode="w", encoding="utf8") self.has_conn.set() class Interpreter (object): """Interpreter for chamuyero scripts.""" def __init__(self): # Processes and sockets we have spawn. Indexed by the id provided by # the user. self.procs = {} # Line number we are processing. self.nline = 0 def syntax_error(self, msg): raise SyntaxError("Error in line %d: %s" % (self.nline, msg)) def runtime_error(self, msg): raise RuntimeError("Error in line %d: %s" % (self.nline, msg)) def run(self, fd): """Main processing loop.""" cont_l = "" for l in fd: self.nline += 1 # Remove rightmost \n. l = l[:-1] # Continuations with \. if cont_l: l = cont_l + " " + l.lstrip() if l.endswith("\\"): cont_l = l[:-1] continue else: cont_l = "" # Comments start with a "#". if l.strip().startswith("#") or l.strip() == "": continue print(l) # Everything else is of the form: # [params] sp = l.split(None, 2) if len(sp) == 3: proc, op, params = sp else: proc, op = sp params = "" # = Launch a process. if op == "=": cmd = Process(params, shell=True, universal_newlines=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) self.procs[proc] = cmd # |= Launch a process, do not capture stdout. elif op == "|=": cmd = Process(params, shell=True, stdin=subprocess.PIPE) self.procs[proc] = cmd # unix_listen Listen on an UNIX socket. elif op == "unix_listen": sock = UnixSock(params) sock.listen() self.procs[proc] = sock # tcp_listen Listen on a TCP socket. elif op == "tcp_listen": sock = TCPSock(params) sock.listen() self.procs[proc] = sock elif op == "tcp_connect": sock = TCPSock(params) sock.connect() self.procs[proc] = sock elif op == "tls_connect": sock = TLSSock(params) sock.connect() self.procs[proc] = sock # -> Send to a process stdin, with a \n at the end. # .> Send to a process stdin, no \n at the end. elif op == "->": self.procs[proc].write(params + "\n") elif op == ".>": self.procs[proc].write(params) # <- Read from the process, expect matching input. # <~ Read from the process, match input using regexp. # <... Read many lines until one matches. elif op == "<-": read = self.procs[proc].readline() if read != params + "\n": self.runtime_error("data different that expected:\n" + " expected: %s\n" % repr(params) + " got: %s" % repr(read)) elif op == "<~": read = self.procs[proc].readline() m = re.match(params, read) if m is None: self.runtime_error("data did not match regexp:\n" + " regexp: %s\n" % repr(params) + " got: %s" % repr(read)) elif op == "<...": while True: read = self.procs[proc].readline() m = re.match(params, read) if m: break # sleep Sleep this number of seconds (process-independent). elif op == "sleep": time.sleep(float(params)) # wait Wait for the process to exit (with the given code). elif op == "wait": retcode = self.procs[proc].wait() if params and retcode != int(params): self.runtime_error("return code did not match:\n" + " expected %s, got %d" % (params, retcode)) # close Close the process. elif op == "close": self.procs[proc].close() else: self.syntax_error("unknown syntax") if __name__ == "__main__": i = Interpreter() i.run(args.script)