Source code for tofu.genreco

"""General projection-based reconstruction for tomographic/laminographic cone/parallel beam data
sets.
"""
import copy
import itertools
import logging
import os
import time
import numpy as np
from multiprocessing.pool import ThreadPool
from threading import Event, Thread
from gi.repository import Ufo
from .preprocess import create_preprocessing_pipeline
from .util import (fbp_filtering_in_phase_retrieval, get_filtering_padding,
                   get_reconstructed_cube_shape, get_reconstruction_regions,
                   get_filenames, determine_shape, get_scarray_value, Vector)
from .tasks import get_task, get_writer


LOG = logging.getLogger(__name__)
DTYPE_CL_SIZE = {'float': 4,
                 'double': 8,
                 'half': 2,
                 'uchar': 1,
                 'ushort': 2,
                 'uint': 4}


def genreco(args):
    st = time.time()
    if is_output_single_file(args):
        try:
            import ufo.numpy
        except ImportError:
            LOG.error('You must install ufo python support (in ufo-core/python) to be able to write single-file output')
            return
    if (args.energy is not None and args.propagation_distance is not None and not
            (args.projection_margin or args.disable_projection_crop)):
        LOG.warning('Phase retrieval without --projection-margin specification or '
                    '--disable-projection-crop may cause convolution artifacts')
    _fill_missing_args(args)
    _convert_angles_to_rad(args)
    set_projection_filter_scale(args)
    x_region, y_region, z_region = get_reconstruction_regions(args, store=True, dtype=float)
    vol_shape = get_reconstructed_cube_shape(x_region, y_region, z_region)
    bpp = DTYPE_CL_SIZE[args.store_type]
    num_voxels = vol_shape[0] * vol_shape[1] * vol_shape[2]
    vol_nbytes = num_voxels * bpp

    resources = [Ufo.Resources()]
    gpus = np.array(resources[0].get_gpu_nodes())
    gpu_indices = np.array(args.gpus or list(range(len(gpus))))
    if min(gpu_indices) < 0 or max(gpu_indices) > len(gpus) - 1:
        raise ValueError('--gpus contains invalid indices')
    
    gpus = gpus[gpu_indices]
    duration = 0
    for i, gpu in enumerate(gpus):
        print('Max mem for {}: {:.2f} GB'.format(i, gpu.get_info(0) / 2. ** 30))

    runs = make_runs(gpus, gpu_indices, x_region, y_region, z_region, bpp,
                     slices_per_device=args.slices_per_device,
                     slice_memory_coeff=args.slice_memory_coeff,
                     data_splitting_policy=args.data_splitting_policy,
                     num_gpu_threads=args.num_gpu_threads)

    for i in range(len(runs[0]) - 1):
        resources.append(Ufo.Resources())

    LOG.info('Number of passes: %d', len(runs))
    LOG.debug('GPUs and regions:')
    for regions in runs:
        LOG.debug('%s', str(regions))

    for i, regions in enumerate(runs):
        duration += _run(resources, args, x_region, y_region, regions, i, vol_nbytes)

    num_gupdates = num_voxels * args.number * 1e-9
    total_duration = time.time() - st
    LOG.debug('UFO duration: %.2f s', duration)
    LOG.debug('Total duration: %.2f s', total_duration)
    LOG.debug('UFO performance: %.2f GUPS', num_gupdates / duration)
    LOG.debug('Total performance: %.2f GUPS', num_gupdates / total_duration)


