from abc import abstractmethod
from rtctools.optimization.optimization_problem import OptimizationProblem
from rtctools.optimization.collocated_integrated_optimization_problem \
import CollocatedIntegratedOptimizationProblem
from casadi import fmax, sqrt, MX
from numpy import inf
import logging
import os
from .util import _ObjectParameterWrapper
logger = logging.getLogger("rtc-hydraulic-structures")
logging.basicConfig(level=logging.INFO)
[docs]class Weir(_ObjectParameterWrapper):
"""
Python Weir object as an interface to the Weir object in the model.
In the optimization, the weir flow is implemented as constraints. It means
that the optimization calculated a flow (not weir height!), that is forced
by the constraints to be a physically possible weir flow.
"""
def __init__(self, optimization_problem, name):
super(Weir, self).__init__(optimization_problem)
self.optimization_problem = optimization_problem
self.symbol = name
[docs] def discharge(self):
"""
Get the state corresponding to the weir discharge.
:returns: `MX` expression of the weir discharge.
"""
return self.optimization_problem.state(self.symbol + '.Q')
def _head(self):
return self.optimization_problem.state(self.symbol + '.HQUp.H')
@property
def c_weir(self):
# coefficient part of the equation
return (2 * 9.81)**0.5 * 2.0 / 3.0 * self.weir_coef * self.width
@property
def q_nom(self):
# half of the possible dischage
return (self.q_max + self.q_min) / 2.0
@property
def h_nom(self):
# H corresponding to half of the possible dischage
return (self.q_nom / self.c_weir)**(2.0 / 3.0) + self.hw_min
@property
def slope(self):
return self.c_weir * 3.0 / 2.0 * (self.h_nom - self.hw_min)**0.5
[docs]class WeirMixin(OptimizationProblem):
"""
Adds handling of Weir objects in your model to your optimization problem.
"""
def __init__(self, *args, **kwargs):
super(WeirMixin, self).__init__(*args, **kwargs)
# We also want to output a bunch of variables that are calculated in
# post(), which we return in the extract_results() call. We store them
# in this dictionary.
self.__additional_results = {}
@abstractmethod
[docs] def weirs(self):
"""
User problem returns list of :class:`Weir` objects.
:returns: A list of weirs.
"""
return []
def pre(self):
super(WeirMixin, self).pre()
self._weir_discrete_symbols = set()
self._weir_mx_path_variables = set()
self._weir_status_bounds = {}
self._weir_status_pairs = {}
for w in self.weirs():
# Define symbol names
status_sym = '{}__status'.format(w.symbol)
# Store variable names
self._weir_discrete_symbols.add(status_sym)
# Store bounds
self._weir_status_bounds[status_sym] = (0, 1)
# Generate and store MX variables
status_mx = MX.sym(status_sym)
self._weir_mx_path_variables.add(status_mx)
# Store all symbols together
self._weir_status_pairs[w.symbol] = (status_sym)
def path_constraints(self, ensemble_member):
constraints = super(WeirMixin, self).path_constraints(ensemble_member)
for w in self.weirs():
status_sym = self._weir_status_pairs[w.symbol]
status = self.state(status_sym)
# calculate minimum possible discharge (weir @ max)
import sys
q_min_h = sqrt(fmax(sys.float_info.epsilon, w.c_weir**2 *
(w._head() - w.hw_max)**3.0))
# calculate maximum possible discharge (weir @ min), using
# linearized curve
q_max_h = w.slope*(w._head() - w.h_nom-1 * (status-1)) + w.q_nom
# flow should be lower than physical maximum and bigger then zero
constraints.append((w.discharge() - w.q_max*(status), -inf, 0))
epsilon = 0.00001
# Here a small value, epsilon is used, write down the equation
# status * epsilon < Q, with other words if the weir is "on",
# this is the minimum flow. If the weir is "off" this flow can still be
# present. So it should be small...
constraints.append((-w.discharge() + epsilon*(status), -inf, 0))
# flow should be lower than max related to water level and weir
# height
constraints.append((w.discharge() - q_max_h, -inf, 0))
# flow should be higher than min related to water level and weir
# height
constraints.append((w.discharge() - q_min_h, 0, inf))
# flow should higher than physical minimum
constraints.append((-w.discharge(), -inf, -w.q_min))
return constraints
@property
def path_variables(self):
variables = super(WeirMixin, self).path_variables
variables.extend(self._weir_mx_path_variables)
return variables
def bounds(self):
bounds = super(WeirMixin, self).bounds()
bounds.update(self._weir_status_bounds)
return bounds
def variable_is_discrete(self, variable):
if variable in self._weir_discrete_symbols:
return True
else:
return super(WeirMixin, self).variable_is_discrete(variable)
def post(self):
results = self.extract_results()
times = self.times()
# Calculate the weir heights
path_expressions = []
# Calculating the weir height
for w in self.weirs():
weir_wl_up = results[w.symbol + ".HQUp.H"]
weir_q = results[w.symbol + ".Q"]
self.__additional_results[w.symbol + "_height"] = weir_wl_up - (weir_q / w.c_weir)**(2.0/3.0)
# NOTE: If we call super() first, adding output time series with
# set_time series has no effect, as e.g. PIMIxin/CSVMixin have already
# written their export file. That is why we do it at the end instead.
super(WeirMixin, self).post()
def extract_results(self, *args, **kwargs):
results = super(WeirMixin, self).extract_results(*args, **kwargs)
results.update(self.__additional_results)
return results
def plot_operating_points(optimization_problem, output_folder, results):
# This is for post processing, for plotting the flow and height of a weir
import matplotlib.pyplot as plt
import numpy as np
for w in optimization_problem.weirs():
weir_name = w.symbol
weir_flow_results = results[weir_name +'.Q']
water_level = results[weir_name + '.HQUp.H']
# Calculating the working area of the weir
hmargin = 0.25 * (w.hw_max - w.hw_min)
hs = np.linspace(w.hw_min - hmargin, w.hw_max + hmargin, 100)
q_max_h_plot = w.slope * (hs - w.h_nom) + w.q_nom
q_min_h_plot = w.c_weir * np.fmax(0, (hs - w.hw_max))**1.5
q_max_th_plot = w.c_weir * np.fmax(0, (hs - w.hw_min))**1.5
q_min_plot = np.full((100), w.q_min)
q_max_plot = np.full((100), w.q_max)
# Plotting the working area of the weir
plt.clf()
plt.plot(hs, q_min_plot, 'k-')
plt.plot(hs, q_max_h_plot, 'b-')
plt.plot(hs, q_max_plot, 'k-')
plt.plot(hs, q_min_h_plot, 'g-')
plt.plot(hs, q_max_th_plot, 'k-')
plt.plot(water_level, weir_flow_results, 'r+', markeredgewidth=2)
plt.ylim(w.q_min - 0.5, w.q_max * 1.2)
plt.title('Working area of the weir')
plt.xlabel('Head (m)')
plt.ylabel('Flow [$m^3\,s^{-1}$]')
save_name = weir_name + "_working_area" + ".png"
fname = os.path.join(output_folder, save_name)
plt.savefig(fname, bbox_inches='tight', pad_inches=0.1)
if w.q_max > 2 * max(weir_flow_results[1:]):
logger.warning('The given maximum weir flow ({}) is much higher than the actual maximum flow ({}). '
'This might lead to an unncessarily big linearization error.'.format(
w.q_max, max(weir_flow_results)))