Source code for pystoned.utils.tools

# import dependencies
from re import compile
from os import environ
import numpy as np
from pyomo.opt import SolverFactory, SolverManagerFactory, check_available_solvers

from ..constant import CET_ADDI, CET_MULT, CET_Model_Categories, OPT_LOCAL, OPT_DEFAULT, RTS_CRS
__email_re = compile(r'([^@]+@[^@]+\.[a-zA-Z0-9]+)$')

def get_remote_solvers():
    import pyomo.neos.kestrel
    kestrel = pyomo.neos.kestrel.kestrelAMPL()
    return list(
        set(
            [
                name.split(":")[0].lower()
                for name in kestrel.solvers()
            ]
        )
    )

def check_remote_solver(solver="mosek"):
    solver_list = get_remote_solvers()
    return bool(solver in solver_list)

def check_local_solver(solver="mosek"):
    return bool(check_available_solvers(solver))

[docs] def set_neos_email(address): """pass email address to NEOS server Args: address (String): your own vaild email address. """ if address == OPT_LOCAL: print("Optimizing locally.") return False if not __email_re.match(address): raise ValueError("Invalid email address.") environ['NEOS_EMAIL'] = address return True
def optimize_model(model, email, cet, solver=OPT_DEFAULT): optimization_status = 0 if not set_neos_email(email): if solver is not OPT_DEFAULT: assert_solver_available_locally(solver) elif cet == CET_ADDI: solver = "mosek" elif cet == CET_MULT: raise ValueError( "Please specify the solver for optimizing multiplicative model locally.") solver_instance = SolverFactory(solver) print("Estimating the {} locally with {} solver.".format( CET_Model_Categories[cet], solver), flush=True) return solver_instance.solve(model, tee=True), 1 else: if solver is OPT_DEFAULT and cet is CET_ADDI: solvers = ["mosek"] elif solver is OPT_DEFAULT and cet == CET_MULT: solvers = ["knitro"] else: solvers = [solver] for solver in solvers: model, optimization_status = __try_remote_solver( model, cet, solver) if optimization_status == 1: return model, optimization_status raise Exception("Remote solvers are temporarily not available.") def __try_remote_solver(model, cet, solver): solver_instance = SolverManagerFactory('neos') try: print("Estimating the {} remotely with {} solver.".format( CET_Model_Categories[cet], solver), flush=True) return solver_instance.solve(model, tee=True, opt=solver), 1 except: print("Remote {} solver is not available now.".format(solver)) return model, 0 def trans_list(li): if type(li) == list: return li return li.tolist() def to_1d_list(li): if type(li) == int or type(li) == float: return [li] if type(li[0]) == list: rl = [] for i in range(len(li)): rl.append(li[i][0]) return rl return li def to_2d_list(li): if type(li[0]) != list: rl = [] for value in li: rl.append([value]) return rl return li def assert_valid_basic_data(y, x, z=None): y = trans_list(y) x = trans_list(x) y = to_1d_list(y) x = to_2d_list(x) y_shape = np.asarray(y).shape x_shape = np.asarray(x).shape if len(y_shape) == 2 and y_shape[1] != 1: raise ValueError( "The multidimensional output data is supported by direciontal based models.") if y_shape[0] != x_shape[0]: raise ValueError( "Number of DMUs must be the same in x and y.") if type(z) != type(None): z = trans_list(z) z = to_2d_list(z) z_shape = np.asarray(z).shape if y_shape[0] != z_shape[0]: raise ValueError( "Number of DMUs must be the same in y and z.") return y, x, z def assert_valid_mupltiple_y_data(y, x): y = trans_list(y) x = trans_list(x) y = to_2d_list(y) x = to_2d_list(x) y_shape = np.asarray(y).shape x_shape = np.asarray(x).shape if y_shape[0] != x_shape[0]: raise ValueError( "Number of DMUs must be the same in x and y.") return y, x def assert_valid_reference_data(y, x, yref, xref): yref = trans_list(yref) xref = trans_list(xref) yref = to_2d_list(yref) xref = to_2d_list(xref) yref_shape = np.asarray(yref).shape xref_shape = np.asarray(xref).shape if yref_shape[0] != xref_shape[0]: raise ValueError( "Number of DMUs must be the same in xref and yref.") if yref_shape[1] != np.asarray(y).shape[1]: raise ValueError( "Number of outputs must be the same in y and yref.") if xref_shape[1] != np.asarray(x).shape[1]: raise ValueError( "Number of inputs must be the same in x and xref.") return yref, xref def assert_valid_reference_data_with_bad_outputs(y, x, b, yref, xref, bref): yref, xref = assert_valid_reference_data(y, x, yref, xref) if type(b) == type(None): return yref, xref, None bref = to_2d_list(bref) bref_shape = np.asarray(bref).shape if bref_shape[0] != np.asarray(yref).shape[0]: raise ValueError( "Number of DMUs must be the same in yref and bref.") if bref_shape[1] != np.asarray(b).shape[1]: raise ValueError( "Number of undesirable outputs must be the same in b and bref.") return yref, xref, bref def assert_valid_direciontal_data(y, x, b=None, gy=[1], gx=[1], gb=None): y = trans_list(y) x = trans_list(x) y = to_2d_list(y) x = to_2d_list(x) gy = to_1d_list(gy) gx = to_1d_list(gx) y_shape = np.asarray(y).shape x_shape = np.asarray(x).shape if y_shape[0] != x_shape[0]: raise ValueError( "Number of DMUs must be the same in x and y.") if y_shape[1] != len(gy): raise ValueError("Number of outputs must be the same in y and gy.") if x_shape[1] != len(gx): raise ValueError("Number of inputs must be the same in x and gx.") if type(b) != type(None): b = trans_list(b) b = to_2d_list(b) gb = to_1d_list(gb) b_shape = np.asarray(b).shape if b_shape[0] != b_shape[0]: raise ValueError( "Number of DMUs must be the same in y and b.") if b_shape[1] != len(gb): raise ValueError( "Number of undesirable outputs must be the same in b and gb.") return y, x, b, gy, gx, gb def assert_optimized(optimization_status): if optimization_status == 0: raise Exception( "Model isn't optimized. Use optimize() method to estimate the model.") def assert_contextual_variable(z): if type(z) == type(None): raise Exception( "Estimated coefficient (lambda) cannot be retrieved due to no contextual variable (z variable) included in the model.") def assert_undesirable_output(b): if type(b) == type(None): raise Exception( "Estimated coefficient (delta) cannot be retrieved due to no undesirable output (b variable) included in the model.") def assert_various_return_to_scale(rts): if rts == RTS_CRS: raise Exception( "Estimated intercept (alpha) cannot be retrieved due to the constant returns-to-scale assumption.") def assert_various_return_to_scale_omega(rts): if rts == RTS_CRS: raise Exception( "Omega cannot be retrieved due to the constant returns-to-scale assumption.") def assert_solver_available_locally(solver): if not SolverFactory(solver).available(): raise ValueError("Solver {} is not available locally.".format(solver)) def assert_valid_wp_data(y, x, b, z=None): y = trans_list(y) x = trans_list(x) b = trans_list(b) y = to_1d_list(y) x = to_2d_list(x) b = to_2d_list(b) y_shape = np.asarray(y).shape x_shape = np.asarray(x).shape b_shape = np.asarray(b).shape if len(y_shape) == 2 and y_shape[1] != 1: raise ValueError( "The multidimensional output data is supported by direciontal based models.") if y_shape[0] != x_shape[0]: raise ValueError( "Number of DMUs must be the same in x and y.") if x_shape[0] != b_shape[0]: raise ValueError( "Number of DMUs must be the same in x and b.") if type(z) != type(None): z = trans_list(z) z = to_2d_list(z) z_shape = np.asarray(z).shape if y_shape[0] != z_shape[0]: raise ValueError( "Number of DMUs must be the same in y and z.") return y, x, b, z def assert_valid_mupltiple_x_y_data(y, x, z=None): y = trans_list(y) x = trans_list(x) y = to_2d_list(y) x = to_2d_list(x) y_shape = np.asarray(y).shape x_shape = np.asarray(x).shape if y_shape[0] != x_shape[0]: raise ValueError( "Number of DMUs must be the same in x and y.") if type(z) != type(None): z = trans_list(z) z = to_2d_list(z) z_shape = np.asarray(z).shape if y_shape[0] != z_shape[0]: raise ValueError( "Number of DMUs must be the same in y and z.") return y, x, z