Source code for OpenPisco.MuscatExtentions.FoamReader

# -*- coding: utf-8 -*-
#
# This file is subject to the terms and conditions defined in
# file 'LICENSE', which is part of this source code package.
#
import itertools
import os
from struct import unpack,calcsize
from typing import Optional,Iterable

import numpy as np

from Muscat.MeshContainers.Mesh import Mesh
import Muscat.MeshContainers.ElementsDescription as ED
from Muscat.IO.ReaderBase import ReaderBase
from Muscat.Types import MuscatIndex, MuscatFloat

[docs]def FilterRelevantContent(fileContent:Iterable): startIndex = None for lineId,line in enumerate(fileContent): if line and line[0].isdigit(): startIndex = lineId+1 break assert startIndex is not None,"Could not find relevant content in file" filteredFileContent = fileContent[startIndex:] return filteredFileContent
[docs]def ReadSingleColumnFile(file:str)->np.ndarray: """Read data within single column file Parameters ---------- file : str file path Returns ------- np.ndarray data """ with open(file, "r") as fp: fileContent = fp.read().splitlines() fileContent = FilterRelevantContent(fileContent) column = np.array([int(singleLine) for singleLine in fileContent if any(char.isdigit() for char in singleLine)]) return column
[docs]def ReadFoam(inputFile:str)->Mesh: """Read mesh written using FOAM format Parameters ---------- file : str file path Returns ------- Mesh Mesh object """ reader = FoamReader() reader.fileName = inputFile return reader.Read(inputFile=inputFile)
[docs]class FoamReader(ReaderBase): def __init__(self): super(FoamReader,self).__init__()
[docs] def Read(self,inputFile:Optional[str]=None)->Mesh: """Read mesh written using FOAM format from file Parameters ---------- inputFile : str input file Returns ------- Mesh Mesh object """ return self.ReadFoam(directoryPath=inputFile)
[docs] def ReadFoam(self,directoryPath:str)->Mesh: """Read mesh written using FOAM format from directoryPath Parameters ---------- inputFile : str input file Returns ------- Mesh Mesh object """ nodesCoordinates = self.ReadCoordinates(directoryPath=directoryPath) ownerFile=os.path.join(directoryPath,"owner") faceTags = ReadSingleColumnFile(file=ownerFile) neighbourFile=os.path.join(directoryPath,"neighbour") neighbours = ReadSingleColumnFile(file=neighbourFile) faceConnectivity = np.array(self.ReadFaceConnectivity(directoryPath=directoryPath)) boundaryTags = self.ReadBoundaryTags(directoryPath=directoryPath) tagNames = boundaryTags.keys() externalFacesConnectivityByTagName = {tagName:faceConnectivity[np.array(list(boundaryTags[tagName]))] for tagName in tagNames} externalFacesConnectivity = np.array(list(itertools.chain.from_iterable((externalFacesConnectivityByTagName.values())))) tetraNumber = max(max(faceTags),max(neighbours))+1 tetraFaces = [ [] for _ in range(tetraNumber) ] for faceIndex,faceOwner in enumerate(faceTags): tetraFaces[faceOwner].append(faceConnectivity[faceIndex]) for faceIndex,faceNeighbour in enumerate(neighbours): tetraFaces[faceNeighbour].append(faceConnectivity[faceIndex]) tetraConnectivities = [ [] for _ in range(tetraNumber) ] for tetraIndex,tetraFaceConnectivity in enumerate(tetraFaces): tetraConnectivities[tetraIndex] = list(set([nodeIndex for face in tetraFaceConnectivity for nodeIndex in face ])) tetraConnectivities = np.array(tetraConnectivities) connectivities = {ED.Triangle_3:externalFacesConnectivity,ED.Tetrahedron_4:tetraConnectivities} return self.BuildMeshFromInformation(nodesCoordinates,connectivities,boundaryTags)
[docs] def BuildMeshFromInformation(self,nodesCoordinates:np.ndarray,connectivities:dict,boundaryTags:dict)->Mesh: """Build mesh based on data parsed from FOAM format Parameters ---------- nodesCoordinates : np.ndarray nodes coordinates connectivities : dict mapping between element type and element connectivities boundaryTags : dict mapping between boundary tag names and element ids within tags Returns ------- Mesh Mesh object """ res = Mesh() dataType = MuscatFloat res.nodes = np.array(nodesCoordinates, dtype = dataType) res.originalIDNodes= np.empty((len(nodesCoordinates),),dtype=MuscatIndex) for cpt in range(len(nodesCoordinates)): res.originalIDNodes[cpt] = cpt globalElementCounter = 0 externalFacesConnectivity = connectivities[ED.Triangle_3] triaElements = res.GetElementsOfType(ED.Triangle_3) nbElements = len(externalFacesConnectivity) triaElements.Reserve(nbElements) triaElements.originalIds = np.arange(globalElementCounter,globalElementCounter+nbElements) triaElements.cpt = nbElements globalElementCounter +=nbElements triaElements.connectivity = np.array(externalFacesConnectivity) elementsurfacicId = 0 for tagName,elemsByTagName in boundaryTags.items(): numElemByTagName = len(elemsByTagName) elements = res.GetElementsOfType(ED.Triangle_3) elementIndices = range(elementsurfacicId,elementsurfacicId+numElemByTagName) elements.GetTag(str(tagName)).AddToTag(elementIndices) elementsurfacicId += numElemByTagName tetraElements = res.GetElementsOfType(ED.Tetrahedron_4) tetraConnectivities = connectivities[ED.Tetrahedron_4] nbElements = len(tetraConnectivities) tetraElements.Reserve(nbElements) tetraElements.originalIds = np.arange(globalElementCounter,globalElementCounter+nbElements) tetraElements.cpt = nbElements globalElementCounter +=nbElements tetraElements.connectivity = np.array(tetraConnectivities) res.GenerateManufacturedOriginalIDs() res.PrepareForOutput() return res
[docs] def ReadCoordinates(self,directoryPath:str)->np.ndarray: """Read nodes coordinates from file Parameters ---------- directoryPath : str OpenFOAM directory path Returns ------- np.ndarray nodes coordinates """ file=os.path.join(directoryPath,"points") with open(file, "r") as fp: fileContent = fp.read().splitlines() fileContent = FilterRelevantContent(fileContent) fileContent = [singleLine.replace("(","").replace(")","") for singleLine in fileContent if any(char.isdigit() for char in singleLine)] nodesCoordinates = np.array([[float(coord) for coord in nodeCoordinates.split(" ")] for nodeCoordinates in fileContent]) return nodesCoordinates
[docs] def ReadFaceConnectivity(self,directoryPath:str)->dict: """Read faces connectivities from file Parameters ---------- directoryPath : str OpenFOAM directory path Returns ------- np.ndarray face connectivity """ file=os.path.join(directoryPath,"faces") with open(file, "r") as fp: fileContent = fp.read().splitlines() fileContent = FilterRelevantContent(fileContent) fileContent = [singleLine.replace("("," ").replace(")","") for singleLine in fileContent if any(char.isdigit() for char in singleLine)] facesConnectivity = [[int(digit) for digit in digits.split(" ")] for digits in fileContent] nbNodesPerFace = [] faceIndicesByNbNodes = dict() for faceConnectivity in facesConnectivity: nbNodesInFace = faceConnectivity[0] if nbNodesInFace not in nbNodesPerFace: nbNodesPerFace.append(nbNodesInFace) faceIndicesByNbNodes[nbNodesInFace] = [] faceIndicesByNbNodes[nbNodesInFace].append(faceConnectivity[1:]) return faceIndicesByNbNodes[3]
[docs] def ReadBoundaryTags(self,directoryPath:str)->dict: """Read boundary tags from file Parameters ---------- directoryPath : str directory path Returns ------- np.ndarray mapping between boundary tags and element ids is tags """ file=os.path.join(directoryPath,"boundary") with open(file, "r") as fp: fileContent = fp.read().splitlines() fileContent = FilterRelevantContent(fileContent) tagNameIndices = [lineIndex-1 for lineIndex,line in enumerate(fileContent) if "{" in line] tagNames = [fileContent[tagNameIndex].split()[0] for tagNameIndex in tagNameIndices] tagsCaracteristics = [int(fileLine.replace(";","").split(" ")[-1]) for fileLine in fileContent if "nFaces" in fileLine or "startFace" in fileLine] elementsInTags = np.array_split(tagsCaracteristics,len(tagsCaracteristics)//2) elementsInTags = [range(elementsInTag[1],elementsInTag[1]+elementsInTag[0]) for elementsInTag in elementsInTags] tagMapping = dict(zip(tagNames,elementsInTags)) return tagMapping
[docs]def ReadExtraField(path:str,fieldNames:Iterable[str],fieldSupports:Iterable["ElementType"],binary:bool=True)->np.ndarray: """Read field produced by OpenFOAM Parameters ---------- path : str input file location fieldNames : Iterable[str] fields name to be read fieldSupports : Iterable["ElementType"] Field support associated to fields to be read binary : bool read from binary or ASCII, default is binary Returns ------- np.ndarray field array """ assert len(fieldNames)==len(fieldSupports),"Number of fields should match number of field supports associated to them" try: lastStepDirectory = str(max([int(directory) for directory in os.listdir(path) if directory.isdigit()])) except ValueError as e: raise Exception('Directory '+path+' may not contain any field') from e fieldsValues = [] for fieldName,fieldSupport in zip(fieldNames,fieldSupports): if fieldSupport!=ED.Tetrahedron_4: raise NotImplementedError("Can only handle Tetrahedron_4 support for now") directoryPath = os.path.join(path,lastStepDirectory,fieldName) assert os.path.exists(directoryPath),"Field "+fieldName+" not found in path "+path if binary: with open(directoryPath, "rb") as fp: fileContent = fp.readlines() fieldFound = False for lineId, line in enumerate(fileContent): if line.startswith(b'internalField') and b'nonuniform' in line: num = int(fileContent[lineId + 1]) dimensions = 1 if b'vector' in line: dimensions = 3 elif b'symmTensor' in line: dimensions = 6 elif b'tensor' in line: dimensions = 9 dataBytes = b''.join(fileContent[lineId + 2:len(fileContent)+1]) fieldValues = unpack(f'{num * dimensions}d', dataBytes[calcsize('c'):num*dimensions*calcsize('d')+calcsize('c')] ) fieldValues = np.array(fieldValues).reshape((num, dimensions)) fieldFound = True break if not fieldFound: raise Exception("InternalField "+fieldName+" not found in path "+str(directoryPath)) else: with open(directoryPath, "r") as fp: fileContent = fp.read().splitlines() startIndex = fileContent.index('(') endtIndex = fileContent.index(')') fileContent = fileContent[startIndex-1:endtIndex] fileContent = [singleLine.replace("(","").replace(")","") for singleLine in fileContent if any(char.isdigit() for char in singleLine)] fileContent = [singleLine for singleLine in fileContent][1:] fieldValues = np.array([[float(coord) for coord in fieldValue.split(" ")] for fieldValue in fileContent]) fieldsValues.append(fieldValues) return fieldsValues
[docs]def CheckIntegrity(): from Muscat.Helpers.IO.TemporaryDirectory import TemporaryDirectory from Muscat.MeshTools.MeshCreationTools import CreateCube from OpenPisco.MuscatExtentions.FoamWriter import FoamWriter mesh = CreateCube(dimensions=[8,8,8],spacing=[2., 2., 2.],origin=[0, 0, 0],ofTetras=True) print(mesh) numTetra = mesh.GetElementsOfType(ED.Tetrahedron_4).GetNumberOfElements() physicalTagNames = ["X0","X1","Y0","Y1","Z0","Z1"] tempdir = TemporaryDirectory.GetTempPath() path = os.path.join(tempdir,"FoamMesh") dummyArray = np.arange(3*numTetra).reshape(numTetra,3) fieldsToExport = {"DummyField":[dummyArray,2*dummyArray,3*dummyArray], "DummyFieldAgain":[18*dummyArray,10*dummyArray,5*dummyArray] } writer = FoamWriter(location=path) writer.Write(meshObject=mesh, physicalTagNames=physicalTagNames, CellFieldsNames=["DummyField","DummyFieldAgain"], CellFields=[fieldsToExport["DummyField"],fieldsToExport["DummyFieldAgain"]]) mesh = ReadFoam(inputFile=path) tet = mesh.GetElementsOfType(ED.Tetrahedron_4) assert np.min(tet.connectivity)==0 assert np.max(tet.connectivity)==mesh.GetNumberOfNodes()-1 coordinatesField = ReadExtraField(path=path,fieldNames=["DummyField"],fieldSupports=[ED.Tetrahedron_4],binary=False)[0] np.testing.assert_almost_equal(fieldsToExport["DummyField"][-1],coordinatesField) coordinatesField = ReadExtraField(path=path,fieldNames=["DummyFieldAgain"],fieldSupports=[ED.Tetrahedron_4],binary=False)[0] np.testing.assert_almost_equal(fieldsToExport["DummyFieldAgain"][-1],coordinatesField) return "OK"
if __name__ == '__main__': print(CheckIntegrity()) # pragma: no cover