利用线程池
Python的concurrent.futures库提供了多线程编程接口。concurrent.futures.ThreadPoolExecutor类则提供了线程池化的线程管理基础设施。
import concurrent.futures import multiprocessing # Worker function def DoWork(Param1, Param2, Param3) : # Do some work return WorkResult #End Function # Script entry if __name__ == "__main__" : # Get processor count nProcessorCount = multiprocessing.cpu_count() # Preprocessing # Do some pre processing here # arrWorkerParams is an example of worker's parameters arrWorkerParams = [ (2,4,5), (2,5,0), (3,8,0), (3,8,1) ] # Array of workers arrWorkers = [] # Create thread pool object with concurrent.futures.ThreadPoolExecutor(max_workers=nProcessorCount/2) as polWorkers : for Params in arrWorkerParams : # Assign parameter values Param1Value = Params[0] Param2Value = Params[1] Param3Value = Params[2] # Submit works ftrCurrentWorker = polWorkers.submit(DoWork, Param1Value, Param2Value, Param3Value) # Cache current worker arrWorkers.append(ftrCurrentWorker) #Next # Register completion list arrCompletedTasks = concurrent.futures.as_completed(arrWorkers) # Collect results arrWorkerResults = [] for WorkerResult in arrCompletedTasks : # Do post processting arrWorkerResults.append(WorkResult) #Next #End With #End If
参考资料:
https://blog.csdn.net/huapingqi/article/details/132521391
https://blog.csdn.net/waitan2018/article/details/108386898
利用进程池
Python的concurrent.futures库提供了多线程编程接口。concurrent.futures.ProcessPoolExecutor类则提供了进程池化的线程管理基础设施。
利用进程池化的多线程编程,可以在一定程度上避免如MatPlotLib的多线程绘图报警等问题。
需要注意的是,在其它脚本中调用会创建进程池或子进程的函数时,需将该脚本的入口代码使用“if __name__ == "__main__" :”结构包裹,否则在执行时会产生“An attempt has been made to start a new process before the current process has finished its bootstrapping phase.”错误。
同时,如果子进程中调用了可能建立concurrent.futures.Future对象的函数或方法(例如:某些并行化计算工具),可能导致进程池在退出(离开with子句确定的域,或显式调用shutdown()方法)时进入长时间甚至无限等待状态。请参考: https://github.com/python/cpython/issues/94440 。
import concurrent.futures import multiprocessing # Worker function def DoWork(Param1, Param2, Param3) : # Do some work return WorkResult #End Function # Script entry if __name__ == "__main__" : # Get processor count nProcessorCount = multiprocessing.cpu_count() # Preprocessing # Do some pre processing here # arrWorkerParams is an example of worker's parameters arrWorkerParams = [ (2,4,5), (2,5,0), (3,8,0), (3,8,1) ] # Array of workers arrWorkers = [] # Create process pool object with concurrent.futures.ProcessPoolExecutor(max_workers=nProcessorCount/2) as polWorkers : for Params in arrWorkerParams : # Assign parameter values Param1Value = Params[0] Param2Value = Params[1] Param3Value = Params[2] # Submit works ftrCurrentWorker = polWorkers.submit(DoWork, Param1Value, Param2Value, Param3Value) # Cache current worker arrWorkers.append(ftrCurrentWorker) #Next # Register completion list arrCompletedTasks = concurrent.futures.as_completed(arrWorkers) # Collect results arrWorkerResults = [] for WorkerResult in arrCompletedTasks : # Do post processting arrWorkerResults.append(WorkResult) #Next #End With #End If
优化进程池
`concurrent.futures.ProcessPoolExecutor`即使在`max_workers`为1时也会创建子进程,这可能造成严重的性能下降,或在嵌套调用可以创建子进程的函数时产生问题。
因此,可以考虑建立一个包裹函数,在指定子进程个数为1时,不创建子进程:
import concurrent.futures import multiprocessing # Run jobs in parallel with process pool # This is a wrapper of concurrent.futures.ProcessPoolExecutor # This also prevents child processes from creating when nWorkers <= 1 # arrFunctions is list of functions # arrArgs is list of tuple, with arguments (*args) # arrKwArgs is list of dict, with keyed arguments (**kwargs) # Returns list of results def RunJobsWithProcessPool(arrFunctions : list, arrArgs : list = None, arrKwArgs : list = None, nWorkers : int = None) -> list : # Handling null-references if arrArgs is None : arrArgs = [] #End If if arrKwArgs is None : arrKwArgs = [] #End If # Initialize result buffer arrResults = [] # Check if we really need to create child workers IsChildWorkersNeeded = False if nWorkers is None : IsChildWorkersNeeded = True elif nWorkers > 1 : IsChildWorkersNeeded = True #End If # Run jobs if IsChildWorkersNeeded : # Multithread workers arrWorkers = [] # Create thread pool object with concurrent.futures.ProcessPoolExecutor(max_workers=nWorkers) as polWorkers : # Create and submit job package for i in range(0, len(arrFunctions)) : # Get function object and arguments fncFunction = arrFunctions[i] tplArgs = arrArgs[i] if (i < len(arrArgs)) else tuple() dctKwArgs = arrKwArgs[i] if (i < len(arrKwArgs)) else dict() # Submit a job ftrCurrentWorker = polWorkers.submit(fncFunction, *tplArgs, **dctKwArgs) # Cache current worker arrWorkers.append(ftrCurrentWorker) #Next # Register completion list arrCompletedTasks = concurrent.futures.as_completed(arrWorkers) # Collect results for WorkerResult in arrCompletedTasks : arrResults.append(WorkerResult.result()) #Next #End With else : # Run jobs in main process directly for i in range(0, len(arrFunctions)) : # Get function object and arguments fncFunction = arrFunctions[i] tplArgs = arrArgs[i] if (i < len(arrArgs)) else () dctKwArgs = arrKwArgs[i] if (i < len(arrKwArgs)) else dict() # Run job resCurrentWorker = fncFunction(*tplArgs, **dctKwArgs) # Collect results arrResults.append(resCurrentWorker) #Next #End If return arrResults #End Fucntion # Worker function def DoWork(Param1, Param2, Param3) : # Do some work return WorkResult #End Function # Main entry point if __name__ == "__main__" : # Run jobs in parallel # Create functions & arguments list arrWorkerFuncs = [] arrWorkerArgs = [] # Reult=DoWork(2,4,5) arrWorkerFuncs.append(DoWork) arrWorkerArgs.append((2,4,5)) # Reult=DoWork(2,5,0) arrWorkerFuncs.append(DoWork) arrWorkerArgs.append((2,5,0)) # Reult=DoWork(3,8,0) arrWorkerFuncs.append(DoWork) arrWorkerArgs.append((3,8,0)) # Reult=DoWork(3,8,1) arrWorkerFuncs.append(DoWork) arrWorkerArgs.append((3,8,1)) # Run jobs arrWorkerResults = SharedUtil.RunJobsWithProcessPool(arrWorkerFuncs, arrWorkerArgs, nWorkers=nParallelCPUJobCount) # Collect worker results for CurrentResult in arrWorkerResults : # Do post processting pass #Next
参考资料:
https://docs.python.org/3/library/multiprocessing.html#the-spawn-and-forkserver-start-methods





