ベイズ最適化のfluentへの適用
By M.Sato
概要
Ansys Fluentを使った流体解析において得られた計算結果を、 ある目的関数を最大化(最小化)することで最適化したい場合がある。
fluentには標準でアジョイント法などの最適化ツールが実装されており、 一般的にはこの手法を使うことで、形状、境界条件、物性値などを効率よく最適化することが可能である。
ここでは、あえてfluentの解析をブラックボックス関数と見做し、ベイズ最適化を適用してみた。
流出境界における流速分布形状を目的関数とし、非ニュートン粘度のべき乗インデックス$n$を設計変数として最適化を行った。 解析の結果、最適なパラメータを求めることができた。
ベイズ最適化とは
ベイズ最適化は、高価な目的関数を効率的に最適化するための確率的手法である。
目的関数は評価するのに時間やコストがかかる場合が多いため、ベイズ最適化では可能な限り少ない評価回数で最適解を見つけることを目指している。
この手法の核となるのはガウス過程である。
ガウス過程は、関数の未知の部分を確率的にモデル化するための強力なツールである。 ガウス過程を用いることで、目的関数の予測分布を得ることができる。
新しい探索箇所を見つけるためには獲得関数が使われる。 獲得関数は、既存のモデルデータから探索と活用のバランスをとりつつ、次に評価すべき点を選ぶために使われる。
一般的な獲得関数には、期待改善量獲得関数(Expected Improvement)、 改善確率量獲得関数(Probability of Improvement)、 信頼下限獲得関数(Lower Confidence Bound)などがある。
新しいサンプル点での目的関数値から確率推定モデルを更新し、最適値を改善する処理を繰り返すことになる。
Fluentとscikit-optimizeを使った解析例
Fluentの解析モデル
今回の最適化解析で使用するfluentの解析モデルおよび計算結果の例を示す。
解析対象領域は、幅0.01[m]の曲線流路である。
左下の流入境界から流体が入り、上部の流出境界から流出する。
流入速度分布は、最大流速 0.01[m/s]の放物線分布とした。
粘度は、fluentに組み込みの非ニュートン粘度モデルを使用した。
$$ \eta = k\dot{\gamma}^{n-1} $$
ここで、$\eta[Pa.s]$は粘度、$\dot{\gamma}[1/sec]$はせん断速度であり、$k=100$と仮定した。
最適化の過程で、非ニュートン粘度のべき乗指数$n$は変化する。
以下は$n=0.5$の場合の計算結果である。
最適化解析のフローチャート
全体の最適化解析の概略流れ図を示す。
今回の最適化解析では、流出境界における流速分布形状をできるだけ円形分布に近くなるようにする。
解析では非ニュートン粘度のべき乗指数$n$をパラメータとして変化させる。 得られた流出境界における流速値を使い、次式で目的関数$f$を定義する。
$$ f=\left[ \frac{1}{N} \Sigma_{i=1}^N (u_i-\hat{u_i})^2 \right]^{1/2} $$
ここで $u_i$は(ある$n$値に対して)計算された流速値、$\hat{u_i}$は目標とする半円形の流速値、$N$はデータ点数である。 $n=0.05,0.50,0.95$における流出部流速分布と半円形の流速分布を比較した図を示す。
なお、ここではテストのために予め$n$値を$0.05$から$0.95$まで変化させた場合の計算を行い、目的関数の特性を調べておいた。 グラフで表示すると以下のようになる。
目的関数は$n=0.18$付近で最小値をとることがわかる。
なおこれは最適化の計算結果をわかりやすくするために使っており、 実際の最適化計算では、この目的関数の回帰式は使っていない(本来は未知のブラックボックス関数)。
解析手法
動作環境
ここでは、全ての処理をpythonから操作する。
pyfluentライブラリの必要動作環境に合わせて、python v3.11を使うようにした。
なお他のpythonライブラリと干渉するのを避けるため、venvモジュールを使い仮想環境を作成し、 その中で解析を実行するようにした(以下は仮想環境使用例。プロンプトの先頭に仮想環境名(ここではvenv311)が表示される)
使用するモジュール
以下のモジュールを仮想環境にインストールした。
pythonにはいくつかのベイズ最適化モジュールが利用可能であるが、ここではscikit-optimizeというモジュールを使った。
pip install ansys-fluent-core
pip install scikit-optimize
目的関数の評価
fluentの計算で得られた流出境界における流速分布は一度ファイルに書き出し、そのファイルを読み込むことで目的関数を評価するようにした。
fluentのケースデータ
予め雛形となるケースファイル(ここでは test.cas.h5)を作成した。
設計変数$n$を変更する際はpyfluentを使い、設定値を更新するようにした。
最適化の設定
設計変数$n$の探索範囲は、$0.05$から$0.95$までとした。
初期のサンプル点として$n=0.3, 0.8$を使用した。
ガウス過程のカーネル関数にはMaternカーネルを使用した。
獲得関数にはLCBを使用し$kappa=1.96$(95%信頼区間)を設定した。
解析結果
収束履歴を示す。
ここで横軸は$n$値の変更回数を示している(ベイズ更新)。
大体5回程度の更新で目的関数は最小値になっていることがわかる。
以下に各反復ごとの平均関数、信頼区間の変動する様子を示す。
上側の図中の赤破線は目的関数(真値)、緑破線はガウス過程により近似された平均関数、緑帯領域は信頼区間を表している。
また下側の図には獲得関数を示す。
探索の結果得られた次回候補点を丸印で示している。
横軸はパラメータ$n$値になっている。初期データ$n=0.3,0.8$を設定して計算を行っている。
計算が収束するのに伴い$n$値が$0.18$付近に近づく様子がわかる。
最終的に得られたパラメータの更新履歴図を示す。
最適値$n=0.1853$を使った場合の流出境界の流速分布を示す。
プログラムソースコード
以下に今回の解析で使用したpythonプログラム(run.py)を示す。 このプログラムの実行は、以下のようにする。
- 作業ディレクトリ内(例えばworking-v311)にfluentの基準ケースの データ(test.cas.h5およびtest.dat.h5)を保存。またpythonプログラム(run.py)も保存しておく。
- コマンドプロンプト内で python run.py でプログラムを実行する
import ansys.fluent.core as pyfluent
import os
import glob
import re
from math import sqrt
import numpy as np
np.random.seed(237)
import matplotlib.pyplot as plt
from skopt.plots import plot_gaussian_process
from skopt.plots import plot_convergence
from skopt import gp_minimize
count = 0
################################################################
def run_fluent(x):
global count
count += 1
solver = pyfluent.launch_fluent(
precision="double",
version="2d",
processor_count=1,
mode="solver",
)
tui = solver.tui
trnfile = "log-{:02d}-{:f}.trn".format(count, x)
solver.file.start_transcript(file_name=trnfile)
solver.file.read_case(file_name="test.cas.h5")
solver.file.read_data(file_name="test.dat.h5")
solver.setup.materials.fluid['air'] = {
"viscosity" : {
"option" : "non-newtonian-power-law",
"non_newtonian_power_law" : {
"option" : "shear-rate-dependent",
"maximum_viscosity" : 0.,
"minimum_viscosity" : 0.,
"power_law_index" : x,
"consistency_index" : 100.
}
}
}
solver.solution.initialization.initialization_type = "standard"
solver.solution.initialization.standard_initialize()
solver.solution.run_calculation.iterate(iter_count=100)
xyfile = "uy-{:02d}-{:f}.xy".format(count, x)
tui.plot.plot("yes",xyfile,"yes","no","no","y-velocity","yes","1","0","outlet","()")
# ident = "test-{:02d}-{:f}".format(count, x)
# solver.file.write_case(file_name=ident+".cas.h5")
# solver.file.write_data(file_name=ident+".dat.h5")
solver.file.stop_transcript()
################################################################
def extract_data_from_file(filename):
positions = []
velocities = []
with open(filename, 'r') as file:
lines = file.readlines()
# Regex to match data lines
data_pattern = re.compile(r'^\s*([\d\.]+)\s+([\d\.Ee+-]+)\s*$')
# Flag to indicate when we are inside the data section
inside_data = False
for line in lines:
if '((xy/key/label' in line:
inside_data = True
continue
if inside_data:
match = data_pattern.match(line)
if match:
position = float(match.group(1))
velocity = float(match.group(2))
positions.append(position)
velocities.append(velocity)
elif line.strip() == ')':
break
return velocities
################################################################
def cleanup_files():
# 削除したいファイルのパターンリスト
file_patterns = [
'log-*.trn',
'uy-*.xy',
'test-*.h5'
]
# 全てのパターンにマッチするファイルを格納するリスト
all_files = []
# 各パターンに対してファイルを取得し、all_filesリストに追加
for pattern in file_patterns:
all_files.extend(glob.glob(pattern))
# 重複を除外(必要に応じて)
all_files = list(set(all_files))
# ファイルを一括削除
for file_path in all_files:
try:
os.remove(file_path)
print(f"{file_path} は正常に削除されました。")
except FileNotFoundError:
print(f"{file_path} が見つかりません。")
except PermissionError:
print(f"{file_path} の削除に必要な権限がありません。")
except Exception as e:
print(f"{file_path} の削除中にエラーが発生しました: {e}")
################################################################
def calculate_objective(vel):
target = ( # <-- semi-circular profile (outlet)
0.0,
0.004178490302970204,
0.006395205576424277,
0.007981295678046266,
0.00858,
0.007981295678046266,
0.006395205576424275,
0.004178490302970204,
0.0,
)
n = len(target)-2 # exclude two end points where uy=0
err = 0
for i in range(1,n+1):
err += (target[i]-vel[i])**2
return sqrt(err/n)
#############################################################################
def polyfunc(x):
"""
approximated objective function. this function is used to plot only.
as a result, the calculated minimum value is not
exactly match with actual objective
"""
if x[0] <= 0.5:
val = \
-2.494e-3 * x[0]**6 \
+5.564e-2 * x[0]**5 \
-8.284e-2 * x[0]**4 \
+4.235e-2 * x[0]**3 \
-5.643e-3 * x[0]**2 \
-4.136e-4 * x[0] \
+5.241e-4
else:
val = \
-1.313e-3 * x[0]**6 \
+6.371e-3 * x[0]**5 \
-1.230e-2 * x[0]**4 \
+1.196e-2 * x[0]**3 \
-6.446e-3 * x[0]**2 \
+3.064e-3 * x[0] \
-1.028e-4
return val
#############################################################################
def objective_func(x):
run_fluent(x[0])
xyfile = "uy-{:02d}-{:f}.xy".format(count, x[0])
vel = extract_data_from_file(xyfile)
err = calculate_objective(vel)
print("calculate_objective has finished. count={} x={} err={}".format(count,x[0],err))
return err
#############################################################################
# main routine start
#############################################################################
noise_level = 0.0
cleanup_files()
n_calls = 10
res = gp_minimize(objective_func, # the function to minimize
[(0.05, 0.95)], # the bounds on each dimension of x
acq_func="LCB", # the acquisition function
kappa=1.96,
n_calls=n_calls, # the number of evaluations of f
n_initial_points=0, # the number of random initialization points
x0=[[0.3], [0.8]],
random_state=11) # the random seed
#############################################################################
# Accordingly, the approximated minimum is found to be:
"x^*=%.4f, f(x^*)=%.4f" % (res.x[0], res.fun)
#############################################################################
# For further inspection of the results, attributes of the `res` named tuple
# provide the following information:
#
# - `x` [float]: location of the minimum.
# - `fun` [float]: function value at the minimum.
# - `models`: surrogate models used for each iteration.
# - `x_iters` [array]:
# location of function evaluation for each iteration.
# - `func_vals` [array]: function value for each iteration.
# - `space` [Space]: the optimization space.
# - `specs` [dict]: parameters passed to the function.
print(res)
with open('result.dat', 'w') as f:
for key, value in res.items():
f.write(f"{key}: {value}\n")
#############################################################################
# Together these attributes can be used to visually inspect the results of the
# minimization, such as the convergence trace or the acquisition function at
# the last iteration:
ax = plot_convergence(res);
ax.plot()
#plt.show()
plotfile = "convergence.png"
plt.savefig(plotfile)
plt.close()
#############################################################################
plt.rcParams["figure.figsize"] = (8, 8)
for n_iter in range(n_calls-2):
# Plot objective function.
plt.subplot(2, 1, 1)
ax = plot_gaussian_process(res, n_calls=n_iter,
# objective=objective_func,
objective=polyfunc,
noise_level=noise_level,
show_legend=True, show_title=True,
show_mu=True,
show_observations=True,
show_next_point=False, show_acq_func=False)
ax.set_ylabel("Objective")
ax.set_ylim(0.0004, 0.0014)
ax.set_xlim(0.0, 1.0)
# Plot acquisition function.
plt.subplot(2, 1, 2)
ax = plot_gaussian_process(res, n_calls=n_iter,
show_legend=True, show_title=False,
show_mu=False, show_acq_func=True,
show_observations=False,
show_next_point=True)
ax.set_ylabel("LCB")
ax.set_xlabel("n-value")
ax.set_ylim(-0.0012, -0.0000)
ax.set_xlim(0.0, 1.0)
ax.plot()
# plt.show()
plotfile = "plot-{:02d}.png".format(n_iter)
plt.savefig(plotfile)
plt.close()
#############################################################################
# final result
plt.rcParams["figure.figsize"] = (6, 4)
ax = plot_gaussian_process(res,
# objective=objective_func,
objective=polyfunc,
noise_level=noise_level)
ax.plot()
#plt.show()
plotfile = "final.png"
plt.savefig(plotfile)
plt.close()
まとめ
fluentを使い、ベイズ最適化を実施した例を示した。
今回の解析は、設計変数は一個であり、また目的関数は単峰な形状となって極めて簡単なモデルになっているが、 ベイズ最適化を使うことで最適解が得られることが分かった。
弊社の解析事例
弊社の流体解析事例については、下記のリンクからご覧ください.