Source code for magni.cs.reconstruction.amp._algorithm

"""
..
    Copyright (c) 2016-2017, Magni developers.
    All rights reserved.
    See LICENSE.rst for further information.

Module providing the core Approximate Message Passing (AMP) algorithm.

Routine listings
----------------
run(y, A)
    Run the AMP reconstruction algorithm.

See Also
--------
magni.cs.reconstruction.amp._config : Configuration options.
magni.cs.reconstruction.amp.threshold_operator : Threshold operators.
magni.cs.reconstruction.amp.stop_criterion : Stop criteria.
magni.cs.reconstruction.amp.util : Utilities.

Notes
-----
The default configuration of the AMP algorithm provides the Donoho, Maleki,
Montanari (DMM) AMP from [1]_ using the soft threshold with the "residual"
thresholding strategy from [2]_.

References
----------
.. [1] D. L. Donoho, A. Maleki, and A. Montanari, "Message-passing algorithms
   for compressed sensing", *Proceedings of the National Academy of Sciences of
   the United States of America*, vol. 106, no. 45, p. 18914-18919, Nov. 2009.
.. [2] A. Montanari, "Graphical models concepts in compressed sensing" *in
   Compressed Sensing: Theory and Applications*, Y. C. Eldar and G. Kutyniok
   (Ed.), Cambridge University Press, ch. 9, pp. 394-438, 2012.

"""

from __future__ import division
import copy

import numpy as np

from magni.cs.reconstruction.amp import config as _conf
from magni.utils.validation import decorate_validation as _decorate_validation
from magni.utils.validation import validate_generic as _generic
from magni.utils.validation import validate_numeric as _numeric


