Source code for magni.cs.reconstruction.it._algorithm

"""
..
    Copyright (c) 2015-2017, Magni developers.
    All rights reserved.
    See LICENSE.rst for further information.

Module providing the core Iterative Thresholding (IT) algorithm.

Routine listings
----------------
run(y, A)
    Run the IT reconstruction algorithm.

See Also
--------
magni.cs.reconstruction.it._config : Configuration options.
magni.cs.reconstruction.it._step_size : Available step-size strategies.
magni.cs.reconstruction.it._stop_criterion : Available stop criteria.
magni.cs.reconstruction.it._threshold : Available threshold estimators.
magni.cs.reconstruction.it._threshold_operators : Available threshold operators

Notes
-----
The default configuration of the IT algorithm provides the Iterative Hard
Thresholding (IHT) algorithm [1]_ using the False Alarm Rate (FAR) heuristic
from [2]_. By default a residual measurements ratio step criterion is used. The
algorithm may also be configured to act as the Iterative Soft Thresholding
(IST) [3]_ algorithm with the soft threshold from [4]_.

References
----------
.. [1] T. Blumensath and M.E. Davies, "Iterative Thresholding for Sparse
   Approximations", *Journal of Fourier Analysis and Applications*, vol. 14,
   pp. 629-654, Sep. 2008.
.. [2] A. Maleki and D.L. Donoho, "Optimally Tuned Iterative Reconstruction
   Algorithms for Compressed Sensing", *IEEE Journal Selected Topics in Signal
   Processing*, vol. 3, no. 2, pp. 330-341, Apr. 2010.
.. [3] I. Daubechies, M. Defrise, and C. D. Mol, "An Iterative Thresholding
   Algorithm for Linear Inverse Problems with a Sparsity Constraint",
   *Communications on Pure and Applied Mathematics*, vol. 57, no. 11,
   pp. 1413-1457, Nov. 2004.
.. [4] D.L. Donoho, "De-Noising by Soft-Thresholding", *IEEE Transactions on
   Information Theory*, vol. 41, no. 3, pp. 613-627, May. 1995.

"""

from __future__ import division

import numpy as np

from magni.cs.reconstruction.it import config as _conf
from magni.cs.reconstruction.it import _step_size
from magni.cs.reconstruction.it import _stop_criterion
from magni.cs.reconstruction.it import _threshold
from magni.cs.reconstruction.it import _threshold_operators
from magni.imaging.evaluation import calculate_mse as _calculate_mse
from magni.utils.validation import decorate_validation as _decorate_validation
from magni.utils.validation import validate_generic as _generic
from magni.utils.validation import validate_numeric as _numeric


