Source code for OpenPisco.QtApp.QtWorker

# -*- coding: utf-8 -*-
#
# This file is subject to the terms and conditions defined in
# file 'LICENSE', which is part of this source code package.
#


import OpenPisco.QtApp.QtImplementation as QT
[docs]class Worker(QT.QtCore.QObject): finished = QT.Signal() def __init__(self, parent = None): super().__init__(parent=parent) self.payload = [False, False, False, False] self.exiting = False self.finished.connect(self.RunCallback)
[docs] def WaitForActions(self): from time import sleep while(self.exiting == False): if self.payload is not None and self.payload[1]: try: self.payload[0]() except Exception as e: print(e) import sys (type, value, traceback) = sys.exc_info() print( "********* Error Begin *********" ) print( value) print( "********** Error End * *********" ) sys.excepthook(type, value, traceback) self.payload[1] = False self.finished.emit() print("Run DONE") sleep(0.05)
[docs] def SetFunctionToRunAndRun(self, func,callback = None): if self.payload[1] or self.payload[3]: print("Action already running please Wait") return self.payload = [func, True, callback, callback is not None ]
[docs] def RunCallback(self): """ CallBack function is allways run in the main thread """ if self.payload is not None and self.payload[3] : if self.payload[2] is not None: self.payload[2]() self.payload[3] = False
[docs] def wait(self): while(self.payload[1] or self.payload[3]): pass
[docs]def CheckIntegrity(): # strange !!! coverage unable to track inside the QT library def TestFunc(size):# pragma: no cover for n in range(size): print(n) # strange !!! coverage unable to track inside the QT library def PrintCoucou():# pragma: no cover print("coucou") import sys from threading import Thread obj = Worker() QtThread = Thread(target=obj.WaitForActions) QtThread.start() from functools import partial obj.SetFunctionToRunAndRun(partial(TestFunc,5) ) obj.SetFunctionToRunAndRun(PrintCoucou) obj.wait() obj.SetFunctionToRunAndRun(PrintCoucou) obj.wait() obj.exiting = True return "ok"
if __name__ == "__main__":# pragma: no cover print(CheckIntegrity())