Source code for OpenPisco.ExternalTools.Aster.AsterInterface

# -*- coding: utf-8 -*-
#
# This file is subject to the terms and conditions defined in
# file 'LICENSE', which is part of this source code package.
#
"""AsterInterface

This is the dedicated interface for the external physical solver Code\_Aster. 
Its main responsability is to facilitate the interactions with Code\_Aster, either by providing features to build the command line required to run the solver or to retrieve field computed by said solver.
"""

import os
import os.path
import shutil
import csv
import subprocess
from typing import List,Tuple,Iterable,Optional

import numpy as np

from Muscat.Helpers.IO.TemporaryDirectory import TemporaryDirectory
from Muscat.Helpers.IO.Which import Which
from Muscat.Helpers.Logger import Info, Error
import Muscat.Helpers.ParserHelper as PH
import Muscat.MeshContainers.ElementsDescription as ED

from OpenPisco.ExternalTools.Aster import GetScriptsPath as GetScriptsPath
from OpenPisco.Unstructured.AppExecutableTools import AppExecutableBase, CommandLineBuilder
from OpenPisco.MuscatExtentions.MedReader import MedReader
import OpenPisco.PhysicalSolvers.FieldsNames as FN
from OpenPisco import RETURN_SUCCESS, RETURN_FAIL

#Name of the required scalar value in csv file by type of post-processing
keysbyValue={}
keysbyValue[FN.int_potential_energy]='TRAV_ELAS'
keysbyValue[FN.EigenFreqSquared]='EigenFreqSquared'
keysbyValue[FN.ERP]='ERPABS'
keysbyValue[FN.eigen_Frequencies]="eigen_Frequencies"
keysbyValue[FN.GeneDisp]="GeneDisp"
keysbyValue[FN.load_crit]='CHAR_CRIT'
keysbyValue[FN.ModalCumulMassFractiondX] = 'ModalCumulMassFractiondX'
keysbyValue[FN.ModalCumulMassFractiondY] = 'ModalCumulMassFractiondY'
keysbyValue[FN.ModalCumulMassFractiondZ] = 'ModalCumulMassFractiondZ'
keysbyValue[FN.ModalLastEnergyRatio] = 'ModalLastEnergyRatio'

AsterElemNameToRuleNames = {}
AsterElemNameToRuleNames[ED.Tetrahedron_10] = ['T10_____FPG5']
AsterElemNameToRuleNames[ED.Tetrahedron_4] = ['TE4_____FPG1','TE4_____FPG4']
AsterElemNameToRuleNames[ED.Triangle_3] = ['TR3_____FPG1']
AsterElemNameToRuleNames[ED.Triangle_6] = ['TR6_____FPG3']


AsterComputedFieldsSupport={
                  FN.potential_energy:FN.IPField,
                  FN.strain:FN.IPField,
                  FN.strain_xx:FN.IPField,
                  FN.strain_yy:FN.IPField,
                  FN.strain_zz:FN.IPField,
                  FN.strain_xy:FN.IPField,
                  FN.strain_xz:FN.IPField,
                  FN.strain_yz:FN.IPField,
                  FN.stress:FN.IPField,
                  FN.stress_xx:FN.IPField,
                  FN.stress_yy:FN.IPField,
                  FN.stress_zz:FN.IPField,
                  FN.stress_xy:FN.IPField,
                  FN.stress_xz:FN.IPField,
                  FN.stress_yz:FN.IPField,
                  FN.von_mises:FN.IPField,
                  FN.flux_temperature:FN.IPField,
                  FN.EigenFreqSquared_sensitivity:FN.Nodes,
                  FN.ERP_sensitivity:FN.Nodes,
                  FN.ERP_density:FN.Nodes,
                  FN.dominant_SolutionMode:FN.Nodes,
                  FN.grad_Ure:FN.Nodes,
                  FN.grad_Uim:FN.Nodes,
                  FN.AllModeDEPL:FN.Nodes,
                  FN.elastic_energy_buckling:FN.IPField,
                  FN.strain_buckling:FN.IPField,
                  FN.strain_green_buckling:FN.IPField,
                  FN.mode_buckling:FN.Nodes,
                  FN.temperature:FN.Nodes,
                  FN.zzError:FN.Centroids,
}

