Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision

Target

Select target project
  • a23marmo/nonnegative-factorization
1 result
Select Git revision
Show changes
Commits on Source (2)
Showing
with 3284 additions and 2683 deletions
__pycache__/
.ipynb_checkpoints/
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
__pycache__/
.ipynb_checkpoints/
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
\ No newline at end of file
Copyright (c) 2020 Marmoret Axel
Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission.
Copyright (c) 2020 Marmoret Axel
Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
\ No newline at end of file

# nn-fac: Nonnegative Factorization techniques toolbox
Python code for computing some Nonnegative Factorizations, using either an accelerated version of Hierarchical Alternating Least Squares algorithm (HALS) with resolution of Nonnegative Least Squares problem (NNLS) [1] for factorizations subject to the minimization of the Euclidean/Frobenius norm, or the Multiplicative Update [2,3] for factors, by minimizing the $\beta$-divergence [3]..
......
from . import nmf
from . import ntf
from . import ntd
from . import parafac2
from . import multilayer_nmf
from . import deep_nmf
from .update_rules import nnls
from .update_rules import mu
from .update_rules import min_vol_mu
from .update_rules import deep_mu
from .utils import beta_divergence
from .utils import errors
from .utils import normalize_wh
from . import beta_divergence
from . import errors
from . import mu
from . import nmf
from . import nnls
from . import ntd
from . import ntf
from . import parafac2
\ No newline at end of file
# from . import nmf
# from . import nnls
# from . import ntd
# from . import ntf
# from . import parafac2
\ No newline at end of file
import numpy as np
import warnings
import time
import nn_fac.update_rules.mu as mu
import nn_fac.update_rules.min_vol_mu as min_vol_mu
import nn_fac.update_rules.deep_mu as deep_mu
import nn_fac.multilayer_nmf as multi_nmf
import nn_fac.utils.beta_divergence as beta_div
from nn_fac.utils.normalize_wh import normalize_WH
def deep_KL_NMF(data, all_ranks, n_iter_max_each_nmf = 100, n_iter_max_deep_loop = 100, init = "multilayer_nmf", init_multi_layer = "random", W_0 = None, H_0 = None, delta = 1e-6, tol = 1e-6, verbose = False):
L = len(all_ranks)
assert L > 1, "The number of layers must be at least 2. Otherwise, ou should just use NMF."
all_errors = np.zeros((n_iter_max_deep_loop + 1,L))
toc = []
global_errors = []
if sorted(all_ranks, reverse=True) != all_ranks:
raise ValueError("The ranks of deep NMF should be decreasing.")
#warnings.warn("Warning: The ranks of deep NMF should be decreasing.")
if init == "multilayer_nmf":
W, H, e = multi_nmf.multilayer_beta_NMF(data, all_ranks, n_iter_max_each_nmf = n_iter_max_each_nmf, init_each_nmf = init_multi_layer, delta = delta, verbose = verbose)
all_errors[0] = e
elif init == "custom":
W = W_0
H = H_0
all_errors[0,0] = beta_div.kl_divergence(data, W[0] @ H[0])
for i in range(1,L):
all_errors[0,i] = [beta_div.kl_divergence(W[i-1], W[i] @ H[i])]
lambda_ = 1 / np.array(all_errors[0])
global_errors.append(lambda_.T @ all_errors[0])
for deep_iteration in range(n_iter_max_deep_loop):
tic = time.time()
W, H, errors = one_step_deep_nmf(data, W, H, all_ranks, lambda_, delta)
toc.append(time.time() - tic)
all_errors[deep_iteration + 1] = errors
global_errors.append(lambda_.T @ errors)
if verbose:
if global_errors[-2] - global_errors[-1] > 0:
print(f'Sum of errors through layers={global_errors[-1]}, variation={global_errors[-2] - global_errors[-1]}.')
else:
# print in red when the reconstruction error is negative (shouldn't happen)
print(f'\033[91m Sum of errors through layers={global_errors[-1]}, variation={global_errors[-2] - global_errors[-1]}. \033[0m')
if deep_iteration > 1 and abs(global_errors[-2] - global_errors[-1]) < tol:
# Stop condition: relative error between last two iterations < tol
if verbose:
print(f'Converged in {deep_iteration} iterations.')
break
return W, H, all_errors, toc
def one_step_deep_nmf(data, W, H, all_ranks, lambda_, delta):
L = len(all_ranks)
errors = []
for layer in range(L):
if layer == 0:
lam = lambda_[1] / lambda_[0]
H[0] = nn_fac.mu.switch_alternate_mu(data, W[0], H[0], beta=1, matrix="H")
W[0], H[0] = normalize_WH(W[0], H[0], matrix="H")
W[0] = deep_mu.deep_KL_mu(data, W[0], H[0], W[1] @ H[1], lam)
errors.append(beta_div.kl_divergence(data, W[0] @ H[0]))
elif layer == L - 1:
H[layer] = nn_fac.mu.switch_alternate_mu(W[layer-1], W[layer], H[layer], beta=1, matrix="H")
W[layer], H[layer] = normalize_WH(W[layer], H[layer], matrix="H")
W[layer] = nn_fac.mu.switch_alternate_mu(W[layer-1], W[layer], H[layer], beta=1, matrix="W")
errors.append(beta_div.kl_divergence(W[layer-1], W[layer] @ H[layer]))
else:
lam = lambda_[layer + 1] / lambda_[layer]
H[layer] = nn_fac.mu.switch_alternate_mu(W[layer-1], W[layer], H[layer], beta=1, matrix="H")
W[layer], H[layer] = normalize_WH(W[layer], H[layer], matrix="H")
W[layer] = deep_mu.deep_KL_mu(W[layer-1], W[layer], H[layer], W[layer+1] @ H[layer+1], lam)
errors.append(beta_div.kl_divergence(W[layer-1], W[layer] @ H[layer]))
return W, H, errors
if __name__ == "__main__":
np.random.seed(0)
m, n, all_ranks = 100, 200, [15,10,5]
data = np.random.rand(m, n) # Example input matrix
W, H, reconstruction_errors, toc = deep_KL_NMF(data, all_ranks, n_iter_max_each_nmf = 100, verbose = True)
##### CONSTRAINED NMF
## Will be integrated into nmf.py when stable.
import numpy as np
import time
from nimfa.methods import seeding
import nn_fac.update_rules.min_vol_mu as min_vol_mu
import nn_fac.update_rules.mu as mu
import nn_fac.utils.beta_divergence as beta_div
from nn_fac.utils.normalize_wh import normalize_WH
import nn_fac.utils.errors as err
eps = 1e-12 # np.finfo(np.float64).eps
# gamma_line_search:
# - "True": compute the update with respect to article: V. Leplat, N. Gillis and A.M.S. Ang, "Blind Audio Source Separation with Minimum-Volume Beta-Divergence NMF", IEEE Transactions on Signal Processing 68, pp. 3400-3410, 2020.
# - "False": compute the update with respect to article: V. Leplat, N. Gillis and J. Idier, "Multiplicative Updates for NMF with β-Divergences under Disjoint Equality Constraints", SIAM Journal on Matrix Analysis and Applications 42.2 (2021), pp. 730-752.
def minvol_beta_nmf(data, rank, beta, n_iter_max = 100, tol = 1e-8, delta = 0.01, lambda_init = 1, gamma_line_search = False, gamma = 1, tol_update_lagrangian = 1e-6, init = "random", W_0 = None, H_0 = None, verbose = False):
assert beta == 1, "This function is only implemented for beta = 1 (KL divergence)."
# Initialization for W and H
if init == 'random':
m,n = data.shape
np.random.seed(0)
W = np.random.rand(m, rank)
H = np.random.rand(rank, n)
# elif init.lower() == "nndsvd": #Doesn't work, to debug!
# W, H = seeding.Nndsvd().initialize(data, rank, {'flag': 0})
# W = np.maximum(W, eps)
# H = np.maximum(H, eps)
elif init == 'custom':
assert W_0 is not None and H_0 is not None, "You must provide W_0 and H_0 if you want to use the custom initialization."
W = W_0
H = H_0
else:
raise NotImplementedError(f"This initialization method is not implemented: {init}")
return compute_minvol_beta_nmf(data=data, W_0=W, H_0=H, rank=rank, beta=beta,n_iter_max=n_iter_max, tol=tol, delta=delta, lambda_init=lambda_init, gamma_line_search=gamma_line_search, gamma=gamma, tol_update_lagrangian=tol_update_lagrangian, verbose = verbose)
def compute_minvol_beta_nmf(data, W_0, H_0, rank, beta, n_iter_max = 100, tol = 1e-8, delta = 0.01, lambda_init = 1, gamma_line_search = False, gamma = 1, tol_update_lagrangian = 1e-6, verbose = False):
assert beta == 1, "This function is only implemented for beta = 1 (KL divergence)."
if gamma_line_search:
assert gamma is not None, "You must set gamma if you use the gamma line search strategy."
# Algorithm Beta-NMF with logdet(W^t @ W + delta*I) regularization
W, H = W_0.copy(), H_0.copy()
# Initialization for loop parameters
traceSave = []
logdetSave = []
condNumberSave = []
# Array to save the value of the loss function
cost_fct_vals = []
toc = []
# Initialization for lambda
log_det = min_vol_mu.compute_log_det(W, delta)
lambda_ = lambda_init * beta_div.beta_divergence(data, W @ H, beta) / (log_det + eps)
# Optimization loop
for iteration in range(n_iter_max):
tic = time.time()
if gamma_line_search and iteration > 5: # No gamma line search for the first 5 iterations
W, H, err, log_det, gamma, trace, cond_number = one_step_minvol_beta_nmf(data=data, W=W, H=H, rank=rank, beta=beta, delta=delta, lambda_=lambda_, gamma=gamma, tol_update_lagrangian=tol_update_lagrangian, prev_error=cost_fct_vals[-1])
else:
W, H, err, log_det, _, trace, cond_number = one_step_minvol_beta_nmf(data=data, W=W, H=H, rank=rank, beta=beta, delta=delta, lambda_=lambda_, gamma=None, tol_update_lagrangian=tol_update_lagrangian, prev_error=None)
toc.append(time.time() - tic)
cost_fct_vals.append(err)
traceSave.append(trace)
logdetSave.append(log_det)
condNumberSave.append(cond_number)
if verbose:
if iteration == 0:
print(f'Normalized cost function value={err}')
else:
if cost_fct_vals[-2] - cost_fct_vals[-1] > 0:
print(f'Normalized cost function value={cost_fct_vals[-1]}, variation={cost_fct_vals[-2] - cost_fct_vals[-1]}.')
else:
# print in red when the reconstruction error is negative (shouldn't happen)
print('\033[91m' + 'Normalized cost function value={}, variation={}.'.format(
cost_fct_vals[-1], cost_fct_vals[-2] - cost_fct_vals[-1]) + '\033[0m')
if iteration > 0 and abs(cost_fct_vals[-2] - cost_fct_vals[-1]) < tol:
# Stop condition: relative error between last two iterations < tol
if verbose:
print(f'Converged in {iteration} iterations.')
break
return W, H, cost_fct_vals, toc
def one_step_minvol_beta_nmf(data, W, H, rank, beta, delta, lambda_, gamma, tol_update_lagrangian, prev_error):
assert beta == 1, "This function is only implemented for beta = 1 (KL divergence)."
# Initialize W for gamma line search
W_prev = W.copy() if gamma is not None else None
H = mu.switch_alternate_mu(data, W, H, beta, "H") # np.transpose(multiplicative_updates.mu_betadivmin(H.T, W.T, data.T, beta))
if gamma is not None:
# Update W
W_update, Y = min_vol_mu.KL_mu_min_vol(data=data, W=W, H=H, delta=delta, lambda_=lambda_, gamma = gamma, tol_update_lagrangian = tol_update_lagrangian)
# Simplex projection wasn't working, so we turn to standard normlization instead (TODO: check avec Valentin si c'est bon)
W_normalized, H_normalized = normalize_WH(W_update, H, "W")
W, H, gamma = min_vol_mu.gamma_line_search(data, W_update = W_update, W_gamma_init = W_normalized, W_prev=W_prev, H_gamma_init = H_normalized,
beta=beta, delta=delta, gamma_init=gamma, lambda_tilde=lambda_, prev_error=prev_error)
else:
W_update, Y = min_vol_mu.KL_mu_min_vol(data=data, W=W, H=H, delta=delta, lambda_=lambda_, gamma = None, tol_update_lagrangian = tol_update_lagrangian)
# Compute the loss function
log_det = min_vol_mu.compute_log_det(W, delta)
err = beta_div.beta_divergence(data, W @ H, beta) + lambda_ * log_det
trace = np.trace(Y @ (W.T @ W))
cond_number = np.linalg.cond(W.T @ W + delta * np.eye(rank))
return W, H, err, log_det, gamma, trace, cond_number
# def one_step_minvol_beta_nmf_lagrangian(data, W, H, rank, beta, delta, alpha, lambda_, tol_update_lagrangian):
# assert beta == 1, "This function is only implemented for beta = 1 (KL divergence)."
# H = multiplicative_updates.switch_alternate_mu(data, W, H, beta, "H") # np.transpose(multiplicative_updates.mu_betadivmin(H.T, W.T, data.T, beta))
# W, Y = min_vol_mu.KL_mu_min_vol_lagrangian(data, W, H, delta, lambda_, tol_update_lagrangian)
# # Compute the loss function
# log_det = min_vol_mu.compute_log_det(W, delta)
# err = beta_divergence(data, W @ H, beta) + lambda_ * log_det
# trace = np.trace(Y @ (W.T @ W))
# cond_number = np.linalg.cond(W.T @ W + delta * np.eye(rank))
# return W, H, err, log_det, trace, cond_number
if __name__ == "__main__":
np.random.seed(42)
m, n, rank = 100, 200, 5
W_0, H_0 = np.random.rand(m, rank), np.random.rand(rank, n) # Example input matrices
data = W_0@H_0 + 1e-2*np.random.rand(m,n) # Example input matrix
W, H, lossfun, t = minvol_beta_nmf(data, rank, beta = 1, n_iter_max = 100, gamma_line_search = True, gamma = 1, verbose = False)
print(f"Time for gamma line search: {t[-1]}")
W, H, lossfun, t = minvol_beta_nmf(data, rank, beta = 1, n_iter_max = 100, gamma_line_search = False, verbose = False)
print(f"Time without gamma line search, and with the normalization implemented inside the update rule: {t[-1]}")
# TO DEBUG
# W, H, lossfun, t = minvol_beta_nmf(data, rank, beta = 1, n_iter_max = 100, init = "nndsvd", gamma_line_search = True, gamma = 1, verbose = True)
# W, H, lossfun, t = minvol_beta_nmf(data, rank, beta = 1, n_iter_max = 100, init = "nndsvd", gamma_line_search = False, verbose = True)
import numpy as np
from nn_fac.nmf import nmf
from nn_fac.utils.normalize_wh import normalize_WH
def multilayer_beta_NMF(data, all_ranks, beta = 1, delta = 1e-6, n_iter_max_each_nmf = 100, init_each_nmf = "nndsvd", verbose = False):
L = len(all_ranks)
assert L > 1, "The number of layers must be at least 2. Otherwise, ou should just use NMF"
if sorted(all_ranks, reverse=True) != all_ranks:
raise ValueError("The ranks of deep NMF should be decreasing.")
W = [None] * L
H = [None] * L
reconstruction_errors = [None] * L
W[0], H[0], reconstruction_errors[0] = one_layer_update(data=data, rank=all_ranks[0], beta=beta, delta=delta, init_each_nmf=init_each_nmf, n_iter_max_each_nmf=n_iter_max_each_nmf, verbose=verbose)
for i in range(1, L): # Layers
W_i, H_i, error_i = one_layer_update(data=W[i - 1], rank=all_ranks[i], beta=beta, delta=delta, init_each_nmf=init_each_nmf, n_iter_max_each_nmf=n_iter_max_each_nmf, verbose=verbose)
W[i], H[i], reconstruction_errors[i] = W_i, H_i, error_i
if verbose:
print(f'Layer {i} done.')
return W, H, reconstruction_errors
def one_layer_update(data, rank, beta, delta, init_each_nmf, n_iter_max_each_nmf, verbose):
W, H, cost_fct_vals, times = nmf(data, rank, init = init_each_nmf, U_0 = None, V_0 = None, n_iter_max=n_iter_max_each_nmf, tol=1e-8,
update_rule = "mu", beta = beta,
sparsity_coefficients = [None, None], fixed_modes = [], normalize = [False, False],
verbose=verbose, return_costs=True, deterministic=False)
W_normalized, H_normalized = normalize_WH(W, H, matrix="H")
reconstruction_error = cost_fct_vals[-1]
return W_normalized, H_normalized, reconstruction_error
if __name__ == "__main__":
np.random.seed(0)
m, n, all_ranks = 100, 200, [15,10,5]
data = np.random.rand(m, n) # Example input matrix
W, H, reconstruction_errors = multilayer_beta_NMF(data, all_ranks, n_iter_max_each_nmf = 100, verbose = True)
print(f"Losses: {reconstruction_errors}")
\ No newline at end of file
This diff is collapsed.
......@@ -8,18 +8,17 @@ Created on Tue Jun 11 16:52:21 2019
import numpy as np
import scipy
import time
import nn_fac.nnls as nnls
import tensorly as tl
from tensorly.decomposition import tucker as tl_tucker
import math
import nn_fac.errors as err
import nn_fac.mu as mu
import nn_fac.beta_divergence as beta_div
import numpy as np
import nn_fac.update_rules.nnls as nnls
import nn_fac.update_rules.mu as mu
import scipy.sparse as sci_sparse
import nn_fac.utils.beta_divergence as beta_div
import nn_fac.utils.errors as err
import numpy as np
# %% HALS
def ntd(tensor, ranks, init = "random", core_0 = None, factors_0 = [], n_iter_max=100, tol=1e-6,
sparsity_coefficients = [], fixed_modes = [], normalize = [], mode_core_norm = None,
......
This diff is collapsed.
This diff is collapsed.
import numpy as np
from scipy.special import lambertw # The lambertw function is imported from the scipy.special module.
eps = 1e-12
def deep_KL_mu(W_Lm1, W_L, H_L, WH_Lp1, lambda_):
ONES = np.ones_like(W_Lm1)
a = ONES @ H_L.T - lambda_ * np.log(WH_Lp1)
b = W_L * ((W_Lm1 / (W_L @ H_L)) @ H_L.T)
W_L = np.maximum(eps, (1/lambda_ * b) / (lambertw(b * np.exp(a/lambda_) / lambda_, k=0).real))
return W_L
import warnings
import numpy as np
from nn_fac.utils.normalize_wh import normalize_WH
from nn_fac.utils.beta_divergence import beta_divergence
eps = 1e-12
def KL_mu_min_vol(data, W, H, delta, lambda_, gamma = None, tol_update_lagrangian = 1e-6):
m, n = data.shape
k = W.shape[1]
Jm1 = np.ones((m,1))
ONES = np.ones((m, n))
# Compute Y
Y = compute_Y(W, delta)
# Update W
Y_plus = np.maximum(0, Y)
Y_minus = np.maximum(0, -Y)
C = ONES @ H.T - 4 * lambda_ * W @ Y_minus
S = 8 * lambda_ * (W @ (Y_plus + Y_minus)) * ((data / ((W @ H) + eps)) @ H.T)
D = 4 * lambda_ * W @ (Y_plus + Y_minus)
if gamma is not None:
W = W * ((C ** 2 + S) ** 0.5 - C) / (D + eps)
else:
lagragian_multipliers_0 = np.zeros((k, 1)) #(D[:,0] - C[:,0] * W[:,0]).T
lagragian_multipliers = update_lagragian_multipliers(C, S, D, W, lagragian_multipliers_0, tol_update_lagrangian)
W = W * ((((C + Jm1 @ lagragian_multipliers.T) ** 2 + S) ** 0.5 - (C + Jm1 @ lagragian_multipliers.T)) / (D + eps))
W = np.maximum(W, eps)
return W, Y
def gamma_line_search(data, W_update, W_gamma_init, H_gamma_init, beta, delta, gamma_init, lambda_tilde, W_prev, prev_error):
W_gamma = W_gamma_init.copy()
H_gamma = H_gamma_init.copy()
gamma = gamma_init
cur_log_det = compute_log_det(W_gamma, delta)
cur_err = beta_divergence(data, W_gamma @ H_gamma, beta) + lambda_tilde * cur_log_det
while cur_err > prev_error and gamma > 1e-16:
gamma *= 0.8
W_gamma = (1 - gamma) * W_prev + gamma * W_update
W_gamma, H_gamma = normalize_WH(W_gamma, H_gamma, "W")
cur_log_det = compute_log_det(W_gamma, delta)
cur_err = beta_divergence(data, W_gamma @ H_gamma, beta) + lambda_tilde * cur_log_det
gamma = min(gamma * 1.2, 1)
return W_gamma, H_gamma, gamma
def update_lagragian_multipliers(C, S, D, W, lagrangian_multipliers_0, tol = 1e-6, n_iter_max = 1000):
# Comes from
m, k = W.shape
Jm1 = np.ones((m,1))
Jk1 = np.ones(k)
ONES = np.ones((m, k))
lagrangian_multipliers = lagrangian_multipliers_0.copy()
for iter in range(n_iter_max):
lagrangian_multipliers_prev = lagrangian_multipliers.copy()
Mat = W * ((((C + Jm1 @ lagrangian_multipliers.T) ** 2 + S) ** 0.5) - (C + Jm1 @ lagrangian_multipliers.T)) / (D + eps)
Matp = W * ((((C + Jm1 @ lagrangian_multipliers.T) ** 2 + S) **(-0.5)) - ONES) / (D + eps)
# Matp = (W / (D + eps)) * ((C + Jm1 @ lagrangian_multipliers.T) / (((C + Jm1 @ lagrangian_multipliers.T)**2 + S)**0.5) - ONES) # Was also in the code, may be more efficient due to less computation of matrix power.
xi = np.sum(Mat, axis=0) - Jk1
xip = np.sum(Matp, axis=0)
lagrangian_multipliers = lagrangian_multipliers - (xi / xip).reshape((k,1))
if np.max(np.abs(lagrangian_multipliers - lagrangian_multipliers_prev)) <= tol:
break
if iter == n_iter_max - 1:
warnings.warn('Maximum of iterations reached in the update of mu.')
return lagrangian_multipliers
def compute_Y(W, delta):
r = W.shape[1]
return np.linalg.inv((W.T @ W + delta * np.eye(r)))# + eps)
def compute_det(W, delta):
r = W.shape[1]
det = np.linalg.det(W.T @ W + delta * np.eye(r))
#det += eps ## TODO: Demander aussi à Valentin si c'est ok ce eps sur le det.
return det
def compute_log_det(W, delta):
det = compute_det(W, delta)
return np.log10(det, where=(det!=0))
# def KL_mu_min_vol_lagrangian(data, W, H, delta, lambda_, tol_update_lagrangian):
# m, n = data.shape
# k = W.shape[1]
# Jm1 = np.ones((m,1))
# ONES = np.ones((m, n))
# # Compute Y
# Y = compute_Y(W, delta)
# # Update mu (lagrangian multipliers)
# Y_plus = np.maximum(0, Y)
# Y_minus = np.maximum(0, -Y)
# C = ONES @ H.T - 4 * lambda_ * W @ Y_minus
# S = 8 * lambda_ * (W @ (Y_plus + Y_minus)) * ((data / ((W @ H) + eps)) @ H.T)
# D = 4 * lambda_ * W @ (Y_plus + Y_minus)
# lagragian_multipliers_0 = np.zeros((k, 1)) #(D[:,0] - C[:,0] * W[:,0]).T
# lagragian_multipliers = update_lagragian_multipliers(C, S, D, W, lagragian_multipliers_0, tol_update_lagrangian)
# # Update W
# W = W * ((((C + Jm1 @ lagragian_multipliers.T) ** 2 + S) ** 0.5 - (C + Jm1 @ lagragian_multipliers.T)) / (D + eps))
# W = np.maximum(W, eps)
# return W, Y
\ No newline at end of file
# -*- coding: utf-8 -*-
"""
Created on Mon Aug 16 14:45:25 2021
@author: amarmore
## Author : Axel Marmoret, based on Florian Voorwinden's code during its internship.
"""
import numpy as np
import time
import tensorly as tl
import nn_fac.errors as err
def mu_betadivmin(U, V, M, beta):
"""
=====================================================
Beta-Divergence NMF solved with Multiplicative Update
=====================================================
Computes an approximate solution of a beta-NMF
[3] with the Multiplicative Update rule [2,3].
M is m by n, U is m by r, V is r by n.
All matrices are nonnegative componentwise.
Conversely than in [1], the NNLS problem is solved for the beta-divergence,
as studied in [3]:
min_{U >= 0} beta_div(M, UV)
The update rule of this algorithm is defined in [3].
Parameters
----------
U : m-by-r array
The first factor of the NNLS, the one which will be updated.
V : r-by-n array
The second factor of the NNLS, which won't be updated.
M : m-by-n array
The initial matrix, to approach.
beta : Nonnegative float
The beta coefficient for the beta-divergence.
Returns
-------
U: array
a m-by-r nonnegative matrix \approx argmin_{U >= 0} beta_div(M, UV)
References
----------
[1]: N. Gillis and F. Glineur, Accelerated Multiplicative Updates and
Hierarchical ALS Algorithms for Nonnegative Matrix Factorization,
Neural Computation 24 (4): 1085-1105, 2012.
[2] D. Lee and H. S. Seung, Learning the parts of objects by non-negative
matrix factorization., Nature, vol. 401, no. 6755, pp. 788–791, 1999.
[3] C. Févotte and J. Idier, Algorithms for nonnegative matrix
factorization with the beta-divergence, Neural Computation,
vol. 23, no. 9, pp. 2421–2456, 2011.
"""
if beta < 0:
raise err.InvalidArgumentValue("Invalid value for beta: negative one.") from None
epsilon = 1e-12
K = np.dot(U,V)
if beta == 1:
K_inverted = K**(-1)
line = np.sum(V.T,axis=0)
denom = np.array([line for i in range(np.shape(K)[0])])
return np.maximum(U * (np.dot((K_inverted*M),V.T) / denom),epsilon)
elif beta == 2:
denom = np.dot(K,V.T)
return np.maximum(U * (np.dot(M,V.T) / denom), epsilon)
elif beta == 3:
denom = np.dot(K**2,V.T)
return np.maximum(U * (np.dot((K * M),V.T) / denom) ** gamma(beta), epsilon)
else:
denom = np.dot(K**(beta-1),V.T)
return np.maximum(U * (np.dot((K**(beta-2) * M),V.T) / denom) ** gamma(beta), epsilon)
def mu_tensorial(G, factors, tensor, beta):
"""
This function is used to update the core G of a
nonnegative Tucker Decomposition (NTD) [1] with beta-divergence [3]
and Multiplicative Updates [2].
See ntd.py of this module for more details on the NTD (or [1])
TODO: expand this docstring.
Parameters
----------
G : tensorly tensor
Core tensor at this iteration.
factors : list of tensorly tensors
Factors for NTD at this iteration.
T : tensorly tensor
The tensor to estimate with NTD.
beta : Nonnegative float
The beta coefficient for the beta-divergence.
Returns
-------
G : tensorly tensor
Update core in NTD.
References
----------
[1] Tamara G Kolda and Brett W Bader. "Tensor decompositions and applications",
SIAM review 51.3 (2009), pp. 455{500.
[2] D. Lee and H. S. Seung, Learning the parts of objects by non-negative
matrix factorization., Nature, vol. 401, no. 6755, pp. 788–791, 1999.
[3] C. Févotte and J. Idier, Algorithms for nonnegative matrix
factorization with the beta-divergence, Neural Computation,
vol. 23, no. 9, pp. 2421–2456, 2011.
"""
if beta < 0:
raise err.InvalidArgumentValue("Invalid value for beta: negative one.") from None
epsilon = 1e-12
K = tl.tenalg.multi_mode_dot(G,factors)
if beta == 1:
L1 = np.ones(np.shape(K))
L2 = K**(-1) * tensor
elif beta == 2:
L1 = K
L2 = np.ones(np.shape(K)) * tensor
elif beta == 3:
L1 = K**2
L2 = K * tensor
else:
L1 = K**(beta-1)
L2 = K**(beta-2) * tensor
return np.maximum(G * (tl.tenalg.multi_mode_dot(L2, [fac.T for fac in factors]) / tl.tenalg.multi_mode_dot(L1, [fac.T for fac in factors])) ** gamma(beta) , epsilon)
def gamma(beta):
"""
Exponent of Fevotte and Idier [1], which guarantees the MU updates decrease the cost.
See [1] for details.
Parameters
----------
beta : Nonnegative float
The beta coefficient for the beta-divergence.
Returns
-------
int : the exponent value
References
----------
[1] C. Févotte and J. Idier, Algorithms for nonnegative matrix
factorization with the beta-divergence, Neural Computation,
vol. 23, no. 9, pp. 2421–2456, 2011.
"""
if beta<1:
return 1/(2-beta)
if beta>2:
return 1/(beta-1)
else:
return 1
\ No newline at end of file
# -*- coding: utf-8 -*-
"""
Created on Mon Aug 16 14:45:25 2021
@author: amarmore
## Author : Axel Marmoret, based on Florian Voorwinden's code during its internship.
"""
import numpy as np
import time
import tensorly as tl
import nn_fac.utils.errors as err
def switch_alternate_mu(data, U, V, beta, matrix):
"""
Encapsulates the switch between the two multiplicative update rules.
"""
if matrix in ["U", "W"]:
return mu_betadivmin(U, V, data, beta)
elif matrix in ["V", "H"]:
return np.transpose(mu_betadivmin(V.T, U.T, data.T, beta))
else:
raise err.InvalidArgumentValue(f"Invalid value for matrix: got {matrix}, but it must be 'U' or 'W' for the first matrix, and 'V' or 'H' for the second one.") from None
def mu_betadivmin(U, V, M, beta):
"""
=====================================================
Beta-Divergence NMF solved with Multiplicative Update
=====================================================
Computes an approximate solution of a beta-NMF
[3] with the Multiplicative Update rule [2,3].
M is m by n, U is m by r, V is r by n.
All matrices are nonnegative componentwise.
Conversely than in [1], the NNLS problem is solved for the beta-divergence,
as studied in [3]:
min_{U >= 0} beta_div(M, UV)
The update rule of this algorithm is defined in [3].
Parameters
----------
U : m-by-r array
The first factor of the NNLS, the one which will be updated.
V : r-by-n array
The second factor of the NNLS, which won't be updated.
M : m-by-n array
The initial matrix, to approach.
beta : Nonnegative float
The beta coefficient for the beta-divergence.
Returns
-------
U: array
a m-by-r nonnegative matrix \approx argmin_{U >= 0} beta_div(M, UV)
References
----------
[1]: N. Gillis and F. Glineur, Accelerated Multiplicative Updates and
Hierarchical ALS Algorithms for Nonnegative Matrix Factorization,
Neural Computation 24 (4): 1085-1105, 2012.
[2] D. Lee and H. S. Seung, Learning the parts of objects by non-negative
matrix factorization., Nature, vol. 401, no. 6755, pp. 788–791, 1999.
[3] C. Févotte and J. Idier, Algorithms for nonnegative matrix
factorization with the beta-divergence, Neural Computation,
vol. 23, no. 9, pp. 2421–2456, 2011.
"""
if beta < 0:
raise err.InvalidArgumentValue("Invalid value for beta: negative one.") from None
epsilon = 1e-12
K = np.dot(U,V)
if beta == 1:
K_inverted = K**(-1)
line = np.sum(V.T,axis=0)
denom = np.array([line for i in range(np.shape(K)[0])])
return np.maximum(U * (np.dot((K_inverted*M),V.T) / denom),epsilon)
elif beta == 2:
denom = np.dot(K,V.T)
return np.maximum(U * (np.dot(M,V.T) / denom), epsilon)
elif beta == 3:
denom = np.dot(K**2,V.T)
return np.maximum(U * (np.dot((K * M),V.T) / denom) ** gamma(beta), epsilon)
else:
denom = np.dot(K**(beta-1),V.T)
return np.maximum(U * (np.dot((K**(beta-2) * M),V.T) / denom) ** gamma(beta), epsilon)
def mu_tensorial(G, factors, tensor, beta):
"""
This function is used to update the core G of a
nonnegative Tucker Decomposition (NTD) [1] with beta-divergence [3]
and Multiplicative Updates [2].
See ntd.py of this module for more details on the NTD (or [1])
TODO: expand this docstring.
Parameters
----------
G : tensorly tensor
Core tensor at this iteration.
factors : list of tensorly tensors
Factors for NTD at this iteration.
T : tensorly tensor
The tensor to estimate with NTD.
beta : Nonnegative float
The beta coefficient for the beta-divergence.
Returns
-------
G : tensorly tensor
Update core in NTD.
References
----------
[1] Tamara G Kolda and Brett W Bader. "Tensor decompositions and applications",
SIAM review 51.3 (2009), pp. 455{500.
[2] D. Lee and H. S. Seung, Learning the parts of objects by non-negative
matrix factorization., Nature, vol. 401, no. 6755, pp. 788–791, 1999.
[3] C. Févotte and J. Idier, Algorithms for nonnegative matrix
factorization with the beta-divergence, Neural Computation,
vol. 23, no. 9, pp. 2421–2456, 2011.
"""
if beta < 0:
raise err.InvalidArgumentValue("Invalid value for beta: negative one.") from None
epsilon = 1e-12
K = tl.tenalg.multi_mode_dot(G,factors)
if beta == 1:
L1 = np.ones(np.shape(K))
L2 = K**(-1) * tensor
elif beta == 2:
L1 = K
L2 = np.ones(np.shape(K)) * tensor
elif beta == 3:
L1 = K**2
L2 = K * tensor
else:
L1 = K**(beta-1)
L2 = K**(beta-2) * tensor
return np.maximum(G * (tl.tenalg.multi_mode_dot(L2, [fac.T for fac in factors]) / tl.tenalg.multi_mode_dot(L1, [fac.T for fac in factors])) ** gamma(beta) , epsilon)
def gamma(beta):
"""
Exponent of Fevotte and Idier [1], which guarantees the MU updates decrease the cost.
See [1] for details.
Parameters
----------
beta : Nonnegative float
The beta coefficient for the beta-divergence.
Returns
-------
int : the exponent value
References
----------
[1] C. Févotte and J. Idier, Algorithms for nonnegative matrix
factorization with the beta-divergence, Neural Computation,
vol. 23, no. 9, pp. 2421–2456, 2011.
"""
if beta<1:
return 1/(2-beta)
if beta>2:
return 1/(beta-1)
else:
return 1
# -*- coding: utf-8 -*-
"""
Created on Mon Aug 16 14:45:25 2021
@author: amarmore
## Author : Axel Marmoret, based on Florian Voorwinden's code during its internship.
"""
import numpy as np
import nn_fac.errors as err
def beta_divergence(a, b, beta):
"""
Compute the beta-divergence of two floats or arrays a and b,
as defined in [3].
Parameters
----------
a : float or array
First argument for the beta-divergence.
b : float or array
Second argument for the beta-divergence.
beta : float
the beta factor of the beta-divergence.
Returns
-------
float
Beta-divergence of a and b.
References
----------
[1] C. Févotte and J. Idier, Algorithms for nonnegative matrix
factorization with the beta-divergence, Neural Computation,
vol. 23, no. 9, pp. 2421–2456, 2011.
"""
if beta < 0:
raise err.InvalidArgumentValue("Invalid value for beta: negative one.") from None
if beta == 1:
#return np.sum(a * np.log(a/b, where=(a!=0)) - a + b)
a_div_b = np.divide(a,b, where=(b!=0))
return np.sum(a * np.log(a_div_b, where=(a_div_b!=0)) - a + b)
elif beta == 0:
return np.sum(a/b - np.log(a/b, where=(a!=0)) - 1)
else:
# -*- coding: utf-8 -*-
"""
Created on Mon Aug 16 14:45:25 2021
@author: amarmore
## Author : Axel Marmoret, based on Florian Voorwinden's code during its internship.
"""
import numpy as np
import nn_fac.utils.errors as err
def kl_divergence(a, b):
return beta_divergence(a, b, beta=1)
def beta_divergence(a, b, beta):
"""
Compute the beta-divergence of two floats or arrays a and b,
as defined in [3].
Parameters
----------
a : float or array
First argument for the beta-divergence.
b : float or array
Second argument for the beta-divergence.
beta : float
the beta factor of the beta-divergence.
Returns
-------
float
Beta-divergence of a and b.
References
----------
[1] C. Févotte and J. Idier, Algorithms for nonnegative matrix
factorization with the beta-divergence, Neural Computation,
vol. 23, no. 9, pp. 2421–2456, 2011.
"""
if beta < 0:
raise err.InvalidArgumentValue("Invalid value for beta: negative one.") from None
if beta == 1:
#return np.sum(a * np.log(a/b, where=(a!=0)) - a + b)
a_div_b = np.divide(a,b, where=(b!=0))
return np.sum(a * np.log(a_div_b, where=(a_div_b!=0)) - a + b)
elif beta == 0:
return np.sum(a/b - np.log(a/b, where=(a!=0)) - 1)
else:
return np.sum(1/(beta*(beta -1)) * (a**beta + (beta - 1) * b**beta - beta * a * (b**(beta-1))))
\ No newline at end of file
# -*- coding: utf-8 -*-
"""
Created on Tue Feb 25 16:01:35 2020
@author: amarmore
"""
class ArgumentException(BaseException): pass
class InvalidRanksException(ArgumentException): pass
class CustomNotEngouhFactors(ArgumentException): pass
class CustomNotValidFactors(ArgumentException): pass
class CustomNotValidCore(ArgumentException): pass
class InvalidInitializationType(ArgumentException): pass
class InvalidArgumentValue(ArgumentException): pass
class OptimException(BaseException): pass
# -*- coding: utf-8 -*-
"""
Created on Tue Feb 25 16:01:35 2020
@author: amarmore
"""
class ArgumentException(BaseException): pass
class InvalidRanksException(ArgumentException): pass
class CustomNotEngouhFactors(ArgumentException): pass
class CustomNotValidFactors(ArgumentException): pass
class CustomNotValidCore(ArgumentException): pass
class InvalidInitializationType(ArgumentException): pass
class InvalidArgumentValue(ArgumentException): pass
class OptimException(BaseException): pass
class ZeroColumnWhenUnautorized(OptimException): pass
\ No newline at end of file
import numpy as np
eps = 1e-8 #np.finfo(float).eps
def normalize_WH(W, H, matrix):
if matrix == "H":
# Normalize so that He = e
scalH = np.sum(H, axis=1)
H = np.diag(1 / scalH) @ H
W = W @ np.diag(scalH)
elif matrix == "W":
# Normalize so that W^T e = e
scalW = np.sum(W, axis=0)
H = np.diag(scalW) @ H
W = W @ np.diag(1 / scalW)
else:
raise ValueError(f"Matrix must be either 'W' or 'H', but it is {matrix}")
return W, H
# %% Test projection simplex (but doesn't work)
def normalize_W_and_H(W, H, iter):
#CA MARCHE PAS, C'EST RELOU
WH = W@H
simplexed_W = SimplexProjW(W)
print(f"Avg des simplex: {np.mean(np.amax(simplexed_W, axis = 0))}")
if np.mean(np.amax(simplexed_W, axis = 0)) > 0.9: # Projection abusive
columns_norm = W.sum(axis = 0)
print(columns_norm)
W = np.maximum(W / columns_norm, eps)
# Ht = H.T
# Ht = Ht * columns_norm
# H = Ht.T
else:
W = np.maximum(simplexed_W, eps)
#H = H / (W.T @ W @ H) #Sur de mon coup là ?
print(f"Difference max: {np.amax(WH - W@H)}")
assert (W>0).all()
assert (np.sum(W, axis = 0) <= 1 + eps*W.shape[0]*W.shape[1]).all()
return W, H
def SimplexProjW(y):
"""
Project y onto the simplex Delta = { x | x >= 0 and sum(x) <= 1 }.
"""
x = np.zeros(y.shape)
for idx in range(y.shape[1]):
x[:,idx] = ProjectVectorSimplex(y[:,idx])
return x
def SimplexProjW_valentin(y):
"""
Project y onto the simplex Delta = { x | x >= 0 and sum(x) <= 1 }.
Ne marche pas, je ne sais pas pourquoi (j'ai du mal à comprendre ce bloc)
"""
r, m = y.shape
ys = -np.sort(-y, axis=0) # Sort in descending order
lambda_ = np.zeros(m)
S = np.zeros((r, m))
for i in range(1, r):
if i == 1:
S[i, :] = ys[:i, :] - ys[i, None]
else:
S[i, :] = np.sum(ys[:i, :] - ys[i, None], axis=0)
indi1 = np.where(S[i, :] >= 1)[0]
indi2 = np.where(S[i, :] < 1)[0]
if indi1.size > 0:
if i == 1:
lambda_[indi1] = -ys[0, indi1] + 1
else:
lambda_[indi1] = (1 - S[i - 1, indi1]) / i - ys[i - 1, indi1]
if i == r - 1:
lambda_[indi2] = (1 - S[r - 1, indi2]) / r - ys[r - 1, indi2]
x = np.maximum(y + lambda_, 0)
return x
def ProjectVectorSimplex(vY):
# Obtained from https://github.com/RoyiAvital/StackExchangeCodes/blob/master/Mathematics/Q2327504/ProjectSimplexExact.m
numElements = len(vY)
if abs(np.sum(vY) - 1) < 1e-9 and np.all(vY >= 0):
# The input is already within the Simplex.
vX = vY
return vX
vZ = np.sort(vY)
vParamMu = np.concatenate(([vZ[0] - 1], vZ, [vZ[-1] + 1]))
hObjFun = lambda paramMu: np.sum(np.maximum(vY - paramMu, 0)) - 1
vObjVal = np.zeros(numElements + 2)
for ii in range(numElements + 2):
vObjVal[ii] = hObjFun(vParamMu[ii])
if np.any(vObjVal == 0):
paramMu = vParamMu[vObjVal == 0]
else:
# Working on when an Affine Function has the value zero
valX1Idx = np.where(vObjVal > 0)[0][-1]
valX2Idx = np.where(vObjVal < 0)[0][0]
valX1 = vParamMu[valX1Idx]
valX2 = vParamMu[valX2Idx]
valY1 = vObjVal[valX1Idx]
valY2 = vObjVal[valX2Idx]
paramA = (valY2 - valY1) / (valX2 - valX1)
paramB = valY1 - (paramA * valX1)
paramMu = -paramB / paramA
vX = np.maximum(vY - paramMu, 0)
return vX
\ No newline at end of file
NMF:
- nimfa (methods.seeding specificly)
NTF:
- nimfa (methods.seeding specificly)
- tensorly
PARAFAC2:
- nimfa (methods.seeding specificly)
- tensorly
NTD:
- scipy
- tensorly
NMF:
- nimfa (methods.seeding specificly)
NTF:
- nimfa (methods.seeding specificly)
- tensorly
PARAFAC2:
- nimfa (methods.seeding specificly)
- tensorly
NTD:
- scipy
- tensorly