Source code for OpenPisco.MuscatExtentions.FoamReader
# -*- coding: utf-8 -*-
#
# This file is subject to the terms and conditions defined in
# file 'LICENSE', which is part of this source code package.
#
import itertools
import os
from struct import unpack,calcsize
from typing import Optional,Iterable
import numpy as np
from Muscat.MeshContainers.Mesh import Mesh
import Muscat.MeshContainers.ElementsDescription as ED
from Muscat.IO.ReaderBase import ReaderBase
from Muscat.Types import MuscatIndex, MuscatFloat
[docs]def FilterRelevantContent(fileContent:Iterable):
startIndex = None
for lineId,line in enumerate(fileContent):
if line and line[0].isdigit():
startIndex = lineId+1
break
assert startIndex is not None,"Could not find relevant content in file"
filteredFileContent = fileContent[startIndex:]
return filteredFileContent
[docs]def ReadSingleColumnFile(file:str)->np.ndarray:
"""Read data within single column file
Parameters
----------
file : str
file path
Returns
-------
np.ndarray
data
"""
with open(file, "r") as fp:
fileContent = fp.read().splitlines()
fileContent = FilterRelevantContent(fileContent)
column = np.array([int(singleLine) for singleLine in fileContent if any(char.isdigit() for char in singleLine)])
return column
[docs]def ReadFoam(inputFile:str)->Mesh:
"""Read mesh written using FOAM format
Parameters
----------
file : str
file path
Returns
-------
Mesh
Mesh object
"""
reader = FoamReader()
reader.fileName = inputFile
return reader.Read(inputFile=inputFile)
[docs]class FoamReader(ReaderBase):
def __init__(self):
super(FoamReader,self).__init__()
[docs] def Read(self,inputFile:Optional[str]=None)->Mesh:
"""Read mesh written using FOAM format from file
Parameters
----------
inputFile : str
input file
Returns
-------
Mesh
Mesh object
"""
return self.ReadFoam(directoryPath=inputFile)
[docs] def ReadFoam(self,directoryPath:str)->Mesh:
"""Read mesh written using FOAM format from directoryPath
Parameters
----------
inputFile : str
input file
Returns
-------
Mesh
Mesh object
"""
nodesCoordinates = self.ReadCoordinates(directoryPath=directoryPath)
ownerFile=os.path.join(directoryPath,"owner")
faceTags = ReadSingleColumnFile(file=ownerFile)
neighbourFile=os.path.join(directoryPath,"neighbour")
neighbours = ReadSingleColumnFile(file=neighbourFile)
faceConnectivity = np.array(self.ReadFaceConnectivity(directoryPath=directoryPath))
boundaryTags = self.ReadBoundaryTags(directoryPath=directoryPath)
tagNames = boundaryTags.keys()
externalFacesConnectivityByTagName = {tagName:faceConnectivity[np.array(list(boundaryTags[tagName]))] for tagName in tagNames}
externalFacesConnectivity = np.array(list(itertools.chain.from_iterable((externalFacesConnectivityByTagName.values()))))
tetraNumber = max(max(faceTags),max(neighbours))+1
tetraFaces = [ [] for _ in range(tetraNumber) ]
for faceIndex,faceOwner in enumerate(faceTags):
tetraFaces[faceOwner].append(faceConnectivity[faceIndex])
for faceIndex,faceNeighbour in enumerate(neighbours):
tetraFaces[faceNeighbour].append(faceConnectivity[faceIndex])
tetraConnectivities = [ [] for _ in range(tetraNumber) ]
for tetraIndex,tetraFaceConnectivity in enumerate(tetraFaces):
tetraConnectivities[tetraIndex] = list(set([nodeIndex for face in tetraFaceConnectivity for nodeIndex in face ]))
tetraConnectivities = np.array(tetraConnectivities)
connectivities = {ED.Triangle_3:externalFacesConnectivity,ED.Tetrahedron_4:tetraConnectivities}
return self.BuildMeshFromInformation(nodesCoordinates,connectivities,boundaryTags)
[docs] def BuildMeshFromInformation(self,nodesCoordinates:np.ndarray,connectivities:dict,boundaryTags:dict)->Mesh:
"""Build mesh based on data parsed from FOAM format
Parameters
----------
nodesCoordinates : np.ndarray
nodes coordinates
connectivities : dict
mapping between element type and element connectivities
boundaryTags : dict
mapping between boundary tag names and element ids within tags
Returns
-------
Mesh
Mesh object
"""
res = Mesh()
dataType = MuscatFloat
res.nodes = np.array(nodesCoordinates, dtype = dataType)
res.originalIDNodes= np.empty((len(nodesCoordinates),),dtype=MuscatIndex)
for cpt in range(len(nodesCoordinates)):
res.originalIDNodes[cpt] = cpt
globalElementCounter = 0
externalFacesConnectivity = connectivities[ED.Triangle_3]
triaElements = res.GetElementsOfType(ED.Triangle_3)
nbElements = len(externalFacesConnectivity)
triaElements.Reserve(nbElements)
triaElements.originalIds = np.arange(globalElementCounter,globalElementCounter+nbElements)
triaElements.cpt = nbElements
globalElementCounter +=nbElements
triaElements.connectivity = np.array(externalFacesConnectivity)
elementsurfacicId = 0
for tagName,elemsByTagName in boundaryTags.items():
numElemByTagName = len(elemsByTagName)
elements = res.GetElementsOfType(ED.Triangle_3)
elementIndices = range(elementsurfacicId,elementsurfacicId+numElemByTagName)
elements.GetTag(str(tagName)).AddToTag(elementIndices)
elementsurfacicId += numElemByTagName
tetraElements = res.GetElementsOfType(ED.Tetrahedron_4)
tetraConnectivities = connectivities[ED.Tetrahedron_4]
nbElements = len(tetraConnectivities)
tetraElements.Reserve(nbElements)
tetraElements.originalIds = np.arange(globalElementCounter,globalElementCounter+nbElements)
tetraElements.cpt = nbElements
globalElementCounter +=nbElements
tetraElements.connectivity = np.array(tetraConnectivities)
res.GenerateManufacturedOriginalIDs()
res.PrepareForOutput()
return res
[docs] def ReadCoordinates(self,directoryPath:str)->np.ndarray:
"""Read nodes coordinates from file
Parameters
----------
directoryPath : str
OpenFOAM directory path
Returns
-------
np.ndarray
nodes coordinates
"""
file=os.path.join(directoryPath,"points")
with open(file, "r") as fp:
fileContent = fp.read().splitlines()
fileContent = FilterRelevantContent(fileContent)
fileContent = [singleLine.replace("(","").replace(")","") for singleLine in fileContent if any(char.isdigit() for char in singleLine)]
nodesCoordinates = np.array([[float(coord) for coord in nodeCoordinates.split(" ")] for nodeCoordinates in fileContent])
return nodesCoordinates
[docs] def ReadFaceConnectivity(self,directoryPath:str)->dict:
"""Read faces connectivities from file
Parameters
----------
directoryPath : str
OpenFOAM directory path
Returns
-------
np.ndarray
face connectivity
"""
file=os.path.join(directoryPath,"faces")
with open(file, "r") as fp:
fileContent = fp.read().splitlines()
fileContent = FilterRelevantContent(fileContent)
fileContent = [singleLine.replace("("," ").replace(")","") for singleLine in fileContent if any(char.isdigit() for char in singleLine)]
facesConnectivity = [[int(digit) for digit in digits.split(" ")] for digits in fileContent]
nbNodesPerFace = []
faceIndicesByNbNodes = dict()
for faceConnectivity in facesConnectivity:
nbNodesInFace = faceConnectivity[0]
if nbNodesInFace not in nbNodesPerFace:
nbNodesPerFace.append(nbNodesInFace)
faceIndicesByNbNodes[nbNodesInFace] = []
faceIndicesByNbNodes[nbNodesInFace].append(faceConnectivity[1:])
return faceIndicesByNbNodes[3]
[docs] def ReadBoundaryTags(self,directoryPath:str)->dict:
"""Read boundary tags from file
Parameters
----------
directoryPath : str
directory path
Returns
-------
np.ndarray
mapping between boundary tags and element ids is tags
"""
file=os.path.join(directoryPath,"boundary")
with open(file, "r") as fp:
fileContent = fp.read().splitlines()
fileContent = FilterRelevantContent(fileContent)
tagNameIndices = [lineIndex-1 for lineIndex,line in enumerate(fileContent) if "{" in line]
tagNames = [fileContent[tagNameIndex].split()[0] for tagNameIndex in tagNameIndices]
tagsCaracteristics = [int(fileLine.replace(";","").split(" ")[-1]) for fileLine in fileContent if "nFaces" in fileLine or "startFace" in fileLine]
elementsInTags = np.array_split(tagsCaracteristics,len(tagsCaracteristics)//2)
elementsInTags = [range(elementsInTag[1],elementsInTag[1]+elementsInTag[0]) for elementsInTag in elementsInTags]
tagMapping = dict(zip(tagNames,elementsInTags))
return tagMapping
[docs]def ReadExtraField(path:str,fieldNames:Iterable[str],fieldSupports:Iterable["ElementType"],binary:bool=True)->np.ndarray:
"""Read field produced by OpenFOAM
Parameters
----------
path : str
input file location
fieldNames : Iterable[str]
fields name to be read
fieldSupports : Iterable["ElementType"]
Field support associated to fields to be read
binary : bool
read from binary or ASCII, default is binary
Returns
-------
np.ndarray
field array
"""
assert len(fieldNames)==len(fieldSupports),"Number of fields should match number of field supports associated to them"
try:
lastStepDirectory = str(max([int(directory) for directory in os.listdir(path) if directory.isdigit()]))
except ValueError as e:
raise Exception('Directory '+path+' may not contain any field') from e
fieldsValues = []
for fieldName,fieldSupport in zip(fieldNames,fieldSupports):
if fieldSupport!=ED.Tetrahedron_4:
raise NotImplementedError("Can only handle Tetrahedron_4 support for now")
directoryPath = os.path.join(path,lastStepDirectory,fieldName)
assert os.path.exists(directoryPath),"Field "+fieldName+" not found in path "+path
if binary:
with open(directoryPath, "rb") as fp:
fileContent = fp.readlines()
fieldFound = False
for lineId, line in enumerate(fileContent):
if line.startswith(b'internalField') and b'nonuniform' in line:
num = int(fileContent[lineId + 1])
dimensions = 1
if b'vector' in line:
dimensions = 3
elif b'symmTensor' in line:
dimensions = 6
elif b'tensor' in line:
dimensions = 9
dataBytes = b''.join(fileContent[lineId + 2:len(fileContent)+1])
fieldValues = unpack(f'{num * dimensions}d', dataBytes[calcsize('c'):num*dimensions*calcsize('d')+calcsize('c')] )
fieldValues = np.array(fieldValues).reshape((num, dimensions))
fieldFound = True
break
if not fieldFound:
raise Exception("InternalField "+fieldName+" not found in path "+str(directoryPath))
else:
with open(directoryPath, "r") as fp:
fileContent = fp.read().splitlines()
startIndex = fileContent.index('(')
endtIndex = fileContent.index(')')
fileContent = fileContent[startIndex-1:endtIndex]
fileContent = [singleLine.replace("(","").replace(")","") for singleLine in fileContent if any(char.isdigit() for char in singleLine)]
fileContent = [singleLine for singleLine in fileContent][1:]
fieldValues = np.array([[float(coord) for coord in fieldValue.split(" ")] for fieldValue in fileContent])
fieldsValues.append(fieldValues)
return fieldsValues
[docs]def CheckIntegrity():
from Muscat.Helpers.IO.TemporaryDirectory import TemporaryDirectory
from Muscat.MeshTools.MeshCreationTools import CreateCube
from OpenPisco.MuscatExtentions.FoamWriter import FoamWriter
mesh = CreateCube(dimensions=[8,8,8],spacing=[2., 2., 2.],origin=[0, 0, 0],ofTetras=True)
print(mesh)
numTetra = mesh.GetElementsOfType(ED.Tetrahedron_4).GetNumberOfElements()
physicalTagNames = ["X0","X1","Y0","Y1","Z0","Z1"]
tempdir = TemporaryDirectory.GetTempPath()
path = os.path.join(tempdir,"FoamMesh")
dummyArray = np.arange(3*numTetra).reshape(numTetra,3)
fieldsToExport = {"DummyField":[dummyArray,2*dummyArray,3*dummyArray],
"DummyFieldAgain":[18*dummyArray,10*dummyArray,5*dummyArray]
}
writer = FoamWriter(location=path)
writer.Write(meshObject=mesh,
physicalTagNames=physicalTagNames,
CellFieldsNames=["DummyField","DummyFieldAgain"],
CellFields=[fieldsToExport["DummyField"],fieldsToExport["DummyFieldAgain"]])
mesh = ReadFoam(inputFile=path)
tet = mesh.GetElementsOfType(ED.Tetrahedron_4)
assert np.min(tet.connectivity)==0
assert np.max(tet.connectivity)==mesh.GetNumberOfNodes()-1
coordinatesField = ReadExtraField(path=path,fieldNames=["DummyField"],fieldSupports=[ED.Tetrahedron_4],binary=False)[0]
np.testing.assert_almost_equal(fieldsToExport["DummyField"][-1],coordinatesField)
coordinatesField = ReadExtraField(path=path,fieldNames=["DummyFieldAgain"],fieldSupports=[ED.Tetrahedron_4],binary=False)[0]
np.testing.assert_almost_equal(fieldsToExport["DummyFieldAgain"][-1],coordinatesField)
return "OK"
if __name__ == '__main__':
print(CheckIntegrity()) # pragma: no cover