Source code for inflation.lp.lp_utils

"""
This file contains functions to interact with LP solvers.

@authors: Erica Han, Elie Wolfe
"""

import sys
import mosek
import numpy as np

from typing import List, Dict, Union
from scipy.sparse import coo_array, issparse
from time import perf_counter
from gc import collect
from ..utils import partsextractor, expand_sparse_vec, vstack


def drop_zero_rows(coo_mat: coo_array):
    """Drops zero rows from a sparse matrix in place.

    Parameters
    ----------
    coo_mat : coo_array
        Sparse matrix to drop zero rows from.
    """
    if len(coo_mat.shape) == 1:
        coo_mat = coo_mat.reshape((1, coo_mat.shape[0]))
    nz_rows, new_row = np.unique(coo_mat.row, return_inverse=True)
    coo_mat.row = new_row
    coo_mat = coo_mat.reshape((len(nz_rows), coo_mat.shape[1]))
    return coo_mat


def canonical_order(coo_mat: coo_array):
    """Puts a sparse matrix in canonical order in place.

    Parameters
    ----------
    coo_mat : coo_array
        Sparse matrix to put in canonical order.
    """
    if len(coo_mat.shape) == 1:
        coo_mat = coo_mat.reshape((1, coo_mat.shape[0]))
    order = np.lexsort([coo_mat.col, coo_mat.row])
    coo_mat.row = np.asarray(coo_mat.row)[order]
    coo_mat.col = np.asarray(coo_mat.col)[order]
    coo_mat.data = np.asarray(coo_mat.data)[order]
    return coo_mat

def solveLP(objective: Union[coo_array, Dict] = None,
            known_vars: Union[coo_array, Dict] = None,
            semiknown_vars: Dict = None,
            inequalities: Union[coo_array, List[Dict]] = None,
            equalities: Union[coo_array, List[Dict]] = None,
            variables: List = None,
            **kwargs
            ) -> Dict:
    """Wrapper function that converts all dictionaries to sparse matrices to
    pass to the solver.

    Parameters
    ----------
    objective : Union[coo_array, Dict], optional
        Objective function
    known_vars : Union[coo_array, Dict], optional
        Known values of the monomials
    semiknown_vars : Dict, optional
        Semiknown variables
    inequalities : Union[coo_array, List[Dict]], optional
        Inequality constraints
    equalities : Union[coo_array, List[Dict]], optional
        Equality constraints
    variables : List
        Monomials by name in same order as column indices of all other solver
        arguments

    Returns
    -------
    dict
        Primal objective value, dual objective value, problem status, success
        status, dual certificate (as dictionary and sparse matrix), x values,
        and response code.
    """
    # Save solver arguments, unpacking kwargs
    solver_args = locals()
    del solver_args['kwargs']
    solver_args.update(kwargs)

    # Check type for arguments related to the problem
    problem_args = ("objective",
                    "known_vars",
                    "semiknown_vars",
                    "inequalities",
                    "equalities",
                    "lower_bounds",
                    "upper_bounds")
    used_args = {k: v for k, v in solver_args.items()
                 if k in problem_args and v is not None}
    if all(issparse(arg) for arg in used_args.values()):
        assert variables is not None, "Variables must be declared when all " \
                                      "arguments are in sparse matrix form."
    elif all(isinstance(arg, (dict, list)) for arg in used_args.values()):
        if variables is None:
            # Infer variables
            variables = set()
            if objective:
                variables.update(objective)
            if inequalities:
                for ineq in inequalities:
                    variables.update(ineq)
            if equalities:
                for eq in equalities:
                    variables.update(eq)
            if semiknown_vars:
                for x, (c, x2) in semiknown_vars.items():
                    variables.update([x, x2])
            if known_vars:
                variables.update(known_vars)
            variables = sorted(variables)
            solver_args["variables"] = variables
        solver_args.update(convert_dicts(**solver_args))
    else:
        assert variables is not None, "Variables must be declared when " \
                                      "arguments are of mixed form."
        solver_args.update(convert_dicts(**solver_args))
    solver_args.pop("semiknown_vars", None)
    return solveLP_sparse(**solver_args)


