diff --git a/src/modules/python/picoweb/__init__.py b/src/modules/python/picoweb/__init__.py index 315de6e..9c23e46 100644 --- a/src/modules/python/picoweb/__init__.py +++ b/src/modules/python/picoweb/__init__.py @@ -28,10 +28,10 @@ def get_mime_type(fname): def sendstream(writer, f): buf = bytearray(64) while True: - l = f.readinto(buf) - if not l: + if l := f.readinto(buf): + yield from writer.awrite(buf, 0, l) + else: break - yield from writer.awrite(buf, 0, l) def jsonify(writer, dict): @@ -47,7 +47,7 @@ def start_response(writer, content_type="text/html", status="200", headers=None) yield from writer.awrite("\r\n\r\n") return yield from writer.awrite("\r\n") - if isinstance(headers, bytes) or isinstance(headers, str): + if isinstance(headers, (bytes, str)): yield from writer.awrite(headers) else: for k, v in headers.items(): @@ -81,14 +81,8 @@ def parse_qs(self): class WebApp: def __init__(self, pkg, routes=None, serve_static=True): - if routes: - self.url_map = routes - else: - self.url_map = [] - if pkg and pkg != "__main__": - self.pkg = pkg.split(".", 1)[0] - else: - self.pkg = None + self.url_map = routes or [] + self.pkg = pkg.split(".", 1)[0] if pkg and pkg != "__main__" else None if serve_static: self.url_map.append((re.compile("^/(static/.+)"), self.handle_static)) self.mounts = [] @@ -117,7 +111,7 @@ def _handle(self, reader, writer): request_line = yield from reader.readline() if request_line == b"": if self.debug >= 0: - self.log.error("%s: EOF on request start" % reader) + self.log.error(f"{reader}: EOF on request start") yield from writer.aclose() return req = HTTPRequest() @@ -127,9 +121,7 @@ def _handle(self, reader, writer): if self.debug >= 0: self.log.info('%.3f %s %s "%s %s"' % (utime.time(), req, writer, method, path)) path = path.split("?", 1) - qs = "" - if len(path) > 1: - qs = path[1] + qs = path[1] if len(path) > 1 else "" path = path[0] #print("================") @@ -148,7 +140,7 @@ def _handle(self, reader, writer): found = True path = path[len(root):] if not path.startswith("/"): - path = "/" + path + path = f"/{path}" break if not found: break @@ -170,22 +162,12 @@ def _handle(self, reader, writer): found = True break elif not isinstance(pattern, str): - # Anything which is non-string assumed to be a ducktype - # pattern matcher, whose .match() method is called. (Note: - # Django uses .search() instead, but .match() is more - # efficient and we're not exactly compatible with Django - # URL matching anyway.) - m = pattern.match(path) - if m: + if m := pattern.match(path): req.url_match = m found = True break - if not found: - headers_mode = "skip" - else: - headers_mode = extra.get("headers", self.headers_mode) - + headers_mode = extra.get("headers", self.headers_mode) if found else "skip" if headers_mode == "skip": while True: l = yield from reader.readline() @@ -205,7 +187,7 @@ def _handle(self, reader, writer): else: yield from start_response(writer, status="404") yield from writer.awrite("404\r\n") - #print(req, "After response write") + #print(req, "After response write") except Exception as e: if self.debug >= 0: self.log.exc(e, "%.3f %s %s %r" % (utime.time(), req, writer, e)) @@ -294,7 +276,7 @@ def run(self, host="127.0.0.1", port=8081, debug=False, lazy_init=False, log=Non app.init() loop = asyncio.get_event_loop() if debug > 0: - print("* Running on http://%s:%s/" % (host, port)) + print(f"* Running on http://{host}:{port}/") loop.create_task(asyncio.start_server(self._handle, host, port)) loop.run_forever() loop.close() diff --git a/src/modules/python/pkg_resources.py b/src/modules/python/pkg_resources.py index 9ab28e9..1e2f0ea 100644 --- a/src/modules/python/pkg_resources.py +++ b/src/modules/python/pkg_resources.py @@ -6,7 +6,7 @@ def resource_stream(package, resource): if package not in c: try: if package: - p = __import__(package + ".R", None, None, True) + p = __import__(f"{package}.R", None, None, True) else: p = __import__("R") c[package] = p.R @@ -19,7 +19,7 @@ def resource_stream(package, resource): # if d[0] != "/": # import uos # d = uos.getcwd() + "/" + d - c[package] = d + "/" + c[package] = f"{d}/" p = c[package] if isinstance(p, dict): diff --git a/src/modules/python/uasyncio/__init__.py b/src/modules/python/uasyncio/__init__.py index 41fa572..d83aa4c 100644 --- a/src/modules/python/uasyncio/__init__.py +++ b/src/modules/python/uasyncio/__init__.py @@ -65,12 +65,7 @@ def remove_writer(self, sock): def wait(self, delay): if DEBUG and __debug__: log.debug("poll.wait(%d)", delay) - # We need one-shot behavior (second arg of 1 to .poll()) - res = self.poller.ipoll(delay, 1) - #log.debug("poll result: %s", res) - # Remove "if res" workaround after - # https://github.com/micropython/micropython/issues/2716 fixed. - if res: + if res := self.poller.ipoll(delay, 1): for sock, ev in res: cb = self.objmap[id(sock)] if ev & (select.POLLHUP | select.POLLERR): diff --git a/src/modules/python/uasyncio/core.py b/src/modules/python/uasyncio/core.py index 77fdb7a..ebebbf9 100644 --- a/src/modules/python/uasyncio/core.py +++ b/src/modules/python/uasyncio/core.py @@ -105,10 +105,7 @@ def run_forever(self): self.cur_task = cb delay = 0 try: - if args is (): - ret = next(cb) - else: - ret = cb.send(*args) + ret = next(cb) if args is () else cb.send(*args) if __debug__ and DEBUG: log.info("Coroutine %s yield result: %s", cb, ret) if isinstance(ret, SysCall1): @@ -168,8 +165,7 @@ def run_forever(self): tnow = self.time() t = self.waitq.peektime() delay = time.ticks_diff(t, tnow) - if delay < 0: - delay = 0 + delay = max(delay, 0) self.wait(delay) def run_until_complete(self, coro): diff --git a/src/modules/python/ulogging.py b/src/modules/python/ulogging.py index cea2de0..98cbaf1 100644 --- a/src/modules/python/ulogging.py +++ b/src/modules/python/ulogging.py @@ -26,9 +26,7 @@ def __init__(self, name): def _level_str(self, level): l = _level_dict.get(level) - if l is not None: - return l - return "LVL%s" % level + return l if l is not None else f"LVL{level}" def setLevel(self, level): self.level = level @@ -38,7 +36,7 @@ def isEnabledFor(self, level): def log(self, level, msg, *args): if level >= (self.level or _level): - _stream.write("%s:%s:" % (self._level_str(level), self.name)) + _stream.write(f"{self._level_str(level)}:{self.name}:") if not args: print(msg, file=_stream) else: diff --git a/src/modules/python/utemplate/compiled.py b/src/modules/python/utemplate/compiled.py index 82237a4..7465e34 100644 --- a/src/modules/python/utemplate/compiled.py +++ b/src/modules/python/utemplate/compiled.py @@ -1,12 +1,9 @@ class Loader: def __init__(self, pkg, dir): - if dir == ".": - dir = "" - else: - dir = dir.replace("/", ".") + "." + dir = "" if dir == "." else dir.replace("/", ".") + "." if pkg and pkg != "__main__": - dir = pkg + "." + dir + dir = f"{pkg}.{dir}" self.p = dir def load(self, name): diff --git a/src/modules/python/utemplate/source.py b/src/modules/python/utemplate/source.py index a9948a1..a0761be 100644 --- a/src/modules/python/utemplate/source.py +++ b/src/modules/python/utemplate/source.py @@ -47,15 +47,12 @@ def close_literal(self): def render_expr(self, e): self.indent() - self.file_out.write('yield str(' + e + ')\n') + self.file_out.write(f'yield str({e}' + ')\n') def parse_statement(self, stmt): tokens = stmt.split(None, 1) if tokens[0] == "args": - if len(tokens) > 1: - self.args = tokens[1] - else: - self.args = "" + self.args = tokens[1] if len(tokens) > 1 else "" elif tokens[0] == "set": self.indent() self.file_out.write(stmt[3:].strip() + "\n") @@ -64,9 +61,7 @@ def parse_statement(self, stmt): # If there was no other output, we still need a header now self.indent() tokens = tokens[1].split(None, 1) - args = "" - if len(tokens) > 1: - args = tokens[1] + args = tokens[1] if len(tokens) > 1 else "" if tokens[0][0] == "{": self.indent() # "1" as fromlist param is uPy hack @@ -91,16 +86,15 @@ def parse_statement(self, stmt): self.indent() self.file_out.write(stmt + ":\n") self.stack.append(tokens[0]) + elif stmt.startswith("end"): + assert self.stack[-1] == stmt[3:] + self.stack.pop(-1) + elif stmt == "else": + assert self.stack[-1] == "if" + self.indent(-1) + self.file_out.write("else:\n") else: - if stmt.startswith("end"): - assert self.stack[-1] == stmt[3:] - self.stack.pop(-1) - elif stmt == "else": - assert self.stack[-1] == "if" - self.indent(-1) - self.file_out.write("else:\n") - else: - assert False + assert False def parse_line(self, l): while l: @@ -158,12 +152,7 @@ def __init__(self, pkg, dir): self.pkg_path = "" if pkg: p = __import__(pkg) - if isinstance(p.__path__, str): - # uPy - self.pkg_path = p.__path__ - else: - # CPy - self.pkg_path = p.__path__[0] + self.pkg_path = p.__path__ if isinstance(p.__path__, str) else p.__path__[0] self.pkg_path += "/" def input_open(self, template): @@ -171,7 +160,7 @@ def input_open(self, template): return open(path) def compiled_path(self, template): - return self.dir + "/" + template.replace(".", "_") + ".py" + return f"{self.dir}/" + template.replace(".", "_") + ".py" def load(self, name): try: @@ -182,9 +171,8 @@ def load(self, name): compiled_path = self.pkg_path + self.compiled_path(name) f_in = self.input_open(name) - f_out = open(compiled_path, "w") - c = Compiler(f_in, f_out, loader=self) - c.compile() - f_in.close() - f_out.close() + with open(compiled_path, "w") as f_out: + c = Compiler(f_in, f_out, loader=self) + c.compile() + f_in.close() return super().load(name)