mirror of
https://blitiri.com.ar/repos/chasquid
synced 2025-12-17 14:37:02 +00:00
ssl.wrap_socket() has been deprecated and is no longer functional in Python 3.12: https://docs.python.org/3/whatsnew/3.12.html#ssl. This patch replaces it with the equivalent (in this context) ssl.SSLContext.
266 lines
8.2 KiB
Python
Executable File
266 lines
8.2 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
chamuyero is a tool to test and validate line-oriented commands and servers.
|
|
|
|
It can launch and communicate with other processes, and follow a script of
|
|
line-oriented request-response, validating the dialog as it goes along.
|
|
|
|
This can be used to test line-oriented network protocols (such as SMTP) or
|
|
interactive command-line tools.
|
|
"""
|
|
|
|
import argparse
|
|
import os
|
|
import re
|
|
import ssl
|
|
import socket
|
|
import subprocess
|
|
import sys
|
|
import threading
|
|
import time
|
|
|
|
# Command-line flags.
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("script", type=argparse.FileType('r', encoding='utf8'))
|
|
args = ap.parse_args()
|
|
|
|
# Make sure stdout is open in utf8 mode, as we will print our input, which is
|
|
# utf8, and want it to work regardless of the environment.
|
|
sys.stdout = open(sys.stdout.fileno(), mode='w', encoding='utf8', buffering=1)
|
|
|
|
|
|
class Process (object):
|
|
def __init__(self, cmd, **kwargs):
|
|
self.cmd = subprocess.Popen(cmd, **kwargs)
|
|
|
|
def write(self, s):
|
|
self.cmd.stdin.write(s)
|
|
|
|
def readline(self):
|
|
return self.cmd.stdout.readline()
|
|
|
|
def wait(self):
|
|
return self.cmd.wait()
|
|
|
|
def close(self):
|
|
return self.cmd.stdin.close()
|
|
|
|
class Sock (object):
|
|
"""A (generic) socket.
|
|
|
|
This class implements the common code for socket support.
|
|
Subclasses will implement the behaviour specific to different socket
|
|
types.
|
|
"""
|
|
def __init__(self, addr):
|
|
self.addr = addr
|
|
self.sock = NotImplemented
|
|
self.connr = None
|
|
self.connw = None
|
|
self.has_conn = threading.Event()
|
|
|
|
def listen(self):
|
|
self.sock.bind(self.addr)
|
|
self.sock.listen(1)
|
|
threading.Thread(target=self._accept).start()
|
|
|
|
def _accept(self):
|
|
conn, _ = self.sock.accept()
|
|
self.connr = conn.makefile(mode="r", encoding="utf8")
|
|
self.connw = conn.makefile(mode="w", encoding="utf8")
|
|
self.has_conn.set()
|
|
|
|
def write(self, s):
|
|
self.has_conn.wait()
|
|
self.connw.write(s)
|
|
self.connw.flush()
|
|
|
|
def readline(self):
|
|
self.has_conn.wait()
|
|
return self.connr.readline()
|
|
|
|
def close(self):
|
|
self.connr.close()
|
|
self.connw.close()
|
|
self.sock.close()
|
|
|
|
class UnixSock (Sock):
|
|
def __init__(self, addr):
|
|
Sock.__init__(self, addr)
|
|
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
|
|
def listen(self):
|
|
if os.path.exists(self.addr):
|
|
os.remove(self.addr)
|
|
Sock.listen(self)
|
|
|
|
|
|
class TCPSock (Sock):
|
|
def __init__(self, addr):
|
|
host, port = addr.rsplit(":", 1)
|
|
Sock.__init__(self, (host, int(port)))
|
|
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
|
|
def connect(self):
|
|
self.sock = socket.create_connection(self.addr)
|
|
self.connr = self.sock.makefile(mode="r", encoding="utf8")
|
|
self.connw = self.sock.makefile(mode="w", encoding="utf8")
|
|
self.has_conn.set()
|
|
|
|
|
|
class TLSSock (Sock):
|
|
def __init__(self, addr):
|
|
host, port = addr.rsplit(":", 1)
|
|
Sock.__init__(self, (host, int(port)))
|
|
plain_sock = socket.create_connection(self.addr)
|
|
|
|
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
|
context.check_hostname = False
|
|
context.verify_mode = ssl.CERT_NONE
|
|
self.sock = context.wrap_socket(plain_sock)
|
|
|
|
def connect(self):
|
|
self.connr = self.sock.makefile(mode="r", encoding="utf8")
|
|
self.connw = self.sock.makefile(mode="w", encoding="utf8")
|
|
self.has_conn.set()
|
|
|
|
|
|
class Interpreter (object):
|
|
"""Interpreter for chamuyero scripts."""
|
|
def __init__(self):
|
|
# Processes and sockets we have spawn. Indexed by the id provided by
|
|
# the user.
|
|
self.procs = {}
|
|
|
|
# Line number we are processing.
|
|
self.nline = 0
|
|
|
|
def syntax_error(self, msg):
|
|
raise SyntaxError("Error in line %d: %s" % (self.nline, msg))
|
|
|
|
def runtime_error(self, msg):
|
|
raise RuntimeError("Error in line %d: %s" % (self.nline, msg))
|
|
|
|
def run(self, fd):
|
|
"""Main processing loop."""
|
|
cont_l = ""
|
|
for l in fd:
|
|
self.nline += 1
|
|
|
|
# Remove rightmost \n.
|
|
l = l[:-1]
|
|
|
|
# Continuations with \.
|
|
if cont_l:
|
|
l = cont_l + " " + l.lstrip()
|
|
if l.endswith("\\"):
|
|
cont_l = l[:-1]
|
|
continue
|
|
else:
|
|
cont_l = ""
|
|
|
|
# Comments start with a "#".
|
|
if l.strip().startswith("#") or l.strip() == "":
|
|
continue
|
|
|
|
print(l)
|
|
|
|
# Everything else is of the form:
|
|
# <proc> <op> [params]
|
|
sp = l.split(None, 2)
|
|
if len(sp) == 3:
|
|
proc, op, params = sp
|
|
else:
|
|
proc, op = sp
|
|
params = ""
|
|
|
|
# = Launch a process.
|
|
if op == "=":
|
|
cmd = Process(params, shell=True, universal_newlines=True,
|
|
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT)
|
|
self.procs[proc] = cmd
|
|
|
|
# |= Launch a process, do not capture stdout.
|
|
elif op == "|=":
|
|
cmd = Process(params, shell=True, stdin=subprocess.PIPE)
|
|
self.procs[proc] = cmd
|
|
|
|
# unix_listen Listen on an UNIX socket.
|
|
elif op == "unix_listen":
|
|
sock = UnixSock(params)
|
|
sock.listen()
|
|
self.procs[proc] = sock
|
|
|
|
# tcp_listen Listen on a TCP socket.
|
|
elif op == "tcp_listen":
|
|
sock = TCPSock(params)
|
|
sock.listen()
|
|
self.procs[proc] = sock
|
|
|
|
elif op == "tcp_connect":
|
|
sock = TCPSock(params)
|
|
sock.connect()
|
|
self.procs[proc] = sock
|
|
|
|
elif op == "tls_connect":
|
|
sock = TLSSock(params)
|
|
sock.connect()
|
|
self.procs[proc] = sock
|
|
|
|
# -> Send to a process stdin, with a \r\n at the end.
|
|
# .> Send to a process stdin, no \r\n at the end.
|
|
# ~> Send to a process stdin, string is python-evaluated.
|
|
elif op == "->":
|
|
self.procs[proc].write(params + "\r\n")
|
|
elif op == ".>":
|
|
self.procs[proc].write(params)
|
|
elif op == "~>":
|
|
self.procs[proc].write(eval(params))
|
|
|
|
# <- Read from the process, expect matching input.
|
|
# <~ Read from the process, match input using regexp.
|
|
# <... Read many lines until one matches.
|
|
elif op == "<-":
|
|
read = self.procs[proc].readline()
|
|
if read != params + "\n":
|
|
self.runtime_error("data different that expected:\n"
|
|
+ " expected: %s\n" % repr(params)
|
|
+ " got: %s" % repr(read))
|
|
elif op == "<~":
|
|
read = self.procs[proc].readline()
|
|
m = re.match(params, read)
|
|
if m is None:
|
|
self.runtime_error("data did not match regexp:\n"
|
|
+ " regexp: %s\n" % repr(params)
|
|
+ " got: %s" % repr(read))
|
|
elif op == "<...":
|
|
while True:
|
|
read = self.procs[proc].readline()
|
|
m = re.match(params, read)
|
|
if m:
|
|
break
|
|
|
|
# sleep Sleep this number of seconds (process-independent).
|
|
elif op == "sleep":
|
|
time.sleep(float(params))
|
|
|
|
# wait Wait for the process to exit (with the given code).
|
|
elif op == "wait":
|
|
retcode = self.procs[proc].wait()
|
|
if params and retcode != int(params):
|
|
self.runtime_error("return code did not match:\n"
|
|
+ " expected %s, got %d" % (params, retcode))
|
|
|
|
# close Close the process.
|
|
elif op == "close":
|
|
self.procs[proc].close()
|
|
|
|
else:
|
|
self.syntax_error("unknown syntax")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
i = Interpreter()
|
|
i.run(args.script)
|