Source code for opentidalfarm.helpers

import random
import yaml
import os.path
import dolfin
import numpy
from dolfin import *

[docs]
def norm_approx(u, alpha=1e-4):
r""" A smooth approximation to :math:\|u\|:

.. math:: \|u\|_\alpha = \sqrt(u^2 + alpha^2)

:param u: The coefficient.
:param alpha: The approximation coefficient.
:returns: ufl expression -- the approximate norm of u.

"""
return sqrt(inner(u, u) + alpha**2)

[docs]
def smooth_uflmin(a, b, alpha=1e-8):
r""" A smooth approximation to :math:\min(a, b):

.. math:: \text{min}_\alpha(a, b) = \frac{1}{2} (a + b - \|a - b\|_\alpha)

:param a: First argument to :math:\min.
:param b: Second argument to :math:\min.
:param alpha: The approximation coefficient.
:returns: ufl expression -- the approximate :math:\min function.
"""
return (a + b - norm_approx(a - b, alpha=alpha)) / 2

[docs]
def get_rank():
""" The processor number.

:returns: int -- The processor number.
"""
rank = MPI.rank(mpi_comm_world())

return rank

[docs]
def test_gradient_array(J, dJ, x, seed=0.01, perturbation_direction=None,
number_of_tests=5, plot_file=None):
'''Checks the correctness of the derivative dJ.
x must be an array that specifies at which point in the parameter space
the gradient is to be checked. The functions J(x) and dJ(x) must return
the functional value and the functional derivative respectivaly.

This function returns the order of convergence of the Taylor
series remainder, which should be 2 if the gradient is correct.'''

