# -*- coding: utf-8 -*-
#
# This file is subject to the terms and conditions defined in
# file 'LICENSE', which is part of this source code package.
#
"""For more details about OpenFOAM format, we refer to section 4.1.2 the polymesh description in
https://www.openfoam.com/documentation/user-guide/4-mesh-generation-and-conversion/4.1-mesh-description
see also
https://openfoamwiki.net/index.php/Write_OpenFOAM_meshes
see also
https://www.cfd-online.com/Forums/openfoam-meshing/61656-creating-your-own-mesh-files.html
"""
import os
import itertools
import time
from typing import Iterable,Optional
import numpy as np
import Muscat.MeshContainers.ElementsDescription as ED
from Muscat.MeshContainers.Mesh import Mesh
from Muscat.IO.WriterBase import WriterBase
from Muscat.Helpers.Logger import Info
[docs]class FoamWriter(WriterBase):
def __init__(self,location:Optional[str]=None):
"""Constructor (can only handle ASCII output for now, binary to implement)
Parameters
----------
location : str
main directory output path
"""
super(FoamWriter,self).__init__()
self._isBinary = False
self.location = location
[docs] def Write(self,
meshObject:Mesh,
physicalTagNames:Iterable,
PointFields:Optional[Iterable]=None,
PointFieldsNames:Optional[str]=None,
CellFields:Optional[Iterable]=None,
CellFieldsNames:Optional[str]=None
):
"""Write mesh in FOAM format
Parameters
----------
meshObject : Mesh
Original mesh to be written
physicalTagNames : Iterable
collection of physical tag names to be used in a FOAM analysis
PointFields : Optional[Iterable], optional
Values of nodal fields to be written if required, by default None
PointFieldsNames : Optional[str], optional
Names of nodal fields to be written if required, by default None
CellFields : Optional[Iterable], optional
Values of cell fields to be written if required, by default None
CellFieldsNames : Optional[str], optional
Names of cell fields to be written if required, by default None
"""
if PointFields is not None or PointFieldsNames is not None:
raise NotImplementedError("Can not write nodal fields for now")
start = time.time()
os.makedirs(self.location, exist_ok=True)
self.WritePoints(filename="points",mesh=meshObject)
self.WriteFaceCaracteristics(mesh=meshObject,physicalTagNames=physicalTagNames)
if CellFields is not None:
assert len(CellFields)==len(CellFieldsNames),"Number of cell fields names should match number of cell fields values provided"
numTimeStepsForFields = set(len(fieldValue) for fieldValue in CellFields)
assert len(numTimeStepsForFields)==1, "All fields do not have the same number of time step! Not allowed"
numTimeSteps = list(numTimeStepsForFields)[0]
for stepNum in range(numTimeSteps):
stepPath = os.path.join(self.location,str(stepNum))
os.mkdir(stepPath)
for CellFieldsName,CellField in zip(CellFieldsNames,CellFields):
self.WriteSingleNodalField(CellFieldsName,CellField)
Info("Time to write mesh: "+str(time.time() - start))
[docs] def WriteSingleNodalField(self,fieldName:str,fieldValue:Iterable)->str:
"""Write single nodal field in file
Parameters
----------
fieldName : str
field name to be written
fieldValue : Iterable
field value
"""
location = self.location
for singleFieldValueStepId,singleFieldValue in enumerate(fieldValue):
fieldLocation = os.path.join(location,str(singleFieldValueStepId),fieldName)
with open(fieldLocation, "w+") as fieldFile:
header = CreateHeader(filePath=str(singleFieldValueStepId),
keyWordClass="volVectorField",
keyWordObject=fieldName)
fieldFile.write(header)
fieldFile.write("\ndimensions [0 1 -1 0 0 0 0];\n")
fieldFile.write("internalField nonuniform List<vector>\n")
fieldFile.write(str(len(singleFieldValue))+"\n")
fieldFile.write("(\n")
for nodalValue in singleFieldValue:
fieldFile.write("("+" ".join(map(str,nodalValue))+")\n")
fieldFile.write(")\n;\n")
[docs] def WritePoints(self,filename:str,mesh:Mesh):
"""Write nodes coordinates in FOAM format
Parameters
----------
filename : str
Output file name
meshObject : Mesh
Original mesh to be written
"""
filePath = os.path.join(self.location,filename)
header = CreateHeader(filePath="polyMesh",keyWordClass="vectorField",keyWordObject="points")
with open(filePath, "w+") as pointFile:
pointFile.write(header)
pointFile.write("\n\n"+str(mesh.GetNumberOfNodes()))
nodes = mesh.nodes
pointFile.write("\n(\n")
for node in nodes:
pointFile.write("("+str(node[0])+" "+str(node[1])+" "+str(node[2])+")\n")
pointFile.write(")\n")
endLine = r"""// ************************************************************************* //"""
pointFile.write("\n\n"+endLine)
[docs] def WriteFaceCaracteristics(self,mesh:Mesh,physicalTagNames:Iterable):
"""Write face caracteristics, including:
- boundary file (related to physical tag names in physical problem)
- faces file
- owner file
- neighbour file
Parameters
----------
mesh : Mesh
Mesh to be written
physicalTagNames : Iterable
collection of physical tag names used in physical problem setting
"""
internalFacesCarac,externalFacesCarac= self.ComputeFaceCaracteristics(mesh)
nbInternalFace = len(internalFacesCarac[0])
internalFacesCarac = self.RenumberInternalFaces(internalFacesCarac)
externalFacesCarac, faceIdsByTagName = self.RenumberExternalFacesForBoundary(externalFacesCarac,mesh,physicalTagNames)
self.WriteFaceBoundary(filename="boundary",nbInternalFace=nbInternalFace,faceIdsByTagName=faceIdsByTagName)
internalTriaFaces,externalTriaFaces = internalFacesCarac[0],externalFacesCarac[0]
triaFaces = np.concatenate([internalTriaFaces,externalTriaFaces])
internalFaceTetra, externalFaceTetra = internalFacesCarac[1],externalFacesCarac[1]
facesTetra = np.concatenate([internalFaceTetra,externalFaceTetra])
triaFaces = self.RenumerateFaceConnectivity(triaFaces,facesTetra,mesh)
self.WriteFaceConnectivity(filename="faces",triaFaces=triaFaces)
nbPoints = mesh.GetNumberOfNodes()
nbCells = mesh.GetElementsOfType(ED.Tetrahedron_4).GetNumberOfElements()
nInternalFaces = len(internalTriaFaces)
nExternalFaces = len(externalTriaFaces)
nAllFaces = nInternalFaces+nExternalFaces
note = "note \"nPoints:"+str(nbPoints)+" nCells:"+str(nbCells)+" nFaces:"+str(nAllFaces)+" nInternalFaces:"+str(nInternalFaces)+"\";"
self.WriteFaceNeighbour(filename="neighbour",internalFacesNeighbours=internalFacesCarac[2],note=note)
internalFaceOwner,externalFaceOwner = internalFacesCarac[1],externalFacesCarac[1]
facesOwner= np.concatenate([internalFaceOwner,externalFaceOwner])
self.WriteFaceOwner(filename="owner",facesOwner=facesOwner,note=note)
[docs] def ComputeFaceCaracteristics(self,mesh:Mesh)->tuple:
"""Compute internal/external triangular face caracteristics:
- internal (2 tetra faces in common):
triangular faces connectivity
faces tetra (owners) indices (lowest tetra indices linked to internal faces)
face tetra (neighbours) indices (other tetra indices linked to internal faces)
- external:
triangular faces connectivity
faces tetra indices (tetra indices linked to external faces)
Parameters
----------
mesh : Mesh
mesh to be written
Returns
-------
tuple
internal/external face caracteristics
"""
faceNumbers, tetraCorresponding, triaFaces = self.ComputeFaceNumbering(mesh)
internalFaces = np.array(list(filter(lambda x: x.size > 1, faceNumbers)))
internalFacesTetra = tetraCorresponding[internalFaces]
internalTriaFaces = np.array(triaFaces)[internalFaces]
np.testing.assert_almost_equal(internalTriaFaces[:,0,:],internalTriaFaces[:,1,:])
assert internalFacesTetra.shape[-1]==2,"Only for acceptable FE mesh (faces can only belong to 2 volumic elements at most)"
internalFacesOwner = np.min(internalFacesTetra,axis=1)
internalFacesNeighbours = np.max(internalFacesTetra,axis=1)
renumbering = np.argsort(internalFacesOwner)
internalFacesOwner = internalFacesOwner[renumbering]
internalFacesNeighbours = internalFacesNeighbours[renumbering]
internalTriaFaces = internalTriaFaces[renumbering][:,0,:]
externalFaces = np.array(list(filter(lambda x: x.size == 1, faceNumbers)))
externalTriaFaces = np.array(triaFaces)[externalFaces][:,0,:]
externalFacesTetra = tetraCorresponding[externalFaces][:,0]
renumbering = np.argsort(externalFacesTetra)
externalTriaFaces = externalTriaFaces[renumbering]
externalFacesTetra = externalFacesTetra[renumbering]
return (internalTriaFaces,internalFacesOwner,internalFacesNeighbours),(externalTriaFaces,externalFacesTetra)
[docs] def ComputeFaceNumbering(self,mesh:Mesh)->tuple:
"""Compute face numbering from tetrahedron:
- Extract all faces associated to each individual tetrahedron and map it to associated tetrahedron
- Collect unique triangle faces, defined unique integer for each, map all triangles faces to such integer
- Build unique face numbering mapping to corresponding triangle faces numbering
For instance, in faceNumbers = [array([8]), array([13]), array([10, 14])...]
it means unique triangular face 2 can be found in triangular face 10 and 14
Note that, for tetrahedron, a triangular face can belong to 1 (external face) or 2 tetrahedrons (internal face)
Parameters
----------
mesh : Mesh
mesh to be written
Returns
-------
tuple
unique face number mapping to all triangular faces,tetrahedron indices associated to each face,triangle face connectivity
"""
bodyElements = mesh.GetElementsOfType(ED.Tetrahedron_4)
myCombo = list(itertools.combinations(range(4),3))
nbTetra = bodyElements.GetNumberOfElements()
triaToTetraMapping = np.sort(bodyElements.connectivity[:,myCombo].reshape(nbTetra*4,3))
tetraIndex = np.repeat(np.arange(nbTetra), 4)
triaToTetraMapping = np.c_[ triaToTetraMapping, tetraIndex ]
triaFaces = triaToTetraMapping[:,:3].tolist()
tetraCorresponding = triaToTetraMapping[:,3]
uniqueTriaFaces = np.unique(triaFaces,axis=0).tolist()
triaToInt = dict(zip(list(map(tuple,uniqueTriaFaces)),range(len(uniqueTriaFaces))))
triaFacesAsInt = np.fromiter(map(triaToInt.get, map(tuple,triaFaces)),dtype=int)
idxSort = np.argsort(triaFacesAsInt)
sortedTriaFacesAsInt = triaFacesAsInt[idxSort]
_, idxStart = np.unique(sortedTriaFacesAsInt, return_index=True)
faceNumbers = np.split(idxSort, idxStart[1:])
return faceNumbers, tetraCorresponding, triaFaces
[docs] def RenumberInternalFaces(self,internalFacesCarac:tuple)->tuple:
"""Renumber internal faces: OpenFOAM is slightly picky about face ordering.
Internal faces get ordered such that when stepping through the higher numbered neighbouring cells in
incremental order one also steps through the corresponding faces in incremental order (upper-triangular ordering)
Parameters
----------
internalFacesCarac : tuple
unique face number mapping to all triangular faces,tetrahedron indices associated to each face,triangle face connectivity
Returns
-------
tuple
renumerated internal face caracteristics
"""
internalTriaFaces,internalFacesOwner,internalFacesNeighbours = internalFacesCarac
_, idxStart = np.unique(internalFacesOwner, return_index=True)
faceIndicesPerTetraOwner = np.split(np.arange(len(internalFacesOwner)), idxStart[1:])
internalFaceMapping = np.arange(len(internalFacesOwner))
startingIndex = 0
for faceIndicesInTetra in faceIndicesPerTetraOwner:
internalFacesNeighboursInTetra = internalFacesNeighbours[faceIndicesInTetra]
indicesSort = np.argsort(internalFacesNeighboursInTetra)
internalFaceMapping[faceIndicesInTetra] = startingIndex+indicesSort
startingIndex += len(faceIndicesInTetra)
renumerotedInternalFaces = internalTriaFaces[internalFaceMapping]
renumerotedInternalFacesOwner = internalFacesOwner[internalFaceMapping]
renumerotedinternalFacesNeighbours = internalFacesNeighbours[internalFaceMapping]
renumerotedInternalFacesCarac = renumerotedInternalFaces,renumerotedInternalFacesOwner,renumerotedinternalFacesNeighbours
return renumerotedInternalFacesCarac
[docs] def RenumberExternalFacesForBoundary(self,externalFacesCarac:tuple,mesh:Mesh,physicalTagNames:Iterable):
"""Renumber external faces: OpenFOAM is slightly picky about face ordering.
Boundary faces are supposed to be bunched per patch (contiguous numerotation for elem ids in tag).
Case where intersection between tags is not empty not handled. The physical tag names should be a partition of the external surface
Parameters
----------
externalFacesCarac : tuple
triangular faces connectivity,faces tetra indices (tetra indices linked to external faces)
mesh : Mesh
mesh to be written
physicalTagNames : Iterable
physical tag names
Returns
-------
tuple
renumerated external face caracteristics
"""
triaElements = mesh.GetElementsOfType(ED.Triangle_3)
sortedTria = np.array([sorted(triaElement) for triaElement in triaElements.connectivity])
assert sortedTria.shape == externalFacesCarac[0].shape, "Number of extracted external faces and number of surfacic elements should be the same"
if np.linalg.norm(sortedTria-externalFacesCarac[0])>1e-8:
#Face/nodes ordering not the same for original mesh and extracted external surface
from scipy.spatial import cKDTree
myTree = cKDTree(externalFacesCarac[0])
_, mapping=myTree.query(sortedTria)
externalFacesCarac= (externalFacesCarac[0][mapping],externalFacesCarac[1][mapping])
allTagNames = set([tag.name for tag in triaElements.tags])
physicalTagNotInMesh = set(physicalTagNames).difference(allTagNames)
assert len(physicalTagNotInMesh)==0,"Physical tag(s) "+str(list(physicalTagNotInMesh))+" not part of the mesh"
allIds = [triaElements.GetTag(physicalTagName).GetIds() for physicalTagName in physicalTagNames]
for combo in itertools.combinations(range(len(allIds)),2):
intersection = set(allIds[combo[0]]) & set(allIds[combo[1]])
assert len(intersection)==0, "Intersection between physical tags should be empty"
mappingContiguousBoundaryTags = np.zeros(sortedTria.shape[0]).astype(int)-1
currentElementId = 0
faceIdsByTagName = dict()
for physicalTagId,physicalTagName in enumerate(physicalTagNames):
tagElementsIds = allIds[physicalTagId]
newOrder = np.arange(currentElementId,currentElementId+len(tagElementsIds))
mappingContiguousBoundaryTags[newOrder] = tagElementsIds
faceIdsByTagName[physicalTagName] = newOrder
currentElementId+=len(tagElementsIds)
assert np.min(mappingContiguousBoundaryTags)>-1, "Mapping not done for all faces! Tagnames should be a partition"
renumerotedExternalFaces = externalFacesCarac[0][mappingContiguousBoundaryTags]
renumerotedExternalFacesTetra = externalFacesCarac[1][mappingContiguousBoundaryTags]
externalFacesCaracRenumeroted = renumerotedExternalFaces,renumerotedExternalFacesTetra
return externalFacesCaracRenumeroted, faceIdsByTagName
[docs] def RenumerateFaceConnectivity(self,triaFaces:np.ndarray,facesTetra:np.ndarray,mesh:Mesh):
"""Renumerate face connectivity: OpenFOAM is slightly picky about normal orientation.
The normal (righthand rule) should point away from the owner cell (so the boundary faces point out of the domain).
If this condition is not respected for specific faces, the faces connectivity is changed
Parameters
----------
triaFaces : np.ndarray
triangular faces connectivity
facesTetra : np.ndarray
physical tag names
mesh : Mesh
mesh to be written
Returns
-------
tuple
renumerated face connectivity
"""
bodyElements = mesh.GetElementsOfType(ED.Tetrahedron_4).connectivity
bodyElementExtended = bodyElements[facesTetra][:, :, np.newaxis]
triaFacesExpanded = triaFaces[:, np.newaxis, :]
comparison = (bodyElementExtended != triaFacesExpanded)
notInTriaFace = np.all(comparison, axis=2)
complementaryNodes = np.where(notInTriaFace[:, :, np.newaxis], bodyElementExtended, np.nan).flatten()
complementaryNodes = complementaryNodes[~np.isnan(complementaryNodes)].astype(int)
nodesCoordinates = mesh.nodes
complementaryNodeCoords = nodesCoordinates[complementaryNodes]
wrongOrientationFaces = self.RetrieveWrongOrientedFaces(nodesCoordinates,triaFaces,complementaryNodeCoords)
Info("There are "+str(len(wrongOrientationFaces))+" faces not correctly oriented. Correcting orientation")
triaFaces[wrongOrientationFaces,0],triaFaces[wrongOrientationFaces,1] = triaFaces[wrongOrientationFaces,1],triaFaces[wrongOrientationFaces,0].copy()
wrongOrientationFaces = self.RetrieveWrongOrientedFaces(nodesCoordinates,triaFaces,complementaryNodeCoords)
Info("Remaining wrong oriented faces: "+str(len(wrongOrientationFaces)))
return triaFaces
[docs] def RetrieveWrongOrientedFaces(self,nodesCoordinates:np.ndarray,triaFaces:np.ndarray,complementaryNodeCoords:np.ndarray)->np.ndarray:
"""Retrieve wrong oriented faces
Find which faces do not have an acceptable orientation. For a given face, if we move in the direction of the normal,
the distance between the triangle center and the complementary node should increase. Else, the orientation is wrong.
Parameters
----------
nodesCoordinates : np.ndarray
nodes coordinates
triaFaces : np.ndarray
triangular faces connectivity
complementaryNodeCoords : np.ndarray
each node coordinates belonging to the tetra attached to the face but not belonging to this very face
Returns
-------
tuple
renumerated face connectivity
"""
triaFaceNodes = nodesCoordinates[triaFaces]
triaFacesCenter = np.mean(triaFaceNodes,axis=1)
normalTria = np.cross(triaFaceNodes[:,1,:] - triaFaceNodes[:,0,:],triaFaceNodes[:,2,:] - triaFaceNodes[:,1,:])
distCenterToCompementaryNodes = np.linalg.norm(triaFacesCenter-complementaryNodeCoords,axis=1)
distMoveAlongNormalToCompementaryNodes = np.linalg.norm(triaFacesCenter+1e-4*normalTria-complementaryNodeCoords,axis=1)
closerToComplementaryNodes = distMoveAlongNormalToCompementaryNodes<distCenterToCompementaryNodes
wrongOrientationFaces = np.nonzero(closerToComplementaryNodes==True)[0]
return wrongOrientationFaces
[docs] def WriteFaceBoundary(self,filename:str,nbInternalFace:int,faceIdsByTagName:dict):
"""Write face boundary
Write face boundary in file
Parameters
----------
filename : str
output file name
nbInternalFace : int
number of internal face
physicalTagNames : dict
mapping between tag name and associated face ids (assume contiguous numerotation for boundary)
"""
filePath = os.path.join(self.location,filename)
header = CreateHeader(filePath="polyMesh",keyWordClass="polyBoundaryMesh",keyWordObject="boundary")
with open(filePath, "w+") as faceFile:
faceFile.write(header)
faceFile.write("\n\n"+str(len(faceIdsByTagName.keys()))+"\n(\n")
for tagName,tagIds in faceIdsByTagName.items():
faceFile.write(" "+tagName+"\n {\n")
faceFile.write(" type wall;\n")
faceFile.write(" inGroups List<word> 1(wall);\n")
faceFile.write(" nFaces "+str(len(tagIds))+";\n")
faceFile.write(" startFace "+str(min(tagIds)+nbInternalFace)+";\n }\n")
faceFile.write(")")
endLine = r"""// ************************************************************************* //"""
faceFile.write("\n\n"+endLine)
[docs] def WriteFaceConnectivity(self,filename:str,triaFaces:np.ndarray):
"""Write face connectivity
Write face connectivity in file
Parameters
----------
filename : str
output file name
triaFaces : np.ndarray
number of internal face
"""
filePath = os.path.join(self.location,filename)
header = CreateHeader(filePath="polyMesh",keyWordClass="faceList",keyWordObject="faces")
with open(filePath, "w+") as faceFile:
faceFile.write(header)
faceFile.write("\n\n"+str(triaFaces.shape[0])+"\n(\n")
for triaFace in triaFaces:
faceFile.write("3("+str(triaFace[0])+" "+str(triaFace[1])+" "+str(triaFace[2])+")\n")
faceFile.write(")\n")
endLine = r"""// ************************************************************************* //"""
faceFile.write("\n\n"+endLine)
[docs] def WriteFaceNeighbour(self,filename:str,internalFacesNeighbours:np.ndarray,note:str):
"""Write face neighbours
Write face neighbours (for each tetra: tetra index attached to each face except minimal index amongst tetra attached to face) in file
Parameters
----------
filename : str
output file name
internalFacesNeighbours : np.ndarray
internal face neighbours
note : str
extra note
"""
filePath = os.path.join(self.location,filename)
header = CreateHeader(filePath="polyMesh",keyWordClass="labelList",keyWordObject="neighbour",note=note)
with open(filePath, "w+") as faceFile:
faceFile.write(header)
faceFile.write("\n\n"+str(internalFacesNeighbours.shape[0])+"\n(\n")
for internalFaceNeighbour in internalFacesNeighbours:
faceFile.write(str(internalFaceNeighbour)+"\n")
faceFile.write(")\n")
endLine = r"""// ************************************************************************* //"""
faceFile.write("\n\n"+endLine)
[docs] def WriteFaceOwner(self,filename:str,facesOwner:np.ndarray,note:str):
"""Write face owners
Write face owner (for each tetra: tetra indices attached to each face whose index is minimal amongst tetra attached to face) in file
Parameters
----------
filename : str
output file name
internalFacesNeighbours : np.ndarray
internal face neighbours
note : str
extra note
"""
filePath = os.path.join(self.location,filename)
header = CreateHeader(filePath="polyMesh",keyWordClass="labelList",keyWordObject="owner",note=note)
with open(filePath, "w+") as faceFile:
faceFile.write(header)
faceFile.write("\n\n"+str(facesOwner.shape[0])+"\n(\n")
for faceOwner in facesOwner:
faceFile.write(str(faceOwner)+"\n")
faceFile.write(")\n")
endLine = r"""// ************************************************************************* //"""
faceFile.write("\n\n"+endLine)
[docs]def CheckIntegrity():
from Muscat.Helpers.IO.TemporaryDirectory import TemporaryDirectory
from Muscat.MeshTools.MeshCreationTools import CreateCube
from OpenPisco.MuscatExtentions.FoamReader import FoamReader
mesh = CreateCube(dimensions=[2,2,2],spacing=[2., 2., 2.],origin=[0, 0, 0],ofTetras=True)
physicalTagNames = ["X0","X1","Y0","Y1","Z0","Z1"]
tempdir = TemporaryDirectory.GetTempPath()
path = os.path.join(tempdir,"FoamMesh")
writer = FoamWriter(location=path)
writer.Write(mesh,physicalTagNames)
reader = FoamReader()
meshRead = reader.Read(path)
print(mesh)
assert np.linalg.norm(meshRead.nodes -mesh.nodes)<1e-8,"Nodes coordinates should be the same"
for elementType in [ED.Triangle_3,ED.Tetrahedron_4]:
elementsRead = set(tuple(sorted(connectiv)) for connectiv in meshRead.GetElementsOfType(elementType).connectivity)
elementsOrigin = set(tuple(sorted(connectiv)) for connectiv in mesh.GetElementsOfType(elementType).connectivity)
assert len(elementsRead.difference(elementsOrigin))==0, "Connectivity should be the same"
return "OK"
if __name__ == '__main__':
print(CheckIntegrity()) # pragma: no cover