Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 148 additions & 0 deletions examples/scripts/write_radial_gre_rot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
import numpy as np

import pypulseq as pp


def main(
plot: bool = False,
test_report: bool = False,
write_seq: bool = False,
seq_filename: str = 'gre_radial_pypulseq_rotext.seq',
*,
fov: float | tuple[float, float] = 260e-3,
n_x: int = 64,
flip_angle_deg: float = 10,
slice_thickness: float = 3e-3,
te: float = 8e-3,
tr: float = 20e-3,
n_spokes: int = 60,
n_dummy: int = 20,
):
"""Create a radial gradient echo (GRE) sequence using rotation extension.

Parameters
----------
plot : bool, optional
Plot the sequence diagram. Default is False.
test_report : bool, optional
Print a test report. Default is False.
write_seq : bool, optional
Write the sequence to a .seq file. Default is False.
seq_filename : str, optional
Output filename for the .seq file. Default is 'gre_radial_pypulseq.seq'.
fov : float or tuple of float, optional
Field of view in meters. If a single value, it is used for both x and y.
If a tuple, it is (fov_x, fov_y). Default is 260e-3.
n_x : int, optional
Number of readout samples. Default is 64.
flip_angle_deg : float, optional
Flip angle in degrees. Default is 10.
slice_thickness : float, optional
Slice thickness in meters. Default is 3e-3.
te : float, optional
Echo time in seconds. Default is 8e-3.
tr : float, optional
Repetition time in seconds. Default is 20e-3.
n_spokes : int, optional
Number of radial spokes. Default is 60.
n_dummy : int, optional
Number of dummy scans. Default is 20.

Returns
-------
seq : pypulseq.Sequence
The radial GRE sequence object.
"""
fov_x, fov_y = (fov, fov) if isinstance(fov, (int, float)) else fov
spoke_angle_increment = np.pi / n_spokes
rf_spoiling_inc = 117

# Set system limits
system = pp.Opts(
max_grad=28,
grad_unit='mT/m',
max_slew=120,
slew_unit='T/m/s',
rf_ringdown_time=20e-6,
rf_dead_time=100e-6,
adc_dead_time=10e-6,
)

seq = pp.Sequence(system)

# Create slice selection pulse and gradient
rf, gz, _ = pp.make_sinc_pulse(
apodization=0.5,
duration=4e-3,
flip_angle=np.deg2rad(flip_angle_deg),
slice_thickness=slice_thickness,
system=system,
time_bw_product=4,
return_gz=True,
delay=system.rf_dead_time,
use='excitation',
)

# Define other gradients and ADC events
delta_kx = 1 / fov_x
gx = pp.make_trapezoid(channel='x', flat_area=n_x * delta_kx, flat_time=6.4e-3 / 5, system=system)
adc = pp.make_adc(num_samples=n_x, duration=gx.flat_time, delay=gx.rise_time, system=system)
gx_pre = pp.make_trapezoid(channel='x', area=-gx.area / 2 - delta_kx / 2, duration=2e-3, system=system)
gz_reph = pp.make_trapezoid(channel='z', area=-gz.area / 2, duration=2e-3, system=system)

# Gradient spoiling
gx_spoil = pp.make_trapezoid(channel='x', area=0.5 * n_x * delta_kx, system=system)
gz_spoil = pp.make_trapezoid(channel='z', area=4 / slice_thickness, system=system)

# Calculate timing
te_delay = te - pp.calc_duration(gx_pre) - gz.fall_time - gz.flat_time / 2 - pp.calc_duration(gx) / 2
te_delay = np.ceil(te_delay / seq.grad_raster_time) * seq.grad_raster_time

tr_delay = tr - pp.calc_duration(gx_pre) - pp.calc_duration(gz) - pp.calc_duration(gx) - te_delay
tr_delay = np.ceil(tr_delay / seq.grad_raster_time) * seq.grad_raster_time
assert np.all(tr_delay) > pp.calc_duration(gx_spoil, gz_spoil)

rf_phase = 0
rf_inc = 0

for i_spoke in range(-n_dummy, n_spokes + 1):
rf.phase_offset = rf_phase / 180 * np.pi
adc.phase_offset = rf_phase / 180 * np.pi

