Source code for mr_utils.cs.ordinator

'''Combinatorial optimization to find permutation maximizing sparsity.
'''

from functools import partial
from time import time
from multiprocessing import Pool
from itertools import combinations
import logging

import numpy as np
from tqdm import tqdm
from scipy.special import comb
from scipy.optimize import basinhopping, linear_sum_assignment as lsa
from scipy.spatial.distance import cdist

[docs]def obj(ck, N, locs, inverse, pdf_ref, pdf, pdf_metric): '''Objective function for basinhopping.''' c = np.zeros(N) c[locs] = ck xhat = inverse(c) xhat /= np.max(np.abs(xhat)) + np.finfo('float').eps return pdf_metric(pdf_ref, pdf(xhat))
[docs]def get_xhat(locs, N, k, inverse, pdf_ref, pdf, pdf_metric): '''Compute xhat for given coefficient locs using basinhopping. Parameters ---------- locs : array_like Coefficient location indices. N : int Length of the desired signal (also number of coefficients in total). k : int Desired sparsity level. inverse : callable Inverse sparsifying transform. pdf_ref : array_like Reference pdf of the prior to compare against. pdf : callable Function that estimates pixel intensity distribution. pdf_metric : callable Function that returns the distance between pdfs. Returns ------- xhat : array_like Inverse transform of coeffs. locs : array_like Indices of non-zero coefficients. coeffs : array_like Coefficients of xhat. ''' c0 = np.zeros(N) ck = np.zeros(k) res = basinhopping( obj, ck, minimizer_kwargs={ 'args': (N, locs, inverse, pdf_ref, pdf, pdf_metric)}) c0[locs] = res['x'] xhat = inverse(c0) xhat /= np.max(np.abs(xhat)) + np.finfo('float').eps return(xhat, locs, res['x'])
[docs]def search_fun(locs, N, k, inverse, pdf_ref, pdf, pdf_metric): '''Return function for parallel loop. Parameters ---------- locs : array_like Coefficient location indices. N : int Length of the desired signal (also number of coefficients in total). k : int Desired sparsity level. inverse : callable Inverse sparsifying transform. pdf_ref : array_like Reference pdf of the prior to compare against. pdf : callable Function that estimates pixel intensity distribution. pdf_metric : callable Function that returns the distance between pdfs. Returns ------- locs : array_like Indices of non-zero coefficients. vals : array_like Values of coefficients at locations given by locs. float Measure of difference between pdf_ref and pdf(xhat). ''' xhat, locs, vals = get_xhat( [*locs], N, k, inverse, pdf_ref, pdf, pdf_metric) return(locs, vals, pdf_metric(pdf_ref, pdf(xhat)))
[docs]class pdf_default(object): '''Picklable object for computing pdfs. Uses histogram to estimate pdf. Attributes ---------- N : int Size of signal. lims : array_like or tuple Upper and lower bounds for range of histogram. pdf_ref : array_like pdf estimate of prior. Used to compare to pdf(xhat). bins : array_like bin locations used for construction of pdf_ref. ''' def __init__(self, prior): ''' Note that prior should be normalized between lims=(-1, 1). ''' N = prior.size self.lims = (-1, 1) self.pdf_ref, self.bins = np.histogram( prior, bins=N, range=self.lims)
[docs] def pdf(self, x): '''Estimate the pdf of x. Parameters ---------- x : array_like Signal to get pdf estimate of. Returns ------- array_like Histogram of x. Notes ----- Will report when xhat has a value outside of range of pdf_ref. ''' if np.min(x) < self.lims[0]: tqdm.write( 'XHAT MIN WAS LOWER THAN X MIN: %g' % np.min(x)) if np.max(x) > self.lims[1]: tqdm.write( 'XHAT MAX WAS HIGHER THAN X MAX: %g' % np.max(x)) return np.histogram(x, self.bins, self.lims)[0]
[docs]def pdf_metric_default(x, y): '''Default pdf metric, l2 norm. Parameters ---------- x : array_like First pdf. y : array_like Second pdf. Returns ------- float l2 norm between x and y. ''' return np.linalg.norm(x - y, ord=2)
[docs]def ordinator1d(prior, k, forward, inverse, chunksize=10, pdf=None, pdf_metric=None, sparse_metric=None, disp=False): '''Find permutation that maximizes sparsity of 1d signal. Parameters ---------- prior : array_like Prior signal estimate to base ordering. k : int Desired sparsity level. forward : callable Sparsifying transform. inverse : callable Inverse sparsifying transform. chunksize : int, optional Chunk size for parallel processing pool. pdf : callable, optional Function that estimates pixel intensity distribution. pdf_metric : callable, optional Function that returns the distance between pdfs. sparse_metric : callable, optional Metric to use to measure sparsity. Uses l1 norm by default. disp : bool, optional Whether or not to display coefficient plots at the end. Returns ------- array_like Reordering indices. Raises ------ ValueError If disp=True and forward function is not provided. Notes ----- pdf_method=None uses histogram. pdf_metric=None uses l2 norm. If disp=True then forward transform function must be provided. Otherwise, forward is not required, only inverse. pdf_method should assume the signal will be bounded between (-1, 1). We do this by always normalizing a signal before computing pdf or comparing. ''' # # Make sure we have the forward transform if we want to display # if disp and forward is None: # raise ValueError( # 'Must provide forward transform for display!') # Make sure we have a sparsity metric if sparse_metric is None: sparse_metric = lambda x0: np.linalg.norm(x0, ord=1) # Make sure we do in fact have a 1d signal if prior.ndim > 1: logging.warning('Prior is not 1d! Flattening!') prior = prior.flatten() N = prior.size # Go ahead and normalize the signal so we don't have to keep # track of the limits of the pdfs we want to compare, always # between (-1, 1). prior /= np.max(np.abs(prior)) + np.finfo('float').eps # Default to histogram if pdf is None: pdf_object = pdf_default(prior) pdf = pdf_object.pdf pdf_ref = pdf_object.pdf_ref else: # Get reference pdf pdf_ref = pdf(prior) # Default to l2 metric if pdf_metric is None: pdf_metric = pdf_metric_default # Let's try to do things in parallel -- more than twice as fast! search_fun_partial = partial( search_fun, N=N, k=k, inverse=inverse, pdf_ref=pdf_ref, pdf_metric=pdf_metric, pdf=pdf) t0 = time() # start the timer with Pool() as pool: res = list(tqdm(pool.imap( search_fun_partial, combinations(range(N), k), chunksize), total=comb(N, k, exact=True), leave=False)) res = np.array(res) # Choose the winner winner_idx = np.where(res[:, -1] == res[:, -1].min())[0] potentials = [] for idx0 in winner_idx: potentials.append(res[idx0, :]) print('potential:', potentials[-1][0]) print('Found %d out of %d (%%%g) potentials in %d seconds!' % ( len(potentials), res.shape[0], len(potentials)/res.shape[0]*100, time() - t0)) # Now solve the assignment problem, we only need one of the # potentials, so look at all of them and choose the one that is # most sparse if disp: import matplotlib.pyplot as plt winner_sparse = np.inf cur_win = None for potential in potentials: c = np.zeros(N) idx_proposed = potential[0] c[idx_proposed] = potential[1] xhat = inverse(c) xhat /= np.max(np.abs(xhat)) + np.finfo('float').eps C = cdist(xhat[:, None], prior[:, None]) _rows, cols = lsa(C) cur_sparse = sparse_metric(forward(prior[cols])) if cur_sparse < winner_sparse: winner_sparse = cur_sparse print('New win: %g' % cur_sparse) cur_win = cols if disp: tcoeffs = np.abs(forward(prior[cols])) # tcoeffs /= np.max(tcoeffs) plt.plot(-np.sort(-tcoeffs), '--', label='xpi') # Show reference coefficients if disp: # plt.plot(-np.sort(-np.abs(forward(xhat))), label='xhat') tcoeffs = np.abs(forward(np.sort(prior))) # tcoeffs /= np.max(tcoeffs) plt.plot(-np.sort(-tcoeffs), ':', label='sort(x)') plt.legend() plt.title('Sorted, Normalized Transform Coefficients') plt.show() return cur_win
if __name__ == '__main__': pass