1
0
mirror of https://blitiri.com.ar/repos/chasquid synced 2025-12-17 14:37:02 +00:00

Auto-format the Python scripts

We have a few Python scripts which over the years ended up with a
variety of formatting.

This patch auto-formats them using `black -l 79` to make them more
uniform, and easier to read and write.
This commit is contained in:
Alberto Bertogli
2025-04-11 14:05:06 +01:00
parent b65ec36916
commit 1cf24ba94a
3 changed files with 104 additions and 85 deletions

View File

@@ -21,15 +21,15 @@ import time
# Command-line flags.
ap = argparse.ArgumentParser()
ap.add_argument("script", type=argparse.FileType('r', encoding='utf8'))
ap.add_argument("script", type=argparse.FileType("r", encoding="utf8"))
args = ap.parse_args()
# Make sure stdout is open in utf8 mode, as we will print our input, which is
# utf8, and want it to work regardless of the environment.
sys.stdout = open(sys.stdout.fileno(), mode='w', encoding='utf8', buffering=1)
sys.stdout = open(sys.stdout.fileno(), mode="w", encoding="utf8", buffering=1)
class Process (object):
class Process(object):
def __init__(self, cmd, **kwargs):
self.cmd = subprocess.Popen(cmd, **kwargs)
@@ -45,13 +45,15 @@ class Process (object):
def close(self):
return self.cmd.stdin.close()
class Sock (object):
class Sock(object):
"""A (generic) socket.
This class implements the common code for socket support.
Subclasses will implement the behaviour specific to different socket
types.
"""
def __init__(self, addr):
self.addr = addr
self.sock = NotImplemented
@@ -84,7 +86,8 @@ class Sock (object):
self.connw.close()
self.sock.close()
class UnixSock (Sock):
class UnixSock(Sock):
def __init__(self, addr):
Sock.__init__(self, addr)
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
@@ -95,7 +98,7 @@ class UnixSock (Sock):
Sock.listen(self)
class TCPSock (Sock):
class TCPSock(Sock):
def __init__(self, addr):
host, port = addr.rsplit(":", 1)
Sock.__init__(self, (host, int(port)))
@@ -108,7 +111,7 @@ class TCPSock (Sock):
self.has_conn.set()
class TLSSock (Sock):
class TLSSock(Sock):
def __init__(self, addr):
host, port = addr.rsplit(":", 1)
Sock.__init__(self, (host, int(port)))
@@ -125,8 +128,9 @@ class TLSSock (Sock):
self.has_conn.set()
class Interpreter (object):
class Interpreter(object):
"""Interpreter for chamuyero scripts."""
def __init__(self):
# Processes and sockets we have spawn. Indexed by the id provided by
# the user.
@@ -176,9 +180,14 @@ class Interpreter (object):
# = Launch a process.
if op == "=":
cmd = Process(params, shell=True, universal_newlines=True,
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
cmd = Process(
params,
shell=True,
universal_newlines=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
self.procs[proc] = cmd
# |= Launch a process, do not capture stdout.
@@ -224,16 +233,20 @@ class Interpreter (object):
elif op == "<-":
read = self.procs[proc].readline()
if read != params + "\n":
self.runtime_error("data different that expected:\n"
+ " expected: %s\n" % repr(params)
+ " got: %s" % repr(read))
self.runtime_error(
"data different that expected:\n"
+ " expected: %s\n" % repr(params)
+ " got: %s" % repr(read)
)
elif op == "<~":
read = self.procs[proc].readline()
m = re.match(params, read)
if m is None:
self.runtime_error("data did not match regexp:\n"
+ " regexp: %s\n" % repr(params)
+ " got: %s" % repr(read))
self.runtime_error(
"data did not match regexp:\n"
+ " regexp: %s\n" % repr(params)
+ " got: %s" % repr(read)
)
elif op == "<...":
while True:
read = self.procs[proc].readline()
@@ -249,8 +262,10 @@ class Interpreter (object):
elif op == "wait":
retcode = self.procs[proc].wait()
if params and retcode != int(params):
self.runtime_error("return code did not match:\n"
+ " expected %s, got %d" % (params, retcode))
self.runtime_error(
"return code did not match:\n"
+ " expected %s, got %d" % (params, retcode)
)
# close Close the process.
elif op == "close":

View File

@@ -24,11 +24,11 @@ def flexible_eq(expected, got):
if posG >= len(got):
return False
if c == '?':
if c == "?":
posG += 1
continue
if c == '*':
while posG < len(got) and got[posG] != '\t':
if c == "*":
while posG < len(got) and got[posG] != "\t":
posG += 1
continue
continue
@@ -46,58 +46,60 @@ def flexible_eq(expected, got):
def msg_equals(expected, msg):
"""Compare two messages recursively, using flexible_eq()."""
diff = False
for h, val in expected.items():
if h not in msg:
print("Header missing: %r" % h)
diff = True
continue
"""Compare two messages recursively, using flexible_eq()."""
diff = False
for h, val in expected.items():
if h not in msg:
print("Header missing: %r" % h)
diff = True
continue
if expected[h] == '*':
continue
if expected[h] == "*":
continue
if not flexible_eq(val, msg[h]):
print("Header %r differs:" % h)
print("Exp: %r" % val)
print("Got: %r" % msg[h])
diff = True
if not flexible_eq(val, msg[h]):
print("Header %r differs:" % h)
print("Exp: %r" % val)
print("Got: %r" % msg[h])
diff = True
if diff:
return False
if diff:
return False
if expected.is_multipart() != msg.is_multipart():
print("Multipart differs, expected %s, got %s" % (
expected.is_multipart(), msg.is_multipart()))
return False
if expected.is_multipart() != msg.is_multipart():
print(
"Multipart differs, expected %s, got %s"
% (expected.is_multipart(), msg.is_multipart())
)
return False
if expected.is_multipart():
for exp, got in itertools.zip_longest(expected.get_payload(), msg.get_payload()):
if not msg_equals(exp, got):
return False
else:
if not flexible_eq(expected.get_payload(), msg.get_payload()):
exp = expected.get_payload().splitlines()
got = msg.get_payload().splitlines()
print("Payload differs:")
for l in difflib.ndiff(exp, got):
print(l)
return False
if expected.is_multipart():
for exp, got in itertools.zip_longest(
expected.get_payload(), msg.get_payload()
):
if not msg_equals(exp, got):
return False
else:
if not flexible_eq(expected.get_payload(), msg.get_payload()):
exp = expected.get_payload().splitlines()
got = msg.get_payload().splitlines()
print("Payload differs:")
for l in difflib.ndiff(exp, got):
print(l)
return False
return True
return True
if __name__ == "__main__":
f1, f2 = sys.argv[1:3]
f1, f2 = sys.argv[1:3]
# We use a custom strict policy to do more strict content validation.
policy = email.policy.EmailPolicy(
utf8=True,
linesep="\r\n",
refold_source='none',
raise_on_defect=True)
# We use a custom strict policy to do more strict content validation.
policy = email.policy.EmailPolicy(
utf8=True, linesep="\r\n", refold_source="none", raise_on_defect=True
)
expected = email.parser.Parser(policy=policy).parse(open(f1))
msg = email.parser.Parser(policy=policy).parse(open(f2))
expected = email.parser.Parser(policy=policy).parse(open(f1))
msg = email.parser.Parser(policy=policy).parse(open(f2))
sys.exit(0 if msg_equals(expected, msg) else 1)
sys.exit(0 if msg_equals(expected, msg) else 1)