diff --git a/rig/geometry.py b/rig/geometry.py index 048029e..7b082a2 100644 --- a/rig/geometry.py +++ b/rig/geometry.py @@ -5,6 +5,8 @@ from math import sqrt +import numpy as np + def to_xyz(xy): """Convert a two-tuple (x, y) coordinate into an (x, y, 0) coordinate.""" @@ -214,3 +216,84 @@ def standard_system_dimensions(num_boards): # Convert the number of triads into numbers of chips (each triad of boards # contributes as 12x12 block of chips). return (w * 12, h * 12) + + +def spinn5_eth_coords(width, height): + """Generate a list of board coordinates with Ethernet connectivity in a + SpiNNaker machine. + + Specifically, generates the coordinates for the Ethernet connected chips of + SpiNN-5 boards arranged in a standard torus topology. + + Parameters + ---------- + width : int + Width of the system in chips. + height : int + Height of the system in chips. + """ + # Internally, work with the width and height rounded up to the next + # multiple of 12 + w = ((width + 11) // 12) * 12 + h = ((height + 11) // 12) * 12 + + for x in range(0, w, 12): + for y in range(0, h, 12): + for dx, dy in ((0, 0), (4, 8), (8, 4)): + nx = (x + dx) % w + ny = (y + dy) % h + # Skip points which are outside the range available + if nx < width and ny < height: + yield (nx, ny) + + +def spinn5_local_eth_coord(x, y, w, h): + """Get the coordinates of a chip's local ethernet connected chip. + + .. note:: + This function assumes the system is constructed from SpiNN-5 boards + returns the coordinates of the ethernet connected chip on the current + board. + + Parameters + ---------- + x : int + y : int + w : int + Width of the system in chips. + h : int + Height of the system in chips. + """ + dx, dy = SPINN5_ETH_OFFSET[y % 12][x % 12] + return ((x + dx) % w), ((y + dy) % h) + + +SPINN5_ETH_OFFSET = np.array([ + [(vx - x, vy - y) for x, (vx, vy) in enumerate(row)] + for y, row in enumerate([ + # Below is an enumeration of the absolute coordinates of the nearest + # ethernet connected chip. Note that the above list comprehension + # changes these into offsets to the nearest chip. + # X: 0 1 2 3 4 5 6 7 8 9 10 11 # noqa Y: + [(+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+4, -4), (+4, -4), (+4, -4), (+4, -4), (+4, -4), (+4, -4), (+4, -4)], # noqa 0 + [(+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+4, -4), (+4, -4), (+4, -4), (+4, -4), (+4, -4), (+4, -4)], # noqa 1 + [(+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+4, -4), (+4, -4), (+4, -4), (+4, -4), (+4, -4)], # noqa 2 + [(+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+4, -4), (+4, -4), (+4, -4), (+4, -4)], # noqa 3 + [(-4, +4), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+8, +4), (+8, +4), (+8, +4), (+8, +4)], # noqa 4 + [(-4, +4), (-4, +4), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+8, +4), (+8, +4), (+8, +4), (+8, +4)], # noqa 5 + [(-4, +4), (-4, +4), (-4, +4), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+8, +4), (+8, +4), (+8, +4), (+8, +4)], # noqa 6 + [(-4, +4), (-4, +4), (-4, +4), (-4, +4), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+8, +4), (+8, +4), (+8, +4), (+8, +4)], # noqa 7 + [(-4, +4), (-4, +4), (-4, +4), (-4, +4), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+8, +4), (+8, +4), (+8, +4)], # noqa 8 + [(-4, +4), (-4, +4), (-4, +4), (-4, +4), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+8, +4), (+8, +4)], # noqa 9 + [(-4, +4), (-4, +4), (-4, +4), (-4, +4), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+8, +4)], # noqa 10 + [(-4, +4), (-4, +4), (-4, +4), (-4, +4), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8)] # noqa 11 + ]) +], dtype=int) +"""SpiNN-5 ethernet connected chip lookup. + +Used by :py:func:`.spinn5_local_eth_coord`. Given an x and y chip position +modulo 12, return the offset of the board's bottom-left chip from the chip's +position. + +Note: the order of indexes: ``SPINN5_ETH_OFFSET[y][x]``! +""" diff --git a/rig/machine_control/consts.py b/rig/machine_control/consts.py index 69da837..b7c61fd 100644 --- a/rig/machine_control/consts.py +++ b/rig/machine_control/consts.py @@ -18,14 +18,6 @@ produce (256 + 8 bytes). """ -SCP_RC_OK = 0x80 -"""SCP response code which indicates that everything was fine.""" - -SCP_RC_TIMEOUT = {0x8b, 0x8c, 0x8d, 0x8e} -"""SCP response codes which should be treated as if they were a packet timing -out. -""" - SPINNAKER_RTR_BASE = 0xE1000000 # Unbuffered """Base address of router hardware registers.""" @@ -93,6 +85,58 @@ class SCPCommands(enum.IntEnum): power = 57 # BMP main board power control +@add_int_enums_to_docstring +class SCPReturnCodes(enum.IntEnum): + """SCP return codes""" + ok = 0x80 # Command completed OK + len = 0x81 # Bad packet length (Fatal) + sum = 0x82 # Bad checksum (Retryable) + cmd = 0x83 # Bad/invalid command (Fatal) + arg = 0x84 # Invalid arguments (Fatal) + port = 0x85 # Bad port number (Fatal) + timeout = 0x86 # Monitor <-> app-core comms timeout (Fatal) + route = 0x87 # No P2P route (Fatal) + cpu = 0x88 # Bad CPU number (Fatal) + dead = 0x89 # SHM dest dead (Fatal) + buf = 0x8a # No free SHM buffers (Fatal) + p2p_noreply = 0x8b # No reply to open (Fatal) + p2p_reject = 0x8c # Open rejected (Fatal) + p2p_busy = 0x8d # Dest busy (Retryable) + p2p_timeout = 0x8e # Eth chip <--> destination comms timeout (Fatal) + pkt_tx = 0x8f # Pkt Tx failed (Fatal) + +RETRYABLE_SCP_RETURN_CODES = set([ + SCPReturnCodes.sum, + SCPReturnCodes.p2p_busy, +]) +"""The set of :py:class:`.SCPReturnCodes` values which indicate a non-fatal +retryable fault.""" + + +FATAL_SCP_RETURN_CODES = { + SCPReturnCodes.len: "Bad command length.", + SCPReturnCodes.cmd: "Bad/invalid command.", + SCPReturnCodes.arg: "Invalid command arguments.", + SCPReturnCodes.port: "Bad port number.", + SCPReturnCodes.timeout: + "Timeout waiting for the application core to respond to " + "the monitor core's request.", + SCPReturnCodes.route: "No P2P route to the target chip is available.", + SCPReturnCodes.cpu: "Bad CPU number.", + SCPReturnCodes.dead: "SHM dest dead.", + SCPReturnCodes.buf: "No free SHM buffers.", + SCPReturnCodes.p2p_noreply: + "No response packets from the target reached the " + "ethernet connected chip.", + SCPReturnCodes.p2p_reject: "The target chip rejected the packet.", + SCPReturnCodes.p2p_timeout: + "Communications between the ethernet connected chip and target chip " + "timedout.", + SCPReturnCodes.pkt_tx: "Packet transmission failed.", +} +"""The set of fatal SCP errors and a human-readable error.""" + + @add_int_enums_to_docstring class DataType(enum.IntEnum): """Used to specify the size of data being read to/from a SpiNNaker machine diff --git a/rig/machine_control/machine_controller.py b/rig/machine_control/machine_controller.py index 72b5a6f..c554751 100644 --- a/rig/machine_control/machine_controller.py +++ b/rig/machine_control/machine_controller.py @@ -1,5 +1,6 @@ """A high level interface for controlling a SpiNNaker system.""" +import sys import collections import functools import os @@ -9,14 +10,21 @@ import struct import time import pkg_resources +import traceback + +from threading import Thread, Lock, Event from .consts import SCPCommands, NNCommands, NNConstants, AppFlags, LEDAction from . import boot, consts, regions, struct_file from .scp_connection import SCPConnection +from rig.machine_control.scp_connection import SCPError + from rig import routing_table from rig.machine import Cores, SDRAM, SRAM, Links, Machine +from rig.geometry import spinn5_eth_coords, spinn5_local_eth_coord + from rig.utils.contexts import ContextMixin, Required from rig.utils.docstrings import add_signature_to_docstring @@ -107,10 +115,29 @@ def __init__(self, initial_host, scp_port=consts.SCP_PORT, "boot/sark.struct") self.structs = struct_file.read_struct_file(struct_data) - # Create the initial connection - self.connections = [ - SCPConnection(initial_host, scp_port, n_tries, timeout) - ] + # This dictionary contains a lookup from chip (x, y) to the + # SCPConnection associated with that chip. The special entry with the + # key None is reserved for the connection initially made to the + # machine and is special since it is always known to exist but its + # actual position in the network is unknown. + self.connections = { + None: SCPConnection(initial_host, scp_port, n_tries, timeout) + } + + # The dimensions of the system. This is set by discover_connections() + # and is used by _get_connection to determine which of the above + # connections to use. + self._width = None + self._height = None + + # Postponed operation queues for reads/writes issued with the postponed + # argument set. These are listed per-connection and are flushed by the + # flush_postponed_io method. Each queue contains a list of + # zero-argument function which carries out the required I/O operation. + # + # {connection: deque([f, ...]), ...} + self._postponed = collections.defaultdict(collections.deque) + self._postponed_lock = Lock() def __call__(self, **context_args): """For use with `with`: set default argument values. @@ -152,9 +179,9 @@ def send_scp(self, *args, **kwargs): This function is a thin wrapper around :py:meth:`rig.machine_control.scp_connection.SCPConnection.send_scp`. - Future versions of this command will automatically choose the most - appropriate connection to use for machines with more than one Ethernet - connection. + This function will attempt to use the SCP connection nearest the + destination of the SCP command if multiple connections have been + discovered using :py:meth:`.discover_connections`. Parameters ---------- @@ -173,7 +200,20 @@ def send_scp(self, *args, **kwargs): def _get_connection(self, x, y): """Get the appropriate connection for a chip.""" - return self.connections[0] + if self._width is None or self._height is None: + return self.connections[None] + else: + # If possible, use the local Ethernet connected chip + eth_chip = spinn5_local_eth_coord(x, y, self._width, self._height) + conn = self.connections.get(eth_chip) + if conn is not None: + return conn + else: + # If no connection was available to the local board, chose + # another arbitrarily. + # XXX: This choice will cause lots of contention in systems + # with many missing Ethernet connections. + return self.connections[None] def _send_scp(self, x, y, p, *args, **kwargs): """Determine the best connection to use to send an SCP packet and use @@ -249,6 +289,63 @@ def boot(self, width, height, **boot_kwargs): **boot_kwargs) assert len(self.structs) > 0 + @ContextMixin.use_contextual_arguments() + def discover_connections(self, x=0, y=0): + """Attempt to discover all available Ethernet connections to a machine. + + After calling this method, :py:class:`.MachineController` will attempt + to communicate via the Ethernet connection on the same board as the + destination chip for all commands. + + If called multiple times, existing connections will be retained in + preference to new ones. + + .. note:: + The system must be booted for this command to succeed. + + .. note:: + Currently, only systems comprised of multiple Ethernet-connected + SpiNN-5 boards are supported. + + Parameters + ---------- + x : int + y : int + (Optional) The coordinates of the chip to initially use to query + the system for the set of live chips. + + Returns + ------- + int + The number of new connections established. + """ + working_chips = set( + (x, y) + for (x, y), route in iteritems(self.get_p2p_routing_table(x, y)) + if route != consts.P2PTableEntry.none) + self._width = max(x for x, y in working_chips) + 1 + self._height = max(y for x, y in working_chips) + 1 + + num_new_connections = 0 + + for x, y in spinn5_eth_coords(self._width, self._height): + if (x, y) in working_chips and (x, y) not in self.connections: + ip = self.get_ip_address(x, y) + if ip is not None: + # Create a connection to the IP + self.connections[(x, y)] = \ + SCPConnection(ip, self.scp_port, + self.n_tries, self.timeout) + # Attempt to use the connection (and remove it if it + # doesn't work) + try: + self.get_software_version(x, y) + num_new_connections += 1 + except SCPError: + self.connections.pop((x, y)).close() + + return num_new_connections + @ContextMixin.use_contextual_arguments() def application(self, app_id): """Update the context to use the given application ID and stop the @@ -293,7 +390,25 @@ def get_software_version(self, x, y, processor=0): sver.arg3, sver.data.decode("utf-8")) @ContextMixin.use_contextual_arguments() - def write(self, address, data, x, y, p=0): + def get_ip_address(self, x, y): + """Get the IP address of a particular SpiNNaker chip's Ethernet link. + + Returns + ------- + str or None + The IPv4 address (as a string) of the chip's Ethernet link or None + if the chip does not have an Ethernet connection or the link is + currently down. + """ + if self.read_struct_field("sv", "eth_up", x=x, y=y): + ip = self.read_struct_field("sv", "ip_addr", x=x, y=y) + # Convert the IP address to the standard decimal string format + return ".".join(str((ip >> i) & 0xFF) for i in range(0, 32, 8)) + else: + return None + + @ContextMixin.use_contextual_arguments() + def write(self, address, data, x, y, p=0, postpone=False): """Write a bytestring to an address in memory. It is strongly encouraged to only read and write to blocks of memory @@ -308,17 +423,37 @@ def write(self, address, data, x, y, p=0): The address at which to start writing the data. Addresses are given within the address space of a SpiNNaker core. See the SpiNNaker datasheet for more information. - data : :py:class:`bytes` + data : :py:class:`bytes` or callable Data to write into memory. Writes are automatically broken into a sequence of SCP write commands. + + If a callable is given, it will be called when the write is about + to be carried out and must return a bytearray (or similar) to be + written. + postpone : bool + If False (the default), writes are performed straight away and this + function returns once the write completes. + + If True, the write will be queued and carried out in parallel when + :py:meth:`.flush_postponed_io` is called. The data object must + remain valid until this function returns. If a callable is passed + as the data argument, the callable may be called from another + thread and with in specific order with respect to other calls to + this function. """ # Call the SCPConnection to perform the write on our behalf connection = self._get_connection(x, y) - return connection.write(self.scp_data_length, self.scp_window_size, - x, y, p, address, data) + f = (lambda: connection.write(self.scp_data_length, self.scp_window_size, + x, y, p, address, + data() if callable(data) else data)) + + if postpone: + self._postponed[connection].append(f) + else: + f() @ContextMixin.use_contextual_arguments() - def read(self, address, length_bytes, x, y, p=0): + def read(self, address, length_bytes, x, y, p=0, on_read=None, postpone=False): """Read a bytestring from an address in memory. Parameters @@ -328,16 +463,100 @@ def read(self, address, length_bytes, x, y, p=0): length_bytes : int The number of bytes to read from memory. Large reads are transparently broken into multiple SCP read commands. + on_read : callable + If supplied, this is called with the read data as an argument when + the read completes. Otherwise, the read data is returned directly. + postpone : bool + If False (the default), the read will occur immediately and the + read value returned or used as an argument to on_read. + + If True, the read will be performed when + :py:meth:`.flush_postponed_io` is called and an uninitialised + bytearray will be returned. When the read actually occurs, this + bytearray will be populated and the on_read method called with + a reference to the bytearray. Note that on_read may be called from + another thread and the order of calls to on_read is not guaranteed. Returns ------- - :py:class:`bytes` - The data is read back from memory as a bytestring. + :py:class:`bytearray` or None + If on_read is not given, the data read back from memory is returned + if postpone is False. If postpone is True, an uninitialised + bytearray will be returned. The bytearray will be populated when + :py:meth:`.flush_postponed_io` is called. + + If on_read is given, this method returns None. """ # Call the SCPConnection to perform the read on our behalf connection = self._get_connection(x, y) - return connection.read(self.scp_data_length, self.scp_window_size, - x, y, p, address, length_bytes) + if on_read is not None: + data = None + f = (lambda: on_read(connection.read(self.scp_data_length, + self.scp_window_size, + x, y, p, + address, length_bytes))) + else: + data = bytearray(length_bytes) + f = (lambda: connection.readinto(data, self.scp_data_length, + self.scp_window_size, + x, y, p, + address, length_bytes)) + + if postpone: + with self._postponed_lock: + self._postponed[connection].append(f) + else: + f() + + return data + + @ContextMixin.use_contextual_arguments() + def readinto(self, address, buffer, length_bytes, x, y, p=0, on_read=None, postpone=False): + """Read a from an address in memory into the supplied buffer. + + Parameters + ---------- + address : int + The address at which to start reading the data. + buffer : bytearray or callable + A bufferable object (e.g. bytearray) of at least length_bytes in + size into which the data will be read. + + If a callable is supplied, this will be called with no arguments + just before the read is carried out and the function must return a + bufferable object into which the result will be written. + length_bytes : int + The number of bytes to read from memory. Large reads are + transparently broken into multiple SCP read commands. + on_read : callable + If supplied, this is called with supplied buffer as an argument + when the read completes. + postpone : bool + If False (the default), the read will occur immediately and the + read value will be placed into the supplied buffer. + + If True, the read will be performed when + :py:meth:`.flush_postponed_io` is called. Note that on_read and + buffer may be called from another thread and the order of other + calls to buffer and on_read is not guaranteed. + """ + # Call the SCPConnection to perform the read on our behalf + connection = self._get_connection(x, y) + + def f(): + buf = buffer() if callable(buffer) else buffer + connection.readinto(buf, + self.scp_data_length, self.scp_window_size, + x, y, p, + address, length_bytes) + if on_read is not None: + on_read(buf) + + if postpone: + with self._postponed_lock: + self._postponed[connection].append(f) + else: + f() def _get_struct_field_and_address(self, struct_name, field_name): field = self.structs[six.b(struct_name)][six.b(field_name)] @@ -380,7 +599,8 @@ def read_struct_field(self, struct_name, field_name, x, y, p=0): length = struct.calcsize(pack_chars) # Perform the read - data = self.read(address, length, x, y, p) + data = self.read(address, length, x, y, p, + on_read=None, postpone=False) # Unpack the data unpacked = struct.unpack(pack_chars, data) @@ -391,7 +611,8 @@ def read_struct_field(self, struct_name, field_name, x, y, p=0): return unpacked @ContextMixin.use_contextual_arguments() - def write_struct_field(self, struct_name, field_name, values, x, y, p=0): + def write_struct_field(self, struct_name, field_name, values, x, y, p=0, + postpone=False): """Write a value into a struct. This method is particularly useful for writing values into the ``sv`` @@ -406,6 +627,12 @@ def write_struct_field(self, struct_name, field_name, values, x, y, p=0): Name of the field to write, e.g., `"random"` values : Value(s) to be written into the field. + postpone : bool + If False (the default), writes are performed straight away and this + function returns once the write completes. + + If True, the write will be queued and carried out in parallel when + :py:meth:`.flush_postponed_io` is called. .. warning:: Fields which are arrays must currently be written in their @@ -422,7 +649,7 @@ def write_struct_field(self, struct_name, field_name, values, x, y, p=0): data = struct.pack(pack_chars, values) # Perform the write - self.write(address, data, x, y, p) + self.write(address, data, x, y, p, postpone=postpone) def _get_vcpu_field_and_address(self, field_name, x, y, p): """Get the field and address for a VCPU struct field.""" @@ -458,7 +685,7 @@ def read_vcpu_struct_field(self, field_name, x, y, p): # Perform the read length = struct.calcsize(pack_chars) - data = self.read(address, length, x, y) + data = self.read(address, length, x, y, on_read=None, postpone=False) # Unpack and return unpacked = struct.unpack(pack_chars, data) @@ -475,7 +702,8 @@ def read_vcpu_struct_field(self, field_name, x, y, p): return unpacked # pragma: no cover @ContextMixin.use_contextual_arguments() - def write_vcpu_struct_field(self, field_name, value, x, y, p): + def write_vcpu_struct_field(self, field_name, value, x, y, p, + postponse=True): """Write a value to the VCPU struct for a specific core. Parameters @@ -484,6 +712,12 @@ def write_vcpu_struct_field(self, field_name, value, x, y, p): Name of the field to write (e.g. `"user0"`) value : Value to write to this field. + postpone : bool + If False (the default), writes are performed straight away and this + function returns once the write completes. + + If True, the write will be queued and carried out in parallel when + :py:meth:`.flush_postponed_io` is called. """ field, address, pack_chars = \ self._get_vcpu_field_and_address(field_name, x, y, p) @@ -499,7 +733,7 @@ def write_vcpu_struct_field(self, field_name, value, x, y, p): data = struct.pack(pack_chars, *value) # pragma: no cover # Perform the write - self.write(address, data, x, y) + self.write(address, data, x, y, postpone=postpone) @ContextMixin.use_contextual_arguments() def get_processor_status(self, p, x, y): @@ -515,7 +749,8 @@ def get_processor_status(self, p, x, y): self.structs[b"vcpu"].size * p) # Get the VCPU data - data = self.read(address, self.structs[b"vcpu"].size, x, y) + data = self.read(address, self.structs[b"vcpu"].size, x, y, + on_read=None, postpone=False) # Build the kwargs that describe the current state state = { @@ -554,7 +789,8 @@ def get_iobuf(self, p, x, y): while address: # The IOBUF data is proceeded by a header which gives the next # address and also the length of the string in the current buffer. - iobuf_data = self.read(address, iobuf_size + 16, x, y) + iobuf_data = self.read(address, iobuf_size + 16, x, y, + on_read=False, postpone=False) address, time, ms, length = struct.unpack("<4I", iobuf_data[:16]) iobuf += iobuf_data[16:16 + length].decode("utf-8") @@ -570,7 +806,8 @@ def get_router_diagnostics(self, x, y): Description of the state of the counters. """ # Read the block of memory - data = self.read(0xe1000300, 64, x=x, y=y) + data = self.read(0xe1000300, 64, x=x, y=y, + on_read=None, postpone=False) # Convert to 16 ints, then process that as the appropriate tuple type return RouterDiagnostics(*struct.unpack("<16I", data)) @@ -694,7 +931,8 @@ def sdram_alloc(self, size, tag=0, x=Required, y=Required, @ContextMixin.use_contextual_arguments() def sdram_alloc_as_filelike(self, size, tag=0, x=Required, y=Required, - app_id=Required, buffer_size=0): + app_id=Required, buffer_size=0, + postpone=False): """Like :py:meth:`.sdram_alloc` but returns a file-like object which allows safe reading and writing to the block that is allocated. @@ -705,6 +943,12 @@ def sdram_alloc_as_filelike(self, size, tag=0, x=Required, y=Required, If this is set to anything but `0` (the default) then :py:meth:`~.MemoryIO.flush` should be called to ensure that all writes are completed. + postpone : bool + If False (the default), reads and (flushed) writes to the returned + object are carried out immediately. + + If True, any reads/(flushed) writes will be queued and carried out + in parallel when :py:meth:`.flush_postponed_io` is called. Returns ------- @@ -722,7 +966,7 @@ def sdram_alloc_as_filelike(self, size, tag=0, x=Required, y=Required, start_address = self.sdram_alloc(size, tag, x, y, app_id) return MemoryIO(self, x, y, start_address, start_address + size, - buffer_size=buffer_size) + buffer_size=buffer_size, postpone=postpone) def _get_next_nn_id(self): """Get the next nearest neighbour ID.""" @@ -1224,7 +1468,7 @@ def load_routing_table_entries(self, entries, x, y, app_id): struct.pack_into(consts.RTE_PACK_STRING, data, i*16, i, 0, route, entry.key, entry.mask) - self.write(buf, data, x, y) + self.write(buf, data, x, y, postpone=False) # Perform the load of the data into the router self._send_scp( @@ -1247,7 +1491,8 @@ def get_routing_table_entries(self, x, y): # Determine where to read from, perform the read rtr_addr = self.read_struct_field("sv", "rtr_copy", x, y) read_size = struct.calcsize(consts.RTE_PACK_STRING) - rtr_data = self.read(rtr_addr, consts.RTR_ENTRIES * read_size, x, y) + rtr_data = self.read(rtr_addr, consts.RTR_ENTRIES * read_size, x, y, + on_read=None, postpone=False) # Read each routing table entry in turn table = list() @@ -1294,7 +1539,8 @@ def get_p2p_routing_table(self, x, y): raw_table_col = self.read( consts.SPINNAKER_RTR_P2P + (((256 * col) // 8) * 4), col_words, - x, y + x, y, + on_read=None, postpone=False, ) row = 0 @@ -1328,7 +1574,8 @@ def get_num_working_cores(self, x, y): """Return the number of working cores, including the monitor.""" return self.read_struct_field("sv", "num_cpus", x, y) - def get_machine(self, default_num_cores=18): + @ContextMixin.use_contextual_arguments() + def get_machine(self, x=0, y=0, default_num_cores=18): """Probe the machine to discover which cores and links are working. .. note:: @@ -1342,7 +1589,12 @@ def get_machine(self, default_num_cores=18): .. note:: The size of the SDRAM and SysRAM heaps is assumed to be the same - for all chips and is only checked on chip (0, 0). + for all chips and is only checked on chip (x, y). + + .. note:: + The chip (x, y) supplied is the one which will be where the search + for working chips begins. Selecting anything other than (0, 0), the + default, may be useful when debugging very broken machines. Parameters ---------- @@ -1364,19 +1616,19 @@ def get_machine(self, default_num_cores=18): :py:data:`~rig.machine.SRAM` The size of the SysRAM heap. """ - p2p_tables = self.get_p2p_routing_table(0, 0) + p2p_tables = self.get_p2p_routing_table(x, y) # Calculate the extent of the system - max_x = max(x for (x, y), r in iteritems(p2p_tables) + max_x = max(x_ for (x_, y_), r in iteritems(p2p_tables) if r != consts.P2PTableEntry.none) - max_y = max(y for (x, y), r in iteritems(p2p_tables) + max_y = max(y_ for (x_, y_), r in iteritems(p2p_tables) if r != consts.P2PTableEntry.none) # Discover the heap sizes available for memory allocation - sdram_start = self.read_struct_field("sv", "sdram_heap", 0, 0) - sdram_end = self.read_struct_field("sv", "sdram_sys", 0, 0) - sysram_start = self.read_struct_field("sv", "sysram_heap", 0, 0) - sysram_end = self.read_struct_field("sv", "vcpu_base", 0, 0) + sdram_start = self.read_struct_field("sv", "sdram_heap", x, y) + sdram_end = self.read_struct_field("sv", "sdram_sys", x, y) + sysram_start = self.read_struct_field("sv", "sysram_heap", x, y) + sysram_end = self.read_struct_field("sv", "vcpu_base", x, y) chip_resources = {Cores: default_num_cores, SDRAM: sdram_end - sdram_start, @@ -1391,22 +1643,105 @@ def get_machine(self, default_num_cores=18): if p2p_route == consts.P2PTableEntry.none: dead_chips.add((x, y)) else: - num_working_cores = self.get_num_working_cores(x, y) - working_links = self.get_working_links(x, y) - - if num_working_cores < default_num_cores: - resource_exception = chip_resources.copy() - resource_exception[Cores] = min(default_num_cores, - num_working_cores) - chip_resource_exceptions[(x, y)] = resource_exception - - for link in set(Links) - working_links: - dead_links.add((x, y, link)) + try: + num_working_cores = self.get_num_working_cores(x, y) + working_links = self.get_working_links(x, y) + + if num_working_cores < default_num_cores: + resource_exception = chip_resources.copy() + resource_exception[Cores] = min(default_num_cores, + num_working_cores) + chip_resource_exceptions[(x, y)] = \ + resource_exception + + for link in set(Links) - working_links: + dead_links.add((x, y, link)) + except SCPError: + # The chip was listed in the P2P table but is not + # responding. Assume it is dead anyway. + dead_chips.add((x, y)) return Machine(max_x + 1, max_y + 1, chip_resources, chip_resource_exceptions, dead_chips, dead_links) + + @ContextMixin.use_contextual_arguments() + def flush_postponed_io(self, max_num_connections=24): + """Carry out all postponed I/O operations in parallel. + + Parameters + ---------- + max_num_connections : int + Gives the maximum number of simultaneous connections to use. + Setting this too high may result in this process becoming CPU bound + and thus not achieving high throughput. + """ + with self._postponed_lock: + num_threads = min(max_num_connections, len(self._postponed)) + + # A flag which is set if one of the threads encounter an error. + terminate_now = Event() + + # This list is populated with all exception objects raised in any of + # the worker threads + exceptions = [] + exceptions_lock = Lock() + + def queue_processor(): + """Attempts to process all postponed events for a particular + connection queue, deleting the queue when it empties.""" + while not terminate_now.is_set(): + # Get the next queue to be processed + try: + with self._postponed_lock: + connection, queue = self._postponed.popitem() + except KeyError: + # There are no more queues which need processing, terminate + # this thread. + return + + # Process that queue + while not terminate_now.is_set(): + try: + with self._postponed_lock: + f = queue.popleft() + except IndexError: + # The queue is empty, move on to the next one + break + + # Run the current queue entry handling failures sensibly + try: + f() + except Exception as e: + sys.stderr.write( + "Exception while processing a queued I/O " + "operation:\n") + traceback.print_exc() + terminate_now.set() + + with exceptions_lock: + exceptions.append(e) + + threads = [] + for _ in range(num_threads): + threads.append(Thread(target=queue_processor)) + + try: + for thread in threads: + thread.start() + + # Wait for the threads to complete + for thread in threads: + thread.join() + finally: + # If something goes wrong in the above, trigger the termination of + # all threads and attempt to wait for this + terminate_now.set() + for thread in threads: + thread.join() + + return exceptions class CoreInfo(collections.namedtuple( @@ -1606,7 +1941,7 @@ class MemoryIO(object): """ def __init__(self, machine_controller, x, y, start_address, end_address, - buffer_size=0, _write_buffer=None): + buffer_size=0, postpone=False, _write_buffer=None): """Create a file-like view onto a subset of the memory-space of a chip. Parameters @@ -1624,6 +1959,13 @@ def __init__(self, machine_controller, x, y, start_address, end_address, End address in memory. buffer_size : int Number of bytes to store in the write buffer. + postpone : bool + If False (the default), reads and (flushed) writes are performed + immediately and their result returned to the caller. + + If True, reads and (flushed) writes will be placed in a queue and + executed in parallel when + :py:meth:`.MachineController.flush_postponed_io` is called. _write_buffer : :py:class:`._WriteBufferChild` Internal use only, the write buffer to use to combine writes. @@ -1635,11 +1977,12 @@ def __init__(self, machine_controller, x, y, start_address, end_address, self._x = x self._y = y self._machine_controller = machine_controller + self._postpone = postpone # Get, or create, a write buffer if _write_buffer is None: _write_buffer = _WriteBuffer(x, y, 0, machine_controller, - buffer_size) + buffer_size, self._postpone) self._write_buffer = _write_buffer # Store and clip the addresses @@ -1704,6 +2047,7 @@ def __getitem__(self, sl): return type(self)( self._machine_controller, self._x, self._y, start_address, end_address, + self._postpone, _write_buffer=self._write_buffer ) else: @@ -1721,8 +2065,22 @@ def __exit__(self, exception_type, exception_value, traceback): """Exit a block and call :py:meth:`~.close`.""" self.close() + def _read_n_bytes(self, n_bytes): + """Return the number of bytes to actually read accounting for the + cursor position. + """ + # If n_bytes is negative then calculate it as the number of bytes left + if n_bytes < 0: + n_bytes = self._end_address - self.address + + # Determine how far to read, then read nothing beyond that point. + if self.address + n_bytes > self._end_address: + n_bytes = min(n_bytes, self._end_address - self.address) + + return n_bytes + @_if_not_closed - def read(self, n_bytes=-1): + def read(self, n_bytes=-1, on_read=None): """Read a number of bytes from the memory. .. note:: @@ -1733,34 +2091,93 @@ def read(self, n_bytes=-1): n_bytes : int A number of bytes to read. If the number of bytes is negative or omitted then read all data until the end of memory region. + on_read : callable + If supplied, this is called with the read data as an argument when + the read completes. Otherwise, the read data is returned directly. Returns ------- - :py:class:`bytes` - Data read from SpiNNaker as a bytestring. + :py:class:`bytes` or None + If on_read is not given and postpone is set to False for this + MemoryIO, the data read from SpiNNaker is returned. + + If on_read is not given and postpone is set to True for this + MemoryIO, an uninitialised bytearray is returned which will be + populated with the read data when + :py:meth:`.MachineController.flush_postponed_io` is called. + + If on_read is supplied, this function will return None and when the + read completes, the read data will be given as an argument to + on_read. Note that on_read may be called from another thread and + the order of calls to on_read is not guaranteed. """ # Flush this write buffer self.flush() - # If n_bytes is negative then calculate it as the number of bytes left - if n_bytes < 0: - n_bytes = self._end_address - self.address - - # Determine how far to read, then read nothing beyond that point. - if self.address + n_bytes > self._end_address: - n_bytes = min(n_bytes, self._end_address - self.address) - + n_bytes = self._read_n_bytes(n_bytes) if n_bytes <= 0: + if callable(on_read): + on_read(b'') return b'' + else: + # Note: the offset is updated before the read (and the read address + # compensated) such that if the on_read callback interacts with + # this MemoryIO, the addresses are kept consistent. + self._offset += n_bytes + return self._machine_controller.read( + self.address - n_bytes, n_bytes, self._x, self._y, 0, + on_read=on_read, postpone=self._postpone) - # Perform the read and increment the offset - data = self._machine_controller.read( - self.address, n_bytes, self._x, self._y, 0) - self._offset += n_bytes - return data + @_if_not_closed + def readinto(self, buffer, n_bytes=-1, on_read=None): + """Read a number of bytes from the memory into a supplied buffer. + + .. note:: + Reads beyond the specified memory range will be truncated. + + Parameters + ---------- + buffer : bytearray or callable + A bufferable object (e.g. bytearray) of at least n_bytes in size + into which the data will be read. + + If a callable is supplied, this will be called with no arguments + just before the read is carried out and the function must return a + bufferable object into which the result will be written. + + If postpone is set to False for this MemoryIO, the read will be + completed before this method returns. If set to True, the read will + actually occur when + :py:meth:`.MachineController.flush_postponed_io` is called. + n_bytes : int + A number of bytes to read. If the number of bytes is negative or + omitted then read all data until the end of memory region. + on_read : callable + If supplied, this is called with supplied buffer as an argument + when the read completes. + """ + # Flush this write buffer + self.flush() + + n_bytes = self._read_n_bytes(n_bytes) + + if n_bytes <= 0: + if callable(on_read): + on_read(buffer() if callable(buffer) else buffer) + elif callable(buffer): + buffer() + return + else: + # Note: the offset is updated before the read (and the read address + # compensated) such that if the on_read callback interacts with + # this MemoryIO, the addresses are kept consistent. + self._offset += n_bytes + self._machine_controller.readinto( + self.address - n_bytes, buffer, n_bytes, self._x, self._y, 0, + on_read=on_read, postpone=self._postpone) @_if_not_closed - def write(self, bytes): + def write(self, bytes, n_bytes=None): """Write data to the memory. .. warning:: @@ -1774,26 +2191,53 @@ def write(self, bytes): Parameters ---------- - bytes : :py:class:`bytes` - Data to write to the memory as a bytestring. + bytes : :py:class:`bytes` or callable + Data to write into memory. + + If a callable is given, it will be called when the write is about + to be carried out and must return a bytearray (or similar) to be + written. Note that at present write data supplied as a callable is + never buffered and writing a callable causes the write buffer to be + flushed. + + For non-callables, if the buffer size is non-zero, the data will be + buffered immediately. If the buffer size is zero, or the data + written is too large to fit in the buffer, the write will be passed + directly to :py:meth:`.MachineController.write`. This means that if + postpone is set to False for this MemoryIO, the write will be + complete before this method returns but if it is set to True, the + write will actually occur when + :py:meth:`.MachineController.flush_postponed_io` is called. + n_bytes : int + The number of bytes to write. This field is optional when bytes + supports the `len` operator but is mandatory when it does not. Returns ------- - int - Number of bytes written. + int or None + Number of bytes written or buffered. """ - if self.address + len(bytes) > self._end_address: - n_bytes = min(len(bytes), self._end_address - self.address) + n_bytes = n_bytes if n_bytes is not None else len(bytes) + + if self.address + n_bytes > self._end_address: + n_bytes = min(n_bytes, self._end_address - self.address) if n_bytes <= 0: + if callable(bytes): + bytes() return 0 - - bytes = bytes[:n_bytes] + + if callable(bytes): + bytes_ = (lambda: bytes()[:n_bytes]) + else: + bytes_ = bytes[:n_bytes] + else: + bytes_ = bytes # Perform the write and increment the offset - self._write_buffer.add_new_write(self.address, bytes) - self._offset += len(bytes) - return len(bytes) + self._offset += n_bytes + self._write_buffer.add_new_write(self.address - n_bytes, bytes_) + return n_bytes @_if_not_closed def flush(self): @@ -1801,6 +2245,11 @@ def flush(self): This must be called to ensure that all writes to SpiNNaker made using this file-like object (and its siblings, if any) are completed. + + If postpone is set to False for this MemoryIO, the writes will be + completed before this method returns. If set to True, the writes will + actually occur when :py:meth:`.MachineController.flush_postponed_io` is + called. """ self._write_buffer.flush() @@ -1860,11 +2309,12 @@ class _WriteBuffer(object): together. """ - def __init__(self, x, y, p, controller, buffer_size=0): + def __init__(self, x, y, p, controller, buffer_size=0, postpone=False): self.x = x self.y = y self.p = p self.controller = controller + self.postpone = postpone # A buffer of writes self.buffer = bytearray(buffer_size) @@ -1875,11 +2325,14 @@ def __init__(self, x, y, p, controller, buffer_size=0): def add_new_write(self, start_address, data): """Add a new write to the buffer.""" - if len(data) > self.buffer_size: - # Perform the write if we couldn't buffer it at all + if callable(data) or len(data) > self.buffer_size: + # Perform the write if we couldn't buffer it at all. Unbufferable + # writes are those too large to fit in the buffer and those + # provided via callback. self.flush() # Flush to ensure ordering is preserved self.controller.write(start_address, data, - self.x, self.y, self.p) + self.x, self.y, self.p, + postpone=self.postpone) return if self.start_address is None: @@ -1921,12 +2374,17 @@ def flush(self): # Write out all the values from the buffer self.controller.write( self.start_address, self.buffer[:self.current_end], - self.x, self.y, self.p + self.x, self.y, self.p, postpone=self.postpone ) # Reset the buffer self.start_address = None self.current_end = 0 + + # If postponed writes are in use, create a new buffer since the old + # one will be used at an undetermined point in the future. + if self.postpone: + self.buffer = bytearray(buffer_size) def unpack_routing_table_entry(packed): diff --git a/rig/machine_control/packets.py b/rig/machine_control/packets.py index a3a3a01..21c68e8 100644 --- a/rig/machine_control/packets.py +++ b/rig/machine_control/packets.py @@ -143,6 +143,17 @@ def packed_data(self): # Return the SCP header and the rest of the data return scp_header + self.data + def __repr__(self): + """Produce a human-redaable summary of (the most important parts of) + the packet.""" + return ("<{} x: {}, y: {}, cpu: {}, " + "cmd_rc: {}, arg1: {}, arg2: {}, arg3: {}, " + "data: {}>".format(self.__class__.__name__, + self.dest_x, self.dest_y, self.dest_cpu, + self.cmd_rc, + self.arg1, self.arg2, self.arg3, + repr(self.data))) + def _unpack_sdp_into_packet(packet, bytestring): """Unpack the SDP header from a bytestring into a packet. diff --git a/rig/machine_control/scp_connection.py b/rig/machine_control/scp_connection.py index f3f1106..a31a372 100644 --- a/rig/machine_control/scp_connection.py +++ b/rig/machine_control/scp_connection.py @@ -50,7 +50,6 @@ def __new__(cls, x, y, p, cmd, arg1=0, arg2=0, arg3=0, data=b'', class SCPConnection(object): """Implements the SCP protocol for communicating with a SpiNNaker chip. """ - error_codes = {} def __init__(self, spinnaker_host, port=consts.SCP_PORT, n_tries=5, timeout=0.5): @@ -73,7 +72,6 @@ def __init__(self, spinnaker_host, port=consts.SCP_PORT, # Create a socket to communicate with the SpiNNaker machine self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - self.sock.settimeout(self.default_timeout) self.sock.connect((spinnaker_host, port)) # Store the number of tries that will be allowed @@ -82,15 +80,6 @@ def __init__(self, spinnaker_host, port=consts.SCP_PORT, # Sequence values self.seq = seqs() - @classmethod - def _register_error(cls, cmd_rc): - """Register an Exception class as belonging to a certain CMD_RC value. - """ - def err_(err): - cls.error_codes[cmd_rc] = err - return err - return err_ - def send_scp(self, buffer_size, x, y, p, cmd, arg1=0, arg2=0, arg3=0, data=b'', expected_args=3, timeout=0.0): """Transmit a packet to the SpiNNaker machine and block until an @@ -176,12 +165,13 @@ def send_scp_burst(self, buffer_size, window_size, class TransmittedPacket(object): """A packet which has been transmitted and still awaits a response. """ - __slots__ = ["callback", "packet", "n_tries", + __slots__ = ["callback", "packet", "bytestring", "n_tries", "timeout", "timeout_time"] def __init__(self, callback, packet, timeout): self.callback = callback self.packet = packet + self.bytestring = packet.bytestring self.n_tries = 1 self.timeout = timeout self.timeout_time = time.time() + self.timeout @@ -226,12 +216,12 @@ def __init__(self, callback, packet, timeout): # expecting a response for it and can retransmit it if # necessary. outstanding_packets[seq] = TransmittedPacket( - args.callback, packet.bytestring, + args.callback, packet, self.default_timeout + args.timeout ) # Actually send the packet - self.sock.send(outstanding_packets[seq].packet) + self.sock.send(outstanding_packets[seq].bytestring) # Listen on the socket for an acknowledgement packet, there may not # be one. @@ -260,18 +250,19 @@ def __init__(self, callback, packet, timeout): consts.SDP_HEADER_LENGTH + 2) # If the code is an error then we respond immediately - if rc != consts.SCP_RC_OK: - if rc in consts.SCP_RC_TIMEOUT: + if rc != consts.SCPReturnCodes.ok: + if rc in consts.RETRYABLE_SCP_RETURN_CODES: # If the error is timeout related then treat the packet # as though it timed out, just discard. This avoids us # hammering the board when it's most vulnerable. pass - elif rc in self.error_codes: - raise self.error_codes[rc] else: - raise SCPError( - "Unhandled exception code {:#2x}".format(rc) - ) + # For all other errors, we'll just fall over + # immediately. + packet = outstanding_packets.get(seq) + if packet is not None: + packet = packet.packet + raise FatalReturnCodeError(rc, packet) else: # Look up the sequence index of packet in the list of # outstanding packets. We may have already processed a @@ -294,10 +285,13 @@ def __init__(self, callback, packet, timeout): # the given number of times then raise a timeout error for # it. if outstanding.n_tries >= self.n_tries: - raise TimeoutError(self.n_tries) + raise TimeoutError( + "No response after {} attempts.".format( + self.n_tries), + outstanding.packet) # Otherwise we retransmit it - self.sock.send(outstanding.packet) + self.sock.send(outstanding.bytestring) outstanding.n_tries += 1 outstanding.timeout_time = (current_time + outstanding.timeout) @@ -333,6 +327,39 @@ def read(self, buffer_size, window_size, x, y, p, address, length_bytes): """ # Prepare the buffer to receive the incoming data data = bytearray(length_bytes) + self.readinto(data, buffer_size, window_size, x, y, p, address, + length_bytes) + return bytes(data) + + def readinto(self, data, buffer_size, window_size, x, y, p, address, + length_bytes): + """Read a bytestring from an address in memory into a supplied buffer. + + ..note:: + This method is included here to maintain API compatibility with an + `alternative implementation of SCP + `_. + + Parameters + ---------- + data : bytearray + An object into which supports the buffer protocol (e.g. bytearray) + into which the data will be read. + buffer_size : int + Number of bytes held in an SCP buffer by SARK, determines how many + bytes will be expected in a socket and how many bytes of data will + be read back in each packet. + window_size : int + x : int + y : int + p : int + address : int + The address at which to start reading the data. + length_bytes : int + The number of bytes to read from memory. Large reads are + transparently broken into multiple SCP read commands. + """ + # Prepare the buffer to receive the incoming data mem = memoryview(data) # Create a callback which will write the data from a packet into a @@ -366,7 +393,6 @@ def packets(length_bytes, data): # Run the event loop and then return the retrieved data self.send_scp_burst(buffer_size, window_size, packets(length_bytes, data)) - return bytes(data) def write(self, buffer_size, window_size, x, y, p, address, data): """Write a bytestring to an address in memory. @@ -416,6 +442,10 @@ def packets(address, data): # Run the event loop and then return the retrieved data self.send_scp_burst(buffer_size, window_size, packets(address, data)) + def close(self): + """Close the SCP connection.""" + self.sock.close() + def seqs(mask=0xffff): i = 0 @@ -425,34 +455,46 @@ def seqs(mask=0xffff): class SCPError(IOError): - """Base Error for SCP return codes.""" - pass - + """Base Error for SCP return codes. -class TimeoutError(SCPError): - """Raised when an SCP is not acknowledged within the given period of time. + Attributes + ---------- + packet : :py:class:`rig.machine_control.packets.SCPPacket` + The packet being processed when the error occurred. May be None if no + specific packet was involved. """ - pass + def __init__(self, message="", packet=None): + self.packet = packet + if self.packet is not None: + message = "{} (Packet: {})".format(message, packet) -@SCPConnection._register_error(0x81) -class BadPacketLengthError(SCPError): - """Raised when an SCP packet is an incorrect length.""" - pass + super(SCPError, self).__init__(message.lstrip()) -@SCPConnection._register_error(0x83) -class InvalidCommandError(SCPError): - """Raised when an SCP packet contains an invalid command code.""" +class TimeoutError(SCPError): + """Raised when an SCP is not acknowledged within the given period of time. + """ pass -@SCPConnection._register_error(0x84) -class InvalidArgsError(SCPError): - """Raised when an SCP packet has an invalid argument.""" - pass +class FatalReturnCodeError(SCPError): + """Raised when an SCP command returns with an error which is connsidered + fatal. + Attributes + ---------- + return_code : :py:class:`rig.machine_control.consts.SCPReturnCodes` or int + The return code (will be a raw integer if the code is unrecognised). + """ -@SCPConnection._register_error(0x87) -class NoRouteError(SCPError): - """Raised when there is no route to the requested core.""" + def __init__(self, return_code=None, packet=None): + try: + self.return_code = consts.SCPReturnCodes(return_code) + message = "RC_{}: {}".format( + self.return_code.name.upper(), + consts.FATAL_SCP_RETURN_CODES[self.return_code]) + except ValueError: + self.return_code = return_code + message = "Unrecognised return code {:#2X}." + super(FatalReturnCodeError, self).__init__(message, packet) diff --git a/rig/scripts/rig_ps.py b/rig/scripts/rig_ps.py index f2b89ee..897df8d 100644 --- a/rig/scripts/rig_ps.py +++ b/rig/scripts/rig_ps.py @@ -13,7 +13,7 @@ from rig.machine_control import MachineController -from rig.machine_control.scp_connection import TimeoutError +from rig.machine_control.scp_connection import SCPError, TimeoutError from rig.machine import Cores @@ -56,17 +56,25 @@ def get_process_list(mc, x_=None, y_=None, p_=None, if p_ is not None and p_ != p: continue - status = mc.get_processor_status(x=x, y=y, p=p) - keep = (match(str(status.app_id), app_ids) and - match(status.app_name, applications) and - match(status.cpu_state.name, states)) - - if keep: - yield (x, y, p, - status.cpu_state, - status.rt_code, - status.app_name, - status.app_id) + try: + status = mc.get_processor_status(x=x, y=y, p=p) + keep = (match(str(status.app_id), app_ids) and + match(status.app_name, applications) and + match(status.cpu_state.name, states)) + + if keep: + yield (x, y, p, + status.cpu_state, + status.rt_code, + status.app_name, + status.app_id) + except SCPError as e: + # If an error occurs while communicating with a chip, we bodge + # it into the "cpu_status" field and continue (note that it + # will never get filtered out). + class DeadStatus(object): + name = "{}: {}".format(e.__class__.__name__, str(e)) + yield (x, y, p, DeadStatus(), None, "", -1) def main(args=None): diff --git a/tests/machine_control/test_machine_controller.py b/tests/machine_control/test_machine_controller.py index 270fbe4..431da48 100644 --- a/tests/machine_control/test_machine_controller.py +++ b/tests/machine_control/test_machine_controller.py @@ -17,7 +17,8 @@ unpack_routing_table_entry ) from rig.machine_control.packets import SCPPacket -from rig.machine_control.scp_connection import SCPConnection +from rig.machine_control.scp_connection import \ + SCPConnection, SCPError, FatalReturnCodeError from rig.machine_control import regions, consts, struct_file from rig.machine import Cores, SDRAM, SRAM, Links, Machine @@ -98,6 +99,17 @@ def test_get_software_version(self, controller, spinnaker_width, assert sver.version >= 1.3 assert sver.position == (x, y) + def test_get_ip_address(self, controller): + """Test getting the IP address.""" + # Chip 0, 0 should report an IP address (since it is what we're + # connected via, though note that we can't check the IP since we may be + # connected via a proxy). + assert isinstance(controller.get_ip_address(0, 0), str) + + # Chip 1, 1 should not report an IP address (since in no existing + # hardware does it have an Ethernet connection).. + assert controller.get_ip_address(1, 1) is None + def test_write_and_read(self, controller): """Test write and read capabilities by writing a string to SDRAM and then reading back in a different order. @@ -137,7 +149,7 @@ def test_write_and_read_struct_values(self, controller): def test_set_get_clear_iptag(self, controller): # Get our address, then add a new IPTag pointing # **YUCK** - ip_addr = controller.connections[0].sock.getsockname()[0] + ip_addr = controller.connections[None].sock.getsockname()[0] port = 1234 iptag = 7 @@ -479,6 +491,95 @@ def test_get_software_version(self, mock_conn): # noqa assert sver.build_date == 888999 assert sver.version_string == "Hello, World!" + @pytest.mark.parametrize("has_ip", [True, False]) + def test_get_ip_address(self, has_ip): + cn = MachineController("localhost") + cn.read_struct_field = mock.Mock(side_effect=[has_ip, 0x11223344]) + + ip = cn.get_ip_address(1, 2) + + if has_ip: + assert ip == "68.51.34.17" + cn.read_struct_field.assert_has_calls([ + mock.call("sv", "eth_up", x=1, y=2), + mock.call("sv", "ip_addr", x=1, y=2), + ]) + else: + assert ip is None + cn.read_struct_field.assert_called_once_with("sv", "eth_up", + x=1, y=2) + + def test__get_connection(self): + cn = MachineController("localhost") + cn.connections = { + None: "default", + (0, 0): "0,0", + (4, 8): "4,8", + # 8, 4 is missing! + } + + # Until _width and _height are set, the default should be used at all + # times. + assert cn._get_connection(0, 0) == "default" + assert cn._get_connection(1, 0) == "default" + assert cn._get_connection(0, 1) == "default" + assert cn._get_connection(11, 0) == "default" + assert cn._get_connection(0, 11) == "default" + + # With width and height specified, the local connector should be used + # in all cases when possible + cn._width = 12 + cn._height = 12 + + assert cn._get_connection(0, 0) == "0,0" + assert cn._get_connection(1, 0) == "0,0" + assert cn._get_connection(0, 1) == "0,0" + + assert cn._get_connection(4, 8) == "4,8" + assert cn._get_connection(5, 8) == "4,8" + assert cn._get_connection(4, 9) == "4,8" + + # When a missing a connection, another connection should be used + assert cn._get_connection(8, 4) in ("default", "0,0", "4,8") + assert cn._get_connection(9, 4) in ("default", "0,0", "4,8") + assert cn._get_connection(8, 5) in ("default", "0,0", "4,8") + + def test_discover_connections(self): + # In this test, the discovered system is a 6-board system with the + # board with a dead chip on (16, 8), the Ethernet link at (4, 8) being + # down, the connection to (8, 4) resulting in timeouts and the + # connection to (20, 4) already present. + cn = MachineController("localhost") + w, h = 24, 12 + cn.get_p2p_routing_table = mock.Mock(return_value={ + (x, y): (consts.P2PTableEntry.north + if (x, y) != (16, 8) else + consts.P2PTableEntry.none) + for x in range(w) + for y in range(h) + }) + + def get_ip_address(x, y): + if (x, y) == (4, 8): + return None + else: + return "127.0.0.1" + cn.get_ip_address = mock.Mock(side_effect=get_ip_address) + + def get_software_version(x, y): + if (x, y) == (8, 4): + raise SCPError("Fail.") + cn.get_software_version = mock.Mock(side_effect=get_software_version) + + cn.connections[(20, 4)] = mock.Mock() + + assert cn.discover_connections() == 2 + assert cn._width == w + assert cn._height == h + assert set(cn.connections) == set([None, (0, 0), (12, 0), (20, 4)]) + assert isinstance(cn.connections[(0, 0)], SCPConnection) + assert isinstance(cn.connections[(12, 0)], SCPConnection) + @pytest.mark.parametrize("size", [128, 256]) def test_scp_data_length(self, size): cn = MachineController("localhost") @@ -502,13 +603,13 @@ def test_write(self, buffer_size, window_size, x, y, p, cn = MachineController("localhost") cn._scp_data_length = buffer_size cn._window_size = window_size - cn.connections[0] = mock.Mock(spec_set=SCPConnection) + cn.connections[None] = mock.Mock(spec_set=SCPConnection) # Perform the read and ensure that values are passed on as appropriate with cn(x=x, y=y, p=p): cn.write(start_address, data) - cn.connections[0].write.assert_called_once_with( + cn.connections[None].write.assert_called_once_with( buffer_size, window_size, x, y, p, start_address, data ) @@ -524,17 +625,45 @@ def test_read(self, buffer_size, window_size, x, y, p, cn = MachineController("localhost") cn._scp_data_length = buffer_size cn._window_size = window_size - cn.connections[0] = mock.Mock(spec_set=SCPConnection) - cn.connections[0].read.return_value = data + cn.connections[None] = mock.Mock(spec_set=SCPConnection) + cn.connections[None].read.return_value = data # Perform the read and ensure that values are passed on as appropriate with cn(x=x, y=y, p=p): assert data == cn.read(start_address, length) - cn.connections[0].read.assert_called_once_with( + cn.connections[None].read.assert_called_once_with( buffer_size, window_size, x, y, p, start_address, length ) + @pytest.mark.parametrize( + "buffer_size, window_size, x, y, p, start_address, length, data", + [(128, 1, 0, 1, 2, 0x67800000, 100, b"\x00" * 100), + (256, 5, 1, 4, 5, 0x67801000, 2, b"\x10\x23"), + ] + ) + def test_readinto(self, buffer_size, window_size, x, y, p, + start_address, length, data): + # Create the mock controller + cn = MachineController("localhost") + cn._scp_data_length = buffer_size + cn._window_size = window_size + cn.connections[None] = mock.Mock(spec_set=SCPConnection) + + def mock_readinto(buffer, *args, **kwargs): + buffer[:] = data + cn.connections[None].readinto.side_effect = mock_readinto + + # Perform the read and ensure that values are passed on as appropriate + with cn(x=x, y=y, p=p): + read_data = bytearray(length) + cn.readinto(start_address, read_data, length) + assert data == read_data + + assert len(cn.connections[None].readinto.mock_calls) == 1 + assert cn.connections[None].readinto.mock_calls[0][1][1:] == \ + (buffer_size, window_size, x, y, p, start_address, length) + @pytest.mark.parametrize( "iptag, addr, port", [(1, "localhost", 54321), @@ -1762,7 +1891,7 @@ def read_struct_field(struct_name, field_name, x, y, p=0): cn.read_struct_field.side_effect = read_struct_field # Return a set of p2p tables where an 8x8 set of chips is alive with - # all chips with (3,3) being dead. + # all except (3,3) being dead. cn.get_p2p_routing_table = mock.Mock() cn.get_p2p_routing_table.return_value = { (x, y): (consts.P2PTableEntry.north @@ -1773,10 +1902,13 @@ def read_struct_field(struct_name, field_name, x, y, p=0): } # Return 18 working cores except for (2, 2) which will have only 3 - # cores. + # cores and (5, 5) which will fail to respond. def get_num_working_cores(x, y): - return 18 if (x, y) != (2, 2) else 3 + if (x, y) == (5, 5): + raise FatalReturnCodeError(0) + else: + return 18 if (x, y) != (2, 2) else 3 cn.get_num_working_cores = mock.Mock() cn.get_num_working_cores.side_effect = get_num_working_cores @@ -1791,7 +1923,7 @@ def get_working_links(x, y): cn.get_working_links = mock.Mock() cn.get_working_links.side_effect = get_working_links - m = cn.get_machine() + m = cn.get_machine(x=3, y=2) # Check that the machine is correct assert isinstance(m, Machine) @@ -1809,24 +1941,24 @@ def get_working_links(x, y): SRAM: vcpu_base - sysram_heap, }, } - assert m.dead_chips == set([(3, 3)]) + assert m.dead_chips == set([(3, 3), (5, 5)]) assert m.dead_links == set([(4, 4, Links.north)]) # Check that only the expected calls were made to mocks cn.read_struct_field.assert_has_calls([ - mock.call("sv", "sdram_heap", 0, 0), - mock.call("sv", "sdram_sys", 0, 0), - mock.call("sv", "sysram_heap", 0, 0), - mock.call("sv", "vcpu_base", 0, 0), + mock.call("sv", "sdram_heap", 3, 2), + mock.call("sv", "sdram_sys", 3, 2), + mock.call("sv", "sysram_heap", 3, 2), + mock.call("sv", "vcpu_base", 3, 2), ], any_order=True) - cn.get_p2p_routing_table.assert_called_once_with(0, 0) + cn.get_p2p_routing_table.assert_called_once_with(3, 2) cn.get_num_working_cores.assert_has_calls([ mock.call(x, y) for x in range(8) for y in range(8) if (x, y) != (3, 3) ], any_order=True) cn.get_working_links.assert_has_calls([ mock.call(x, y) for x in range(8) for y in range(8) - if (x, y) != (3, 3) + if (x, y) != (3, 3) and (x, y) != (5, 5) ], any_order=True) @pytest.mark.parametrize("app_id", [66, 12]) @@ -1861,15 +1993,17 @@ def test_read(self, mock_controller, x, y, start_address, lengths): calls = [] offset = 0 for n_bytes in lengths: - sdram_file.read(n_bytes) + buf = bytearray(n_bytes) + sdram_file.readinto(buf, n_bytes) assert sdram_file.tell() == offset + n_bytes assert sdram_file.address == start_address + offset + n_bytes - calls.append(mock.call(start_address + offset, n_bytes, x, y, 0)) + calls.append(mock.call(start_address + offset, + buf, n_bytes, x, y, 0)) offset = offset + n_bytes # Check the reads caused the appropriate calls to the machine # controller. - mock_controller.read.assert_has_calls(calls) + mock_controller.readinto.assert_has_calls(calls) @pytest.mark.parametrize("x, y", [(1, 3), (3, 0)]) @pytest.mark.parametrize("start_address, length, offset", @@ -1881,18 +2015,15 @@ def test_read_no_parameter(self, mock_controller, x, y, start_address, # Assert that reading with no parameter reads the full number of bytes sdram_file.seek(offset) - sdram_file.read() - mock_controller.read.assert_called_one_with( - start_address + offset, length - offset, x, y, 0) + assert sdram_file._read_n_bytes(-1) == length - offset def test_read_beyond(self, mock_controller): sdram_file = MemoryIO(mock_controller, 0, 0, start_address=0, end_address=10) - sdram_file.read(100) - mock_controller.read.assert_called_with(0, 10, 0, 0, 0) + assert len(sdram_file.read(100)) == 10 assert sdram_file.read(1) == b'' - assert mock_controller.read.call_count == 1 + assert mock_controller.readinto.call_count == 1 @pytest.mark.parametrize("x, y", [(4, 2), (255, 1)]) @pytest.mark.parametrize("start_address", [0x60000004, 0x61000003]) @@ -2110,6 +2241,7 @@ def test_zero_length_filelike(self, mock_controller): "flush_event", [lambda filelike: filelike.flush(), lambda filelike: filelike.read(1), + lambda filelike: filelike.readinto(bytearray(1), 1), lambda filelike: filelike.close()] ) def test_coalescing_writes(self, get_node, flush_event): diff --git a/tests/machine_control/test_packets.py b/tests/machine_control/test_packets.py index 279a020..157754a 100644 --- a/tests/machine_control/test_packets.py +++ b/tests/machine_control/test_packets.py @@ -1,5 +1,7 @@ from rig.machine_control.packets import SDPPacket, SCPPacket +import sys + class TestSDPPacket(object): """Test SDPPacket representations.""" @@ -263,3 +265,15 @@ def test_from_bytestring_2_args_short(self): # Check that the bytestring this packet creates is the same as the one # we specified before. assert scp_packet.bytestring == packet + + def test_repr(self): + """Test the string representation of an SCP packet makes sense.""" + scp_packet = SCPPacket(dest_x=10, dest_y=20, dest_cpu=3, + cmd_rc=2, arg1=123, arg2=456, + data=b"foobar") + # Note: Python 2 does not have the "b" prefix on byte strings + assert repr(scp_packet) == ( + "".format( + "b" if sys.version_info >= (3, 0) else "")) diff --git a/tests/machine_control/test_scp_connection.py b/tests/machine_control/test_scp_connection.py index aab2b79..b92a2e9 100644 --- a/tests/machine_control/test_scp_connection.py +++ b/tests/machine_control/test_scp_connection.py @@ -4,9 +4,12 @@ import struct import time -from rig.machine_control.consts import SCPCommands, DataType, SDP_HEADER_LENGTH +from rig.machine_control.consts import \ + SCPCommands, DataType, SDP_HEADER_LENGTH, RETRYABLE_SCP_RETURN_CODES, \ + FATAL_SCP_RETURN_CODES from rig.machine_control.packets import SCPPacket -from rig.machine_control.scp_connection import SCPConnection, scpcall +from rig.machine_control.scp_connection import \ + SCPConnection, scpcall, FatalReturnCodeError from rig.machine_control import scp_connection @@ -40,6 +43,11 @@ def mock_conn(): return conn +def test_close(mock_conn): + mock_conn.close() + assert mock_conn.sock.close.called + + def test_scpcall(): """scpcall is a utility for specifying SCP packets and callbacks""" call = scpcall(0, 1, 2, 3) @@ -193,10 +201,11 @@ def mock_select(rlist, wlist, xlist, timeout): # times we specified. assert mock_conn.sock.send.call_count == mock_conn.n_tries - @pytest.mark.parametrize("err_code", [0x8b, 0x8c, 0x8d, 0x8e]) - def test_single_packet_fails_with_RC_P2P_ERROR(self, mock_conn, err_code): + @pytest.mark.parametrize("err_code", RETRYABLE_SCP_RETURN_CODES) + def test_single_packet_fails_with_retryable_error(self, mock_conn, + err_code): """Test correct operation for transmitting a single packet which is - always acknowledged with one of the RC_P2P error codes. + always acknowledged with one of the retryable error codes. """ # Create a packet to send packets = [scpcall(3, 5, 0, 12)] @@ -344,14 +353,9 @@ def recv(self, *args): mock.patch("select.select", new=mock_select): mock_conn.send_scp_burst(512, 8, packets()) - @pytest.mark.parametrize( - "rc, error", - [(0x81, scp_connection.BadPacketLengthError), - (0x83, scp_connection.InvalidCommandError), - (0x84, scp_connection.InvalidArgsError), - (0x87, scp_connection.NoRouteError), - (0x00, Exception)]) - def test_errors(self, mock_conn, rc, error): + @pytest.mark.parametrize("rc", + list(map(int, FATAL_SCP_RETURN_CODES)) + [0x00]) + def test_errors(self, mock_conn, rc): """Test that errors are raised when error RCs are returned.""" # Create an object which returns a packet with an error code class ReturnPacket(object): @@ -374,7 +378,7 @@ def __call__(self, packet): # Send an SCP command and check that the correct error is raised packets = [scpcall(3, 5, 0, 12)] - with pytest.raises(error), \ + with pytest.raises(FatalReturnCodeError), \ mock.patch("select.select", new=mock_select): mock_conn.send_scp_burst(256, 1, iter(packets)) @@ -439,6 +443,7 @@ def recv(self, *args, **kwargs): mock_conn.send_scp_burst(512, 8, packets) +@pytest.mark.parametrize("readinto", [True, False]) @pytest.mark.parametrize( "buffer_size, window_size, x, y, p", [(128, 1, 0, 0, 1), (256, 5, 1, 2, 3)] ) @@ -462,7 +467,7 @@ def recv(self, *args, **kwargs): (256, DataType.word, 0x60000004) ]) def test_read(buffer_size, window_size, x, y, p, n_bytes, - data_type, start_address): + data_type, start_address, readinto): mock_conn = SCPConnection("localhost") # Construct the expected calls, and hence the expected return packets @@ -503,8 +508,13 @@ def __call__(self, buffer_size, window_size, args): send_scp_burst.side_effect = ccs # Read an amount of memory specified by the size. - data = mock_conn.read(buffer_size, window_size, x, y, p, - start_address, n_bytes) + if readinto: + data = bytearray(n_bytes) + mock_conn.readinto(data, buffer_size, window_size, x, y, p, + start_address, n_bytes) + else: + data = mock_conn.read(buffer_size, window_size, x, y, p, + start_address, n_bytes) assert data == ccs.read_data # send_burst_scp should have been called once, each element in the iterator diff --git a/tests/scripts/test_rig_ps.py b/tests/scripts/test_rig_ps.py index 7315879..d772268 100644 --- a/tests/scripts/test_rig_ps.py +++ b/tests/scripts/test_rig_ps.py @@ -6,7 +6,8 @@ import rig.scripts.rig_ps as rig_ps -from rig.machine_control.scp_connection import TimeoutError +from rig.machine_control.scp_connection import \ + TimeoutError, FatalReturnCodeError from rig.machine import Machine, Cores @@ -82,6 +83,28 @@ def get_processor_status(x, y, p): ] +def test_get_process_list_dead_chip(): + mc = mock.Mock() + + machine = Machine(1, 1, chip_resources={Cores: 2}) + mc.get_machine.return_value = machine + + mc.get_processor_status.side_effect = FatalReturnCodeError(0x88) + + # Should list the failiure + ps = list(rig_ps.get_process_list(mc)) + assert len(ps) == 2 + for x, y, p, app_state, rte, name, app_id in ps: + assert x == 0 + assert y == 0 + assert 0 <= p < 2 + assert app_state.name == \ + "FatalReturnCodeError: RC_CPU: Bad CPU number." + assert bool(rte) is False + assert name == "" + assert app_id == -1 + + def test_bad_args(): # No hostname with pytest.raises(SystemExit): @@ -101,7 +124,7 @@ def test_bad_args(): def test_no_machine(monkeypatch): # Should fail if nothing responds mc = mock.Mock() - mc.get_software_version = mock.Mock(side_effect=TimeoutError) + mc.get_software_version = mock.Mock(side_effect=TimeoutError()) MC = mock.Mock() MC.return_value = mc diff --git a/tests/test_geometry.py b/tests/test_geometry.py index 37622eb..46f7293 100644 --- a/tests/test_geometry.py +++ b/tests/test_geometry.py @@ -3,7 +3,7 @@ from rig.geometry import concentric_hexagons, to_xyz, minimise_xyz, \ shortest_mesh_path_length, shortest_mesh_path, \ shortest_torus_path_length, shortest_torus_path, \ - standard_system_dimensions + standard_system_dimensions, spinn5_eth_coords, spinn5_local_eth_coord def test_concentric_hexagons(): @@ -337,3 +337,71 @@ def test_standard_system_dimensions(): assert standard_system_dimensions(3 * 1 * 3) == (36, 12) assert standard_system_dimensions(3 * 2 * 4) == (48, 24) assert standard_system_dimensions(3 * 1 * 17) == (204, 12) + + +def test_spinn5_eth_coords(): + # Minimal system + assert set(spinn5_eth_coords(12, 12)) == set([(0, 0), (4, 8), (8, 4)]) + + # Larger, non-square systems + assert set(spinn5_eth_coords(24, 12)) == set([ + (0, 0), (4, 8), (8, 4), (12, 0), (16, 8), (20, 4)]) + assert set(spinn5_eth_coords(12, 24)) == set([ + (0, 0), (4, 8), (8, 4), (0, 12), (4, 20), (8, 16)]) + + # Larger square system + assert set(spinn5_eth_coords(24, 24)) == set([ + (0, 0), (4, 8), (8, 4), + (12, 0), (16, 8), (20, 4), + (0, 12), (4, 20), (8, 16), + (12, 12), (16, 20), (20, 16) + ]) + + # Subsets for non multiples of 12 (i.e. non-spinn-5 based things) + assert set(spinn5_eth_coords(2, 2)) == set([(0, 0)]) + assert set(spinn5_eth_coords(8, 8)) == set([(0, 0)]) + + +def test_spinn5_local_eth_coord(): + # Points lie on actual eth chips + assert spinn5_local_eth_coord(0, 0, 12, 12) == (0, 0) + assert spinn5_local_eth_coord(4, 8, 12, 12) == (4, 8) + assert spinn5_local_eth_coord(8, 4, 12, 12) == (8, 4) + + assert spinn5_local_eth_coord(12, 0, 24, 12) == (12, 0) + assert spinn5_local_eth_coord(16, 8, 24, 12) == (16, 8) + assert spinn5_local_eth_coord(20, 4, 24, 12) == (20, 4) + + assert spinn5_local_eth_coord(0, 12, 12, 24) == (0, 12) + assert spinn5_local_eth_coord(8, 16, 12, 24) == (8, 16) + assert spinn5_local_eth_coord(4, 20, 12, 24) == (4, 20) + + assert spinn5_local_eth_coord(12, 12, 24, 24) == (12, 12) + assert spinn5_local_eth_coord(16, 20, 24, 24) == (16, 20) + assert spinn5_local_eth_coord(20, 16, 24, 24) == (20, 16) + + # Exhaustive check for a 12x12 system + cases = [ + # X: 0 1 2 3 4 5 6 7 8 9 10 11 # noqa Y: + [(+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8)], # noqa 0 + [(+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8)], # noqa 1 + [(+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8)], # noqa 2 + [(+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+4, +8), (+4, +8), (+4, +8), (+4, +8)], # noqa 3 + [(+8, +4), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+8, +4), (+8, +4), (+8, +4), (+8, +4)], # noqa 4 + [(+8, +4), (+8, +4), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+8, +4), (+8, +4), (+8, +4), (+8, +4)], # noqa 5 + [(+8, +4), (+8, +4), (+8, +4), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+8, +4), (+8, +4), (+8, +4), (+8, +4)], # noqa 6 + [(+8, +4), (+8, +4), (+8, +4), (+8, +4), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+8, +4), (+8, +4), (+8, +4), (+8, +4)], # noqa 7 + [(+8, +4), (+8, +4), (+8, +4), (+8, +4), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+8, +4), (+8, +4), (+8, +4)], # noqa 8 + [(+8, +4), (+8, +4), (+8, +4), (+8, +4), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+8, +4), (+8, +4)], # noqa 9 + [(+8, +4), (+8, +4), (+8, +4), (+8, +4), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+8, +4)], # noqa 10 + [(+8, +4), (+8, +4), (+8, +4), (+8, +4), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8)] # noqa 11 + ] + for y, row in enumerate(cases): + for x, eth_coord in enumerate(row): + assert spinn5_local_eth_coord(x, y, 12, 12) == eth_coord + + # Still works for non multiples of 12 + assert spinn5_local_eth_coord(0, 0, 2, 2) == (0, 0) + assert spinn5_local_eth_coord(0, 1, 2, 2) == (0, 0) + assert spinn5_local_eth_coord(1, 0, 2, 2) == (0, 0) + assert spinn5_local_eth_coord(1, 1, 2, 2) == (0, 0)