Source code for tofu.inpaint

import gi
import logging
import numpy as np
gi.require_version('Ufo', '0.0')
from gi.repository import Ufo
from tofu.tasks import get_memory_in, get_task, get_writer
from tofu.util import (
    determine_shape,
    make_subargs,
    run_scheduler,
    set_node_props,
    setup_read_task,
    setup_padding,
)


LOG = logging.getLogger(__name__)


SELECT_SRC = """
kernel void
select_simple (global float *image,
               global float *mask,
               global float *output)
{
    const size_t idx = get_global_id (1) * get_global_size (0) + get_global_id (0);
    output[idx] = mask[idx] > 0.0f ? 0.0f : image[idx];
}

kernel void
select_guidance (global float *image,
                 global float *mask,
                 global float *guidance,
                 global float *output)
{
    const size_t idx = get_global_id (1) * get_global_size (0) + get_global_id (0);
    output[idx] = mask[idx] > 0.0f ? guidance[idx] : image[idx];
}
"""

ADD_CONSTANT_SRC = """
kernel void
add_constant (global float *image,
              global float *value,
              global float *output)
{
    const size_t idx = get_global_id (1) * get_global_size (0) + get_global_id (0);

    output[idx] = image[idx] + value[0];
}
"""


def _make_discrete_inverse_laplace(width, height):
    """Make discrete Laplace deconvolution kernel special for this use case, where we do not care
    about the (0, 0) frequency becuase the kernel is going to be applied on Laplace-filtered data,
    which has zero mean.
    """
    f = np.fft.fftfreq(width)
    g = np.fft.fftfreq(height)
    f, g = np.meshgrid(f, g)
    # From discrete Laplace and time shift: F[f''(x, y)] = -4 F[f(x, y)]
    # + F[f(x + 1, y)] + F[f(x - 1, y)] + F[f(x, y + 1)] + F[f(x, y - 1)] = the result below when we
    # use the time shift property of the Fourier transform.
    kernel = 2 * (np.cos(2 * np.pi * f) + np.cos(2 * np.pi * g) - 2)
    # Make this invertible by simply setting the (0, 0) frequency to 1 instead of making sure that
    # after the inversion it is 0. We can afford this becuase we know the input to filtering will be
    # Laplace-filtered -> zero mean -> (0, 0) frequency = 0.
    kernel[0, 0] = 1

    return (1 / kernel).astype(np.float32)


