Source code for pystoned.sCQER

# import dependencies
from pyomo.environ import ConcreteModel, Set, Var, Objective, minimize, Constraint
import numpy as np
import pandas as pd

from .constant import CET_ADDI, OPT_LOCAL, OPT_DEFAULT
from .utils import tools


[docs] class sCQR: """Simultaneous estimation of CQR """
[docs] def __init__(self, y, x, tau, C): """sCQR model Args: y (float): output variable. x (float): input variables. tau: vector of quantile. C: interval (small positive value) """ # TODO(error/warning handling): Check the configuration of the model exist self.y, self.x = tools.to_1d_list(tools.trans_list( y)), tools.to_2d_list(tools.trans_list(x)) self.tau = tools.to_1d_list(tools.trans_list(tau)) self.C, self.cet = C, CET_ADDI # Initialize the CQR model self.__model__ = ConcreteModel() # Initialize the sets self.__model__.I = Set(initialize=range(len(self.y))) self.__model__.J = Set(initialize=range(len(self.x[0]))) self.__model__.K = Set(initialize=range(len(self.tau))) self.__model__.W = Set(initialize=range(len(self.tau)-1)) # Initialize the variables self.__model__.alpha = Var( self.__model__.K, self.__model__.I, doc='alpha') self.__model__.beta = Var( self.__model__.K, self.__model__.I, self.__model__.J, bounds=(0.0, None), doc='beta') self.__model__.epsilon_plus = Var(self.__model__.K, self.__model__.I, bounds=( 0.0, None), doc='positive error term') self.__model__.epsilon_minus = Var(self.__model__.K, self.__model__.I, bounds=( 0.0, None), doc='negative error term') # Setup the objective function and constraints self.__model__.objective = Objective( rule=self.__objective_rule(), sense=minimize, doc='objective function') self.__model__.regression_rule = Constraint(self.__model__.K, self.__model__.I, rule=self.__regression_rule(), doc='regression equation') self.__model__.afriat_rule = Constraint(self.__model__.K, self.__model__.I, self.__model__.I, rule=self.__afriat_rule(), doc='afriat inequality') self.__model__.noncrossing_rule = Constraint(self.__model__.W, self.__model__.I, rule=self.__noncrossing_rule(), doc='non-crossing constraint') # Optimize model self.optimization_status, self.problem_status = 0, 0
[docs] def optimize(self, email=OPT_LOCAL, solver=OPT_DEFAULT): """Optimize the function by requested method Args: email (string): The email address for remote optimization. It will optimize locally if OPT_LOCAL is given. solver (string): The solver chosen for optimization. It will optimize with default solver if OPT_DEFAULT is given. """ # TODO(error/warning handling): Check problem status after optimization self.problem_status, self.optimization_status = tools.optimize_model( self.__model__, email, self.cet, solver)
def __objective_rule(self): """Return the proper objective function""" def objective_rule(model): return sum(self.tau[k] * sum(model.epsilon_plus[k, i] for i in model.I) + (1 - self.tau[k]) * sum(model.epsilon_minus[k, i] for i in model.I) for k in model.K) return objective_rule def __regression_rule(self): """Return the proper regression constraint""" def regression_rule(model, k, i): return self.y[i] == model.alpha[k, i] + sum(model.beta[k, i, j] * self.x[i][j] for j in model.J) + \ model.epsilon_plus[k, i] - model.epsilon_minus[k, i] return regression_rule def __afriat_rule(self): """Return the proper afriat inequality constraint""" def afriat_rule(model, k, i, h): if i == h: return Constraint.Skip return model.alpha[k, i] + sum(model.beta[k, i, j] * self.x[i][j] for j in model.J) \ <= model.alpha[k, h] + sum(model.beta[k, h, j] * self.x[i][j] for j in model.J) return afriat_rule def __noncrossing_rule(self): """Return the proper non-crossing constraint""" def noncrossing_rule(model, w, i): return model.alpha[w, i] + sum(model.beta[w, i, j] * self.x[i][j] for j in model.J) + self.C <= \ model.alpha[w+1, i] + \ sum(model.beta[w+1, i, j] * self.x[i][j] for j in model.J) return noncrossing_rule
[docs] def get_alpha(self): """Return alpha value by array""" tools.assert_optimized(self.optimization_status) alpha = np.asarray([i + tuple([j]) for i, j in zip(list(self.__model__.alpha), list(self.__model__.alpha[:, :].value))]) alpha = pd.DataFrame(alpha, columns=['Name', 'Key', 'Value']) alpha = alpha.pivot(index='Name', columns='Key', values='Value') return alpha.to_numpy().T
[docs] def get_beta(self): """Return beta value by array""" tools.assert_optimized(self.optimization_status) beta = np.asarray([i + tuple([j]) for i, j in zip(list(self.__model__.beta), list(self.__model__.beta[:, :, :].value))]) beta = pd.DataFrame(beta, columns=['tau', 'n', 'd', 'beta']) betanew = [] for i in range(len(self.tau)): beta_new = beta.loc[beta['tau'] == i] del beta_new["tau"] betanew.append(beta_new.pivot( index='n', columns='d', values='beta').to_numpy()) return np.array(betanew).reshape(len(self.tau)*len(self.y), len(self.x[0]))
[docs] def get_positive_residual(self): """Return positive residual value by array""" tools.assert_optimized(self.optimization_status) residual_plus = np.asarray([i + tuple([j]) for i, j in zip(list(self.__model__.epsilon_plus), list(self.__model__.epsilon_plus[:, :].value))]) residual_plus = pd.DataFrame(residual_plus, columns=[ 'Name', 'Key', 'Value']) residual_plus = residual_plus.pivot( index='Name', columns='Key', values='Value') return residual_plus.to_numpy().T
[docs] def get_negative_residual(self): """Return negative residual value by array""" tools.assert_optimized(self.optimization_status) residual_minus = np.asarray([i + tuple([j]) for i, j in zip(list(self.__model__.epsilon_minus), list(self.__model__.epsilon_minus[:, :].value))]) residual_minus = pd.DataFrame(residual_minus, columns=[ 'Name', 'Key', 'Value']) residual_minus = residual_minus.pivot( index='Name', columns='Key', values='Value') return residual_minus.to_numpy().T
[docs] def get_frontier(self): """Return estimated frontier value by array""" tools.assert_optimized(self.optimization_status) frontier = np.tile(np.asarray(self.y).reshape(len(self.y), 1), len(self.tau)) \ - (self.get_positive_residual() - self.get_negative_residual()) return np.asarray(frontier)
[docs] class sCER(sCQR): """Simultaneous estimation of CER """
[docs] def __init__(self, y, x, tau, C): """sCER model Args: y (float): output variable. x (float): input variables. tau: vector of expectile. C: interval (small positive value) """ super().__init__(y, x, tau, C) self.__model__.objective.deactivate() self.__model__.squared_objective = Objective( rule=self.__squared_objective_rule(), sense=minimize, doc='squared objective rule')
def __squared_objective_rule(self): def squared_objective_rule(model): return sum(self.tau[k] * sum(model.epsilon_plus[k, i] ** 2 for i in model.I) + (1 - self.tau[k]) * sum(model.epsilon_minus[k, i] ** 2 for i in model.I) for k in model.K) return squared_objective_rule