# We will compute the gradient of the functional with respect to the
# initial condition, and check its correctness with the Taylor remainder
# convergence test.
log(INFO, "Running Taylor remainder convergence analysis to check the "

# First run the problem unperturbed
j_direct = J(x)

# Randomise the perturbation direction:
if perturbation_direction is None:
perturbation_direction = x.copy()
# Make sure that we use a consistent seed value accross all processors
random.seed(243)
for i in range(len(x)):
perturbation_direction[i] = random.random()

# Run the forward problem for various perturbed initial conditions
functional_values = []
perturbations = []
perturbation_sizes = [seed / (2 ** i) for i in range(number_of_tests)]
for perturbation_size in perturbation_sizes:
perturbation = perturbation_direction.copy() * perturbation_size
perturbations.append(perturbation)

perturbed_x = x.copy() + perturbation
functional_values.append(J(perturbed_x))

# First-order Taylor remainders (not using adjoint)
no_gradient = [abs(perturbed_j - j_direct) for perturbed_j in
functional_values]

dj = dJ(x, forget=True)
log(INFO, "Absolute functional evaluation differences: %s" % no_gradient)
log(INFO, "Convergence orders for Taylor remainder without adjoint \
information (should all be 1): %s" %

for i in range(len(perturbations)):
remainder = abs(functional_values[i] - j_direct - numpy.dot(perturbations[i],
dj))

log(INFO, "Absolute functional evaluation differences with adjoint: %s" %
log(INFO, "Convergence orders for Taylor remainder with adjoint \
information (should all be 2): %s" %

if plot_file:
first_order = [xx for xx in perturbation_sizes]
second_order = [xx ** 2 for xx in perturbation_sizes]

import pylab
pylab.figure()
pylab.loglog(perturbation_sizes, first_order, 'b--',
perturbation_sizes, second_order, 'g--',
pylab.legend(('First order convergence', 'Second order convergence',
pylab.xlabel("Perturbation size")
pylab.ylabel("Taylor remainder")
pylab.savefig(plot_file)

[docs]
class StateWriter:
def __init__(self, solver, callback=None):
self.timestep = 0
self.solver = solver
self.u_out, self.p_out = self.output_files(
solver.problem.parameters.finite_element.__name__)
self.M_u_out, self.v_out, self.u_out_state = self.u_output_projector(
solver.function_space.mesh())
self.M_p_out, self.q_out, self.p_out_state = self.p_output_projector(
solver.function_space.mesh())
self.callback = callback

[docs]
def write(self, state):
log(PROGRESS, "Projecting velocity and pressure to CG1 for visualisation")
rhs = assemble(inner(self.v_out, state.split()[0]) * dx)
solve(self.M_u_out, self.u_out_state.vector(), rhs, "cg", "sor",
annotate=False)
rhs = assemble(inner(self.q_out, state.split()[1]) * dx)
solve(self.M_p_out, self.p_out_state.vector(), rhs, "cg", "sor",
annotate=False)

self.u_out << self.u_out_state
self.p_out << self.p_out_state

if self.solver.parameters.output_abs_u_at_turbine_positions:
u_at_turbines = []
for position in self.solver.problem.parameters\
.tidal_farm.turbine_positions:
u_at_turbines.append((state[0](position)**2
+state[1](position)**2)**0.5)
dir = self.solver.get_optimisation_and_search_directory()
filename = os.path.join(dir, "abs_u_at_turb_pos_t_{}.txt"\
.format(self.timestep))
numpy.savetxt(filename, u_at_turbines)

if self.callback is not None:
self.callback(state, self.u_out_state, self.p_out_state,
self.timestep, self.solver.optimisation_iteration)

self.timestep += 1

[docs]
def u_output_projector(self, mesh):
# Projection operator for output.
Output_V = VectorFunctionSpace(mesh, 'CG', 1, dim=2)

u_out = TrialFunction(Output_V)
v_out = TestFunction(Output_V)

M_out = assemble(inner(v_out, u_out) * dx)
out_state = Function(Output_V)

return M_out, v_out, out_state

[docs]
def p_output_projector(self, mesh):
# Projection operator for output.
Output_V = FunctionSpace(mesh, 'CG', 1)

u_out = TrialFunction(Output_V)
v_out = TestFunction(Output_V)

M_out = assemble(inner(v_out, u_out) * dx)
out_state = Function(Output_V)

return M_out, v_out, out_state

[docs]
def output_files(self, basename):
dir = self.solver.get_optimisation_and_search_directory()
# Output file
u_out = File(os.path.join(dir, basename + "_u.pvd"), "compressed")
p_out = File(os.path.join(dir, basename + "_p.pvd"), "compressed")
return u_out, p_out

[docs]
def cpu0only(f):
''' A decorator class that only evaluates on the first CPU in a parallel
environment. '''
def decorator(self, *args, **kw):
myid = get_rank()
if myid == 0:
f(self, *args, **kw)

return decorator

[docs]
def function_eval(func, point):
''' A parallel safe evaluation of dolfin functions '''
try:
val = func(point)
except RuntimeError:
val = -numpy.inf

if dolfin.__version__ >= '1.3.0+':
maxval = MPI.max(mpi_comm_world(), val)
else:
maxval = MPI.max(val)

if maxval == -numpy.inf:
raise RuntimeError("Point is outside the domain")
else:
return maxval

[docs]
class FrozenClass(object):
""" A class which can be (un-)frozen. If the class is frozen, no attributes
can be added to the class. """

__isfrozen = True

def __setattr__(self, key, value):
if self.__isfrozen and not hasattr(self, key):
raise TypeError("%r is a frozen class. %r is not a valid \
attribute." % (self, key))

object.__setattr__(self, key, value)

def _freeze(self):
""" Disallows adding new attributes. """
self.__isfrozen = True

def _unfreeze(self):
""" Allows adding new attributes. """
self.__isfrozen = False

def _convert_type(self, k):
attr = getattr(self, k)

if isinstance(attr, bool):
return attr
try:
return float(attr)
except:
return str(attr)

def __str__(self):
attrs = dir(self)
attrs_dict = {}

for k in attrs:
if k.startswith("_"):
continue
val = self._convert_type(k)
attrs_dict[k] = val

return yaml.dump(attrs_dict, default_flow_style=False)

[docs]
class OutputWriter(object):
""" Suite of tools to write output to disk
"""

def __init__(self, functional):
self.functional = functional

[docs]
def individual_turbine_power(self, solver):
""" Print out the individual turbine's power or save it to file.
"""
log(INFO, "Computing individual turbine power extraction contribution.")
farm = solver.problem.parameters.tidal_farm
turbine_positions = farm.turbine_positions
individual_turbine_data = []
for i in range(len(turbine_positions)):
turbine_info = {}
turbine_info['location'] = turbine_positions[i]
turbine_info['power'] = self.functional.Jt_individual(solver.state, i)
turbine_info['force'] = self.functional.force_individual(solver.state, i)
turbine_info['friction'] = farm.turbine_cache._parameters['friction'][i]
turbine_info['friction_field'] = farm.turbine_cache['turbine_field_individual'][i]
info("Contribution of turbine %d at x=%.3f, "
"y=%.3f, is %.2f kW with a friction of %.2f." % (i,
turbine_info['location'][0], turbine_info['location'][1],
turbine_info['power']*0.001, turbine_info['friction']))