rf_inc = divmod(rf_inc + rf_spoiling_inc, 360.0)[1]
rf_phase = divmod(rf_inc + rf_phase, 360.0)[1]

seq.add_block(rf, gz)
phi = spoke_angle_increment * (i_spoke - 1)
seq.add_block(gx_pre, gz_reph, pp.make_rotation('z', phi))
seq.add_block(pp.make_delay(te_delay))
if i_spoke > 0:
seq.add_block(gx, adc, pp.make_rotation('z', phi))
else:
seq.add_block(gx, pp.make_rotation('z', phi))
seq.add_block(gx_spoil, gz_spoil, pp.make_delay(tr_delay), pp.make_rotation('z', phi))

ok, error_report = seq.check_timing()
if ok:
print('Timing check passed successfully')
else:
print('Timing check failed. Error listing follows:')
[print(e) for e in error_report]

if test_report:
print(seq.test_report())

if plot:
seq.plot()

seq.set_definition(key='FOV', value=[fov_x, fov_y, slice_thickness])
seq.set_definition(key='Name', value='gre_rad')

if write_seq:
seq.write(seq_filename)

return seq


if __name__ == '__main__':
main(plot=True, write_seq=True)
23 changes: 23 additions & 0 deletions src/pypulseq/Sequence/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,29 @@ def register_label_event(self, event: SimpleNamespace) -> int:
return label_id


def register_rotation_event(self, event: EventLibrary) -> int:
"""
Parameters
----------
event : SimpleNamespace
Rotation event to be registered.

Returns
-------
int
ID of registered rotation event.
"""
data = tuple(event.rot_quaternion.as_quat(canonical=True, scalar_first=True).tolist())
rotation_id, found = self.rotation_library.find_or_insert(new_data=data)

# Clear block cache because Rotation event was overwritten
# TODO: Could find only the blocks that are affected by the changes
if self.use_block_cache and found:
self.block_cache.clear()

return rotation_id


