"""
Tower template loader: parse OpenFAST ElastoDyn tower input files and
return distributed tower properties suitable for assembling an
OpenSeesPy stick model with byte-identical NREL section properties.
This is Phase 1 / Task 1.1 of the Op^3 Track C industry-grade plan:
replace the hand-tuned ``TOWER_TEMPLATES`` dict in
``op3/opensees_foundations/builder.py`` with values read directly from
the canonical NREL ``*_ElastoDyn*Tower.dat`` files. The expected
outcome is that examples 1, 2, 3, 7, 8 frequency errors fall from
10-30% to <2% on the fixed-base configuration.
Usage
-----
>>> from op3.opensees_foundations.tower_loader import load_elastodyn_tower
>>> tpl = load_elastodyn_tower(
... ed_main="nrel_reference/openfast_rtest/5MW_OC3Mnpl_DLL_WTurb_WavesIrr/"
... "NRELOffshrBsline5MW_OC3Monopile_ElastoDyn.dat",
... )
>>> tpl.tower_height_m, len(tpl.ht_fract)
(87.6, 11)
Schema
------
Returned ``TowerTemplate`` exposes:
- ``tower_height_m`` : TowerHt from ElastoDyn main file
- ``tower_base_z_m`` : TowerBsHt (offset from MSL or ground)
- ``ht_fract`` : array of fractional heights along tower (0..1)
- ``mass_density_kg_m`` : array of TMassDen at each station
- ``ei_fa_Nm2`` : array of TwFAStif (fore-aft EI)
- ``ei_ss_Nm2`` : array of TwSSStif (side-side EI)
- ``damping_ratio_fa`` : (TwrFADmp1, TwrFADmp2) percent of critical
- ``damping_ratio_ss`` : (TwrSSDmp1, TwrSSDmp2)
- ``adj_tower_mass`` : AdjTwMa scalar
- ``adj_fa_stiff`` : AdjFASt scalar
- ``adj_ss_stiff`` : AdjSSSt scalar
- ``source_files`` : provenance for V&V records
"""
from __future__ import annotations
import re
from dataclasses import dataclass, field
from pathlib import Path
from typing import Iterable
import numpy as np
# ---------------------------------------------------------------------------
# Data class
# ---------------------------------------------------------------------------
[docs]
@dataclass
class TowerTemplate:
"""Distributed tower properties parsed from an ElastoDyn deck."""
tower_height_m: float
tower_base_z_m: float
ht_fract: np.ndarray
mass_density_kg_m: np.ndarray
ei_fa_Nm2: np.ndarray
ei_ss_Nm2: np.ndarray
damping_ratio_fa: tuple[float, float] = (1.0, 1.0)
damping_ratio_ss: tuple[float, float] = (1.0, 1.0)
adj_tower_mass: float = 1.0
adj_fa_stiff: float = 1.0
adj_ss_stiff: float = 1.0
source_files: list[str] = field(default_factory=list)
@property
def n_stations(self) -> int:
return int(self.ht_fract.size)
[docs]
def station_elevations_m(self) -> np.ndarray:
"""Absolute z-coordinates of stations (base + ht_fract * height)."""
return self.tower_base_z_m + self.ht_fract * self.tower_height_m
[docs]
def section_at(self, ht_fract: float) -> dict:
"""Linearly interpolated section properties at a given height fraction."""
return {
"mass_kg_m": float(np.interp(ht_fract, self.ht_fract, self.mass_density_kg_m)),
"EI_fa_Nm2": float(np.interp(ht_fract, self.ht_fract, self.ei_fa_Nm2)),
"EI_ss_Nm2": float(np.interp(ht_fract, self.ht_fract, self.ei_ss_Nm2)),
}
# ---------------------------------------------------------------------------
# Parsing helpers
# ---------------------------------------------------------------------------
_NUM = r"[-+]?\d+(?:\.\d*)?(?:[EeDd][-+]?\d+)?"
def _read_text(path: Path) -> str:
return path.read_text(errors="replace")
def _scan_scalar(text: str, key: str) -> float | None:
"""Find ``<value> <key>`` style scalar parameter."""
# Use a word-boundary only when the key ends in a word char, since
# OpenFAST uses keys like ``TwFAM1Sh(2)`` whose trailing ``)`` is
# non-word and would block \b from matching.
suffix = r"\b" if key[-1].isalnum() or key[-1] == "_" else r"(?=\s|$)"
m = re.search(rf"^\s*({_NUM})\s+{re.escape(key)}{suffix}", text, re.MULTILINE)
if not m:
return None
raw = m.group(1).replace("D", "E").replace("d", "e")
try:
return float(raw)
except ValueError:
return None
def _scan_distributed_block(text: str) -> np.ndarray:
"""
Locate the DISTRIBUTED TOWER PROPERTIES table and return an
(n, 4) array with columns [HtFract, TMassDen, TwFAStif, TwSSStif].
Some NREL files include extra columns (TwGJStif, TwEAStif, ...) — we
keep only the first four because that is what ElastoDyn requires
for FE bending mode synthesis.
"""
# Find header line that starts the block
m = re.search(
r"DISTRIBUTED\s+TOWER\s+PROPERTIES.*?\n"
r"\s*HtFract.*?\n" # column header
r"\s*\(-\).*?\n", # units line
text,
flags=re.IGNORECASE | re.DOTALL,
)
if not m:
raise ValueError("DISTRIBUTED TOWER PROPERTIES block not found")
tail = text[m.end():]
rows: list[list[float]] = []
for line in tail.splitlines():
s = line.strip()
if not s:
continue
if s.startswith("-") or s.startswith("="):
break
# require leading numeric token
first = s.split()[0].replace("D", "E").replace("d", "e")
try:
float(first)
except ValueError:
break
toks = [t.replace("D", "E").replace("d", "e") for t in s.split()]
try:
vals = [float(t) for t in toks[:4]]
except ValueError:
break
if len(vals) < 4:
break
rows.append(vals)
if not rows:
raise ValueError("No distributed tower stations parsed")
return np.asarray(rows, dtype=float)
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
[docs]
def load_elastodyn_tower(
ed_main: str | Path,
ed_tower: str | Path | None = None,
) -> TowerTemplate:
"""
Parse an OpenFAST ElastoDyn deck and return a ``TowerTemplate``.
Parameters
----------
ed_main
Path to the ElastoDyn primary input file (the one referenced by
``EDFile`` in the top-level ``.fst``). Provides ``TowerHt`` and
``TowerBsHt`` plus the path to the tower file via ``TwrFile``.
ed_tower
Optional explicit override for the tower properties file. If
omitted, the loader resolves it from ``TwrFile`` in ``ed_main``.
Notes
-----
Some NREL decks (the OC3 monopile in particular) embed the
distributed properties directly in the main ElastoDyn file rather
than in a separate ``*_Tower.dat``. We handle both layouts.
"""
ed_main_path = Path(ed_main)
main_text = _read_text(ed_main_path)
tower_height = _scan_scalar(main_text, "TowerHt")
tower_base = _scan_scalar(main_text, "TowerBsHt") or 0.0
if tower_height is None:
raise ValueError(f"TowerHt not found in {ed_main_path}")
# Try to resolve a separate tower file
twr_path: Path | None = None
if ed_tower is not None:
twr_path = Path(ed_tower)
else:
m = re.search(r'^\s*"([^"]+)"\s+TwrFile\b', main_text, re.MULTILINE)
if m:
cand = (ed_main_path.parent / m.group(1)).resolve()
if cand.exists():
twr_path = cand
else:
# Fall back: search same directory for *tower*.dat
basename = Path(m.group(1)).name
local = ed_main_path.parent / basename
if local.exists():
twr_path = local
else:
matches = sorted(ed_main_path.parent.glob("*[Tt]ower*.dat"))
matches = [p for p in matches if p != ed_main_path]
if matches:
twr_path = matches[0]
if twr_path is not None and twr_path.exists():
twr_text = _read_text(twr_path)
block_source = twr_path
else:
twr_text = main_text
block_source = ed_main_path
table = _scan_distributed_block(twr_text)
fa_dmp1 = _scan_scalar(twr_text, "TwrFADmp(1)") or 1.0
fa_dmp2 = _scan_scalar(twr_text, "TwrFADmp(2)") or 1.0
ss_dmp1 = _scan_scalar(twr_text, "TwrSSDmp(1)") or 1.0
ss_dmp2 = _scan_scalar(twr_text, "TwrSSDmp(2)") or 1.0
adj_mass = _scan_scalar(twr_text, "AdjTwMa") or 1.0
adj_fa = _scan_scalar(twr_text, "AdjFASt") or 1.0
adj_ss = _scan_scalar(twr_text, "AdjSSSt") or 1.0
return TowerTemplate(
tower_height_m=float(tower_height),
tower_base_z_m=float(tower_base),
ht_fract=table[:, 0],
mass_density_kg_m=table[:, 1],
ei_fa_Nm2=table[:, 2],
ei_ss_Nm2=table[:, 3],
damping_ratio_fa=(float(fa_dmp1), float(fa_dmp2)),
damping_ratio_ss=(float(ss_dmp1), float(ss_dmp2)),
adj_tower_mass=float(adj_mass),
adj_fa_stiff=float(adj_fa),
adj_ss_stiff=float(adj_ss),
source_files=[str(ed_main_path), str(block_source)],
)
# ---------------------------------------------------------------------------
# RNA (rotor + nacelle assembly) loader
# ---------------------------------------------------------------------------
[docs]
@dataclass
class RNAProperties:
"""Mass + inertia of the rotor-nacelle assembly parsed from ElastoDyn."""
hub_mass_kg: float
nac_mass_kg: float
blade_mass_kg: float # per blade, integrated from blade .dat
n_blades: int
hub_iner_kgm2: float # rotor-axis inertia
nac_yiner_kgm2: float # nacelle yaw-axis inertia
nac_cm_xn_m: float # downwind offset tower-top -> nacelle CM
nac_cm_zn_m: float # vertical offset tower-top -> nacelle CM
twr2shft_m: float # tower-top to rotor shaft vertical offset
tip_rad_m: float
hub_rad_m: float
source_files: list[str] = field(default_factory=list)
@property
def total_rna_mass_kg(self) -> float:
return self.hub_mass_kg + self.nac_mass_kg + self.n_blades * self.blade_mass_kg
[docs]
def published_mode_shape(twr_text: str, mode: str = "TwFAM1Sh") -> np.ndarray | None:
"""
Return the 5 polynomial coefficients [c2, c3, c4, c5, c6] for one of
the NREL ElastoDyn tower mode shapes
(TwFAM1Sh, TwFAM2Sh, TwSSM1Sh, TwSSM2Sh).
Mode shape is psi(eta) = sum_{i=2..6} c_i * eta^i where eta in [0,1].
"""
coeffs = []
for i in range(2, 7):
v = _scan_scalar(twr_text, f"{mode}({i})")
if v is None:
return None
coeffs.append(v)
return np.asarray(coeffs, dtype=float)
[docs]
def evaluate_mode_shape(coeffs: np.ndarray, eta: np.ndarray) -> np.ndarray:
"""Sample psi(eta) given the [c2..c6] polynomial coefficients."""
eta = np.asarray(eta, dtype=float)
psi = np.zeros_like(eta)
for i, c in enumerate(coeffs, start=2):
psi += c * eta ** i
return psi
def _integrate_blade_mass(blade_path: Path, span_m: float) -> float:
"""Trapezoidal integration of BMassDen over the blade span."""
text = _read_text(blade_path)
hm = re.search(
r"DISTRIBUTED\s+BLADE\s+PROPERTIES.*?\n(\s*BlFract[^\n]*)\n",
text,
flags=re.IGNORECASE | re.DOTALL,
)
if not hm:
return 0.0
header_tokens = hm.group(1).split()
try:
bm_idx = header_tokens.index("BMassDen")
bf_idx = header_tokens.index("BlFract")
except ValueError:
return 0.0
# Skip units line after header
after = text[hm.end():]
units_match = re.match(r"\s*\([^\n]*\n", after)
tail = after[units_match.end():] if units_match else after
rows: list[list[float]] = []
for line in tail.splitlines():
s = line.strip()
if not s:
continue
if s.startswith("-") or s.startswith("="):
break
toks = [t.replace("D", "E").replace("d", "e") for t in s.split()]
try:
vals = [float(t) for t in toks]
except ValueError:
break
if len(vals) <= max(bf_idx, bm_idx):
break
rows.append(vals)
if not rows:
return 0.0
arr = np.asarray(rows)
bl_fract = arr[:, bf_idx]
bmass = arr[:, bm_idx]
s = bl_fract * span_m
return float(np.trapezoid(bmass, s))
[docs]
def load_elastodyn_rna(ed_main: str | Path) -> RNAProperties:
ed_main_path = Path(ed_main)
text = _read_text(ed_main_path)
def must(key: str) -> float:
v = _scan_scalar(text, key)
if v is None:
raise ValueError(f"{key} not found in {ed_main_path.name}")
return v
hub_mass = must("HubMass")
nac_mass = must("NacMass")
hub_iner = must("HubIner")
nac_yiner = must("NacYIner")
nac_cm_xn = _scan_scalar(text, "NacCMxn") or 0.0
nac_cm_zn = _scan_scalar(text, "NacCMzn") or 0.0
twr2shft = _scan_scalar(text, "Twr2Shft") or 0.0
tip_rad = must("TipRad")
hub_rad = must("HubRad")
n_blades = int(_scan_scalar(text, "NumBl") or 3)
# Locate blade file (BldFile(1)) and integrate BMassDen
blade_mass = 0.0
sources = [str(ed_main_path)]
bm = re.search(r'^\s*"([^"]+)"\s+BldFile[\(]?1[\)]?', text, re.MULTILINE)
if bm:
cand = (ed_main_path.parent / bm.group(1)).resolve()
if not cand.exists():
basename = Path(bm.group(1)).name
# Search recursively under ed parent and grandparent
for root in [ed_main_path.parent, ed_main_path.parent.parent]:
hits = list(root.rglob(basename))
if hits:
cand = hits[0]
break
if cand.exists():
blade_mass = _integrate_blade_mass(cand, tip_rad - hub_rad)
sources.append(str(cand))
return RNAProperties(
hub_mass_kg=float(hub_mass),
nac_mass_kg=float(nac_mass),
blade_mass_kg=float(blade_mass),
n_blades=n_blades,
hub_iner_kgm2=float(hub_iner),
nac_yiner_kgm2=float(nac_yiner),
nac_cm_xn_m=float(nac_cm_xn),
nac_cm_zn_m=float(nac_cm_zn),
twr2shft_m=float(twr2shft),
tip_rad_m=float(tip_rad),
hub_rad_m=float(hub_rad),
source_files=sources,
)
# ---------------------------------------------------------------------------
# Convenience: build OpenSeesPy stick segments from a template
# ---------------------------------------------------------------------------
[docs]
def discretise(
tpl: TowerTemplate,
n_segments: int = 20,
) -> list[dict]:
"""
Subdivide the tower into ``n_segments`` equal-length elements and
return per-element section properties (interpolated at the element
midpoint). The resulting list is what
``opensees_foundations.builder._build_tower_stick`` consumes.
Each entry has keys: ``z_bot``, ``z_top``, ``length``, ``mass_kg_m``,
``EI_fa_Nm2``, ``EI_ss_Nm2``, ``EI_avg_Nm2``.
"""
if n_segments < 1:
raise ValueError("n_segments must be >= 1")
z0 = tpl.tower_base_z_m
H = tpl.tower_height_m
edges = np.linspace(0.0, 1.0, n_segments + 1)
elements: list[dict] = []
for i in range(n_segments):
f_lo, f_hi = edges[i], edges[i + 1]
f_mid = 0.5 * (f_lo + f_hi)
sec = tpl.section_at(f_mid)
ei_avg = 0.5 * (sec["EI_fa_Nm2"] + sec["EI_ss_Nm2"])
elements.append(
{
"z_bot": z0 + f_lo * H,
"z_top": z0 + f_hi * H,
"length": (f_hi - f_lo) * H,
"mass_kg_m": sec["mass_kg_m"] * tpl.adj_tower_mass,
"EI_fa_Nm2": sec["EI_fa_Nm2"] * tpl.adj_fa_stiff,
"EI_ss_Nm2": sec["EI_ss_Nm2"] * tpl.adj_ss_stiff,
"EI_avg_Nm2": ei_avg * 0.5 * (tpl.adj_fa_stiff + tpl.adj_ss_stiff),
}
)
return elements
# ---------------------------------------------------------------------------
# Self-test entry point
# ---------------------------------------------------------------------------
def _selftest(paths: Iterable[str | Path]) -> None:
for p in paths:
try:
tpl = load_elastodyn_tower(p)
except Exception as exc: # noqa: BLE001
print(f"[FAIL] {p}: {exc}")
continue
print(
f"[OK] {Path(p).name}: H={tpl.tower_height_m:.1f} m, "
f"stations={tpl.n_stations}, "
f"m_base={tpl.mass_density_kg_m[0]:.0f} kg/m, "
f"EI_base={tpl.ei_fa_Nm2[0]:.2e} Nm^2"
)
if __name__ == "__main__":
import sys
if len(sys.argv) > 1:
_selftest(sys.argv[1:])
else:
repo = Path(__file__).resolve().parents[2]
candidates = [
repo / "nrel_reference/openfast_rtest/5MW_OC3Mnpl_DLL_WTurb_WavesIrr"
"/NRELOffshrBsline5MW_OC3Monopile_ElastoDyn.dat",
repo / "nrel_reference/iea_15mw/OpenFAST_monopile"
"/IEA-15-240-RWT-Monopile_ElastoDyn.dat",
]
_selftest([c for c in candidates if c.exists()])