Source code for tofu.preprocess

"""Flat field correction."""
import sys
import logging
from gi.repository import Ufo
from tofu.util import (fbp_filtering_in_phase_retrieval, get_filenames,
                       set_node_props, make_subargs, determine_shape, setup_read_task,
                       setup_padding, next_power_of_two, run_scheduler)
from tofu.tasks import get_task, get_writer


LOG = logging.getLogger(__name__)


[docs]def create_flat_correct_pipeline(args, graph, processing_node=None): """ Create flat field correction pipeline. All the settings are provided in *args*. *graph* is used for making the connections. Returns the flat field correction task which can be used for further pipelining. """ pm = Ufo.PluginManager() if args.projections is None or args.flats is None or args.darks is None: raise RuntimeError("You must specify --projections, --flats and --darks.") reader = get_task('read') dark_reader = get_task('read') flat_before_reader = get_task('read') ffc = get_task('flat-field-correct', processing_node=processing_node, dark_scale=args.dark_scale, flat_scale=args.flat_scale, absorption_correct=args.absorptivity, fix_nan_and_inf=args.fix_nan_and_inf) mode = args.reduction_mode.lower() roi_args = make_subargs(args, ['y', 'height', 'y_step']) set_node_props(reader, args) set_node_props(dark_reader, roi_args) set_node_props(flat_before_reader, roi_args) for r, path in ((reader, args.projections), (dark_reader, args.darks), (flat_before_reader, args.flats)): setup_read_task(r, path, args) LOG.debug("Doing flat field correction using reduction mode `{}'".format(mode)) if args.flats2: flat_after_reader = get_task('read') setup_read_task(flat_after_reader, args.flats2, args) set_node_props(flat_after_reader, roi_args) num_files = len(get_filenames(args.projections)) can_read = len(list(range(args.start, num_files, args.step))) number = args.number if args.number else num_files num_read = min(can_read, number) flat_interpolate = get_task('interpolate', processing_node=processing_node, number=num_read) if args.resize: LOG.debug("Resize input data by factor of {}".format(args.resize)) proj_bin = get_task('bin', processing_node=processing_node, size=args.resize) dark_bin = get_task('bin', processing_node=processing_node, size=args.resize) flat_bin = get_task('bin', processing_node=processing_node, size=args.resize) graph.connect_nodes(reader, proj_bin) graph.connect_nodes(dark_reader, dark_bin) graph.connect_nodes(flat_before_reader, flat_bin) reader, dark_reader, flat_before_reader = proj_bin, dark_bin, flat_bin if args.flats2: flat_bin = get_task('bin', processing_node=processing_node, size=args.resize) graph.connect_nodes(flat_after_reader, flat_bin) flat_after_reader = flat_bin if mode == 'median': dark_stack = get_task('stack', processing_node=processing_node, number=len(get_filenames(args.darks))) dark_reduced = get_task('flatten', processing_node=processing_node, mode='median') flat_before_stack = get_task('stack', processing_node=processing_node, number=len(get_filenames(args.flats))) flat_before_reduced = get_task('flatten', processing_node=processing_node, mode='median') graph.connect_nodes(dark_reader, dark_stack) graph.connect_nodes(dark_stack, dark_reduced) graph.connect_nodes(flat_before_reader, flat_before_stack) graph.connect_nodes(flat_before_stack, flat_before_reduced) if args.flats2: flat_after_stack = get_task('stack', processing_node=processing_node, number=len(get_filenames(args.flats2))) flat_after_reduced = get_task('flatten', processing_node=processing_node, mode='median') graph.connect_nodes(flat_after_reader, flat_after_stack) graph.connect_nodes(flat_after_stack, flat_after_reduced) elif mode == 'average': dark_reduced = get_task('average', processing_node=processing_node) flat_before_reduced = get_task('average', processing_node=processing_node) graph.connect_nodes(dark_reader, dark_reduced) graph.connect_nodes(flat_before_reader, flat_before_reduced) if args.flats2: flat_after_reduced = get_task('average', processing_node=processing_node) graph.connect_nodes(flat_after_reader, flat_after_reduced) else: raise ValueError('Invalid reduction mode') graph.connect_nodes_full(reader, ffc, 0) graph.connect_nodes_full(dark_reduced, ffc, 1) if args.flats2: graph.connect_nodes_full(flat_before_reduced, flat_interpolate, 0) graph.connect_nodes_full(flat_after_reduced, flat_interpolate, 1) graph.connect_nodes_full(flat_interpolate, ffc, 2) else: graph.connect_nodes_full(flat_before_reduced, ffc, 2) return ffc
def create_phase_retrieval_pipeline(args, graph, processing_node=None): LOG.debug('Creating phase retrieval pipeline') pm = Ufo.PluginManager() # Retrieve phase phase_retrieve = get_task('retrieve-phase', processing_node=processing_node) pad_phase_retrieve = get_task('pad', processing_node=processing_node) crop_phase_retrieve = get_task('crop', processing_node=processing_node) fft_phase_retrieve = get_task('fft', processing_node=processing_node) ifft_phase_retrieve = get_task('ifft', processing_node=processing_node) calculate = get_task('calculate', processing_node=processing_node) width = args.width height = args.height default_padded_width = next_power_of_two(width + 64) default_padded_height = next_power_of_two(height + 64) if not args.retrieval_padded_width: args.retrieval_padded_width = default_padded_width if not args.retrieval_padded_height: args.retrieval_padded_height = default_padded_height fmt = 'Phase retrieval padding: {}x{} -> {}x{}' LOG.debug(fmt.format(width, height, args.retrieval_padded_width, args.retrieval_padded_height)) x = (args.retrieval_padded_width - width) // 2 y = (args.retrieval_padded_height - height) // 2 pad_phase_retrieve.props.x = x pad_phase_retrieve.props.y = y pad_phase_retrieve.props.width = args.retrieval_padded_width pad_phase_retrieve.props.height = args.retrieval_padded_height pad_phase_retrieve.props.addressing_mode = args.retrieval_padding_mode crop_phase_retrieve.props.y = y crop_phase_retrieve.props.height = height if ( args.projection_crop_after == 'filter' or not fbp_filtering_in_phase_retrieval(args) ): crop_phase_retrieve.props.x = x crop_phase_retrieve.props.width = width phase_retrieve.props.method = args.retrieval_method phase_retrieve.props.energy = args.energy if len(args.propagation_distance) == 1: phase_retrieve.props.distance = [args.propagation_distance[0]] else: phase_retrieve.props.distance_x = args.propagation_distance[0] phase_retrieve.props.distance_y = args.propagation_distance[1] phase_retrieve.props.pixel_size = args.pixel_size phase_retrieve.props.regularization_rate = args.regularization_rate phase_retrieve.props.thresholding_rate = args.thresholding_rate phase_retrieve.props.frequency_cutoff = args.frequency_cutoff fft_phase_retrieve.props.dimensions = 2 ifft_phase_retrieve.props.dimensions = 2 if args.delta is not None: import numpy as np lam = 6.62606896e-34 * 299792458 / (args.energy * 1.60217733e-16) thickness_conversion = -lam / (2 * np.pi * args.delta) else: thickness_conversion = 1 if fbp_filtering_in_phase_retrieval(args): LOG.debug('Fusing phase retrieval and FBP filtering') fltr = get_task('filter', processing_node=processing_node) fltr.props.filter = args.projection_filter fltr.props.scale = args.projection_filter_scale fltr.props.cutoff = args.projection_filter_cutoff graph.connect_nodes(phase_retrieve, fltr) graph.connect_nodes(fltr, ifft_phase_retrieve) else: graph.connect_nodes(phase_retrieve, ifft_phase_retrieve) if args.retrieval_method == 'tie' and args.tie_approximate_logarithm: # a = 2 / 10^R, b = -10^R / 2, c = thickness_conversion # t = Taylor series point, T = t * (1 - ln(t)) # a * b = -1 (from above) # ----------------------------------------------------- # We will use the Taylor expansion to the 1st order of the logarithm which TIE needs: # -ln (a * retrieved) * b * c ~ b * c / t * [T - a * retrieved] # T - a * retrieved = T - a * F^-1{F(I) * kernel} = T - F^-1{aF(I) * kernel} # for u, v = 0: aF(I) * kernel = F(I)(0, 0) because kernel(0, 0) = 1 / a, thus: # T - F^-1{aF(I) * kernel} = -F^-1{aF(I - T) * kernel}, because for u, v = 0 we have: # -aF(I - T) * kernel = - F(I - T) = T - F(I)(0, 0) (and the rest of the frequencies is # unaffected by "T". # further: -aF(I - T) * kernel = F[a(T - I)] * kernel # bring in b, c and t and we have F[abc/t(T - I)] * kernel, with ab=-1 we end up with: # F[c/t(I - T)] * kernel, so the approximation of the logarithm does not need any change in # the TIE kernel itself, we may just transform the input image and use the rest of the # pipeline as usual. if args.delta is None: # Do not multiply by one expression = "v - 1" else: expression = "{} * (v - {})".format( # c / t thickness_conversion / args.tie_approximate_point, # t * (1 - ln(t)) args.tie_approximate_point * (1 - np.log(args.tie_approximate_point)) ) calculate.props.expression = expression LOG.debug("Phase contrast conversion expression (log approximation): `%s'", expression) graph.connect_nodes(calculate, pad_phase_retrieve) first = calculate last = crop_phase_retrieve else: if args.retrieval_method == 'tie': expression = '(isinf (v) || isnan (v) || (v <= 0)) ? 0.0f : ' if args.tie_approximate_logarithm: # ln(x) ~ x - 1 at a=1 expression += '(1.0f - {} * v) * {}' else: expression += '-log ({} * v) * {}' # first term: 2 for 0.5 factor in ufo-filters and alpha = 10^-R, so divide by 10^R # second term: The following converts the TIE result to the actual phase, which when # multiplied by the thickness_conversion gives the projected thickness thickness_conversion *= -10 ** args.regularization_rate / 2 expression = expression.format(2 / 10 ** args.regularization_rate, thickness_conversion) else: expression = '(isinf (v) || isnan (v)) ? 0.0f : v * {}'.format(thickness_conversion) LOG.debug("Phase contrast conversion expression: `%s'", expression) calculate.props.expression = expression graph.connect_nodes(crop_phase_retrieve, calculate) first = pad_phase_retrieve last = calculate graph.connect_nodes(pad_phase_retrieve, fft_phase_retrieve) graph.connect_nodes(fft_phase_retrieve, phase_retrieve) graph.connect_nodes(ifft_phase_retrieve, crop_phase_retrieve) return (first, last) def run_flat_correct(args): graph = Ufo.TaskGraph() sched = Ufo.Scheduler() pm = Ufo.PluginManager() out_task = get_writer(args) flat_task = create_flat_correct_pipeline(args, graph) graph.connect_nodes(flat_task, out_task) run_scheduler(sched, graph)
[docs]def create_sinogram_pipeline(args, graph): """Create sinogram generating pipeline based on arguments from *args*.""" pm = Ufo.PluginManager() sinos = pm.get_task('transpose-projections') if args.number: region = (args.start, args.start + args.number, args.step) num_projections = len(list(range(*region))) else: num_projections = len(get_filenames(args.projections)) sinos.props.number = num_projections if args.darks and args.flats: start = create_flat_correct_pipeline(args, graph) else: start = get_task('read') start.props.path = args.projections set_node_props(start, args) graph.connect_nodes(start, sinos) return sinos
[docs]def run_sinogram_generation(args): """Make the sinograms with arguments provided by *args*.""" if not args.height: args.height = determine_shape(args, args.projections)[1] - args.y step = args.y_step * args.pass_size if args.pass_size else args.height starts = list(range(args.y, args.y + args.height, step)) + [args.y + args.height] def generate_partial(append=False): graph = Ufo.TaskGraph() sched = Ufo.Scheduler() args.output_append = append writer = get_writer(args) sinos = create_sinogram_pipeline(args, graph) graph.connect_nodes(sinos, writer) return run_scheduler(sched, graph) for i in range(len(starts) - 1): args.y = starts[i] args.height = starts[i + 1] - starts[i] if not generate_partial(append=i != 0): # We were interrupted break
def create_projection_filtering_pipeline(args, graph, processing_node=None): pm = Ufo.PluginManager() pad = get_task('pad', processing_node=processing_node) fft = get_task('fft', processing_node=processing_node) ifft = get_task('ifft', processing_node=processing_node) fltr = get_task('filter', processing_node=processing_node) if args.projection_crop_after == 'filter': crop = get_task('crop', processing_node=processing_node) else: crop = None padding_width = setup_padding(pad, args.width, args.height, args.projection_padding_mode, crop=crop)[0] fft.props.dimensions = 1 ifft.props.dimensions = 1 fltr.props.filter = args.projection_filter fltr.props.scale = args.projection_filter_scale fltr.props.cutoff = args.projection_filter_cutoff graph.connect_nodes(pad, fft) graph.connect_nodes(fft, fltr) graph.connect_nodes(fltr, ifft) if crop: graph.connect_nodes(ifft, crop) last = crop else: last = ifft return (pad, last)
[docs]def create_preprocessing_pipeline(args, graph, source=None, processing_node=None, cone_beam_weight=True, make_reader=True): """If *make_reader* is True, create a read task if *source* is None and no dark and flat fields are given. """ import numpy as np if not (args.width and args.height): width, height = determine_shape(args, args.projections) if not width: raise RuntimeError("Could not determine width from the input") if not args.width: args.width = width if not args.height: args.height = height - args.y LOG.debug('Image width x height: %d x %d', args.width, args.height) current = None if source: current = source elif args.darks and args.flats: current = create_flat_correct_pipeline(args, graph, processing_node=processing_node) else: if make_reader: current = get_task('read') set_node_props(current, args) if not args.projections: raise RuntimeError('--projections not set') setup_read_task(current, args.projections, args) if args.absorptivity: absorptivity = get_task('calculate', processing_node=processing_node) absorptivity.props.expression = 'v <= 0 ? 0.0f : -log(v)' if current: graph.connect_nodes(current, absorptivity) current = absorptivity if args.transpose_input: transpose = get_task('transpose') if current: graph.connect_nodes(current, transpose) current = transpose tmp = args.width args.width = args.height args.height = tmp if cone_beam_weight and not np.all(np.isinf(args.source_position_y)): # Cone beam projection weight LOG.debug('Enabling cone beam weighting') weight = get_task('cone-beam-projection-weight', processing_node=processing_node) weight.props.source_distance = (-np.array(args.source_position_y)).tolist() weight.props.detector_distance = args.detector_position_y weight.props.center_position_x = args.center_position_x or [args.width / 2. + (args.width % 2) * 0.5] weight.props.center_position_z = args.center_position_z or [args.height / 2. + (args.height % 2) * 0.5] weight.props.axis_angle_x = args.axis_angle_x if current: graph.connect_nodes(current, weight) current = weight if args.energy is not None and args.propagation_distance is not None: pr_first, pr_last = create_phase_retrieval_pipeline(args, graph, processing_node=processing_node) if current: graph.connect_nodes(current, pr_first) current = pr_last if args.projection_filter != 'none' and not fbp_filtering_in_phase_retrieval(args): pf_first, pf_last = create_projection_filtering_pipeline(args, graph, processing_node=processing_node) if current: graph.connect_nodes(current, pf_first) current = pf_last return current
def run_preprocessing(args): graph = Ufo.TaskGraph() sched = Ufo.Scheduler() pm = Ufo.PluginManager() out_task = get_writer(args) current = create_preprocessing_pipeline(args, graph) graph.connect_nodes(current, out_task) run_scheduler(sched, graph)