[docs]def run(y, A): """ Run the AMP reconstruction algorithm. Parameters ---------- y : ndarray The m x 1 measurement vector. A : ndarray or magni.utils.matrices.{Matrix, MatrixCollection} The m x n matrix which is the product of the measurement matrix and the dictionary matrix. Returns ------- alpha : ndarray The n x 1 reconstructed coefficient vector. history : dict, optional The dictionary of various measures tracked in the GAMP iterations. See Also -------- magni.cs.reconstruction.amp._config : Configuration options. magni.cs.reconstruction.amp.threshold_operator : Threshold operators. magni.cs.reconstruction.amp.stop_criterion : Stop criteria. magni.cs.reconstruction.amp.util : Utilities. Notes ----- Optionally, the algorithm may be configured to save and return the iteration history along with the reconstruction result. The returned history contains the following: * alpha_bar : Coefficient estimates (the reconstruction coefficients). * MSE : solution mean squared error (if the true solution is known). * threshold_parameters : The state of the threshold parameters. * stop_criterion : The currently used stop criterion. * stop_criterion_value : The value of the stop criterion. * stop_iteration : The iteration at which the algorithm stopped. * stop_reason : The reason for termination of the algorithm. Examples -------- For example, recovering a vector from noiseless measurements using AMP with soft thresholding >>> import numpy as np >>> from magni.cs.reconstruction.amp import run, config >>> from magni.cs.reconstruction.amp.util import theta_mm >>> np.random.seed(seed=6028) >>> k, m, n = 10, 200, 400 >>> A = 1 / np.sqrt(m) * np.random.randn(m, n) >>> alpha = np.zeros((n, 1)) >>> alpha[:k] = np.random.normal(scale=2, size=(k, 1)) >>> np_printoptions = np.get_printoptions() >>> np.set_printoptions(suppress=True, threshold=k+2) >>> alpha[:k + 2] array([[ 1.92709461], [ 0.74378508], [-3.2418159 ], [-1.32277347], [ 0.90118 ], [-0.19157262], [ 0.82855712], [ 0.24817994], [-1.43034777], [-0.21232344], [ 0. ], [ 0. ]]) >>> y = A.dot(alpha) >>> threshold_params = {'threshold_level_update_method': 'residual', ... 'theta': theta_mm(float(m) / n), 'tau_hat_sq': 1.0} >>> config['threshold_parameters'] = threshold_params >>> alpha_hat = run(y, A) >>> alpha_hat[:k + 2] array([[ 1.92612707], [ 0.74185284], [-3.24179313], [-1.32200929], [ 0.90089208], [-0.19097308], [ 0.82658038], [ 0.24515825], [-1.42980997], [-0.2111469 ], [ 0.00002815], [ 0.00047293]]) >>> np.sum(np.abs(alpha - alpha_hat) > 1e-2) 0 or recover the same vector returning a history comparing the pr. iteration solution to the true vector >>> config['report_history'] = True >>> config['true_solution'] = alpha >>> alpha_hat, history = run(y, A) >>> alpha_hat[:k + 2] array([[ 1.92612707], [ 0.74185284], [-3.24179313], [-1.32200929], [ 0.90089208], [-0.19097308], [ 0.82658038], [ 0.24515825], [-1.42980997], [-0.2111469 ], [ 0.00002815], [ 0.00047293]]) >>> np.array(history['MSE']).reshape(-1, 1)[1:11] array([[ 0.01273323], [ 0.0053925 ], [ 0.00276334], [ 0.00093472], [ 0.0004473 ], [ 0.00021142], [ 0.00009618], [ 0.00004371], [ 0.00002275], [ 0.00001053]]) or recover the same vector using the median based update method >>> config['report_history'] = False >>> threshold_params['threshold_level_update_method'] = 'median' >>> config['threshold_parameters'] = threshold_params >>> alpha_hat = run(y, A) >>> alpha_hat[:k + 2] array([[ 1.92577079], [ 0.7413161 ], [-3.24131125], [-1.32044146], [ 0.89998407], [-0.18981235], [ 0.82584016], [ 0.24445332], [-1.42934708], [-0.21043221], [ 0. ], [ 0. ]]) >>> np.sum(np.abs(alpha - alpha_hat) > 1e-2) 0 >>> np.set_printoptions(**np_printoptions) """ @_decorate_validation def validate_input(): _numeric('y', ('integer', 'floating', 'complex'), shape=(-1, 1)) _numeric('A', ('integer', 'floating', 'complex'), shape=( y.shape[0], _conf['true_solution'].shape[0] if _conf['true_solution'] is not None else -1)) @_decorate_validation def validate_output(): _numeric('alpha', ('integer', 'floating'), shape=(A.shape[1], 1)) _generic('history', 'mapping', keys_in=('alpha_bar', 'MSE', 'threshold_parameters', 'stop_criterion', 'stop_criterion_value', 'stop_iteration', 'stop_reason')) validate_input() # Initialisation init = _get_amp_initialisation(y, A) AH = init['AH'] chi = init['chi'] delta = init['delta'] alpha_bar = init['alpha_bar'] AH_dot_chi = init['AH_dot_chi'] threshold = init['threshold'] threshold_parameters = init['threshold_parameters'] stop_criterion = init['stop_criterion'] stop_criterion_name = init['stop_criterion_name'] iterations = init['iterations'] tolerance = init['tolerance'] convert = init['convert'] report_history = init['report_history'] history = init['history'] true_solution = init['true_solution'] # AMP iterations for it in range(iterations): # Save previous state alpha_bar_prev = alpha_bar # AMP state updates # Basic AMP treshold operand: alpha_bar + AH_dot_chi alpha_bar = threshold.compute_threshold(locals()) A_dot_alpha_bar = A.dot(alpha_bar) # Basic AMP deriv threshold operand: alpha_bar_prev + AH_dot_chi chi = y - A_dot_alpha_bar + 1 / delta * np.mean( threshold.compute_deriv_threshold(locals())) * chi AH_dot_chi = AH.dot(chi) # Threshold level update threshold.update_threshold_level(locals()) # Stop criterion stop, stop_criterion_value = stop_criterion.compute(locals()) # History reporting if report_history: history['alpha_bar'].append(alpha_bar) history['threshold_parameters'].append( copy.deepcopy(threshold.__dict__)) history['stop_criterion_value'].append(stop_criterion_value) history['stop_iteration'] = it if true_solution is not None: history['MSE'].append( 1/A.shape[1] * np.linalg.norm( true_solution - alpha_bar)**2) if stop: history['stop_reason'] = stop_criterion_name.upper() break alpha = alpha_bar validate_output() if report_history: return alpha, history else: return alpha
[docs]def _get_amp_initialisation(y, A): """ Return an initialisation of the AMP algorithm. Parameters ---------- y : ndarray The m x 1 measurement vector. A : ndarray or magni.utils.matrices.{Matrix, MatrixCollection} The m x n matrix which is the product of the measurement matrix and the dictionary matrix. Returns ------- init : dict The initialisation of the the AMP algorithm. """ init = dict() param = dict(_conf.items()) # Configured setup init['convert'] = param['precision_float'] init['report_history'] = param['report_history'] init['tolerance'] = param['tolerance'] init['iterations'] = param['iterations'] init['true_solution'] = param['true_solution'] init['threshold_parameters'] = param['threshold_parameters'] init['stop_criterion_name'] = param['stop_criterion'].__name__ # Arguments based configuration init['AH'] = A.conj().T init['delta'] = A.shape[0] / A.shape[1] # threshold and stop criterion configuration threshold_init = copy.copy(init) threshold_init['A'] = A threshold_init['y'] = y init['threshold'] = param['threshold'](threshold_init) init['stop_criterion'] = param['stop_criterion'](threshold_init) # Configuration of remaining states if param['warm_start'] is not None: init['alpha_bar'] = init['convert'](param['warm_start']) else: init['alpha_bar'] = np.zeros((A.shape[1], 1), dtype=init['convert']) init['history'] = {'alpha_bar': [init['alpha_bar']], 'MSE': [np.nan], 'threshold_parameters': [ init['threshold_parameters']], 'stop_criterion': init['stop_criterion_name'].upper(), 'stop_criterion_value': [np.nan], 'stop_iteration': 0, 'stop_reason': 'MAX_ITERATIONS'} init['chi'] = np.zeros_like(y) # by convention init['AH_dot_chi'] = init['AH'].dot(y - A.dot(init['alpha_bar'])) return init