blank_coo_array = coo_array((0, 0), dtype=np.int8)
[docs] def solveLP_sparse(objective: coo_array = blank_coo_array, known_vars: coo_array = blank_coo_array, inequalities: coo_array = blank_coo_array, equalities: coo_array = blank_coo_array, lower_bounds: coo_array = blank_coo_array, upper_bounds: coo_array = blank_coo_array, solve_dual: bool = False, default_non_negative: bool = True, relax_known_vars: bool = False, relax_inequalities: bool = False, verbose: int = 0, solverparameters: Dict = None, variables: List = None ) -> Dict: """Internal function to solve an LP with the Mosek Optimizer API using sparse matrices. Columns of each matrix correspond to a fixed order of variables in the LP. Parameters ---------- objective : coo_array, optional Objective function with coefficients as matrix entries. known_vars : coo_array, optional Known values of the monomials with values as matrix entries. inequalities : coo_array, optional Inequality constraints in matrix form. equalities : coo_array, optional Equality constraints in matrix form. lower_bounds : coo_array, optional Lower bounds of variables with bounds as matrix entries. upper_bounds : coo_array, optional Upper bounds of variables with bounds as matrix entries. solve_dual : bool, optional Whether to solve the dual (``True``) or primal (``False``) formulation. By default, ``False``. default_non_negative : bool, optional Whether to set default primal variables as non-negative. By default, ``True``. relax_known_vars : bool, optional Do feasibility as optimization where each known value equality becomes two relaxed inequality constraints. E.g., P(A) = 0.7 becomes P(A) + lambda >= 0.7 and P(A) - lambda <= 0.7, where lambda is a slack variable. By default, ``False``. relax_inequalities : bool, optional Do feasibility as optimization where each inequality is relaxed by the non-negative slack variable lambda. By default, ``False``. verbose : int, optional Verbosity. Higher means more messages. By default, 0. solverparameters : dict, optional Parameters to pass to the MOSEK solver. For example, to control whether presolve is applied before optimization, set ``mosek.iparam.presolve_use`` to ``mosek.presolvemode.on`` or ``mosek.presolvemode.off``. Or, control which optimizer is used by setting an optimizer type to ``mosek.iparam.optimizer``. See `MOSEK's documentation`_ for more details. variables : list Monomials by name in same order as column indices of all other solver arguments Returns ------- dict Primal objective value, dual objective value, problem status, success status, dual certificate (as dictionary and sparse matrix), x values, and response code. """ inequalities=drop_zero_rows(inequalities) equalities=drop_zero_rows(equalities) known_vars=canonical_order(known_vars) upper_bounds=canonical_order(upper_bounds) lower_bounds=canonical_order(lower_bounds) objective=canonical_order(objective) if verbose > 1: t0 = perf_counter() t_total = perf_counter() print("Starting pre-processing for the LP solver...") # Since the value of infinity is ignored, define it for symbolic purposes inf = 0.0 if relax_known_vars or relax_inequalities: default_non_negative = False with mosek.Env() as env: with mosek.Task(env) as task: # Set parameters for the solver depending on value type if solverparameters: for param, val in solverparameters.items(): if isinstance(val, int): task.putintparam(param, val) elif isinstance(val, float): task.putdouparam(param, val) elif isinstance(val, str): task.putstrparam(param, val) if solve_dual: task.putintparam(mosek.iparam.sim_solve_form, mosek.solveform.dual) else: task.putintparam(mosek.iparam.sim_solve_form, mosek.solveform.primal) if verbose > 0: # Attach a log stream printer to the task task.set_Stream(mosek.streamtype.log, streamprinter) task.putintparam(mosek.iparam.log_include_summary, mosek.onoffkey.on) task.putintparam(mosek.iparam.log_storage, 1) if verbose < 2: task.putintparam(mosek.iparam.log_sim, 0) task.putintparam(mosek.iparam.log_intpnt, 0) # Initialize constraint matrix constraints = vstack((inequalities, equalities)) nof_primal_inequalities = inequalities.shape[0] nof_primal_equalities = equalities.shape[0] (nof_primal_constraints, nof_primal_variables) = constraints.shape nof_known_vars = known_vars.nnz # Initialize b vector (RHS of constraints) b = [0] * nof_primal_constraints if relax_inequalities: # Add slack variable lambda to each inequality cons_row = np.hstack( (constraints.row, np.arange(nof_primal_inequalities))) cons_col = np.hstack( (constraints.col, np.repeat(nof_primal_variables, nof_primal_inequalities))) cons_data = np.hstack( (constraints.data, np.repeat(1, nof_primal_inequalities))) constraints = coo_array((cons_data, (cons_row, cons_col)), shape=(nof_primal_constraints, nof_primal_variables + 1)) if relax_known_vars: # Each known value is replaced by two inequalities with slacks kv_row = np.tile( np.arange(nof_known_vars * 2), 2) kv_col = np.hstack(( np.tile(known_vars.col, 2), np.broadcast_to(nof_primal_variables, nof_known_vars * 2) )) kv_data = np.hstack(( np.broadcast_to(1, nof_known_vars * 3), np.broadcast_to(-1, nof_known_vars) )) kv_matrix = coo_array((kv_data, (kv_row, kv_col)), shape=(nof_known_vars * 2, nof_primal_variables + 1)) canonical_order(kv_matrix) constraints.resize(*(nof_primal_constraints, nof_primal_variables + 1)) b = np.hstack((b, np.tile(known_vars.data, 2))) else: # Add known values as equalities to the constraint matrix kv_matrix = expand_sparse_vec(known_vars) b = np.hstack((b, known_vars.data)) nof_primal_equalities += nof_known_vars constraints = vstack((constraints, kv_matrix)) (nof_primal_constraints, nof_primal_variables) = constraints.shape if objective.shape[-1] == 0: objective = coo_array((1, nof_primal_variables), dtype=np.int8) nof_lb = lower_bounds.nnz nof_ub = upper_bounds.nnz if verbose > 0: print(f"Size of constraint matrix: {constraints.shape}") if solve_dual: if verbose > 1: print("Proceeding with dual initialization...") nof_primal_nontriv_bounds = nof_lb + nof_ub nof_dual_constraints = nof_primal_variables nof_dual_variables = nof_primal_constraints + \ nof_primal_nontriv_bounds # Add variable bounds as inequality constraints to matrix if nof_primal_nontriv_bounds > 0: lb_mat = expand_sparse_vec(lower_bounds, conversion_style="eq") ub_mat = expand_sparse_vec(upper_bounds, conversion_style="eq") ub_mat.data[:] = -ub_mat.data # just a list of ones matrix = vstack((constraints, lb_mat, ub_mat), format='csr') b_extra = np.concatenate( (lower_bounds.data, -np.asarray(upper_bounds.data))) objective_vector = np.concatenate((b, b_extra)) else: matrix = constraints.tocsr(copy=False) objective_vector = b if verbose > 1: print("Sparse matrix reformat complete...") # Set bound keys and values for constraints (primal objective) if relax_known_vars or relax_inequalities: blc = buc = np.zeros(nof_primal_variables) blc[-1] = buc[-1] = -1 else: blc = buc = objective.toarray().ravel() # Set constraint bounds corresponding to primal variable bounds if default_non_negative: bkc = np.broadcast_to(mosek.boundkey.lo, nof_dual_constraints) else: bkc = np.broadcast_to(mosek.boundkey.fx, nof_dual_constraints) # Set bound keys and values for variables # Non-positivity for y corresponding to inequalities bkx = [mosek.boundkey.up] * nof_primal_inequalities + \ [mosek.boundkey.fr] * nof_primal_equalities bux = [0.0] * nof_dual_variables blx = [0.0] * nof_dual_variables if relax_known_vars: bkx.extend([mosek.boundkey.up] * nof_known_vars + [mosek.boundkey.lo] * nof_known_vars) bkx.extend([mosek.boundkey.up] * nof_primal_nontriv_bounds) # Set the objective sense task.putobjsense(mosek.objsense.minimize) else: if verbose > 1: print("Proceeding with primal initialization...") matrix = constraints.tocsc(copy=False) if verbose > 1: print("Sparse matrix reformat complete...") if relax_known_vars or relax_inequalities: # Maximize lambda # (If maximum slack is still negative, then the unrelaxed # LP would be infeasible, whereas positive slack solution # implies LP solution strictly interior in the polytope.) objective_vector = np.zeros(nof_primal_variables) objective_vector[-1] = -1 else: objective_vector = objective.toarray().ravel() # Set bound keys and values for constraints # Ax >= b where b is 0 bkc = np.hstack((np.broadcast_to(mosek.boundkey.lo, nof_primal_inequalities), np.broadcast_to(mosek.boundkey.fx, nof_primal_equalities) )) if relax_known_vars: bkc = np.hstack((bkc, np.repeat([mosek.boundkey.lo, mosek.boundkey.up], nof_known_vars) )) blc = buc = b ub_col = upper_bounds.col ub_data = np.zeros(nof_primal_variables) ub_data[ub_col] = upper_bounds.data lb_col = lower_bounds.col lb_data = np.zeros(nof_primal_variables) lb_data[lb_col] = lower_bounds.data # Set bound keys and bound values for variables blx = np.zeros(nof_primal_variables) bux = np.zeros(nof_primal_variables) ub_col = np.asarray(upper_bounds.col) lb_col = np.asarray(lower_bounds.col) lb_data = lower_bounds.data if default_non_negative: bkx = np.repeat(mosek.boundkey.lo, nof_primal_variables) bkx[ub_col] = mosek.boundkey.ra else: bkx = np.repeat(mosek.boundkey.fr, nof_primal_variables) bkx[np.setdiff1d(lb_col, ub_col)] = mosek.boundkey.lo bkx[np.setdiff1d(ub_col, lb_col)] = mosek.boundkey.up bkx[np.intersect1d(ub_col, lb_col)] = mosek.boundkey.ra blx[lb_col] = lb_data bux[ub_col] = upper_bounds.data if relax_known_vars or relax_inequalities: bkx[-1] = mosek.boundkey.fr # Set the objective sense task.putobjsense(mosek.objsense.maximize) # Add all the problem data to the task if solve_dual: numcon = nof_dual_constraints numvar = nof_dual_variables else: numcon = nof_primal_constraints numvar = nof_primal_variables if verbose > 1: print("Starting task.inputdata in Mosek...") task.inputdata(# maxnumcon= numcon, # maxnumvar= numvar, # c= objective_vector, # cfix= 0, # aptrb= matrix.indptr[:-1], # aptre= matrix.indptr[1:], # asub= matrix.indices, # aval= matrix.data, bkc, blc, buc, bkx, blx, bux) collect() if verbose > 1: print("Pre-processing took", format(perf_counter() - t0, ".4f"), "seconds.\n") t0 = perf_counter() if verbose > 1: print("Writing problem to debug_lp.ptf...") task.writedata("debug_lp.ptf") # Solve the problem if verbose > 0: print("\nSolving the problem...\n") trmcode = task.optimize() if verbose > 1: print("Solving took", format(perf_counter() - t0, ".4f"), "seconds.") basic = mosek.soltype.bas (problemsta, solutionsta, skc, skx, skn, xc, xx, yy, slc, suc, slx, sux, snx) = task.getsolution(basic) # Get objective values, solutions x, dual values y if solve_dual: primal = task.getdualobj(basic) dual = task.getprimalobj(basic) x_values = dict(zip(variables, yy)) y_values = np.asarray(xx) else: primal = task.getprimalobj(basic) dual = task.getdualobj(basic) x_values = dict(zip(variables, xx)) y_values = np.asarray(yy) if solutionsta == mosek.solsta.optimal: success = True else: success = False status_str = solutionsta.__repr__() term_tuple = mosek.Env.getcodedesc(trmcode) if solutionsta == mosek.solsta.unknown and verbose > 0: print("The solution status is unknown.") print(f" Termination code: {term_tuple}") # Extract the certificate as a sparse matrix: y.b - c.x <= 0 if relax_known_vars: y_values = y_values[nof_primal_constraints - nof_known_vars*2:] else: y_values = y_values[nof_primal_constraints - nof_known_vars:] cert_row = [0] * nof_primal_variables cert_col = [*range(nof_primal_variables)] cert_data = np.zeros((nof_primal_variables,)) obj_data = objective.toarray().ravel() objective_unknown_cols = np.setdiff1d(objective.col, known_vars.col) cert_data[objective_unknown_cols] = -obj_data[objective_unknown_cols] cert_data[known_vars.col] = y_values[:nof_known_vars] # Assumes known values coded as equalities if relax_known_vars: cert_data[known_vars.col] += y_values[nof_known_vars:(2*nof_known_vars)] sparse_certificate = coo_array((cert_data, (cert_row, cert_col)), shape=(1, nof_primal_variables)) # Certificate as a dictionary certificate = dict(zip(variables, sparse_certificate.toarray().ravel())) # Clean entries with coefficient zero for x in list(certificate): if np.isclose(certificate[x], 0): del certificate[x] if verbose > 1: print("\nTotal execution time:", format(perf_counter() - t_total, ".4f"), "seconds.") return { "primal_value": primal, "dual_value": dual, "status": status_str, "success": success, "dual_certificate": certificate, "sparse_certificate": sparse_certificate, "x": x_values, "term_code": term_tuple }
def streamprinter(text: str) -> None: """A stream printer to get output from Mosek. Parameters ---------- text : str Text to print. """ sys.stdout.write(text) sys.stdout.flush() ########################################################################### # ROUTINES RELATED TO SPARSE MATRIX CONVERSION # ########################################################################### def to_sparse(argument: Union[Dict, List[Dict]], variables: List) -> coo_array: """Convert a solver argument to a sparse matrix to pass to the solver. Parameters ---------- argument : Union[Dict, List[Dict]] Solver argument to convert to sparse matrix. variables : List Monomials by name in same order as column indices of all other solver arguments Returns ------- coo_array Sparse matrix representation of the solver argument. """ if type(argument) == dict: var_to_idx = {x: i for i, x in enumerate(variables)} data = list(argument.values()) keys = list(argument.keys()) col = partsextractor(var_to_idx, keys) row = np.zeros(len(col), dtype=int) return coo_array((data, (row, col)), shape=(1, len(variables))) else: # Argument is a list of constraints row = [] for i, cons in enumerate(argument): row.extend([i] * len(cons)) cols = [to_sparse(cons, variables).col for cons in argument] col = [c for vec_col in cols for c in vec_col] data = [to_sparse(cons, variables).data for cons in argument] data = [d for vec_data in data for d in vec_data] return coo_array((data, (row, col)), shape=(len(argument), len(variables))) def convert_dicts(semiknown_vars: Dict = None, variables: List = None, **kwargs) -> Dict: """Convert any dictionaries to sparse matrices to send to the solver. Semiknown variables are absorbed into the equality constraints. Parameters ---------- semiknown_vars : Dict, optional Semiknown variables variables : List Monomials by name in same order as column indices of all other solver arguments Returns ------- Dict Converted arguments (Unchanged arguments are not returned) """ assert variables is not None, "Variables must be passed." args = locals() del args['kwargs'] args.update(kwargs) # Arguments converted from dictionaries to sparse matrices sparse_args = {k: to_sparse(arg, variables) for k, arg in args.items() if isinstance(arg, (dict, list)) and k != "semiknown_vars" and k != "solverparameters" and k != "variables"} # Add semiknown variables to equality constraints if semiknown_vars: nof_semiknown = len(semiknown_vars) nof_variables = len(variables) var_to_idx = {x: i for i, x in enumerate(variables)} row = np.repeat(np.arange(nof_semiknown), 2) col = [(var_to_idx[x], var_to_idx[x2]) for x, (c, x2) in semiknown_vars.items()] col = list(sum(col, ())) data = [(1, -c) for x, (c, x2) in semiknown_vars.items()] data = list(sum(data, ())) semiknown_mat = coo_array((data, (row, col)), shape=(nof_semiknown, nof_variables)) if "equalities" in sparse_args: sparse_args["equalities"] = vstack( (sparse_args["equalities"], semiknown_mat)) else: sparse_args["equalities"] = semiknown_mat return sparse_args