def make_runs(gpus, gpu_indices, x_region, y_region, z_region, bpp, slices_per_device=None,
              slice_memory_coeff=0.8, data_splitting_policy='one', num_gpu_threads=1):
    gpu_indices = np.array(gpu_indices)
    def _add_region(runs, gpu_index, current, to_process, z_start, z_step):
        current_per_thread = current // num_gpu_threads
        for i in range(num_gpu_threads):
            if i + 1 == num_gpu_threads:
                current_per_thread += current % num_gpu_threads
            z_end = z_start + current_per_thread * z_step
            runs[-1].append((gpu_indices[gpu_index], [z_start, z_end, z_step]))
            z_start = z_end

        return z_start, z_end, to_process - current

    z_start, z_stop, z_step = z_region
    y_start, y_stop, y_step = y_region
    x_start, x_stop, x_step = x_region
    slice_width, slice_height, num_slices = get_reconstructed_cube_shape(x_region, y_region,
                                                                         z_region)

    if slices_per_device:
        slices_per_device = [slices_per_device for i in range(len(gpus))]
    else:
        slices_per_device = get_num_slices_per_gpu(gpus, slice_width, slice_height, bpp,
                                                   slice_memory_coeff=slice_memory_coeff)

    max_slices_per_pass = sum(slices_per_device)
    if not max_slices_per_pass:
        raise RuntimeError('None of the available devices has enough memory to store any slices')
    num_full_passes = num_slices // max_slices_per_pass
    LOG.debug('Number of slices: %d', num_slices)
    LOG.debug('Slices per device %s', slices_per_device)
    LOG.debug('Maximum slices on all GPUs per pass: %d', max_slices_per_pass)
    LOG.debug('Number of passes with full workload: %d', num_slices // max_slices_per_pass)
    sorted_indices = np.argsort(slices_per_device)[-np.count_nonzero(slices_per_device):]
    runs = []
    z_start = z_region[0]
    to_process = num_slices

    # Create passes where all GPUs are fully loaded
    for j in range(num_full_passes):
        runs.append([])
        for i in sorted_indices:
            z_start, z_end, to_process = _add_region(runs, i, slices_per_device[i], to_process,
                                                     z_start, z_step)

    if to_process:
        if data_splitting_policy == 'one':
            # Fill the last pass by maximizing the workload per GPU
            runs.append([])
            for i in sorted_indices[::-1]:
                if not to_process:
                    break
                current = min(slices_per_device[i], to_process)
                z_start, z_end, to_process = _add_region(runs, i, current, to_process,
                                                         z_start, z_step)
        else:
            # Fill the last pass by maximizing the number of GPUs which will work
            num_gpus = len(sorted_indices)
            runs.append([])
            for j, i in enumerate(sorted_indices):
                # Current GPU will either process the maximum number of slices it can. If the number
                # of slices per GPU based on even division between them cannot saturate the GPU, use
                # this number. This way the work will be split evenly between the GPUs.
                current = max(min(slices_per_device[i], (to_process - 1) // (num_gpus - j) + 1), 1)
                z_start, z_end, to_process = _add_region(runs, i, current, to_process,
                                                         z_start, z_step)
                if not to_process:
                    break

    return runs


def get_num_slices_per_gpu(gpus, width, height, bpp, slice_memory_coeff=0.8):
    num_slices = []
    slice_size = width * height * bpp

    for i, gpu in enumerate(gpus):
        max_mem = gpu.get_info(Ufo.GpuNodeInfo.GLOBAL_MEM_SIZE)
        num_slices.append(int(np.floor(max_mem * slice_memory_coeff / slice_size)))

    return num_slices


def _run(resources, args, x_region, y_region, regions, run_number, vol_nbytes):
    """Execute one pass on all possible GPUs with slice ranges given by *regions*. Use separate
    thread per GPU and optimize the read projection regions.
    """
    executors = []
    writer = None
    last = None

    if is_output_single_file(args):
        import tifffile
        bigtiff = vol_nbytes > 2 ** 32 - 2 ** 25
        LOG.debug('Writing BigTiff: %s', bigtiff)
        dirname = os.path.dirname(args.output)
        if dirname and not os.path.exists(dirname):
            os.makedirs(dirname)
        writer = tifffile.TiffWriter(args.output, append=run_number != 0, bigtiff=bigtiff)

    for index in range(len(regions)):
        gpu_index, region = regions[index]
        region_index = run_number * len(resources) + index
        executors.append(
            Executor(
                resources[index],
                args,
                region,
                x_region,
                y_region,
                gpu_index,
                region_index,
                writer=writer
            )
        )
        if last:
            # Chain up waiting events of subsequent executors
            executors[-1].wait_event = last.finished
        last = executors[-1]

    def start_one(index):
        return executors[index].process()

    st = time.time()

    try:
        with ThreadPool(processes=len(regions)) as pool:
            try:
                pool.map(start_one, list(range(len(regions))))
            except KeyboardInterrupt:
                LOG.info('Processing interrupted')
                for executor in executors:
                    executor.abort()
    finally:
        if writer:
            writer.close()
            LOG.debug('Writer closed')

    return time.time() - st


def setup_graph(args, graph, x_region, y_region, region, source=None, gpu=None, do_output=True,
                index=0, make_reader=True):
    backproject = get_task('general-backproject', processing_node=gpu)

    if do_output:
        if args.dry_run:
            sink = get_task('null', processing_node=gpu, download=True)
        else:
            sink = get_writer(args)
            sink.props.filename = '{}-{:>03}-%04i.tif'.format(args.output, index)

    width = args.width
    height = args.height
    if args.transpose_input:
        tmp = width
        width = height
        height = tmp
    if args.projection_filter != 'none' and args.projection_crop_after == 'backprojection':
        # Take projection padding into account
        if fbp_filtering_in_phase_retrieval(args):
            padding = args.retrieval_padded_width - width
            padding_from = 'phase retrieval'
        else:
            padding = get_filtering_padding(width)
            padding_from = 'default backproject'
        args.center_position_x = [pos + padding / 2 for pos in args.center_position_x]
        if args.z_parameter == 'center-position-x':
            region = [region[0] + padding / 2, region[1] + padding / 2, region[2]]
        LOG.debug('center-position-x after padding: %g (from %s)',
                  args.center_position_x[0], padding_from)

    backproject.props.parameter = args.z_parameter
    if args.burst:
        backproject.props.burst = args.burst
    backproject.props.z = args.z
    backproject.props.region = region
    backproject.props.x_region = x_region
    backproject.props.y_region = y_region
    backproject.props.center_position_x = args.center_position_x
    backproject.props.center_position_z = args.center_position_z
    backproject.props.source_position_x = args.source_position_x
    backproject.props.source_position_y = args.source_position_y
    backproject.props.source_position_z = args.source_position_z
    backproject.props.detector_position_x = args.detector_position_x
    backproject.props.detector_position_y = args.detector_position_y
    backproject.props.detector_position_z = args.detector_position_z
    backproject.props.detector_angle_x = args.detector_angle_x
    backproject.props.detector_angle_y = args.detector_angle_y
    backproject.props.detector_angle_z = args.detector_angle_z
    backproject.props.axis_angle_x = args.axis_angle_x
    backproject.props.axis_angle_y = args.axis_angle_y
    backproject.props.axis_angle_z = args.axis_angle_z
    backproject.props.volume_angle_x = args.volume_angle_x
    backproject.props.volume_angle_y = args.volume_angle_y
    backproject.props.volume_angle_z = args.volume_angle_z
    backproject.props.num_projections = args.number
    backproject.props.compute_type = args.compute_type
    backproject.props.result_type = args.result_type
    backproject.props.store_type = args.store_type
    backproject.props.overall_angle = args.overall_angle
    backproject.props.addressing_mode = args.genreco_padding_mode
    backproject.props.gray_map_min = args.slice_gray_map[0]
    backproject.props.gray_map_max = args.slice_gray_map[1]

    source = create_preprocessing_pipeline(args, graph, source=source,
                                           processing_node=gpu,
                                           cone_beam_weight=not args.disable_cone_beam_weight,
                                           make_reader=make_reader)
    if source:
        graph.connect_nodes(source, backproject)
    else:
        source = backproject

    if do_output:
        graph.connect_nodes(backproject, sink)
        last = sink
    else:
        last = backproject

    return (source, last)


def is_output_single_file(args):
    filename = args.output.lower()

    return not args.dry_run and (filename.endswith('.tif') or filename.endswith('.tiff'))


def set_projection_filter_scale(args):
    is_parallel = np.all(np.isinf(args.source_position_y))
    magnification = (args.source_position_y[0] - args.detector_position_y[0]) / \
        args.source_position_y[0]

    args.projection_filter_scale = 1.
    if is_parallel:
        if np.any(np.array(args.axis_angle_x)):
            LOG.debug('Adjusting filter for parallel beam laminography')
            args.projection_filter_scale = 0.5 * np.cos(args.axis_angle_x[0])
    else:
        args.projection_filter_scale = 0.5
        args.projection_filter_scale /= magnification ** 2
        if np.all(np.array(args.axis_angle_x) == 0):
            LOG.debug('Adjusting filter for cone beam tomography')
            args.projection_filter_scale /= magnification


def _fill_missing_args(args):
    (width, height) = determine_shape(args, args.projections, store=False)
    if args.transpose_input:
        tmp = width
        width = height
        height = tmp
    args.center_position_x = (args.center_position_x or [width / 2.])
    args.center_position_z = (args.center_position_z or [height / 2.])

    if not args.overall_angle:
        args.overall_angle = 360.
        LOG.info('Overall angle not specified, using 360 deg')

    if not args.number:
        if len(args.axis_angle_z) > 1:
            LOG.debug("--number not specified, using length of --axis-angle-z: %d",
                      len(args.axis_angle_z))
            args.number = len(args.axis_angle_z)
        else:
            num_files = len(get_filenames(args.projections))
            if not num_files:
                raise RuntimeError("No files found in `{}'".format(args.projections))
            LOG.debug("--number not specified, using number of files matching "
                      "--projections pattern: %d", num_files)
            args.number = num_files

    if args.dry_run:
        if not args.number:
            raise ValueError('--number must be specified by --dry-run')
        determine_shape(args, args.projections, store=True)
        LOG.info('Dummy data W x H x N: {} x {} x {}'.format(args.width,
                                                             args.height,
                                                             args.number))

    return args


def _convert_angles_to_rad(args):
    names = ['detector_angle', 'axis_angle', 'volume_angle']
    coords = ['x', 'y', 'z']
    angular_z_params = [x[0].replace('_', '-') + '-' + x[1] for x in itertools.product(names, coords)]
    args.overall_angle = np.deg2rad(args.overall_angle)
    if args.z_parameter in angular_z_params:
        LOG.debug('Converting z parameter values to radians')
        args.region = _convert_list_to_rad(args.region)

    for name in names:
        for coord in coords:
            full_name = name + '_' + coord
            values = getattr(args, full_name)
            setattr(args, full_name, _convert_list_to_rad(values))


def _convert_list_to_rad(values):
    return np.deg2rad(np.array(values)).tolist()



def _are_values_equal(values):
    return np.all(np.array(values) == values[0])


[docs]class Executor(object): """Reconstructs one region. :param writer: if not None, we'll be writing to a file shared with other executors and need to use *wait_event* to make sure we write our region when the previous executors are finished. """ def __init__(self, resources, args, region, x_region, y_region, gpu_index, region_index, writer=None): self.resources = resources self.args = args self.region = region self.gpu_index = gpu_index self.x_region = x_region self.y_region = y_region self.region_index = region_index self.writer = writer self.output = Ufo.OutputTask() if self.writer else None self.scheduler = None self.wait_event = None self.finished = Event() self.abort_requested = False def process(self): self.scheduler = Ufo.FixedScheduler() if hasattr(self.scheduler.props, 'enable_tracing'): LOG.debug("Use tracing: {}".format(self.args.enable_tracing)) self.scheduler.props.enable_tracing = self.args.enable_tracing self.scheduler.set_resources(self.resources) graph = Ufo.TaskGraph() gpu = self.scheduler.get_resources().get_gpu_nodes()[self.gpu_index] geometry = CTGeometry(self.args) if (len(self.args.center_position_z) == 1 and np.modf(self.args.center_position_z[0])[0] == 0 and geometry.is_simple_parallel_tomo): LOG.info('Simple tomography with integer z center, changing to center_position_z + 0.5 ' 'to avoid interpolation') geometry.args.center_position_z = (geometry.args.center_position_z[0] + 0.5,) if not self.args.disable_projection_crop: if not self.args.dry_run and (self.args.y or self.args.height or self.args.transpose_input): LOG.debug('--y or --height or --transpose-input specified, ' 'not optimizing projection region') else: geometry.optimize_args(region=self.region) opt_args = geometry.args if self.args.dry_run: source = get_task('dummy-data', number=self.args.number, width=self.args.width, height=self.args.height) else: source = None last = setup_graph(opt_args, graph, self.x_region, self.y_region, self.region, source=source, gpu=gpu, index=self.region_index, make_reader=True, do_output=self.writer is None)[-1] if self.writer: graph.connect_nodes(last, self.output) LOG.debug('Device: %d, region: %s', self.gpu_index, self.region) thread = Thread(target=self.scheduler.run, args=(graph,)) thread.setDaemon(True) thread.start() if self.writer: self.consume() thread.join() return self.scheduler.props.time def consume(self): import ufo.numpy if self.wait_event: LOG.debug('Executor of region %s waiting for writing', self.region) self.wait_event.wait() for i in np.arange(*self.region): if self.abort_requested: LOG.debug('Abort requested in writing of region %s', self.region) return buf = self.output.get_output_buffer() self.writer.save(ufo.numpy.asarray(buf)) self.output.release_output_buffer(buf) self.finished.set() LOG.debug('Executor of region %s finished writing', self.region) def abort(self): self.abort_requested = True if self.scheduler: self.scheduler.abort()
class CTGeometry(object): def __init__(self, args): self.args = copy.deepcopy(args) determine_shape(self.args, self.args.projections, store=True) get_reconstruction_regions(self.args, store=True, dtype=float) self.args.center_position_x = (self.args.center_position_x or [self.args.width / 2.]) self.args.center_position_z = (self.args.center_position_z or [self.args.height / 2.]) @property def is_parallel(self): return np.all(np.isinf(self.args.source_position_y)) @property def is_detector_rotated(self): return (np.any(self.args.detector_angle_x) or np.any(self.args.detector_angle_y) or np.any(self.args.detector_angle_z)) @property def is_axis_rotated(self): return (np.any(self.args.axis_angle_x) or np.any(self.args.axis_angle_y) or np.any(self.args.axis_angle_z)) @property def is_volume_rotated(self): return (np.any(self.args.volume_angle_x) or np.any(self.args.volume_angle_y) or np.any(self.args.volume_angle_z)) @property def is_center_position_x_constant(self): return _are_values_equal(self.args.center_position_x) @property def is_center_position_z_constant(self): return _are_values_equal(self.args.center_position_z) @property def is_center_constant(self): return self.is_center_position_x_constant and self.is_center_position_z_constant @property def is_simple_parallel_tomo(self): return (not (self.is_axis_rotated or self.is_detector_rotated or self.is_volume_rotated) and self.is_parallel and self.is_center_constant) def optimize_args(self, region=None): xmin, ymin, xmax, ymax = self.compute_height(region=region) center_position_z = np.array(self.args.center_position_z) - ymin self.args.center_position_z = center_position_z.tolist() self.args.y = int(ymin) self.args.height = int(ymax - ymin) LOG.debug('Optimization for region: %s', region or self.args.region) LOG.debug('Optimized X: %d - %d, Z: %d - %d', xmin, xmax, ymin, ymax) LOG.debug('Optimized Z: %d', self.args.y) LOG.debug('Optimized height: %d', self.args.height) LOG.debug('Optimized center_position_z: %g - %g', self.args.center_position_z[0], self.args.center_position_z[-1]) def compute_height(self, region=None): extrema = [] if not region: region = self.args.region if self.is_simple_parallel_tomo: # Simple parallel beam tomography, thus compute only the horizontal crop at rotations # which are multiples of 45 degrees LOG.debug('Computing optimal projection region from 4 angles') projs_per_45 = self.args.number / self.args.overall_angle * np.pi / 4 stop = 4 if self.args.overall_angle <= np.pi else 8 indices = projs_per_45 * np.arange(1, stop, 2) indices = np.round(indices).astype(int).tolist() else: LOG.debug('Computing optimal projection region from all angles') indices = list(range(self.args.number)) for i in indices: extrema_0 = self._compute_one_parameter(region[0], i) extrema_1 = self._compute_one_parameter(region[1], i) extrema.append(extrema_0) extrema.append(extrema_1) minima = np.min(extrema, axis=0) maxima = np.max(extrema, axis=0) if maxima[-1] == minima[2]: # Don't let height be 0 maxima[-1] += 1 result = tuple(minima[::2]) + tuple(maxima[1::2]) return result def _compute_one_parameter(self, param_value, index): source_position = np.array([get_scarray_value(self.args.source_position_x, index), get_scarray_value(self.args.source_position_y, index), get_scarray_value(self.args.source_position_z, index)]) axis = Vector(x_angle=get_scarray_value(self.args.axis_angle_x, index), y_angle=get_scarray_value(self.args.axis_angle_y, index), z_angle=get_scarray_value(self.args.axis_angle_z, index), position=[get_scarray_value(self.args.center_position_x, index), 0, get_scarray_value(self.args.center_position_z, index)]) detector = Vector(x_angle=get_scarray_value(self.args.detector_angle_x, index), y_angle=get_scarray_value(self.args.detector_angle_y, index), z_angle=get_scarray_value(self.args.detector_angle_z, index), position=[get_scarray_value(self.args.detector_position_x, index), get_scarray_value(self.args.detector_position_y, index), get_scarray_value(self.args.detector_position_z, index)]) volume_angle = Vector(x_angle=get_scarray_value(self.args.volume_angle_x, index), y_angle=get_scarray_value(self.args.volume_angle_y, index), z_angle=get_scarray_value(self.args.volume_angle_z, index)) z = self.args.z if self.args.z_parameter == 'z': z = param_value elif self.args.z_parameter == 'axis-angle-x': axis.x_angle = param_value elif self.args.z_parameter == 'axis-angle-y': axis.y_angle = param_value elif self.args.z_parameter == 'axis-angle-z': axis.z_angle = param_value elif self.args.z_parameter == 'volume-angle-x': volume_angle.x_angle = param_value elif self.args.z_parameter == 'volume-angle-y': volume_angle.y_angle = param_value elif self.args.z_parameter == 'volume-angle-z': volume_angle.z_angle = param_value elif self.args.z_parameter == 'detector-angle-x': detector.x_angle = param_value elif self.args.z_parameter == 'detector-angle-y': detector.y_angle = param_value elif self.args.z_parameter == 'detector-angle-z': detector.z_angle = param_value elif self.args.z_parameter == 'detector-position-x': detector.position[0] = param_value elif self.args.z_parameter == 'detector-position-y': detector.position[1] = param_value elif self.args.z_parameter == 'detector-position-z': detector.position[2] = param_value elif self.args.z_parameter == 'source-position-x': source_position[0] = param_value elif self.args.z_parameter == 'source-position-y': source_position[1] = param_value elif self.args.z_parameter == 'source-position-z': source_position[2] = param_value elif self.args.z_parameter == 'center-position-x': axis.position[0] = param_value elif self.args.z_parameter == 'center-position-z': axis.position[2] = param_value else: raise RuntimeError("Unknown z parameter '{}'".format(self.args.z_parameter)) points = get_extrema(self.args.x_region, self.args.y_region, z) if self.args.z_parameter != 'z': points_upper = get_extrema(self.args.x_region, self.args.y_region, z + 1) points = np.hstack((points, points_upper)) tomo_angle = float(index) / self.args.number * self.args.overall_angle xe, ye = compute_detector_pixels(points, source_position, axis, volume_angle, detector, tomo_angle) return compute_detector_region(xe, ye, (self.args.height, self.args.width), overhead=self.args.projection_margin)
[docs]def project(points, source, detector_normal, detector_offset): """Project *points* onto a detector.""" x, y, z = points source_extended = np.tile(source[:, np.newaxis], [1, points.shape[1]]) detector_normal_extended = np.tile(detector_normal[:, np.newaxis], [1, points.shape[1]]) denom = np.sum((points - source_extended) * detector_normal_extended, axis=0) if np.isinf(source[1]): # Parallel beam if np.any(detector_normal != np.array([0., -1, 0])): # Detector is not perpendicular, compute translation along the beam direction, # otherwise don't compute anything because voxels are mapped directly # to detector coordinates points[1, :] = - (detector_offset + detector_normal[0] * points[0, :] + detector_normal[2] * points[2, :]) / detector_normal[1] projected = points else: # Cone beam u = -(detector_offset + np.dot(source, detector_normal)) / denom u = np.tile(u, [3, 1]) projected = source_extended + (points - source_extended) * u return projected
[docs]def compute_detector_pixels(points, source_position, axis, volume_rotation, detector, tomo_angle): """*points* are a list of points along x-direcion, thus the array has height 3. *source_position* is a 3-vector, *axis*, *volume_rotation* and *detector* are util.Vector instances. """ # Rotate the axis detector_normal = np.array((0, -1, 0), dtype=float) detector_normal = rotate_z(detector.z_angle, detector_normal) detector_normal = rotate_y(detector.y_angle, detector_normal) detector_normal = rotate_x(detector.x_angle, detector_normal) # Compute d from ax + by + cz + d = 0 detector_offset = -np.dot(detector.position, detector_normal) if np.isinf(source_position[1]): # Parallel beam voxels = points else: # Apply magnification voxels = -points * source_position[1] / (detector.position[1] - source_position[1]) # Rotate the volume voxels = rotate_z(volume_rotation.z_angle, voxels) voxels = rotate_y(volume_rotation.y_angle, voxels) voxels = rotate_x(volume_rotation.x_angle, voxels) # Rotate around the axis voxels = rotate_z(tomo_angle, voxels) # Rotate the volume voxels = rotate_z(axis.z_angle, voxels) voxels = rotate_y(axis.y_angle, voxels) voxels = rotate_x(axis.x_angle, voxels) # Get the projected pixel projected = project(voxels, source_position, detector_normal, detector_offset) if np.any(detector_normal != np.array([0., -1, 0])): # Detector is not perpendicular projected -= np.array([detector.position]).T # Reverse rotation => reverse order of transformation matrices and negative angles projected = rotate_x(-detector.x_angle, projected) projected = rotate_y(-detector.y_angle, projected) projected = rotate_z(-detector.z_angle, projected) x = projected[0, :] + axis.position[0] - 0.5 y = projected[2, :] + axis.position[2] - 0.5 return x, y
[docs]def compute_detector_region(x, y, shape, overhead=2): """*overhead* specifies how much margin is taken into account around the computed area.""" def _compute_outlier(extremum_func, values): if extremum_func == min: round_func = np.floor sgn = -1 else: round_func = np.ceil sgn = +1 return int(round_func(extremum_func(values)) + sgn * overhead) x_min = min(shape[1], max(0, _compute_outlier(min, x))) y_min = min(shape[0], max(0, _compute_outlier(min, y))) x_max = max(0, min(shape[1], _compute_outlier(max, x))) y_max = max(0, min(shape[0], _compute_outlier(max, y))) return (x_min, x_max, y_min, y_max)
def get_extrema(x_region, y_region, z): def get_extrema(region): return (region[0], region[1]) product = itertools.product(get_extrema(x_region), get_extrema(y_region), [z]) return np.array(list(product), dtype=float).T.copy() def rotate_x(angle, point): cos = np.cos(angle) sin = np.sin(angle) matrix = np.identity(3) matrix[1, 1] = cos matrix[1, 2] = -sin matrix[2, 1] = sin matrix[2, 2] = cos return np.dot(matrix, point) def rotate_y(angle, point): cos = np.cos(angle) sin = np.sin(angle) matrix = np.identity(3) matrix[0, 0] = cos matrix[0, 2] = sin matrix[2, 0] = -sin matrix[2, 2] = cos return np.dot(matrix, point) def rotate_z(angle, point): cos = np.cos(angle) sin = np.sin(angle) matrix = np.identity(3) matrix[0, 0] = cos matrix[0, 1] = -sin matrix[1, 0] = sin matrix[1, 1] = cos return np.dot(matrix, point)