diff --git a/auditwheel_emscripten/emscripten_tools/colored_logger.py b/auditwheel_emscripten/emscripten_tools/colored_logger.py new file mode 100644 index 0000000..29627eb --- /dev/null +++ b/auditwheel_emscripten/emscripten_tools/colored_logger.py @@ -0,0 +1,116 @@ +# Copyright 2018 The Emscripten Authors. All rights reserved. +# Emscripten is available under two separate licenses, the MIT license and the +# University of Illinois/NCSA Open Source License. Both these licenses can be +# found in the LICENSE file. + +"""Enables colored logger just by importing this module + +Also, provides utility functions to use ANSI colors in the terminal. +""" + +import ctypes +import logging +import sys +from functools import wraps + +# ANSI colors +RED = 1 +GREEN = 2 +YELLOW = 3 +BLUE = 4 +MAGENTA = 5 +CYAN = 6 +WHITE = 7 + +color_enabled = False + + +def output_color(color): + if color_enabled: + return '\033[3%sm' % color + return '' + + +def bold(): + if color_enabled: + return '\033[1m' + return '' + + +def reset_color(): + if color_enabled: + return '\033[0m' + return '' + + +def with_bold_color(color, string): + return output_color(color) + bold() + string + reset_color() + + +def with_color(color, string): + return output_color(color) + string + reset_color() + + +def with_bold(string): + return bold() + string + reset_color() + + +def ansi_color_available(): + if not sys.platform.startswith('win'): + return sys.stderr.isatty() + + # Constants from the Windows API + STD_OUTPUT_HANDLE = -11 + ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x0004 + + kernel32 = ctypes.windll.kernel32 + stdout_handle = kernel32.GetStdHandle(STD_OUTPUT_HANDLE) + + console_mode = ctypes.c_uint() + # Attempt to enable ANSI color processing (ENABLE_VIRTUAL_TERMINAL_PROCESSING). + # Assume that failure of either GetConsoleMode or SetConsoleMode means that stdout + # is not attached to a terminal or that the terminal does not support this mode. + if kernel32.GetConsoleMode(stdout_handle, ctypes.byref(console_mode)) == 0: + return False + if kernel32.SetConsoleMode(stdout_handle, console_mode.value | ENABLE_VIRTUAL_TERMINAL_PROCESSING) == 0: + return False + + return True + + +def add_coloring_to_emit_ansi(fn): + # add methods we need to the class + @wraps(fn) + def new(*args): + levelno = args[1].levelno + color = None + if levelno >= 40: + color = RED + elif levelno >= 30: + color = YELLOW + elif levelno >= 20: + color = GREEN + elif levelno >= 10: + color = MAGENTA + if color: + args[1].msg = with_color(color, args[1].msg) + return fn(*args) + + new.orig_func = fn + return new + + +def enable(force=False): + global color_enabled + if not color_enabled: + if ansi_color_available() or force: + logging.StreamHandler.emit = add_coloring_to_emit_ansi(logging.StreamHandler.emit) + color_enabled = True + + +def disable(): + global color_enabled + if color_enabled: + if hasattr(logging.StreamHandler.emit, 'orig_func'): + logging.StreamHandler.emit = logging.StreamHandler.emit.orig_func + color_enabled = False diff --git a/auditwheel_emscripten/emscripten_tools/diagnostics.py b/auditwheel_emscripten/emscripten_tools/diagnostics.py index f9ae4e8..db4f1ed 100644 --- a/auditwheel_emscripten/emscripten_tools/diagnostics.py +++ b/auditwheel_emscripten/emscripten_tools/diagnostics.py @@ -3,257 +3,144 @@ # University of Illinois/NCSA Open Source License. Both these licenses can be # found in the LICENSE file. -"""Simple color-enabled diagnositics reporting functions. +"""Simple color-enabled diagnostics reporting functions. """ -import ctypes import logging import os import sys -from typing import Any -WINDOWS = sys.platform.startswith("win") +from . import colored_logger -logger = logging.getLogger("diagnostics") -color_enabled = sys.stderr.isatty() +logger = logging.getLogger('diagnostics') tool_name = os.path.splitext(os.path.basename(sys.argv[0]))[0] # diagnostic levels WARN = 1 ERROR = 2 -FATAL = 3 - -# available colors -RED = 1 -GREEN = 2 -YELLOW = 3 -BLUE = 4 -MAGENTA = 5 -CYAN = 6 -WHITE = 7 # color for use for each diagnostic level level_colors = { - WARN: MAGENTA, - ERROR: RED, + WARN: colored_logger.MAGENTA, + ERROR: colored_logger.RED, } level_prefixes = { - WARN: "warning: ", - ERROR: "error: ", + WARN: 'warning: ', + ERROR: 'error: ', } -# Constants from the Windows API -STD_OUTPUT_HANDLE = -11 - - -def output_color_windows(color): - # TODO(sbc): This code is duplicated in colored_logger.py. Refactor. - # wincon.h - FOREGROUND_BLACK = 0x0000 # noqa - FOREGROUND_BLUE = 0x0001 # noqa - FOREGROUND_GREEN = 0x0002 # noqa - FOREGROUND_CYAN = 0x0003 # noqa - FOREGROUND_RED = 0x0004 # noqa - FOREGROUND_MAGENTA = 0x0005 # noqa - FOREGROUND_YELLOW = 0x0006 # noqa - FOREGROUND_GREY = 0x0007 # noqa - - color_map = { - RED: FOREGROUND_RED, - GREEN: FOREGROUND_GREEN, - YELLOW: FOREGROUND_YELLOW, - BLUE: FOREGROUND_BLUE, - MAGENTA: FOREGROUND_MAGENTA, - CYAN: FOREGROUND_CYAN, - WHITE: FOREGROUND_BLUE | FOREGROUND_GREEN | FOREGROUND_RED, - } - - sys.stderr.flush() - hdl = ctypes.windll.kernel32.GetStdHandle(STD_OUTPUT_HANDLE) - ctypes.windll.kernel32.SetConsoleTextAttribute(hdl, color_map[color]) - - -def get_color_windows(): - SHORT = ctypes.c_short - WORD = ctypes.c_ushort - - class COORD(ctypes.Structure): - _fields_ = [("X", SHORT), ("Y", SHORT)] - - class SMALL_RECT(ctypes.Structure): - _fields_ = [ - ("Left", SHORT), - ("Top", SHORT), - ("Right", SHORT), - ("Bottom", SHORT), - ] - - class CONSOLE_SCREEN_BUFFER_INFO(ctypes.Structure): - _fields_ = [ - ("dwSize", COORD), - ("dwCursorPosition", COORD), - ("wAttributes", WORD), - ("srWindow", SMALL_RECT), - ("dwMaximumWindowSize", COORD), - ] - - hdl = ctypes.windll.kernel32.GetStdHandle(STD_OUTPUT_HANDLE) - csbi = CONSOLE_SCREEN_BUFFER_INFO() - ctypes.windll.kernel32.GetConsoleScreenBufferInfo(hdl, ctypes.byref(csbi)) - return csbi.wAttributes - - -def reset_color_windows(): - sys.stderr.flush() - hdl = ctypes.windll.kernel32.GetStdHandle(STD_OUTPUT_HANDLE) - ctypes.windll.kernel32.SetConsoleTextAttribute(hdl, default_color) - - -def output_color(color): - if WINDOWS: - return output_color_windows(color) - return "\033[3%sm" % color - - -def reset_color(): - if WINDOWS: - return reset_color_windows() - return "\033[0m" - def diag(level, msg, *args): - # Format output message as: - # : : msg - # With the `:` part being colored accordingly. - sys.stderr.write(tool_name + ": ") - - if color_enabled: - output = output_color(level_colors[level]) - if output: - sys.stderr.write(output) - - sys.stderr.write(level_prefixes[level]) + # Format output message as: + # : : msg + # With the `:` part being colored accordingly, and the message itself in bold. + prefix = level_prefixes[level] + color = level_colors[level] + if args: + msg = msg % args - if color_enabled: - output = reset_color() - if output: - sys.stderr.write(output) - - if args: - msg = msg % args - sys.stderr.write(str(msg)) - sys.stderr.write("\n") + # Add colors + prefix = colored_logger.with_bold_color(color, prefix) + msg = colored_logger.with_bold(msg) + sys.stderr.write(f'{tool_name}: {prefix}{msg}\n') def error(msg, *args): - diag(ERROR, msg, *args) - sys.exit(1) + diag(ERROR, msg, *args) + sys.exit(1) def warn(msg, *args): - diag(WARN, msg, *args) + diag(WARN, msg, *args) class WarningManager: - warnings: Any = {} - - def add_warning( - self, name, enabled=True, part_of_all=True, shared=False, error=False - ): - self.warnings[name] = { - "enabled": enabled, - "part_of_all": part_of_all, - # True for flags that are shared with the underlying clang driver - "shared": shared, - "error": error, - } - - def capture_warnings(self, cmd_args): - for i in range(len(cmd_args)): - if cmd_args[i] == "-w": - for warning in self.warnings.values(): - warning["enabled"] = False - continue - - if not cmd_args[i].startswith("-W"): - continue - - if cmd_args[i] == "-Wall": - for warning in self.warnings.values(): - if warning["part_of_all"]: - warning["enabled"] = True - continue - - if cmd_args[i] == "-Werror": - for warning in self.warnings.values(): - warning["error"] = True - continue - - if cmd_args[i].startswith("-Werror=") or cmd_args[i].startswith( - "-Wno-error=" - ): - warning_name = cmd_args[i].split("=", 1)[1] - if warning_name in self.warnings: - enabled = not cmd_args[i].startswith("-Wno-") - self.warnings[warning_name]["error"] = enabled - if enabled: - self.warnings[warning_name]["enabled"] = True - cmd_args[i] = "" - continue - - warning_name = cmd_args[i].replace("-Wno-", "").replace("-W", "") - enabled = not cmd_args[i].startswith("-Wno-") - - # special case pre-existing warn-absolute-paths - if warning_name == "warn-absolute-paths": - self.warnings["absolute-paths"]["enabled"] = enabled - cmd_args[i] = "" - continue - - if warning_name in self.warnings: - self.warnings[warning_name]["enabled"] = enabled - if not self.warnings[warning_name]["shared"]: - cmd_args[i] = "" - continue - - return cmd_args - - def warning(self, warning_type, message, *args): - warning_info = self.warnings[warning_type] - msg = (message % args) + " [-W" + warning_type.lower().replace("_", "-") + "]" - if warning_info["enabled"]: - if warning_info["error"]: - error(msg + " [-Werror]") - else: - warn(msg) - else: - logger.debug("disabled warning: " + msg) - + warnings: dict[str, dict] = {} + + def add_warning(self, name, enabled=True, part_of_all=True, shared=False, error=False): + self.warnings[name] = { + 'enabled': enabled, + 'part_of_all': part_of_all, + # True for flags that are shared with the underlying clang driver + 'shared': shared, + 'error': error, + } -def add_warning(name, enabled=True, part_of_all=True, shared=False, error=False): - manager.add_warning(name, enabled, part_of_all, shared, error) + def capture_warnings(self, cmd_args): + for i in range(len(cmd_args)): + if cmd_args[i] == '-w': + for warning in self.warnings.values(): + warning['enabled'] = False + continue + + if not cmd_args[i].startswith('-W'): + continue + + if cmd_args[i] == '-Wall': + for warning in self.warnings.values(): + if warning['part_of_all']: + warning['enabled'] = True + continue + + if cmd_args[i] == '-Werror': + for warning in self.warnings.values(): + warning['error'] = True + continue + + if cmd_args[i].startswith('-Werror=') or cmd_args[i].startswith('-Wno-error='): + warning_name = cmd_args[i].split('=', 1)[1] + if warning_name in self.warnings: + enabled = not cmd_args[i].startswith('-Wno-') + self.warnings[warning_name]['error'] = enabled + if enabled: + self.warnings[warning_name]['enabled'] = True + cmd_args[i] = '' + continue + + warning_name = cmd_args[i].removeprefix('-Wno-').removeprefix('-W') + enabled = not cmd_args[i].startswith('-Wno-') + + # special case pre-existing warn-absolute-paths + if warning_name == 'warn-absolute-paths': + self.warnings['absolute-paths']['enabled'] = enabled + cmd_args[i] = '' + continue + + if warning_name in self.warnings: + self.warnings[warning_name]['enabled'] = enabled + if not self.warnings[warning_name]['shared']: + cmd_args[i] = '' + continue + + return cmd_args + + def warning(self, warning_type, message, *args): + warning_info = self.warnings[warning_type] + msg = (message % args) + ' [-W' + warning_type.lower().replace('_', '-') + ']' + if warning_info['enabled']: + if warning_info['error']: + error(msg + ' [-Werror]') + else: + warn(msg) + else: + logger.debug('disabled warning: ' + msg) -def enable_warning(name, as_error=False): - manager.warnings[name]["enabled"] = True - if as_error: - manager.warnings[name]["error"] = True +def add_warning(name, enabled=True, part_of_all=True, shared=False, error=False): + manager.add_warning(name, enabled, part_of_all, shared, error) -def disable_warning(name): - manager.warnings[name]["enabled"] = False +def is_enabled(name): + return manager.warnings[name]['enabled'] def warning(warning_type, message, *args): - manager.warning(warning_type, message, *args) + manager.warning(warning_type, message, *args) def capture_warnings(argv): - return manager.capture_warnings(argv) - + return manager.capture_warnings(argv) -if WINDOWS: - default_color = get_color_windows() manager = WarningManager() diff --git a/auditwheel_emscripten/emscripten_tools/utils.py b/auditwheel_emscripten/emscripten_tools/utils.py index 6ee9f6d..5eaf871 100644 --- a/auditwheel_emscripten/emscripten_tools/utils.py +++ b/auditwheel_emscripten/emscripten_tools/utils.py @@ -3,123 +3,232 @@ # University of Illinois/NCSA Open Source License. Both these licenses can be # found in the LICENSE file. -import contextlib +"""General purpose utility functions. The code in this file should mostly be +not emscripten-specific, but general purpose enough to be useful in any command +line utility.""" + +import functools +import logging import os +import shlex import shutil +import stat +import subprocess import sys from pathlib import Path from . import diagnostics __rootpath__ = os.path.abspath(os.path.dirname(os.path.dirname(__file__))) -WINDOWS = sys.platform.startswith("win") -MACOS = sys.platform == "darwin" -LINUX = sys.platform.startswith("linux") +WINDOWS = sys.platform.startswith('win') +MACOS = sys.platform == 'darwin' +LINUX = sys.platform.startswith('linux') + +logger = logging.getLogger('utils') + + +def run_process(cmd, check=True, input=None, *args, **kw): + """Runs a subprocess returning the exit code. + + By default this function will raise an exception on failure. Therefore this should only be + used if you want to handle such failures. For most subprocesses, failures are not recoverable + and should be fatal. In those cases the `check_call` wrapper should be preferred. + """ + + # Flush standard streams otherwise the output of the subprocess may appear in the + # output before messages that we have already written. + sys.stdout.flush() + sys.stderr.flush() + kw.setdefault('text', True) + kw.setdefault('encoding', 'utf-8') + ret = subprocess.run(cmd, check=check, input=input, *args, **kw) + debug_text = '%sexecuted %s' % ('successfully ' if check else '', shlex.join(cmd)) + logger.debug(debug_text) + return ret + + +def exec(cmd): + if WINDOWS: + rtn = run_process(cmd, stdin=sys.stdin, check=False).returncode + sys.exit(rtn) + else: + sys.stdout.flush() + sys.stderr.flush() + os.execvp(cmd[0], cmd) def exit_with_error(msg, *args): - diagnostics.error(msg, *args) + diagnostics.error(msg, *args) def path_from_root(*pathelems): - return str(Path(__rootpath__, *pathelems)) + return str(Path(__rootpath__, *pathelems)) + + +def exe_path_from_root(*pathelems): + return find_exe(path_from_root(*pathelems)) + + +def suffix(name): + """Return the file extension""" + return os.path.splitext(name)[1] + + +def find_exe(*pathelems): + path = os.path.join(*pathelems) + + if WINDOWS: + # Should we use PATHEXT environment variable here? + # For now, specify only enough extensions to find llvm / binaryen / emscripten executables. + for ext in ['.exe', '.bat']: + if os.path.isfile(path + ext): + return path + ext + + return path + + +def replace_suffix(filename, new_suffix): + assert new_suffix[0] == '.' + return os.path.splitext(filename)[0] + new_suffix + + +def unsuffixed(name): + """Return the filename without the extension. + + If there are multiple extensions this strips only the final one. + """ + return os.path.splitext(name)[0] + + +def unsuffixed_basename(name): + return os.path.basename(unsuffixed(name)) + + +def get_file_suffix(filename): + """Parses the essential suffix of a filename, discarding Unix-style version + numbers in the name. For example for 'libz.so.1.2.8' returns '.so'""" + while filename: + filename, suffix = os.path.splitext(filename) + if not suffix[1:].isdigit(): + return suffix + return '' + + +def normalize_path(path): + """Normalize path separators to UNIX-style forward slashes. + + This can be useful when converting paths to URLs or JS strings, + or when trying to generate consistent output file contents + across all platforms. In most cases UNIX-style separators work + fine on windows. + """ + return path.replace('\\', '/').replace('//', '/') def safe_ensure_dirs(dirname): - os.makedirs(dirname, exist_ok=True) - - -@contextlib.contextmanager -def chdir(dir): - """A context manager that performs actions in the given directory.""" - orig_cwd = os.getcwd() - os.chdir(dir) - try: - yield - finally: - os.chdir(orig_cwd) - - -# Finds the given executable 'program' in PATH. Operates like the Unix tool 'which'. -def which(program): - def is_exe(fpath): - return os.path.isfile(fpath) and os.access(fpath, os.X_OK) - - if os.path.isabs(program): - if os.path.isfile(program): - return program - - if WINDOWS: - for suffix in [".exe", ".cmd", ".bat"]: - if is_exe(program + suffix): - return program + suffix - - fpath, fname = os.path.split(program) - if fpath: - if is_exe(program): - return program - else: - for path in os.environ["PATH"].split(os.pathsep): - path = path.strip('"') - exe_file = os.path.join(path, program) - if is_exe(exe_file): - return exe_file - if WINDOWS: - for suffix in (".exe", ".cmd", ".bat"): - if is_exe(exe_file + suffix): - return exe_file + suffix + os.makedirs(dirname, exist_ok=True) + - return None +def make_writable(filename): + assert os.path.exists(filename) + old_mode = stat.S_IMODE(os.stat(filename).st_mode) + os.chmod(filename, old_mode | stat.S_IWUSR) + + +def safe_copy(src, dst): + logger.debug('copy: %s -> %s', src, dst) + src = os.path.abspath(src) + dst = os.path.abspath(dst) + if os.path.isdir(dst): + dst = os.path.join(dst, os.path.basename(src)) + if src == dst: + return + if dst == os.devnull: + return + # Copies data and permission bits, but not other metadata such as timestamp + shutil.copy(src, dst) + # We always want the target file to be writable even when copying from + # read-only source. (e.g. a read-only install of emscripten). + make_writable(dst) def read_file(file_path): - """Read from a file opened in text mode""" - with open(file_path, encoding="utf-8") as fh: - return fh.read() + """Read from a file opened in text mode""" + with open(file_path, encoding='utf-8') as fh: + return fh.read() def read_binary(file_path): - """Read from a file opened in binary mode""" - with open(file_path, "rb") as fh: - return fh.read() + """Read from a file opened in binary mode""" + with open(file_path, 'rb') as fh: + return fh.read() -def write_file(file_path, text): - """Write to a file opened in text mode""" - with open(file_path, "w", encoding="utf-8") as fh: - fh.write(text) +def write_file(file_path, text, line_endings=None): + """Write to a file opened in text mode""" + if line_endings and line_endings != os.linesep: + text = text.replace('\n', line_endings) + write_binary(file_path, text.encode('utf-8')) + else: + with open(file_path, 'w', encoding='utf-8') as fh: + fh.write(text) def write_binary(file_path, contents): - """Write to a file opened in binary mode""" - with open(file_path, "wb") as fh: - fh.write(contents) + """Write to a file opened in binary mode""" + with open(file_path, 'wb') as fh: + fh.write(contents) def delete_file(filename): - """Delete a file (if it exists).""" - if not os.path.exists(filename): - return + """Delete a file (if it exists).""" + if os.path.lexists(filename): os.remove(filename) def delete_dir(dirname): - """Delete a directory (if it exists).""" - if not os.path.exists(dirname): - return - shutil.rmtree(dirname) + """Delete a directory (if it exists).""" + if not os.path.exists(dirname): + return + shutil.rmtree(dirname) def delete_contents(dirname, exclude=None): - """Delete the contents of a directory without removing - the directory itself.""" - if not os.path.exists(dirname): - return - for entry in os.listdir(dirname): - if exclude and entry in exclude: - continue - entry = os.path.join(dirname, entry) - if os.path.isdir(entry): - delete_dir(entry) - else: - delete_file(entry) + """Delete the contents of a directory without removing + the directory itself.""" + if not os.path.exists(dirname): + return + for entry in os.listdir(dirname): + if exclude and entry in exclude: + continue + entry = os.path.join(dirname, entry) + if os.path.isdir(entry): + delete_dir(entry) + else: + delete_file(entry) + + +def get_num_cores(): + # Prefer `os.process_cpu_count` when available (3.13 and above) since + # it takes into account thread affinity. + # Fall back to `os.sched_getaffinity` where available and finally + # `os.cpu_count`, which should work everywhere. + if hasattr(os, 'process_cpu_count'): + cpu_count = os.process_cpu_count() + elif hasattr(os, 'sched_getaffinity'): + cpu_count = len(os.sched_getaffinity(0)) + else: + cpu_count = os.cpu_count() + return int(os.environ.get('EMCC_CORES', cpu_count)) + + +memoize = functools.cache + + +# TODO: Move this back to shared.py once importing that file becoming side effect free (i.e. it no longer requires a config). +def set_version_globals(): + global EMSCRIPTEN_VERSION, EMSCRIPTEN_VERSION_MAJOR, EMSCRIPTEN_VERSION_MINOR, EMSCRIPTEN_VERSION_TINY + filename = path_from_root('emscripten-version.txt') + EMSCRIPTEN_VERSION = read_file(filename).strip().strip('"') + parts = [int(x) for x in EMSCRIPTEN_VERSION.split('-')[0].split('.')] + EMSCRIPTEN_VERSION_MAJOR, EMSCRIPTEN_VERSION_MINOR, EMSCRIPTEN_VERSION_TINY = parts diff --git a/auditwheel_emscripten/emscripten_tools/webassembly.py b/auditwheel_emscripten/emscripten_tools/webassembly.py index 5edf3c9..12aa31d 100644 --- a/auditwheel_emscripten/emscripten_tools/webassembly.py +++ b/auditwheel_emscripten/emscripten_tools/webassembly.py @@ -3,7 +3,7 @@ # University of Illinois/NCSA Open Source License. Both these licenses can be # found in the LICENSE file. -"""Utilties for manipulating WebAssembly binaries from python. +"""Utilities for manipulating WebAssembly binaries from python. """ import logging @@ -14,18 +14,19 @@ from functools import wraps from . import utils +from .utils import memoize -sys.path.append(utils.path_from_root("third_party")) +sys.path.append(utils.path_from_root('third_party')) import leb128 -logger = logging.getLogger("webassembly") +logger = logging.getLogger('webassembly') WASM_PAGE_SIZE = 65536 -MAGIC = b"\0asm" +MAGIC = b'\0asm' -VERSION = b"\x01\0\0\0" +VERSION = b'\x01\0\0\0' HEADER_SIZE = 8 @@ -33,9 +34,9 @@ SEG_PASSIVE = 0x1 -PREFIX_MATH = 0xFC -PREFIX_THREADS = 0xFE -PREFIX_SIMD = 0xFD +PREFIX_MATH = 0xfc +PREFIX_THREADS = 0xfe +PREFIX_SIMD = 0xfd SYMBOL_BINDING_MASK = 0x3 SYMBOL_BINDING_GLOBAL = 0x0 @@ -44,562 +45,601 @@ def to_leb(num): - return leb128.u.encode(num) + return leb128.u.encode(num) def read_uleb(iobuf): - return leb128.u.decode_reader(iobuf)[0] + return leb128.u.decode_reader(iobuf)[0] def read_sleb(iobuf): - return leb128.i.decode_reader(iobuf)[0] - - -def memoize(method): - @wraps(method) - def wrapper(self, *args, **kwargs): - assert not kwargs - key = method - if key not in self._cache: - self._cache[key] = method(self, *args, **kwargs) - return self._cache[key] - - return wrapper + return leb128.i.decode_reader(iobuf)[0] def once(method): - @wraps(method) - def helper(self, *args, **kwargs): - key = method - if key not in self._cache: - self._cache[key] = method(self, *args, **kwargs) - return helper + @wraps(method) + def helper(self, *args, **kwargs): + key = method + if key not in self._cache: + self._cache[key] = method(self, *args, **kwargs) + + return helper class Type(IntEnum): - I32 = 0x7F # -0x1 - I64 = 0x7E # -0x2 - F32 = 0x7D # -0x3 - F64 = 0x7C # -0x4 - V128 = 0x7B # -0x5 - FUNCREF = 0x70 # -0x10 - EXTERNREF = 0x6F # -0x11 - VOID = 0x40 # -0x40 + I32 = 0x7f # -0x1 + I64 = 0x7e # -0x2 + F32 = 0x7d # -0x3 + F64 = 0x7c # -0x4 + V128 = 0x7b # -0x5 + FUNCREF = 0x70 # -0x10 + EXTERNREF = 0x6f # -0x11 + EXNREF = 0x69 # -0x17 + VOID = 0x40 # -0x40 class OpCode(IntEnum): - NOP = 0x01 - BLOCK = 0x02 - END = 0x0B - BR = 0x0C - BR_TABLE = 0x0E - CALL = 0x10 - DROP = 0x1A - LOCAL_GET = 0x20 - LOCAL_SET = 0x21 - LOCAL_TEE = 0x22 - GLOBAL_GET = 0x23 - GLOBAL_SET = 0x24 - RETURN = 0x0F - I32_CONST = 0x41 - I64_CONST = 0x42 - F32_CONST = 0x43 - F64_CONST = 0x44 - I32_ADD = 0x6A - REF_NULL = 0xD0 - ATOMIC_PREFIX = 0xFE - MEMORY_PREFIX = 0xFC + NOP = 0x01 + BLOCK = 0x02 + END = 0x0b + BR = 0x0c + BR_TABLE = 0x0e + CALL = 0x10 + DROP = 0x1a + LOCAL_GET = 0x20 + LOCAL_SET = 0x21 + LOCAL_TEE = 0x22 + GLOBAL_GET = 0x23 + GLOBAL_SET = 0x24 + RETURN = 0x0f + I32_CONST = 0x41 + I64_CONST = 0x42 + F32_CONST = 0x43 + F64_CONST = 0x44 + I32_ADD = 0x6a + I64_ADD = 0x7c + REF_NULL = 0xd0 + ATOMIC_PREFIX = 0xfe + MEMORY_PREFIX = 0xfc class MemoryOpCode(IntEnum): - MEMORY_INIT = 0x08 - MEMORY_DROP = 0x09 - MEMORY_COPY = 0x0A - MEMORY_FILL = 0x0B + MEMORY_INIT = 0x08 + MEMORY_DROP = 0x09 + MEMORY_COPY = 0x0a + MEMORY_FILL = 0x0b class AtomicOpCode(IntEnum): - ATOMIC_NOTIFY = 0x00 - ATOMIC_WAIT32 = 0x01 - ATOMIC_WAIT64 = 0x02 - ATOMIC_I32_STORE = 0x17 - ATOMIC_I32_RMW_CMPXCHG = 0x48 + ATOMIC_NOTIFY = 0x00 + ATOMIC_WAIT32 = 0x01 + ATOMIC_WAIT64 = 0x02 + ATOMIC_I32_STORE = 0x17 + ATOMIC_I32_RMW_CMPXCHG = 0x48 class SecType(IntEnum): - CUSTOM = 0 - TYPE = 1 - IMPORT = 2 - FUNCTION = 3 - TABLE = 4 - MEMORY = 5 - TAG = 13 - GLOBAL = 6 - EXPORT = 7 - START = 8 - ELEM = 9 - DATACOUNT = 12 - CODE = 10 - DATA = 11 + CUSTOM = 0 + TYPE = 1 + IMPORT = 2 + FUNCTION = 3 + TABLE = 4 + MEMORY = 5 + TAG = 13 + GLOBAL = 6 + EXPORT = 7 + START = 8 + ELEM = 9 + DATACOUNT = 12 + CODE = 10 + DATA = 11 class ExternType(IntEnum): - FUNC = 0 - TABLE = 1 - MEMORY = 2 - GLOBAL = 3 - TAG = 4 + FUNC = 0 + TABLE = 1 + MEMORY = 2 + GLOBAL = 3 + TAG = 4 class DylinkType(IntEnum): - MEM_INFO = 1 - NEEDED = 2 - EXPORT_INFO = 3 - IMPORT_INFO = 4 - RUNTIME_PATH = 5 + MEM_INFO = 1 + NEEDED = 2 + EXPORT_INFO = 3 + IMPORT_INFO = 4 + RUNTIME_PATH = 5 + + +class TargetFeaturePrefix(IntEnum): + USED = 0x2b + DISALLOWED = 0x2d + + +class NameType(IntEnum): + MODULE = 0 + FUNCTION = 1 + LOCAL = 2 + LABEL = 3 + TYPE = 4 + TABLE = 5 + MEMORY = 6 + GLOBAL = 7 + ELEMSEGMENT = 8 + DATASEGMENT = 9 + FIELD = 10 + TAG = 11 class InvalidWasmError(BaseException): - pass - - -Section = namedtuple("Section", ["type", "size", "offset", "name"]) -Limits = namedtuple("Limits", ["flags", "initial", "maximum"]) -Import = namedtuple("Import", ["kind", "module", "field", "type"]) -Export = namedtuple("Export", ["name", "kind", "index"]) -Global = namedtuple("Global", ["type", "mutable", "init"]) -Dylink = namedtuple( - "Dylink", - [ - "mem_size", - "mem_align", - "table_size", - "table_align", - "needed", - "export_info", - "import_info", - "runtime_paths", - ], -) -Table = namedtuple("Table", ["elem_type", "limits"]) -FunctionBody = namedtuple("FunctionBody", ["offset", "size"]) -DataSegment = namedtuple("DataSegment", ["flags", "init", "offset", "size"]) -FuncType = namedtuple("FuncType", ["params", "returns"]) + pass + + +Section = namedtuple('Section', ['type', 'size', 'offset', 'name']) +Limits = namedtuple('Limits', ['flags', 'initial', 'maximum']) +Import = namedtuple('Import', ['kind', 'module', 'field', 'type']) +Export = namedtuple('Export', ['name', 'kind', 'index']) +Global = namedtuple('Global', ['type', 'mutable', 'init']) +Dylink = namedtuple('Dylink', ['mem_size', 'mem_align', 'table_size', 'table_align', 'needed', 'export_info', 'import_info', 'runtime_paths']) +Table = namedtuple('Table', ['elem_type', 'limits']) +FunctionBody = namedtuple('FunctionBody', ['offset', 'size']) +DataSegment = namedtuple('DataSegment', ['flags', 'init', 'offset', 'size']) +FuncType = namedtuple('FuncType', ['params', 'returns']) class Module: - """Extremely minimal wasm module reader. Currently only used - for parsing the dylink section.""" - - def __init__(self, filename): - self.buf = None # Set this before FS calls below in case they throw. - self.filename = filename - self.size = os.path.getsize(filename) - self.buf = open(filename, "rb") - magic = self.buf.read(4) - version = self.buf.read(4) - if magic != MAGIC or version != VERSION: - raise InvalidWasmError(f"{filename} is not a valid wasm file") - self._cache = {} - - def __del__(self): - assert ( - not self.buf - ), "`__exit__` should have already been called, please use context manager" - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): # noqa - if self.buf: - self.buf.close() - self.buf = None - - def read_at(self, offset, count): - self.buf.seek(offset) - return self.buf.read(count) - - def read_byte(self): - return self.buf.read(1)[0] - - def read_uleb(self): - return read_uleb(self.buf) - - def read_sleb(self): - return read_sleb(self.buf) - - def read_string(self): - size = self.read_uleb() - return self.buf.read(size).decode("utf-8") - - def read_limits(self): - flags = self.read_byte() - initial = self.read_uleb() - maximum = 0 - if flags & LIMITS_HAS_MAX: - maximum = self.read_uleb() - return Limits(flags, initial, maximum) - - def read_type(self): - return Type(self.read_uleb()) - - def read_init(self): - code = [] - while 1: - opcode = OpCode(self.read_byte()) - args = [] - if opcode in (OpCode.GLOBAL_GET, OpCode.I32_CONST, OpCode.I64_CONST): - args.append(self.read_uleb()) - elif opcode in (OpCode.REF_NULL,): - args.append(self.read_type()) - elif opcode in (OpCode.END,): - pass - else: - raise Exception("unexpected opcode %s" % opcode) - code.append((opcode, args)) - if opcode == OpCode.END: - break - return code - - def seek(self, offset): - return self.buf.seek(offset) - - def tell(self): - return self.buf.tell() - - def skip(self, count): - self.buf.seek(count, os.SEEK_CUR) - - def sections(self): - """Generator that lazily returns sections from the wasm file.""" - offset = HEADER_SIZE - while offset < self.size: - self.seek(offset) - section_type = SecType(self.read_byte()) - section_size = self.read_uleb() - section_offset = self.buf.tell() - name = None - if section_type == SecType.CUSTOM: - name = self.read_string() - - yield Section(section_type, section_size, section_offset, name) - offset = section_offset + section_size - - @memoize - def get_types(self): - type_section = self.get_section(SecType.TYPE) - if not type_section: - return [] - self.seek(type_section.offset) - num_types = self.read_uleb() - types = [] - for _ in range(num_types): - type_form = self.read_byte() - assert type_form == 0x60 - - params = [] - num_params = self.read_uleb() - for _ in range(num_params): - params.append(self.read_type()) - - returns = [] - num_returns = self.read_uleb() - for _ in range(num_returns): - returns.append(self.read_type()) - - types.append(FuncType(params, returns)) - - return types - - def parse_features_section(self): - features = [] - sec = self.get_custom_section("target_features") - if sec: - self.seek(sec.offset) - self.read_string() # name - feature_count = self.read_uleb() - while feature_count: - prefix = self.read_byte() - features.append((chr(prefix), self.read_string())) - feature_count -= 1 - return features - - @memoize - def parse_dylink_section(self): - dylink_section = next(self.sections()) - assert dylink_section.type == SecType.CUSTOM - self.seek(dylink_section.offset) - # section name - needed = [] - export_info = {} - import_info = {} - runtime_paths = [] - self.read_string() # name - - if dylink_section.name == "dylink": + """Extremely minimal wasm module reader. Currently only used + for parsing the dylink section.""" + def __init__(self, filename): + self.buf = None # Set this before FS calls below in case they throw. + self.filename = filename + self.size = os.path.getsize(filename) + self.buf = open(filename, 'rb') + magic = self.buf.read(4) + version = self.buf.read(4) + if magic != MAGIC or version != VERSION: + raise InvalidWasmError(f'{filename} is not a valid wasm file') + self._cache = {} + + def __del__(self): + assert not self.buf, '`__exit__` should have already been called, please use context manager' + + def __enter__(self): + return self + + def __exit__(self, _exc_type, _exc_val, _exc_tb): + if self.buf: + self.buf.close() + self.buf = None + + def read_at(self, offset, count): + self.buf.seek(offset) + return self.buf.read(count) + + def read_byte(self): + return self.buf.read(1)[0] + + def read_uleb(self): + return read_uleb(self.buf) + + def read_sleb(self): + return read_sleb(self.buf) + + def read_string(self): + size = self.read_uleb() + return self.buf.read(size).decode('utf-8') + + def read_limits(self): + flags = self.read_byte() + initial = self.read_uleb() + maximum = 0 + if flags & LIMITS_HAS_MAX: + maximum = self.read_uleb() + return Limits(flags, initial, maximum) + + def read_type(self): + return Type(self.read_uleb()) + + def read_init(self): + code = [] + while 1: + opcode = OpCode(self.read_byte()) + args = [] + match opcode: + case OpCode.GLOBAL_GET: + args.append(self.read_uleb()) + case OpCode.I32_CONST | OpCode.I64_CONST: + args.append(self.read_sleb()) + case OpCode.REF_NULL: + args.append(self.read_type()) + case OpCode.END | OpCode.I32_ADD | OpCode.I64_ADD: + pass + case _: + raise Exception('unexpected opcode %s' % opcode) + code.append((opcode, args)) + if opcode == OpCode.END: + break + return code + + def seek(self, offset): + return self.buf.seek(offset) + + def tell(self): + return self.buf.tell() + + def skip(self, count): + self.buf.seek(count, os.SEEK_CUR) + + def sections(self): + """Generator that lazily returns sections from the wasm file.""" + offset = HEADER_SIZE + while offset < self.size: + self.seek(offset) + section_type = SecType(self.read_byte()) + section_size = self.read_uleb() + section_offset = self.buf.tell() + name = None + if section_type == SecType.CUSTOM: + name = self.read_string() + + yield Section(section_type, section_size, section_offset, name) + offset = section_offset + section_size + + @memoize + def get_types(self): + type_section = self.get_section(SecType.TYPE) + if not type_section: + return [] + self.seek(type_section.offset) + num_types = self.read_uleb() + types = [] + for _ in range(num_types): + type_form = self.read_byte() + assert type_form == 0x60 + + num_params = self.read_uleb() + params = [self.read_type() for _ in range(num_params)] + + num_returns = self.read_uleb() + returns = [self.read_type() for _ in range(num_returns)] + + types.append(FuncType(params, returns)) + + return types + + @memoize + def parse_dylink_section(self): + dylink_section = next(self.sections()) + assert dylink_section.type == SecType.CUSTOM + self.seek(dylink_section.offset) + # section name + needed = [] + export_info = {} + import_info = {} + runtime_paths = [] + self.read_string() # name + + if dylink_section.name == 'dylink': + mem_size = self.read_uleb() + mem_align = self.read_uleb() + table_size = self.read_uleb() + table_align = self.read_uleb() + + needed_count = self.read_uleb() + while needed_count: + libname = self.read_string() + needed.append(libname) + needed_count -= 1 + elif dylink_section.name == 'dylink.0': + section_end = dylink_section.offset + dylink_section.size + while self.tell() < section_end: + subsection_type = self.read_uleb() + subsection_size = self.read_uleb() + end = self.tell() + subsection_size + match subsection_type: + case DylinkType.MEM_INFO: mem_size = self.read_uleb() mem_align = self.read_uleb() table_size = self.read_uleb() table_align = self.read_uleb() - + case DylinkType.NEEDED: needed_count = self.read_uleb() while needed_count: - libname = self.read_string() - needed.append(libname) - needed_count -= 1 - elif dylink_section.name == "dylink.0": - section_end = dylink_section.offset + dylink_section.size - while self.tell() < section_end: - subsection_type = self.read_uleb() - subsection_size = self.read_uleb() - end = self.tell() + subsection_size - if subsection_type == DylinkType.MEM_INFO: - mem_size = self.read_uleb() - mem_align = self.read_uleb() - table_size = self.read_uleb() - table_align = self.read_uleb() - elif subsection_type == DylinkType.NEEDED: - needed_count = self.read_uleb() - while needed_count: - libname = self.read_string() - needed.append(libname) - needed_count -= 1 - elif subsection_type == DylinkType.EXPORT_INFO: - count = self.read_uleb() - while count: - sym = self.read_string() - flags = self.read_uleb() - export_info[sym] = flags - count -= 1 - elif subsection_type == DylinkType.IMPORT_INFO: - count = self.read_uleb() - while count: - module = self.read_string() - field = self.read_string() - flags = self.read_uleb() - import_info.setdefault(module, {}) - import_info[module][field] = flags - count -= 1 - elif subsection_type == DylinkType.RUNTIME_PATH: - count = self.read_uleb() - while count: - rpath = self.read_string() - runtime_paths.append(rpath) - count -= 1 - else: - print(f"unknown subsection: {subsection_type}") - # ignore unknown subsections - self.skip(subsection_size) - - assert self.tell() == end, f"({subsection_type}) {self.tell()} != {end}" - else: - utils.exit_with_error("error parsing shared library") - - return Dylink( - mem_size, - mem_align, - table_size, - table_align, - needed, - export_info, - import_info, - runtime_paths, - ) - - @memoize - def get_exports(self): - export_section = self.get_section(SecType.EXPORT) - if not export_section: - return [] - - self.seek(export_section.offset) - num_exports = self.read_uleb() - exports = [] - for _ in range(num_exports): - name = self.read_string() - kind = ExternType(self.read_byte()) - index = self.read_uleb() - exports.append(Export(name, kind, index)) - - return exports - - @memoize - def get_imports(self): - import_section = self.get_section(SecType.IMPORT) - if not import_section: - return [] - - self.seek(import_section.offset) - num_imports = self.read_uleb() - imports = [] - for _ in range(num_imports): - mod = self.read_string() - field = self.read_string() - kind = ExternType(self.read_byte()) - type_ = None - if kind == ExternType.FUNC: - type_ = self.read_uleb() - elif kind == ExternType.GLOBAL: - type_ = self.read_sleb() - self.read_byte() # mutable - elif kind == ExternType.MEMORY: - self.read_limits() # limits - elif kind == ExternType.TABLE: - type_ = self.read_sleb() - self.read_limits() # limits - elif kind == ExternType.TAG: - self.read_byte() # attribute - type_ = self.read_uleb() - else: - raise AssertionError() - imports.append(Import(kind, mod, field, type_)) - - return imports - - @memoize - def get_globals(self): - global_section = self.get_section(SecType.GLOBAL) - if not global_section: - return [] - globls = [] - self.seek(global_section.offset) - num_globals = self.read_uleb() - for _ in range(num_globals): - global_type = self.read_type() - mutable = self.read_byte() - init = self.read_init() - globls.append(Global(global_type, mutable, init)) - return globls - - @memoize - def get_start(self): - start_section = self.get_section(SecType.START) - if not start_section: - return None - self.seek(start_section.offset) - return self.read_uleb() - - @memoize - def get_functions(self): - code_section = self.get_section(SecType.CODE) - if not code_section: - return [] - functions = [] - self.seek(code_section.offset) - num_functions = self.read_uleb() - for _ in range(num_functions): - body_size = self.read_uleb() - start = self.tell() - functions.append(FunctionBody(start, body_size)) - self.seek(start + body_size) - return functions - - def get_section(self, section_code): - return next((s for s in self.sections() if s.type == section_code), None) - - @memoize - def get_custom_section(self, name): - for section in self.sections(): - if section.type == SecType.CUSTOM and section.name == name: - return section - return None - - @memoize - def get_segments(self): - segments = [] - data_section = self.get_section(SecType.DATA) - self.seek(data_section.offset) - num_segments = self.read_uleb() - for _ in range(num_segments): - flags = self.read_uleb() - if flags & SEG_PASSIVE: - init = None - else: - init = self.read_init() - size = self.read_uleb() - offset = self.tell() - segments.append(DataSegment(flags, init, offset, size)) - self.seek(offset + size) - return segments - - @memoize - def get_tables(self): - table_section = self.get_section(SecType.TABLE) - if not table_section: - return [] - - self.seek(table_section.offset) - num_tables = self.read_uleb() - tables = [] - for _ in range(num_tables): - elem_type = self.read_type() - limits = self.read_limits() - tables.append(Table(elem_type, limits)) - - return tables - - @memoize - def get_function_types(self): - function_section = self.get_section(SecType.FUNCTION) - if not function_section: - return [] - - self.seek(function_section.offset) - num_types = self.read_uleb() - func_types = [] - for _ in range(num_types): - func_types.append(self.read_uleb()) - return func_types - - def has_name_section(self): - return self.get_custom_section("name") is not None - - @once - def _calc_indexes(self): - self.imports_by_kind = {} - for i in self.get_imports(): - self.imports_by_kind.setdefault(i.kind, []) - self.imports_by_kind[i.kind].append(i) - - def num_imported_funcs(self): - self._calc_indexes() - return len(self.imports_by_kind.get(ExternType.FUNC, [])) - - def num_imported_globals(self): - self._calc_indexes() - return len(self.imports_by_kind.get(ExternType.GLOBAL, [])) - - def get_function(self, idx): - self._calc_indexes() - assert idx >= self.num_imported_funcs() - return self.get_functions()[idx - self.num_imported_funcs()] - - def get_global(self, idx): - self._calc_indexes() - assert idx >= self.num_imported_globals() - return self.get_globals()[idx - self.num_imported_globals()] - - def get_function_type(self, idx): - self._calc_indexes() - if idx < self.num_imported_funcs(): - imp = self.imports_by_kind[ExternType.FUNC][idx] - func_type = imp.type - else: - func_type = self.get_function_types()[idx - self.num_imported_funcs()] - return self.get_types()[func_type] + libname = self.read_string() + needed.append(libname) + needed_count -= 1 + case DylinkType.EXPORT_INFO: + count = self.read_uleb() + while count: + sym = self.read_string() + flags = self.read_uleb() + export_info[sym] = flags + count -= 1 + case DylinkType.IMPORT_INFO: + count = self.read_uleb() + while count: + module = self.read_string() + field = self.read_string() + flags = self.read_uleb() + import_info.setdefault(module, {}) + import_info[module][field] = flags + count -= 1 + case DylinkType.RUNTIME_PATH: + count = self.read_uleb() + while count: + rpath = self.read_string() + runtime_paths.append(rpath) + count -= 1 + case _: + print(f'unknown subsection: {subsection_type}') + # ignore unknown subsections + self.skip(subsection_size) + assert self.tell() == end + else: + utils.exit_with_error('error parsing shared library') + + return Dylink(mem_size, mem_align, table_size, table_align, needed, export_info, import_info, runtime_paths) + + @memoize + def get_exports(self): + export_section = self.get_section(SecType.EXPORT) + if not export_section: + return [] + + self.seek(export_section.offset) + num_exports = self.read_uleb() + exports = [] + for _ in range(num_exports): + name = self.read_string() + kind = ExternType(self.read_byte()) + index = self.read_uleb() + exports.append(Export(name, kind, index)) + + return exports + + @memoize + def get_imports(self): + import_section = self.get_section(SecType.IMPORT) + if not import_section: + return [] + + self.seek(import_section.offset) + num_imports = self.read_uleb() + imports = [] + for _ in range(num_imports): + mod = self.read_string() + field = self.read_string() + kind = ExternType(self.read_byte()) + type_ = None + match kind: + case ExternType.FUNC: + type_ = self.read_uleb() + case ExternType.GLOBAL: + type_ = self.read_sleb() + self.read_byte() # mutable + case ExternType.MEMORY: + self.read_limits() # limits + case ExternType.TABLE: + type_ = self.read_sleb() + self.read_limits() # limits + case ExternType.TAG: + self.read_byte() # attribute + type_ = self.read_uleb() + case _: + raise AssertionError() + imports.append(Import(kind, mod, field, type_)) + + return imports + + @memoize + def get_globals(self): + global_section = self.get_section(SecType.GLOBAL) + if not global_section: + return [] + globls = [] + self.seek(global_section.offset) + num_globals = self.read_uleb() + for _ in range(num_globals): + global_type = self.read_type() + mutable = self.read_byte() + init = self.read_init() + globls.append(Global(global_type, mutable, init)) + return globls + + @memoize + def get_start(self): + start_section = self.get_section(SecType.START) + if not start_section: + return None + self.seek(start_section.offset) + return self.read_uleb() + + @memoize + def get_functions(self): + code_section = self.get_section(SecType.CODE) + if not code_section: + return [] + functions = [] + self.seek(code_section.offset) + num_functions = self.read_uleb() + for _ in range(num_functions): + body_size = self.read_uleb() + start = self.tell() + functions.append(FunctionBody(start, body_size)) + self.seek(start + body_size) + return functions + + def get_section(self, section_code): + return next((s for s in self.sections() if s.type == section_code), None) + + @memoize + def get_custom_section(self, name): + for section in self.sections(): + if section.type == SecType.CUSTOM and section.name == name: + return section + return None + + @memoize + def get_segments(self): + segments = [] + data_section = self.get_section(SecType.DATA) + self.seek(data_section.offset) + num_segments = self.read_uleb() + for _ in range(num_segments): + flags = self.read_uleb() + if (flags & SEG_PASSIVE): + init = None + else: + init = self.read_init() + size = self.read_uleb() + offset = self.tell() + segments.append(DataSegment(flags, init, offset, size)) + self.seek(offset + size) + return segments + + @memoize + def get_tables(self): + table_section = self.get_section(SecType.TABLE) + if not table_section: + return [] + + self.seek(table_section.offset) + num_tables = self.read_uleb() + tables = [] + for _ in range(num_tables): + elem_type = self.read_type() + limits = self.read_limits() + tables.append(Table(elem_type, limits)) + + return tables + + @memoize + def get_function_types(self): + function_section = self.get_section(SecType.FUNCTION) + if not function_section: + return [] + + self.seek(function_section.offset) + num_types = self.read_uleb() + return [self.read_uleb() for _ in range(num_types)] + + @memoize + def get_function_names(self, remove_imports=True): + num_funcs = self.num_imported_funcs() + len(self.get_functions()) + names = [None] * num_funcs + + name_section = self.get_custom_section('name') + if not name_section: + return names + + self.seek(name_section.offset) + self.read_string() # section name + section_end = name_section.offset + name_section.size + + while self.tell() < section_end: + subsection_id = self.read_byte() + subsection_size = self.read_uleb() + if subsection_id == 1: # function names + count = self.read_uleb() + for _ in range(count): + func_idx = self.read_uleb() + func_name = self.read_string() + assert func_idx < len(names) + names[func_idx] = func_name + else: + self.skip(subsection_size) + + return names[self.num_imported_funcs():] if remove_imports else names + + def has_name_section(self): + return self.get_custom_section('name') is not None + + @once + def _calc_indexes(self): + self.imports_by_kind = {} + for i in self.get_imports(): + self.imports_by_kind.setdefault(i.kind, []) + self.imports_by_kind[i.kind].append(i) + + def num_imported_funcs(self): + self._calc_indexes() + return len(self.imports_by_kind.get(ExternType.FUNC, [])) + + def num_imported_globals(self): + self._calc_indexes() + return len(self.imports_by_kind.get(ExternType.GLOBAL, [])) + + def get_function(self, idx): + self._calc_indexes() + assert idx >= self.num_imported_funcs() + return self.get_functions()[idx - self.num_imported_funcs()] + + def iter_functions_by_index(self): + self._calc_indexes() + for idx in range(self.num_imported_funcs(), + self.num_imported_funcs() + len(self.get_functions())): + yield idx, self.get_function(idx) + + def get_global(self, idx): + self._calc_indexes() + assert idx >= self.num_imported_globals() + return self.get_globals()[idx - self.num_imported_globals()] + + def get_function_type(self, idx): + self._calc_indexes() + if idx < self.num_imported_funcs(): + imp = self.imports_by_kind[ExternType.FUNC][idx] + func_type = imp.type + else: + func_type = self.get_function_types()[idx - self.num_imported_funcs()] + return self.get_types()[func_type] + + @memoize + def get_target_features(self): + section = self.get_custom_section('target_features') + if not section: + return {} + self.seek(section.offset) + assert self.read_string() == 'target_features' + features = {} + self.read_byte() # ignore feature count + while self.tell() < section.offset + section.size: + prefix = TargetFeaturePrefix(self.read_byte()) + feature = self.read_string() + features[feature] = prefix + return features + + @memoize + def get_sourceMappingURL(self): + section = self.get_custom_section('sourceMappingURL') + if not section: + return '' + self.seek(section.offset) + self.read_string() # 'sourceMappingURL' + return self.read_string() def parse_dylink_section(wasm_file): - with Module(wasm_file) as module: - return module.parse_dylink_section() + with Module(wasm_file) as module: + return module.parse_dylink_section() def get_exports(wasm_file): - with Module(wasm_file) as module: - return module.get_exports() + with Module(wasm_file) as module: + return module.get_exports() def get_imports(wasm_file): - with Module(wasm_file) as module: - return module.get_imports() + with Module(wasm_file) as module: + return module.get_imports() + + +def get_weak_imports(wasm_file): + weak_imports = [] + dylink_sec = parse_dylink_section(wasm_file) + for symbols in dylink_sec.import_info.values(): + for symbol, flags in symbols.items(): + if flags & SYMBOL_BINDING_MASK == SYMBOL_BINDING_WEAK: + weak_imports.append(symbol) + return weak_imports