[docs]def GetScalarFromFile(fileName:str,scalarName:str)->List: """Get scalar value from file, typically a post processing value computed by Code\_Aster Parameters ---------- fileName : str input file, assumption it is in the .csv format scalarName : str scalar name to be retrieved If the scalar is not found in the file, an assertion error is raised. Here is an example of such a file # #-------------------------------------------------------------------------------- ##ASTER 17.02.22 CONCEPT 00000036 CALCULE LE 27/10/2025 A 15:06:07 DE TYPE NUME_ORDRE,INST,TRAV_ELAS,TRAV_REEL 1, 0.00000E+00, 1.51764E-05, 0.00000E+00 Returns ------- List scalar value corresponding to scalar name """ value = [] keyNameExist = False counter = 0 lines = [] with open(fileName) as csvfile: reader=csv.reader(csvfile) for row in reader: counter+=1 lines.append(row) if scalarName in row: index = row.index(scalarName) icounter=counter keyNameExist=True assert keyNameExist,"The name "+scalarName+" was not found in "+fileName for i in range(icounter,len(lines)): if len(lines[i]): val = PH.ReadFloat(lines[i][index]) value.append(val) else: break return value
[docs]class AsterPrecisionError(Exception): """Should be raised when the solution returned by Code_Aster is not accurate enough""" pass
[docs]class AsterExecutableNotFound(Exception): """Should be raised when the Code\_Aster executable was not found""" pass
asterdefaultExec = "as_run" asterExecOptions = [] if 'ASTER_EXECUTABLE' not in os.environ.keys(): asterExec = asterdefaultExec else: asterExecCustom = os.environ['ASTER_EXECUTABLE'] if len(asterExecCustom.split())>1: asterExecCustom = asterExecCustom.split() asterExec = asterExecCustom[0] asterExecOptions = asterExecCustom[1:] else: asterExec = asterExecCustom ASTER_AVAILABLE = Which(asterExec)
[docs]class AsterInterface(AppExecutableBase): """ .. py:class:: AsterInterface Interface for the finite element solver Code_Aster """ def __init__(self): super(AsterInterface, self).__init__(asterExec) self.extrafiles=False self.support=None self.reader = MedReader() self.filepath = GetScriptsPath() self.keepStdErr = False self.fileCsvname = "" self.fileoutname = "" self.memoryLimit = None
[docs] def SetExtraFiles(self,extrafiles:bool): """Whether to include additional file to build study Parameters ---------- extrafiles : bool True to include additional files to study. """ self.extrafiles=extrafiles
def _executeSubprocess(self, cmd:Iterable)->str: """Run the solver by using the built command line Parameters ---------- cmd : Iterable commnad line description to run the solver Returns ------- str returned state after running the command line """ #TODO: Should be possible to use run instead but annoying bug to solve first return subprocess.check_output(' '.join(cmd), shell=True,stderr=subprocess.STDOUT)
[docs] def ComputeMemoryLimit(self): """When running the solver, the user is expected to provide beforehand the memory limit. Rather than letting the user estimates the value, it is computed automatically. """ cmdBuilder = CommandLineBuilder(self.GetAppName()) for option in asterExecOptions: cmdBuilder.argumentAdd(option) cmdBuilder.argumentAdd("--info") cmd = cmdBuilder.result() try: out = self._runCommand(cmd) import io keyword = "interactif_memmax : " for line in io.StringIO(out): if line.startswith(keyword): self.memoryLimit = line.rstrip()[len(keyword):] except Exception as e: Info("CodeAster warning : unable to estimate memory limit.") Info(e)
[docs] def IsAsterStudyFileExtension(self,fileName:str)->bool: """Check whether a file is a study file (only .comm and .export are considered as such) Parameters ---------- fileName : str fileName name Returns ------- bool True if the input file is a Code\_Aster study file, else False """ for ext in ['.comm','.export']: if fileName.endswith(ext): return True return False
[docs] def GetStudyFilesFromPath(self,path:str)->List[str]: """Retrieve file names within a path associated to a specific study Parameters ---------- path : str location of studies Returns ------- List[str] Collection of file names associated to a study. """ studyFiles = [fileName for fileName in os.listdir(path) if fileName.startswith(self.filename+'.') and self.IsAsterStudyFileExtension(fileName)] if self.extrafiles: studyFiles+=[fileName for fileName in os.listdir(path) if fileName.startswith(self.filename) and self.IsAsterStudyFileExtension(fileName)] return studyFiles
[docs] def GetSolverComputedError(self)->float: """Retrieve solver computed error after the computation Returns ------- float computed error """ import re logfile=self._filePath(self.filename+".log") with open(logfile) as myfile: targets = list(set([line for line in myfile if "Erreur calcul" in line])) errorComputed=[float(x) for x in re.findall(r'-?\d+.?\d*(?:[Ee]-\d+)?', targets[0])][0] return errorComputed
[docs] def ExecuteAster(self)->int: """Run Code\_Aster executable Returns ------- int RETURN_SUCCESS is sucess, RETURN_FAIL if failure Raises ------ AsterExecutableNotFound Raised when the executable is not found AsterPrecisionError Raised when the solver accuracy is not satisfactory """ source = os.getcwd() studyFiles = self.GetStudyFilesFromPath(source) if not len(studyFiles): Info("Study not found in current directory; looking at templates ") source = self.filepath studyFiles = self.GetStudyFilesFromPath(source) assert len(studyFiles)>0,"Aster files corresponding to "+self.filename+" were not found." for f in studyFiles: if f.endswith(".export"): fileReader = open(os.path.join(source, f),"r+") fileWriter =open(os.path.join(self.workingDirectory, f) ,"w") lines = fileReader.readlines() for line in lines: if line.find("P memory_limit ")>-1 and self.memoryLimit is not None: fileWriter.write(line.replace(line,"P memory_limit "+str(self.memoryLimit)+"\n")) else: fileWriter.write(line) fileReader.close() fileWriter.close() else: shutil.copy(os.path.join(source,f), self.workingDirectory) cmdBuilder = CommandLineBuilder(self.GetAppName()) for option in asterExecOptions: cmdBuilder.argumentAdd(option) cmdBuilder.argumentAdd(self._filePath(self.filename+".export")) #hack to redirect aster stderr if not self.keepStdErr: cmdBuilder.argumentAdd("2> /dev/null") cmd = cmdBuilder.result() #Run executable try: self._runCommand(cmd) except: workDir=self.workingDirectory asterLogFiles=[os.path.join(workDir, f) for f in os.listdir(workDir) if f.endswith(self.filename+'.log')] if len(asterLogFiles)==1: logfile=asterLogFiles[0] elif not asterLogFiles: if not ASTER_AVAILABLE: raise AsterExecutableNotFound("Aster executable not found") raise Exception("Problem arising before Aster log was produced") else: logfile=asterLogFiles[0] Info("Warning: several log files found, only look at" + logfile +" file") with open(logfile) as myfile: if 'La solution du système linéaire est trop imprécise' in myfile.read(): raise AsterPrecisionError("Precision problem when solving FEM") else: myfile.seek(0) last_lines = myfile.readlines()[-200:-1] for line in last_lines: print(line) Error("Aster executable ("+" ".join(cmd)+") had failed for the reason stated above") return RETURN_FAIL return RETURN_SUCCESS
[docs] def CleanWorkingDirectory(self,extensions:Iterable): """Clean working directory, remove specific files with provided extensions Parameters ---------- extensions : Iterable collection of extension to be remove from the working directory """ filesToDelete = [self.filename+extension for extension in extensions] for f in filesToDelete: try: filetodelete = self.workingDirectory+f if os.path.isfile(filetodelete): os.remove(filetodelete) except FileNotFoundError: pass
[docs] def GetField(self,fieldName:str,fieldSupport:str,index:Optional[int]=None)->np.ndarray: """Retrieve field of a given name at a specific location Parameters ---------- fieldName : str field name fieldSupport : str field support index : int, optional field number id (can be time step id for instance), by default None Returns ------- np.ndarray array containing the field Raises ------ KeyError raised if field name provided was not found in the mesh support. """ self.fileoutname = self._filePath(self.filename+"output.rmed") self.support = self.reader.ReadExtraFields(self.support,self.fileoutname,fieldNames=[fieldName],fieldSupports=[fieldSupport],fieldNumOrders=[index]) try: if fieldSupport=="nodes": return self.support.nodeFields[fieldName] else: return self.support.elemFields[fieldName] except KeyError as e: raise Exception("Field "+fieldName+" was not found within support") from e
[docs] def GetIntegrationRuleFor(self,name:str)->Tuple[np.ndarray,np.ndarray]: """Get integration rule from rule name Parameters ---------- name : str integration rule name Returns ------- Tuple[np.ndarray,np.ndarray] Integration rule integration point localization and weights """ locname, _, gscoo , wg = self.reader.ReadGaussPointsLocalization(self.fileoutname) assert locname in AsterElemNameToRuleNames[name],"Cannot find integration rule for this element: "+str(name) return gscoo, wg
[docs] def GetScalar(self,name:str)->float: """Get scalar from name Parameters ---------- name : str scalar name Returns ------- float scalar value """ return self.GetScalarsbyOrders(name)[0]
[docs] def GetScalarsbyOrders(self,name:str,listSize:int=1)->float: """Get scalar by order Parameters ---------- name : str scalar name listSize : int, optional size of collection of scalars expected, by default 1 Returns ------- float scalar values """ self.fileCsvname = self._filePath(self.filename+"_objective.csv") keyName = keysbyValue[name] value = GetScalarFromFile(fileName=self.fileCsvname,scalarName=keyName) assert len(value)==listSize,("Given list size ("+str(listSize)+") is different from the size of scalar list computed by Code_Aster : ",len(value)) return value
[docs] def GetScalarbyLoadCase(self,name:str,loadcase:str): """Get scalar by load case Parameters ---------- name : str scalar name loadcase : str load cases names Returns ------- float scalar values """ self.fileCsvname = self._filePath(self.filename+"_objective.csv") keyName = keysbyValue[name]+"_"+str(loadcase) value = GetScalarFromFile(fileName=self.fileCsvname,scalarName=keyName) return value[0]
[docs]def SkipAsterTest(): """Allow to skip Code_Aster test if dependencies are missing.""" from Muscat.Helpers.CheckTools import SkipTest if SkipTest("ASTER_NO_FAIL"): return True,"skip" from OpenPisco.MuscatExtentions.MedTools import MEDAvailable if not MEDAvailable: return True,"skip MED not Available" from OpenPisco.ExternalTools.Aster.AsterInterface import ASTER_AVAILABLE if not ASTER_AVAILABLE: return True,"skip Aster not Available" return False,"Aster test can be run"
[docs]def PrepareLinToQuadStudy()->Tuple[AsterInterface,str]: """Convert a study based on a linear interpolation (P1) to a quadratic interpolation (P2) for shape functions. Returns ------- Tuple[AsterInterface,str] interface and study path """ import OpenPisco.TopoZones as TZ from Muscat.MeshTools.MeshCreationTools import CreateCube mesh = CreateCube(dimensions=[3,3,3],spacing=[1./2, 1./2, 1./2],origin=[0, 0, 0],ofTetras=True) nbtets = mesh.GetElementsOfType(ED.Tetrahedron_4).GetNumberOfElements() mesh.GetElementsOfType(ED.Tetrahedron_4).GetTag(TZ.Bulk).SetIds(np.arange(nbtets) ) solverInterface = AsterInterface() solverInterface.SetFilename("AsterLinToQuad") path=TemporaryDirectory.GetTempPath() solverInterface.SetWorkingDirectory(path) solverInterface.ComputeMemoryLimit() from OpenPisco.MuscatExtentions.MedWriter import WriteMed WriteMed(path+solverInterface.filename+".med",mesh) return solverInterface, path
[docs]def CheckIntegrity_LinToQuad(GUI=False): """ Convert a linear mesh into a quadratic one. The extra nodes added for the quadratic elements are stored after the original ones in the mesh nodes storage. The following line allows to retrive the original nodes in the quadratic mesh quadMesh.nodes[0:inputmesh.GetNumberOfNodes(),:] The order of elements is left unchanged. For more information see https://www.code-aster.org/V2/doc/default/fr/man_u/u4/u4.23.02.pdf """ skipTestPhase,message = SkipAsterTest() if skipTestPhase: return message solverInterface, path = PrepareLinToQuadStudy() solverInterface.ExecuteAster() from OpenPisco.MuscatExtentions.MedReader import ReadMed newMesh = ReadMed(path+solverInterface.filename+".rmed") nbTet10 = newMesh.GetElementsOfType(ED.Tetrahedron_10) assert nbTet10.GetNumberOfElements()>0, "There should be only Tet10 elements" nbTet4 = newMesh.GetElementsOfType(ED.Tetrahedron_4) assert nbTet4.GetNumberOfElements()==0, "There should not be any Tet4 element" return "ok"
[docs]def CheckIntegrity_AsterExecutableNotFound(): os.environ["ASTER_EXECUTABLE"] = "as_run_dummy" from OpenPisco.MuscatExtentions.MedTools import MEDAvailable if not MEDAvailable: return "skip MED not Available" solverInterface, _ = PrepareLinToQuadStudy() try: solverInterface.ExecuteAster() except AsterExecutableNotFound: print("Aster executable not found") pass return "ok"
[docs]def CheckIntegrity(): totest = [ CheckIntegrity_AsterExecutableNotFound, CheckIntegrity_LinToQuad, ] for test in totest: res = test() if res.lower() != "ok" : return res return "ok"
if __name__ == '__main__': print(CheckIntegrity())