def register_soft_delay_event(self, event: SimpleNamespace) -> int:
"""
Parameters
Expand Down
10 changes: 10 additions & 0 deletions src/pypulseq/Sequence/read_seq.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def read(self, path: str, detect_rf_use: Union[bool, None] = None, remove_duplic
self.rf_library = EventLibrary()
self.shape_library = EventLibrary()
self.trigger_library = EventLibrary()
self.rotation_library = EventLibrary()

# Raster times
self.grad_raster_time = self.system.grad_raster_time
Expand Down Expand Up @@ -224,6 +225,15 @@ def l2(s):
for d in self.soft_delay_library.data.values():
if d[3] not in self.soft_delay_hints:
self.soft_delay_hints[d[3]] = d[0]
elif section[:19] == 'extension ROTATIONS':
extension_id = int(section[19:])
self.set_extension_string_ID('ROTATIONS', extension_id)
self.rotation_library = __read_events(input_file, (1, 1, 1, 1), event_library=self.rotation_library)
for k, v in self.rotation_library.data.items():
v = np.asarray(v)
norm = np.linalg.norm(v)
v = np.divide(v, norm, where=norm > 0)
self.rotation_library.data[k] = tuple(v)
else:
raise ValueError(f'Unknown section code: {section}')

Expand Down
18 changes: 18 additions & 0 deletions src/pypulseq/Sequence/sequence.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from pypulseq.decompress_shape import decompress_shape
from pypulseq.event_lib import EventLibrary
from pypulseq.opts import Opts
from pypulseq.rotate3D import rotate3D
from pypulseq.Sequence import block
from pypulseq.Sequence.calc_grad_spectrum import calculate_gradient_spectrum
from pypulseq.Sequence.calc_pns import calc_pns
Expand Down Expand Up @@ -68,6 +69,7 @@ def __init__(self, system: Union[Opts, None] = None, use_block_cache: bool = Tru
self.shape_library = EventLibrary(numpy_data=True)
self.trigger_library = EventLibrary()
self.soft_delay_library = EventLibrary()
self.rotation_library = EventLibrary()

# =========
# OTHER
Expand Down Expand Up @@ -1143,6 +1145,9 @@ def register_label_event(self, event: SimpleNamespace) -> int:
def register_rf_event(self, event: SimpleNamespace) -> Tuple[int, List[int]]:
return block.register_rf_event(self, event)

def register_rotation_event(self, event: SimpleNamespace) -> int:
return block.register_rotation_event(self, event)

def register_soft_delay_event(self, event: SimpleNamespace) -> int:
return block.register_soft_delay_event(self, event)

Expand Down Expand Up @@ -1589,6 +1594,19 @@ def waveforms(self, append_RF: bool = False, time_range: Union[List[float], None
for block_counter in blocks:
block = self.get_block(block_counter)

if hasattr(block, 'rotation'):
block = deepcopy(block)

# Apply the rotation to the current block and restore the block structure
gradients = [getattr(block, ax) for ax in grad_channels if getattr(block, ax) is not None]
rotated_gradients = rotate3D(
*gradients, rotation_matrix=block.rotation.rot_quaternion.as_matrix(), system=self.system
)
for i in range(3):
setattr(block, grad_channels[i], None)
for i in range(len(rotated_gradients)):
setattr(block, f'g{rotated_gradients[i].channel}', rotated_gradients[i])

for j in range(len(grad_channels)):
grad = getattr(block, grad_channels[j])
if grad is not None: # Gradients
Expand Down
15 changes: 15 additions & 0 deletions src/pypulseq/Sequence/write_seq.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,16 @@ def write(self, file_name: Union[str, Path], create_signature, remove_duplicates
output_file.write(s)
output_file.write('\n')

if len(self.rotation_library.data) != 0:
output_file.write('# Extension specification for rotation events:\n')
output_file.write('# id RotQuat0 RotQuatX RotQuatY RotQuatZ\n')
output_file.write(f'extension ROTATIONS {self.get_extension_type_ID("ROTATIONS")}\n')
id_format_str = '{:.0f} {:12g} {:12g} {:12g} {:12g}\n' # Refer lines 20-21
for k in self.rotation_library.data:
s = id_format_str.format(k, *self.rotation_library.data[k])
output_file.write(s)
output_file.write('\n')

if len(self.shape_library.data) != 0:
output_file.write('# Sequence Shapes\n')
output_file.write('[SHAPES]\n\n')
Expand Down Expand Up @@ -500,6 +510,11 @@ def write_v141(self, file_name: Union[str, Path], create_signature, remove_dupli
'WARNING! The sequence in memory uses "soft delay" extension, which is incompatible with the file format v1.4.1. The produced Pulseq file is only partially valid and may fail to load or operate in some cases.'
)

if len(self.rotation_library.data) != 0:
raise RuntimeError(
'WARNING! The sequence in memory uses the "rotation" extension, which is incompatible with the file format v1.4.1. The produced Pulseq file is likely to be invalid and would probably fail to operate'
)

if len(self.shape_library.data) != 0:
output_file.write('# Sequence Shapes\n')
output_file.write('[SHAPES]\n\n')
Expand Down
1 change: 1 addition & 0 deletions src/pypulseq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def round_half_up(n, decimals=0):
from pypulseq.make_extended_trapezoid_area import make_extended_trapezoid_area
from pypulseq.make_gauss_pulse import make_gauss_pulse
from pypulseq.make_label import make_label
from pypulseq.make_rotation import make_rotation
from pypulseq.make_sinc_pulse import make_sinc_pulse
from pypulseq.make_trapezoid import make_trapezoid
from pypulseq.sigpy_pulse_opts import SigpyPulseOpts
Expand Down
66 changes: 66 additions & 0 deletions src/pypulseq/make_rotation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from types import SimpleNamespace

import numpy as np
from scipy.spatial.transform import Rotation as R


def make_rotation(*args) -> SimpleNamespace:
"""
Create a rotation event to instruct the interpreter to rotate
the gx, gy and gz waveforms according to the given rotation matrix.

See also `pypulseq.Sequence.sequence.Sequence.add_block()`.

Parameters
----------
rot_quaternion : Rotation
Scipy rotation operator. Name of the attribute is 'rot_quaternion'
to align with MATLAB toolbox.

