Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 115 additions & 9 deletions src/foamai-core/foamai_core/boundary_condition.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,16 @@ def map_boundary_conditions_to_patches(
"walls": ["walls", "top", "bottom"],
"sides": ["sides", "front", "back"]
},
GeometryType.NOZZLE: {
"inlet": ["inlet"],
"outlet": ["outlet"],
"nozzle": ["nozzle"],
"top": ["top"],
"bottom": ["bottom"],
"front": ["front"],
"back": ["back"],
"walls": ["top", "bottom", "front", "back"] # Keep for backward compatibility
},
GeometryType.CUSTOM: {
# STL geometries - external flow around custom geometry
"inlet": ["inlet"],
Expand Down Expand Up @@ -403,8 +413,12 @@ def generate_boundary_conditions_with_mapping(
) -> Dict[str, Any]:
"""Generate boundary conditions with intelligent patch mapping."""

# Get solver type for compressible flow detection
solver_info = state.get("solver_settings", {})
solver_type = solver_info.get("solver_type")

# First, generate standard boundary conditions
boundary_conditions = generate_boundary_conditions(parsed_params, geometry_info, mesh_config)
boundary_conditions = generate_boundary_conditions(parsed_params, geometry_info, mesh_config, solver_type)

# Check if we can read actual mesh patches (post-mesh generation)
case_directory = state.get("case_directory")
Expand Down Expand Up @@ -471,7 +485,7 @@ def merge_ai_boundary_conditions(
return existing_conditions


def generate_boundary_conditions(parsed_params: Dict[str, Any], geometry_info: Dict[str, Any], mesh_config: Dict[str, Any]) -> Dict[str, Any]:
def generate_boundary_conditions(parsed_params: Dict[str, Any], geometry_info: Dict[str, Any], mesh_config: Dict[str, Any], solver_type: SolverType = None) -> Dict[str, Any]:
"""Generate complete boundary condition configuration."""
flow_type = parsed_params.get("flow_type", FlowType.LAMINAR)
velocity = parsed_params.get("velocity", 1.0)
Expand All @@ -481,7 +495,7 @@ def generate_boundary_conditions(parsed_params: Dict[str, Any], geometry_info: D
# Generate field files
boundary_conditions = {
"U": generate_velocity_field(parsed_params, geometry_info, mesh_config),
"p": generate_pressure_field(parsed_params, geometry_info, mesh_config),
"p": generate_pressure_field(parsed_params, geometry_info, mesh_config, solver_type),
}

# Add turbulence fields if needed
Expand Down Expand Up @@ -834,21 +848,61 @@ def generate_velocity_field(parsed_params: Dict[str, Any], geometry_info: Dict[s
velocity_field["boundaryField"][stl_name] = {
"type": "noSlip" # Wall boundary on STL surface
}
elif geometry_type == GeometryType.NOZZLE:
# Internal flow through nozzle - using snappyHexMesh
velocity_field["boundaryField"] = {
"inlet": {
"type": "fixedValue",
"value": f"uniform {velocity_vector}"
},
"outlet": {
"type": "zeroGradient"
},
"nozzle": {
"type": "noSlip" # Nozzle surface as no-slip wall
},
"top": {
"type": "slip" # Far field boundary
},
"bottom": {
"type": "slip" # Far field boundary
},
"front": {
"type": "slip" # Far field boundary
},
"back": {
"type": "slip" # Far field boundary
}
}

return velocity_field


def generate_pressure_field(parsed_params: Dict[str, Any], geometry_info: Dict[str, Any], mesh_config: Dict[str, Any]) -> Dict[str, Any]:
def generate_pressure_field(parsed_params: Dict[str, Any], geometry_info: Dict[str, Any], mesh_config: Dict[str, Any], solver_type: SolverType = None) -> Dict[str, Any]:
"""Generate pressure field (p) boundary conditions."""
pressure = parsed_params.get("pressure", 0.0)

# Determine if this is a compressible flow solver
is_compressible_solver = solver_type in [
SolverType.RHO_PIMPLE_FOAM,
SolverType.SONIC_FOAM,
SolverType.CHT_MULTI_REGION_FOAM, # Can be compressible
SolverType.REACTING_FOAM # Can be compressible
]

# For incompressible flow simulations, use gauge pressure (0) instead of absolute pressure
# This ensures proper pressure gradients for visualization
# Check if pressure looks like absolute pressure (atmospheric ~101325 Pa)
if pressure > 50000: # Anything above 50kPa is likely absolute pressure
# Convert to gauge pressure for incompressible flow
logger.info(f"Converting absolute pressure ({pressure} Pa) to gauge pressure (0 Pa) for incompressible flow")
# For compressible flow simulations, keep absolute pressure
if pressure > 50000 and not is_compressible_solver: # Anything above 50kPa is likely absolute pressure
# Convert to gauge pressure for incompressible flow only
logger.info(f"Converting absolute pressure ({pressure} Pa) to gauge pressure (0 Pa) for incompressible flow solver {solver_type}")
pressure = 0.0 # Use gauge pressure for incompressible flow
elif pressure > 50000 and is_compressible_solver:
# Keep absolute pressure for compressible flows
logger.info(f"Keeping absolute pressure ({pressure} Pa) for compressible flow solver {solver_type}")
elif pressure <= 50000 and is_compressible_solver and pressure <= 0:
# For compressible flows, ensure we don't have zero or negative pressure
logger.warning(f"Compressible flow solver {solver_type} requires positive absolute pressure. Setting to atmospheric pressure (101325 Pa)")
pressure = 101325.0 # Default to atmospheric pressure

geometry_type = geometry_info["type"]
flow_context = geometry_info.get("flow_context", {})
Expand Down Expand Up @@ -1093,6 +1147,32 @@ def generate_pressure_field(parsed_params: Dict[str, Any], geometry_info: Dict[s
pressure_field["boundaryField"][stl_name] = {
"type": "zeroGradient" # No pressure gradient at STL surface
}
elif geometry_type == GeometryType.NOZZLE:
# Internal flow through nozzle - using snappyHexMesh
pressure_field["boundaryField"] = {
"inlet": {
"type": "zeroGradient" # Let velocity drive the flow
},
"outlet": {
"type": "fixedValue",
"value": f"uniform {pressure}" # Fix pressure at outlet
},
"nozzle": {
"type": "zeroGradient" # No pressure gradient at nozzle surface
},
"top": {
"type": "zeroGradient" # Far field boundary
},
"bottom": {
"type": "zeroGradient" # Far field boundary
},
"front": {
"type": "zeroGradient" # Far field boundary
},
"back": {
"type": "zeroGradient" # Far field boundary
}
}

return pressure_field

Expand Down Expand Up @@ -2450,6 +2530,32 @@ def generate_temperature_field(parsed_params: Dict[str, Any], geometry_info: Dic
temp_field["boundaryField"][stl_name] = {
"type": "zeroGradient" # No temperature gradient at STL surface
}
elif geometry_type == GeometryType.NOZZLE:
# Internal flow through nozzle - using snappyHexMesh
temp_field["boundaryField"] = {
"inlet": {
"type": "fixedValue",
"value": f"uniform {temperature}"
},
"outlet": {
"type": "zeroGradient"
},
"nozzle": {
"type": "zeroGradient" # No temperature gradient at nozzle surface
},
"top": {
"type": "zeroGradient" # Far field boundary
},
"bottom": {
"type": "zeroGradient" # Far field boundary
},
"front": {
"type": "zeroGradient" # Far field boundary
},
"back": {
"type": "zeroGradient" # Far field boundary
}
}

return temp_field

Expand Down
121 changes: 111 additions & 10 deletions src/foamai-core/foamai_core/case_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,8 @@ def get_resolution_for_geometry(geom_type: str, res_str: str) -> Dict[str, int]:
return {"circumferential": count, "radial": count//2, "meridional": count//2}
elif geom_type == "cube":
return {"x": count, "y": count, "z": count}
elif geom_type == "nozzle":
return {"x": count, "y": count, "z": count}
else:
return {"x": count, "y": count, "z": count}

Expand All @@ -698,6 +700,8 @@ def get_resolution_for_geometry(geom_type: str, res_str: str) -> Dict[str, int]:
return generate_sphere_blockmesh_dict(dimensions, resolution)
elif geometry_type == "cube":
return generate_cube_blockmesh_dict(dimensions, resolution)
elif geometry_type == "nozzle":
return generate_nozzle_blockmesh_dict(dimensions, resolution)
else:
raise ValueError(f"Unsupported geometry type for blockMesh: {geometry_type}")

Expand Down Expand Up @@ -1379,6 +1383,38 @@ def write_solver_files(case_directory: Path, state: CFDState) -> None:
# Write transportProperties
write_foam_dict(case_directory / "constant" / "transportProperties", solver_settings["transportProperties"])

# Write GPU-specific configuration files
if state.get("use_gpu", False):
gpu_info = state.get("gpu_info", {})
if gpu_info.get("gpu_backend") == "amgx":
# Write AmgX configuration file
amgx_config = {
"config_version": 2,
"solver": {
"preconditioner": {
"scope": "precond",
"solver": "BLOCK_JACOBI"
},
"solver": "PCG",
"print_solve_stats": 1,
"obtain_timings": 1,
"max_iters": 200,
"monitor_residual": 1,
"store_res_history": 1,
"scope": "main",
"convergence": "RELATIVE_INI_CORE",
"tolerance": 1e-03
}
}

# Write AmgX configuration as JSON
import json
with open(case_directory / "system" / "amgxOptions", "w") as f:
json.dump(amgx_config, f, indent=4)

if state["verbose"]:
logger.info("Case Writer: Wrote AmgX configuration file")

# Write solver-specific files
# interFoam specific files
if "g" in solver_settings:
Expand Down Expand Up @@ -1452,19 +1488,21 @@ def write_solver_files(case_directory: Path, state: CFDState) -> None:
if state["verbose"]:
logger.info("Case Writer: Wrote p_rgh field for interFoam")

# rhoPimpleFoam specific files
# Compressible solver files (rhoPimpleFoam, sonicFoam, etc.)
if "thermophysicalProperties" in solver_settings:
write_foam_dict(case_directory / "constant" / "thermophysicalProperties", solver_settings["thermophysicalProperties"])
if state["verbose"]:
logger.info("Case Writer: Wrote thermophysicalProperties for rhoPimpleFoam")
solver_name = solver_settings.get("solver", "compressible solver")
logger.info(f"Case Writer: Wrote thermophysicalProperties for {solver_name}")

if "T" in solver_settings:
# Only write temperature field if it doesn't already exist from boundary conditions
temp_file_path = case_directory / "0" / "T"
if not temp_file_path.exists():
write_foam_dict(temp_file_path, solver_settings["T"])
if state["verbose"]:
logger.info("Case Writer: Wrote initial temperature field T for rhoPimpleFoam")
solver_name = solver_settings.get("solver", "compressible solver")
logger.info(f"Case Writer: Wrote initial temperature field T for {solver_name}")
else:
if state["verbose"]:
logger.info("Case Writer: Temperature field T already exists from boundary conditions")
Expand Down Expand Up @@ -1930,12 +1968,9 @@ def validate_case_structure(case_directory: Path) -> Dict[str, Any]:
if not (case_directory / "0" / "alpha.water").exists():
errors.append("Missing alpha.water field for interFoam")

# Check for rhoPimpleFoam specific files
elif solver == "rhoPimpleFoam":
if not (case_directory / "constant" / "thermophysicalProperties").exists():
errors.append("Missing thermophysicalProperties for rhoPimpleFoam")
if not (case_directory / "0" / "T").exists():
errors.append("Missing temperature field T for rhoPimpleFoam")
# For compressible solvers, files are written based on solver_settings
# so we don't need to check for file existence here since this validation
# runs before the files are actually written

# Check boundary condition files
zero_dir = case_directory / "0"
Expand Down Expand Up @@ -2298,4 +2333,70 @@ def generate_snappyhexmesh_dict(mesh_config: Dict[str, Any], state: CFDState) ->
"levels": [(1e15, snappy_settings.get("refinement_levels", {}).get("max", 2) - 1)]
}

return snappy_dict
return snappy_dict


def generate_nozzle_blockmesh_dict(dimensions: Dict[str, float], resolution: Dict[str, int]) -> Dict[str, Any]:
"""Generate blockMeshDict for nozzle geometry."""
# Extract nozzle dimensions
length = dimensions.get("length", 0.3)
max_diameter = dimensions.get("max_diameter", 0.1)

# For nozzle, create a rectangular domain that encompasses the nozzle
# The actual nozzle profile will be handled by boundary conditions
domain_length = length
domain_height = max_diameter * 2.0 # 2x max diameter
domain_width = max_diameter * 2.0 # 2x max diameter

# Cell counts
nx = resolution.get("x", 60)
ny = resolution.get("y", 30)
nz = resolution.get("z", 30)

# Check if this is 2D or 3D
is_2d = nz == 1

return {
"convertToMeters": 1.0,
"vertices": [
"(0 0 0)",
f"({domain_length} 0 0)",
f"({domain_length} {domain_height} 0)",
f"(0 {domain_height} 0)",
f"(0 0 {domain_width})",
f"({domain_length} 0 {domain_width})",
f"({domain_length} {domain_height} {domain_width})",
f"(0 {domain_height} {domain_width})"
],
"blocks": [
f"hex (0 1 2 3 4 5 6 7) ({nx} {ny} {nz}) simpleGrading (1 1 1)"
],
"edges": [],
"boundary": {
"inlet": {
"type": "patch",
"faces": ["(0 4 7 3)"]
},
"outlet": {
"type": "patch",
"faces": ["(1 2 6 5)"]
},
"top": {
"type": "slip",
"faces": ["(3 7 6 2)"]
},
"bottom": {
"type": "slip",
"faces": ["(0 1 5 4)"]
},
"front": {
"type": "slip",
"faces": ["(0 3 2 1)"]
},
"back": {
"type": "slip",
"faces": ["(4 5 6 7)"]
}
},
"mergePatchPairs": []
}
Loading
Loading