[docs]def prepare_border_smoothing(padded_width, padded_height): """ The use case here is mainly the removal of the cross at (0, 0) in the power spectrum by masking out the borders of the image, i.e. the gradients are forced to go to zeros at the borders and thus removing the sharp transitions when we consider the periodicity assumed by the DFT. *padded_width* and *padded_height* are the width and height of the FFT-padding, not the original image shape. One should use `mirrored_repeat' padding mode on the input images to get the FFT-padded image, compute the mask here and use it for inpainting. """ mask = np.ones((padded_width, padded_height), dtype=np.float32) mask[1:-1, 1:-1] = 0 mem_in_task = get_memory_in(mask) return mem_in_task
def _get_gradient_task(finite_difference_type, direction): return get_task( "gradient", finite_difference_type=finite_difference_type, direction=direction, addressing_mode="repeat", )
[docs]def create_inpaint_pipeline( args, graph, processing_node=None ): """ Create tasks needed for inpainting and connect them. The pipeline has three inputs and one output, which is the inpainted image. Based on :cite:`MOREL2012342`. """ determine_shape(args, path=args.projections, store=True, do_raise=True) if not args.inpaint_padded_width: args.inpaint_padded_width = args.width if not args.inpaint_padded_height: args.inpaint_padded_height = args.height do_pad = args.inpaint_padded_width != args.width and args.inpaint_padded_height != args.height use_guidance = not args.harmonize_borders and args.guidance_image LOG.debug("inpaint padding on: %s", do_pad) LOG.debug("inpaint using guidance image: %s", use_guidance) copy_projections = Ufo.CopyTask() copy_mask = Ufo.CopyTask() copy_guidance = Ufo.CopyTask() if use_guidance else None if do_pad: # Padding pad_projections = get_task("pad") pad_mask = get_task("pad") pad_guidance = get_task("pad") for pad_task in (pad_projections, pad_mask, pad_guidance): setup_padding( pad_task, args.width, args.height, args.inpaint_padding_mode, pad_width=args.inpaint_padded_width - args.width, pad_height=args.inpaint_padded_height - args.height, centered=False ) graph.connect_nodes(pad_projections, copy_projections) graph.connect_nodes(pad_mask, copy_mask) if use_guidance: graph.connect_nodes(pad_guidance, copy_guidance) else: pad_guidance = None inputs = (pad_projections, pad_mask, pad_guidance) else: inputs = (copy_projections, copy_mask, copy_guidance) # First gradient is forward and the second backward -> we get exactly the discrete Laplace after # the two passes. gx = _get_gradient_task("forward", "horizontal") gy = _get_gradient_task("forward", "vertical") ggx = _get_gradient_task("backward", "horizontal") ggy = _get_gradient_task("backward", "vertical") fft_task = get_task("fft", dimensions=2) ifft_task = get_task("ifft", dimensions=2) add_ggx_ggy = get_task("opencl", kernel="add", dimensions=2) mul_task = get_task("opencl", kernel="multiply", halve_width=False, dimensions=2) select_kernel = "select_guidance" if use_guidance else "select_simple" select_gx = get_task("opencl", source=SELECT_SRC, kernel=select_kernel, dimensions=2) select_gy = get_task("opencl", source=SELECT_SRC, kernel=select_kernel, dimensions=2) # We are computing discrete gradients -> Laplace must also be discrete lap_kernel = _make_discrete_inverse_laplace( args.inpaint_padded_width, args.inpaint_padded_height ) # Multiply interleaved complex array -> a * z = a * Re[z] + j * a * Im[z] mem_in_task = get_memory_in(lap_kernel + 1j * lap_kernel) if args.preserve_mean: mean_task = get_task("measure", axis=-1, metric="mean") add_constant = get_task( "opencl", source=ADD_CONSTANT_SRC, kernel="add_constant", dimensions=2 ) # First derivative graph.connect_nodes(copy_projections, gx) graph.connect_nodes(copy_projections, gy) # Select guidance or zeros where mask >= 0 graph.connect_nodes_full(gx, select_gx, 0) graph.connect_nodes_full(gy, select_gy, 0) graph.connect_nodes_full(copy_mask, select_gx, 1) graph.connect_nodes_full(copy_mask, select_gy, 1) if use_guidance: guidance_gx = _get_gradient_task("forward", "horizontal") guidance_gy = _get_gradient_task("forward", "vertical") graph.connect_nodes(copy_guidance, guidance_gx) graph.connect_nodes(copy_guidance, guidance_gy) graph.connect_nodes_full(guidance_gx, select_gx, 2) graph.connect_nodes_full(guidance_gy, select_gy, 2) # Second derivative graph.connect_nodes(select_gx, ggx) graph.connect_nodes(select_gy, ggy) # Sum -> Laplacian graph.connect_nodes_full(ggx, add_ggx_ggy, 0) graph.connect_nodes_full(ggy, add_ggx_ggy, 1) # Deconvolve with Laplacian graph.connect_nodes(add_ggx_ggy, fft_task) graph.connect_nodes_full(fft_task, mul_task, 0) graph.connect_nodes_full(mem_in_task, mul_task, 1) graph.connect_nodes(mul_task, ifft_task) if args.preserve_mean: # Get the mean back to the one of the input image graph.connect_nodes(copy_projections, mean_task) graph.connect_nodes_full(ifft_task, add_constant, 0) graph.connect_nodes_full(mean_task, add_constant, 1) last = add_constant else: last = ifft_task outputs = (last,) return (inputs, outputs)
[docs]def run(args): """Usage with tofu: create readers, the pipeline and run it.""" if args.harmonize_borders: if args.mask_image: LOG.warning( "--mask-image has no effect when --harmonize-borders is specified" ) if args.guidance_image: LOG.warning( "--guidance-image has no effect when --harmonize-borders is specified" ) if args.inpaint_padding_mode != "mirrored_repeat": LOG.warning( "Padding mode should be `mirrored_repeat' for smooth transitions between " "true image borders and padded borders" ) elif not args.mask_image: raise ValueError("One of --mask-image or --harmonize-borders must be specified") # Reading reader = get_task("read") roi_args = make_subargs(args, ['y', 'height', 'y_step']) set_node_props(reader, args) setup_read_task(reader, args.projections, args) out_task = get_writer(args) graph = Ufo.TaskGraph() ((input_projections, input_mask, input_guidance), (last,)) = create_inpaint_pipeline( args, graph, ) if args.harmonize_borders: mask_reader = prepare_border_smoothing( args.inpaint_padded_width, args.inpaint_padded_height ) else: mask_reader = get_task("read") set_node_props(mask_reader, roi_args) setup_read_task(mask_reader, args.mask_image, args) graph.connect_nodes(reader, input_projections) graph.connect_nodes(mask_reader, input_mask) if not args.harmonize_borders and args.guidance_image: guidance_reader = get_task("read") set_node_props(guidance_reader, roi_args) setup_read_task(guidance_reader, args.guidance_image, args) graph.connect_nodes(guidance_reader, input_guidance) graph.connect_nodes(last, out_task) # CopyTask works only with FixedScheduler sched = Ufo.FixedScheduler() run_scheduler(sched, graph)