Returns
-------
rotation : SimpleNamespace
Rotation event.
"""
if len(args) < 1:
raise ValueError('Must supply rotation angle(s)')
elif len(args) == 1: # make_rotation(phi), make_rotation(rot_mat) or make_rotation(quaternion)
if isinstance(args[0], float): # make_rotation(phi)
phi = float(args[0])
if not (0 <= abs(phi) < 2 * np.pi):
raise ValueError(f'Rotation angle phi ({phi:.2f}) is invalid. Should be in [0, 2π)')
rot = R.from_rotvec(args[0] * np.asarray([0, 0, 1]))
elif isinstance(args[0], (list, np.ndarray)) and args[0].shape == (3, 3): # make_rotation(rot_mat)
rot = R.from_matrix(np.asarray(args[0]))
elif isinstance(args[0], (list, np.ndarray)) and args[0].shape == (4,): # make_rotation(quaternion)
quat = np.asarray(args[0], dtype=float)
norm = np.linalg.norm(quat)
quat = np.divide(quat, norm, where=norm > 0)
rot = R.from_quat(quat, scalar_first=True) # ensure unit quaternion
else:
raise ValueError('Unexpected input to make_rotation')
elif len(args) == 2: # make_rotation(phi, theta) or make_rotation(axis, angle)
if isinstance(args[0], float): # make_rotation(phi, theta)
phi = float(args[0])
theta = float(args[1]) if len(args) > 1 else 0.0
if not (0 <= abs(phi) < 2 * np.pi):
raise ValueError(f'Rotation angle phi ({phi:.2f}) is invalid. Should be in [0, 2π)')
if not (0 <= abs(theta) <= np.pi):
raise ValueError(f'Rotation angle theta ({theta:.2f}) is invalid. Should be in [0, π]')
# First rotate about X (theta), then Z (phi)
q1 = R.from_rotvec(theta * np.asarray([1, 0, 0])) # theta: elevation
q2 = R.from_rotvec(phi * np.asarray([0, 0, 1])) # phi: azimuth
rot = q1 * q2 # Apply q2 after q1 (q2 rotates in q1's frame)
elif isinstance(args[0], (list, np.ndarray)) and len(args[0]) == 3: # make_rotation(axis, angle)
axis = np.asarray(args[0], dtype=float)
phi = float(args[1])
if not (0 <= abs(phi) <= np.pi):
raise ValueError(f'Rotation angle phi ({phi:.2f}) is invalid. Should be in [0, π]')
axis /= np.linalg.norm(axis)
rot = R.from_rotvec(phi * axis)
else:
raise ValueError('Unexpected input to make_rotation')
else:
raise ValueError('Unexpected input to make_rotation')

return SimpleNamespace(type='rot3D', rot_quaternion=rot)
14 changes: 14 additions & 0 deletions src/pypulseq/utils/seq_plot.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
import itertools
import math
import typing
from copy import deepcopy

import matplotlib as mpl
import numpy as np
from matplotlib import pyplot as plt

from pypulseq.calc_rf_center import calc_rf_center
from pypulseq.rotate3D import rotate3D
from pypulseq.Sequence import parula
from pypulseq.supported_labels_rf_use import get_supported_labels
from pypulseq.utils.cumsum import cumsum
Expand Down Expand Up @@ -598,6 +600,18 @@ def _seq_plot(
)

grad_channels = ['gx', 'gy', 'gz']
if hasattr(block, 'rotation'):
block = deepcopy(block)

# Apply the rotation to the current block and restore the block structure
gradients = [getattr(block, ax) for ax in grad_channels if getattr(block, ax) is not None]
rotated_gradients = rotate3D(
*gradients, rotation_matrix=block.rotation.rot_quaternion.as_matrix(), system=seq.system
)
for i in range(3):
setattr(block, grad_channels[i], None)
for i in range(len(rotated_gradients)):
setattr(block, f'g{rotated_gradients[i].channel}', rotated_gradients[i])
for x in range(len(grad_channels)): # Gradients
if getattr(block, grad_channels[x], None) is not None:
grad = getattr(block, grad_channels[x])
Expand Down
Loading
Loading