# import dependencies
from pyomo.environ import ConcreteModel, Set, Var, Objective, minimize, maximize, Constraint
import numpy as np
import pandas as pd
from .constant import CET_ADDI, ORIENT_IO, ORIENT_OO, RTS_VRS, RTS_CRS, OPT_DEFAULT, OPT_LOCAL
from .utils import tools
[docs]
class DEA:
"""Data Envelopment Analysis (DEA)
"""
[docs]
def __init__(self, y, x, orient, rts, yref=None, xref=None):
"""DEA: Envelopment problem
Args:
y (float): output variable.
x (float): input variables.
orient (String): ORIENT_IO (input orientation) or ORIENT_OO (output orientation)
rts (String): RTS_VRS (variable returns to scale) or RTS_CRS (constant returns to scale)
yref (String, optional): reference output. Defaults to None.
xref (String, optional): reference inputs. Defaults to None.
"""
# TODO(error/warning handling): Check the configuration of the model exist
self.y, self.x = tools.assert_valid_mupltiple_y_data(y, x)
self.orient, self.rts = orient, rts
if type(yref) != type(None):
self.yref, self.xref = tools.assert_valid_reference_data(
self.y, self.x, yref, xref)
else:
self.yref, self.xref = self.y, self.x
# Initialize DEA model
self.__model__ = ConcreteModel()
self.__model__.R = Set(initialize=range(len(self.yref)))
# Initialize sets
self.__model__.I = Set(initialize=range(len(self.y)))
self.__model__.J = Set(initialize=range(len(self.x[0])))
self.__model__.K = Set(initialize=range(len(self.y[0])))
# Initialize variable
self.__model__.theta = Var(self.__model__.I, doc='efficiency')
self.__model__.lamda = Var(self.__model__.I, self.__model__.R, bounds=(
0.0, None), doc='intensity variables')
# Setup the objective function and constraints
if self.orient == ORIENT_IO:
self.__model__.objective = Objective(
rule=self.__objective_rule(), sense=minimize, doc='objective function')
else:
self.__model__.objective = Objective(
rule=self.__objective_rule(), sense=maximize, doc='objective function')
self.__model__.input = Constraint(
self.__model__.I, self.__model__.J, rule=self.__input_rule(), doc='input constraint')
self.__model__.output = Constraint(
self.__model__.I, self.__model__.K, rule=self.__output_rule(), doc='output constraint')
if self.rts == RTS_VRS:
self.__model__.vrs = Constraint(
self.__model__.I, rule=self.__vrs_rule(), doc='variable return to scale rule')
# Optimize model
self.optimization_status, self.problem_status = 0, 0
def __objective_rule(self):
"""Return the proper objective function"""
def objective_rule(model):
return sum(model.theta[i] for i in model.I)
return objective_rule
def __input_rule(self):
"""Return the proper input constraint"""
if self.orient == ORIENT_IO:
def input_rule(model, o, j):
return model.theta[o]*self.x[o][j] >= sum(model.lamda[o, r]*self.xref[r][j] for r in model.R)
return input_rule
elif self.orient == ORIENT_OO:
def input_rule(model, o, j):
return sum(model.lamda[o, r] * self.xref[r][j] for r in model.R) <= self.x[o][j]
return input_rule
def __output_rule(self):
"""Return the proper output constraint"""
if self.orient == ORIENT_IO:
def output_rule(model, o, k):
return sum(model.lamda[o, r] * self.yref[r][k] for r in model.R) >= self.y[o][k]
return output_rule
elif self.orient == ORIENT_OO:
def output_rule(model, o, k):
return model.theta[o]*self.y[o][k] <= sum(model.lamda[o, r]*self.yref[r][k] for r in model.R)
return output_rule
def __vrs_rule(self):
def vrs_rule(model, o):
return sum(model.lamda[o, r] for r in model.R) == 1
return vrs_rule
[docs]
def optimize(self, email=OPT_LOCAL, solver=OPT_DEFAULT):
"""Optimize the function by requested method
Args:
email (string): The email address for remote optimization. It will optimize locally if OPT_LOCAL is given.
solver (string): The solver chosen for optimization. It will optimize with default solver if OPT_DEFAULT is given.
"""
# TODO(error/warning handling): Check problem status after optimization
self.problem_status, self.optimization_status = tools.optimize_model(
self.__model__, email, CET_ADDI, solver)
[docs]
def display_status(self):
"""Display the status of problem"""
print(self.optimization_status)
[docs]
def display_theta(self):
"""Display theta value"""
tools.assert_optimized(self.optimization_status)
self.__model__.theta.display()
[docs]
def display_lamda(self):
"""Display lamda value"""
tools.assert_optimized(self.optimization_status)
self.__model__.lamda.display()
[docs]
def get_status(self):
"""Return status"""
return self.optimization_status
[docs]
def get_theta(self):
"""Return theta value by array"""
tools.assert_optimized(self.optimization_status)
theta = list(self.__model__.theta[:].value)
return np.asarray(theta)
[docs]
def get_lamda(self):
"""Return lamda value by array"""
tools.assert_optimized(self.optimization_status)
lamda = np.asarray([i + tuple([j]) for i, j in zip(list(self.__model__.lamda),
list(self.__model__.lamda[:, :].value))])
lamda = pd.DataFrame(lamda, columns=['Name', 'Key', 'Value'])
lamda = lamda.pivot(index='Name', columns='Key', values='Value')
return lamda.to_numpy()
[docs]
class DDF(DEA):
[docs]
def __init__(self, y, x, b=None, gy=[1], gx=[1], gb=None, rts=RTS_VRS, yref=None, xref=None, bref=None):
"""DEA: Directional distance function
Args:
y (float): output variable.
x (float): input variables.
b (float), optional): undesirable output variables. Defaults to None.
gy (list, optional): output directional vector. Defaults to [1].
gx (list, optional): input directional vector. Defaults to [1].
gb (list, optional): undesirable output directional vector. Defaults to None.
rts (String): RTS_VRS (variable returns to scale) or RTS_CRS (constant returns to scale)
yref (String, optional): reference output. Defaults to None.
xref (String, optional): reference inputs. Defaults to None.
bref (String, optional): reference undesirable output. Defaults to None.
"""
# Initialize DEA model
self.__model__ = ConcreteModel()
self.y, self.x, self.b, self.gy, self.gx, self.gb = tools.assert_valid_direciontal_data(
y, x, b, gy, gx, gb)
self.rts = rts
if type(yref) != type(None):
self.yref, self.xref, self.bref = tools.assert_valid_reference_data_with_bad_outputs(
self.y, self.x, self.b, yref, xref, bref)
else:
self.yref, self.xref, self.bref = self.y, self.x, self.b
self.__model__.R = Set(initialize=range(len(self.yref)))
# Initialize sets
self.__model__.I = Set(initialize=range(len(self.y)))
self.__model__.J = Set(initialize=range(len(self.x[0])))
self.__model__.K = Set(initialize=range(len(self.y[0])))
if type(b) != type(None):
self.__model__.L = Set(initialize=range(len(self.b[0])))
# Initialize variable
self.__model__.theta = Var(
self.__model__.I, doc='directional distance')
self.__model__.lamda = Var(self.__model__.I, self.__model__.R, bounds=(
0.0, None), doc='intensity variables')
# Setup the objective function and constraints
self.__model__.objective = Objective(
rule=self._DEA__objective_rule(), sense=maximize, doc='objective function')
self.__model__.input = Constraint(
self.__model__.I, self.__model__.J, rule=self.__input_rule(), doc='input constraint')
self.__model__.output = Constraint(
self.__model__.I, self.__model__.K, rule=self.__output_rule(), doc='output constraint')
if type(b) != type(None):
self.__model__.undesirable_output = Constraint(
self.__model__.I, self.__model__.L, rule=self.__undesirable_output_rule(), doc='undesirable output constraint')
if self.rts == RTS_VRS:
self.__model__.vrs = Constraint(
self.__model__.I, rule=self.__vrs_rule(), doc='various return to scale rule')
# Optimize model
self.optimization_status, self.problem_status = 0, 0
def __input_rule(self):
"""Return the proper input constraint"""
def input_rule(model, o, j):
return self.x[o][j] - model.theta[o]*self.gx[j] >= sum(model.lamda[o, r] * self.xref[r][j] for r in model.R)
return input_rule
def __output_rule(self):
"""Return the proper output constraint"""
def output_rule(model, o, k):
return self.y[o][k] + model.theta[o]*self.gy[k] <= sum(model.lamda[o, r] * self.yref[r][k] for r in model.R)
return output_rule
def __undesirable_output_rule(self):
"""Return the proper undesirable output constraint"""
def undesirable_output_rule(model, o, l):
return self.b[o][l] - model.theta[o]*self.gb[l] == sum(model.lamda[o, r] * self.bref[r][l] for r in model.R)
return undesirable_output_rule
def __vrs_rule(self):
"""Return the VRS constraint"""
def vrs_rule(model, o):
return sum(model.lamda[o, r] for r in model.R) == 1
return vrs_rule
[docs]
class DUAL(DEA):
[docs]
def __init__(self, y, x, orient, rts, yref=None, xref=None):
"""DEA: Multiplier problem
Args:
y (float): output variable.
x (float): input variables.
orient (String): ORIENT_IO (input orientation) or ORIENT_OO (output orientation)
rts (String): RTS_VRS (variable returns to scale) or RTS_CRS (constant returns to scale)
yref (String, optional): reference output. Defaults to None.
xref (String, optional): reference inputs. Defaults to None.
"""
# Initialize DEA model
self.__model__ = ConcreteModel()
self.y, self.x = tools.assert_valid_mupltiple_y_data(y, x)
self.orient, self.rts = orient, rts
if type(yref) != type(None):
self.yref, self.xref = tools.assert_valid_reference_data(
self.y, self.x, yref, xref)
else:
self.yref, self.xref = self.y, self.x
self.__model__.R = Set(initialize=range(len(self.yref)))
# Initialize sets
self.__model__.I = Set(initialize=range(len(self.y)))
self.__model__.J = Set(initialize=range(len(self.x[0])))
self.__model__.K = Set(initialize=range(len(self.y[0])))
# Initialize variable
self.__model__.nu = Var(self.__model__.I, self.__model__.J, bounds=(
0.0, None), doc='multiplier x')
self.__model__.mu = Var(self.__model__.I, self.__model__.K, bounds=(
0.0, None), doc='multiplier y')
if self.rts == RTS_VRS:
self.__model__.omega = Var(
self.__model__.I, doc='variable return to scale')
# Setup the objective function and constraints
if self.orient == ORIENT_IO:
self.__model__.objective = Objective(
rule=self.__objective_rule(), sense=maximize, doc='objective function')
else:
self.__model__.objective = Objective(
rule=self.__objective_rule(), sense=minimize, doc='objective function')
self.__model__.first = Constraint(
self.__model__.I, self.__model__.R, rule=self.__first_rule(), doc='technology constraint')
self.__model__.second = Constraint(
self.__model__.I, rule=self.__second_rule(), doc='normalization constraint')
# Optimize model
self.optimization_status, self.problem_status = 0, 0
def __objective_rule(self):
"""Return the proper objective function"""
if self.orient == ORIENT_IO:
def objective_rule(model):
if self.rts == RTS_VRS:
return sum(sum(model.mu[o, k] * self.y[o][k] for o in model.I) for k in model.K) + sum(model.omega[o] for o in model.I)
elif self.rts == RTS_CRS:
return sum(sum(model.mu[o, k] * self.y[o][k] for o in model.I) for k in model.K)
return objective_rule
elif self.orient == ORIENT_OO:
def objective_rule(model):
if self.rts == RTS_VRS:
return sum(sum(model.nu[o, j] * self.x[o][j] for o in model.I) for j in model.J) + sum(model.omega[o] for o in model.I)
elif self.rts == RTS_CRS:
return sum(sum(model.nu[o, j] * self.x[o][j] for o in model.I) for j in model.J)
return objective_rule
def __first_rule(self):
"""Return the proper technology constraint"""
if self.orient == ORIENT_IO:
if self.rts == RTS_VRS:
def first_rule(model, o, r):
return sum(model.mu[o, k] * self.yref[r][k] for k in model.K) - sum(model.nu[o, j] * self.xref[r][j] for j in model.J) + model.omega[o] <= 0
return first_rule
elif self.rts == RTS_CRS:
def first_rule(model, o, r):
return sum(model.mu[o, k] * self.yref[r][k] for k in model.K) - sum(model.nu[o, j] * self.xref[r][j] for j in model.J) <= 0
return first_rule
elif self.orient == ORIENT_OO:
if self.rts == RTS_VRS:
def first_rule(model, o, r):
return sum(model.nu[o, j] * self.xref[r][j] for j in model.J) - sum(model.mu[o, k] * self.yref[r][k] for k in model.K) + model.omega[o] >= 0
return first_rule
elif self.rts == RTS_CRS:
def first_rule(model, o, r):
return sum(model.nu[o, j] * self.xref[r][j] for j in model.J) - sum(model.mu[o, k] * self.yref[r][k] for k in model.K) >= 0
return first_rule
def __second_rule(self):
"""Return the proper normalization constraint"""
if self.orient == ORIENT_IO:
def second_rule(model, o):
return sum(model.nu[o, j] * self.x[o][j] for j in model.J) == 1
return second_rule
elif self.orient == ORIENT_OO:
def second_rule(model, o):
return sum(model.mu[o, k] * self.y[o][k] for k in model.K) == 1
return second_rule
[docs]
def display_mu(self):
"""Display mu value"""
tools.assert_optimized(self.optimization_status)
self.__model__.mu.display()
[docs]
def display_nu(self):
"""Display nu value"""
tools.assert_optimized(self.optimization_status)
self.__model__.nu.display()
[docs]
def display_omega(self):
"""Display omega value"""
tools.assert_optimized(self.optimization_status)
tools.assert_various_return_to_scale_omega(self.rts)
self.__model__.omega.display()
[docs]
def get_mu(self):
"""Return mu value by array"""
tools.assert_optimized(self.optimization_status)
mu = np.asarray([i + tuple([j]) for i, j in zip(list(self.__model__.mu),
list(self.__model__.mu[:, :].value))])
mu = pd.DataFrame(mu, columns=['Name', 'Key', 'Value'])
mu = mu.pivot(index='Name', columns='Key', values='Value')
return mu.to_numpy()
[docs]
def get_nu(self):
"""Return nu value by array"""
tools.assert_optimized(self.optimization_status)
nu = np.asarray([i + tuple([j]) for i, j in zip(list(self.__model__.nu),
list(self.__model__.nu[:, :].value))])
nu = pd.DataFrame(nu, columns=['Name', 'Key', 'Value'])
nu = nu.pivot(index='Name', columns='Key', values='Value')
return nu.to_numpy()
[docs]
def get_omega(self):
"""Return omega value by array"""
tools.assert_optimized(self.optimization_status)
tools.assert_various_return_to_scale_omega(self.rts)
omega = list(self.__model__.omega[:].value)
return np.asarray(omega)
[docs]
def get_efficiency(self):
"""Return efficiency value by array"""
tools.assert_optimized(self.optimization_status)
if self.orient == ORIENT_IO:
if self.rts == RTS_CRS:
return (np.sum(self.get_mu()*self.y, axis=1)).reshape(len(self.y), 1)
elif self.rts == RTS_VRS:
return (np.sum(self.get_mu()*self.y, axis=1)).reshape(len(self.y), 1) + self.get_omega().reshape(len(self.y), 1)
elif self.orient == ORIENT_OO:
if self.rts == RTS_CRS:
return (np.sum(self.get_nu()*self.x, axis=1)).reshape(len(self.x), 1)
elif self.rts == RTS_VRS:
return (np.sum(self.get_nu()*self.x, axis=1)).reshape(len(self.x), 1) + self.get_omega().reshape(len(self.x), 1)