[docs]def run(y, A): """ Run the IT reconstruction algorithm. Parameters ---------- y : ndarray The m x 1 measurement vector. A : ndarray or magni.utils.matrices.{Matrix, MatrixCollection} The m x n matrix which is the product of the measurement matrix and the dictionary matrix. Returns ------- alpha : ndarray The n x 1 reconstructed coefficient vector. history : dict, optional The dictionary of various measures tracked in the IT iterations. See Also -------- magni.cs.reconstruction.it._config : Configuration options. magni.cs.reconstruction.it._step_size : Step-size calculation. magni.cs.reconstruction.it._stop_criterion : Stop criterion calculation. magni.cs.reconstruction.it._threshold : Threshold calculation. magni.cs.reconstruction.it._threshold_operators : Thresholding operators. Notes ----- Optionally, the algorithm may be configured to save and return the iteration history along with the reconstruction result. The returned history contains the following: * alpha : Reconstruction coefficient estimates * MSE : Solution mean squared error (if the true solution is known). * stop_criterion : The currently used stop criterion. * stop_criterion_value : The value of the stop criterion. * stop_iteration : The iteration at which the algorithm stopped. * stop_reason : The reason for termination of the algorithm. Examples -------- For example, recovering a vector from random measurements using IHT with the FAR heuristic >>> import numpy as np, magni >>> from magni.cs.reconstruction.it import run >>> np.random.seed(seed=6021) >>> A = 1 / np.sqrt(90) * np.random.randn(90, 200) >>> alpha = np.zeros((200, 1)) >>> alpha[:10] = 1 >>> y = A.dot(alpha) >>> alpha_hat = run(y, A) >>> alpha_hat[:12] array([[ 0.99748945], [ 1.00082074], [ 0.99726507], [ 0.99987834], [ 1.00025857], [ 1.00003266], [ 1.00021666], [ 0.99838884], [ 1.00018356], [ 0.99859105], [ 0. ], [ 0. ]]) >>> (np.abs(alpha_hat) > 1e-2).sum() 10 Or recover the same vector as above using IST with a fixed number of non-thresholded coefficients >>> it_config = {'threshold_operator': 'soft', 'threshold': 'fixed', ... 'threshold_fixed': 10} >>> magni.cs.reconstruction.it.config.update(it_config) >>> alpha_hat = run(y, A) >>> alpha_hat[:12] array([[ 0.99822443], [ 0.99888724], [ 0.9982493 ], [ 0.99928642], [ 0.99964131], [ 0.99947346], [ 0.9992809 ], [ 0.99872624], [ 0.99916842], [ 0.99903033], [ 0. ], [ 0. ]]) >>> (np.abs(alpha_hat) > 1e-2).sum() 10 Or use a weighted IHT method with a fixed number of non-thresholded coefficietns to recover the above signal >>> weights = np.linspace(1.0, 0.9, 200).reshape(200, 1) >>> it_config = {'threshold_operator': 'weighted_hard', ... 'threshold_weights': weights} >>> magni.cs.reconstruction.it.config.update(it_config) >>> alpha_hat = run(y, A) >>> alpha_hat[:12] array([[ 0.99853888], [ 0.99886104], [ 0.99843149], [ 1.0000333 ], [ 1.00060273], [ 1.00035539], [ 0.99966219], [ 0.99912209], [ 0.99961541], [ 0.99952029], [ 0. ], [ 0. ]]) >>> (np.abs(alpha_hat) > 1e-2).sum() 10 >>> magni.cs.reconstruction.it.config.reset() """ @_decorate_validation def validate_input(): _numeric('y', ('integer', 'floating', 'complex'), shape=(-1, 1)) _numeric('A', ('integer', 'floating', 'complex'), shape=(y.shape[0], -1)) @_decorate_validation def validate_output(): _numeric('alpha', ('integer', 'floating', 'complex'), shape=(A.shape[1], 1), precision=convert(0).nbytes * 8) _generic('history', 'mapping', keys_in=('alpha', 'MSE', 'stop_criterion', 'stop_criterion_value', 'stop_iteration', 'stop_reason')) validate_input() # Configured setup param = dict(_conf.items()) convert = param['precision_float'] iterations = param['iterations'] report_history = param['report_history'] tolerance = param['tolerance'] true_solution = param['true_solution'] threshold_weights = param['threshold_weights'] kappa = param['kappa_fixed'] threshold_alpha = _threshold_operators.get_function_handle( param['threshold_operator']) # Initialisation based on configuration if param['warm_start'] is not None: alpha = convert(param['warm_start']) else: alpha = np.zeros((A.shape[1], 1), dtype=convert) run.calculate_step_size = _step_size.get_function_handle( param['kappa'], locals()) run.calculate_threshold = _threshold.get_function_handle( param['threshold'], locals()) run.stop_criterion = _stop_criterion.get_function_handle( param['stop_criterion'], locals()) history = {'alpha': [alpha], 'MSE': [np.nan], 'stop_criterion': param['stop_criterion'].upper(), 'stop_criterion_value': [np.nan], 'stop_iteration': 0, 'stop_reason': 'MAX_ITERATIONS'} r = y.copy() # IT iterations for it in range(param['iterations']): alpha_prev = alpha c = A.conj().T.dot(r) # Step-size (relaxation parameter) kappa = run.calculate_step_size(locals()) alpha = alpha + kappa * c # Threshold threshold = run.calculate_threshold(locals()) threshold_alpha(locals()) # Residual r = y - A.dot(alpha) # Stop criterion stop, stop_criterion_value = run.stop_criterion(locals()) # History reporting if report_history: history['alpha'].append(alpha) history['stop_criterion_value'].append(stop_criterion_value) history['stop_iteration'] = it if true_solution is not None: history['MSE'].append(_calculate_mse(true_solution, alpha)) if stop: history['stop_reason'] = param['stop_criterion'].upper() break validate_output() if report_history: return alpha, history else: return alpha