import concurrent.futures import multiprocessing # Worker function def DoWork(Param1, Param2, Param3) : # Do some work return WorkResult #End Function # Script entry if __name__ == "__main__" : # Get processor count nProcessorCount = multiprocessing.cpu_count() # Preprocessing # Do some pre processing here # arrWorkerParams is an example of worker's parameters arrWorkerParams = [ (2,4,5), (2,5,0), (3,8,0), (3,8,1) ] # Array of workers arrWorkers = [] # Create thread pool object with concurrent.futures.ThreadPoolExecutor(max_workers=nProcessorCount/2) as polWorkers : for Params in arrWorkerParams : # Assign parameter values Param1Value = Params[0] Param2Value = Params[1] Param3Value = Params[2] # Submit works resCurrentWorker = polWorkers.submit(DoWork, Param1Value, Param2Value, Param3Value) # Cache current worker arrWorkers.append(resCurrentWorker) #Next # Register completion list arrCompletedTasks = concurrent.futures.as_completed(arrWorkers) # Collect results arrWorkerResults = [] for WorkerResult in arrCompletedTasks : # Do post processting arrWorkerResults.append(WorkResult) #Next #End With #End If
需要注意的是,在其它脚本中调用会创建进程池或子进程的函数时,需将该脚本的入口代码使用“if __name__ == "__main__" :”结构包裹,否则在执行时会产生“An attempt has been made to start a new process before the current process has finished its bootstrapping phase.”错误。
同时,如果子进程中调用了可能建立concurrent.futures.Future对象的函数或方法(例如:某些并行化计算工具),可能导致进程池在退出(离开with子句确定的域,或显式调用shutdown()方法)时进入长时间甚至无限等待状态。请参考: https://github.com/python/cpython/issues/94440 。
import concurrent.futures import multiprocessing # Worker function def DoWork(Param1, Param2, Param3) : # Do some work return WorkResult #End Function # Script entry if __name__ == "__main__" : # Get processor count nProcessorCount = multiprocessing.cpu_count() # Preprocessing # Do some pre processing here # arrWorkerParams is an example of worker's parameters arrWorkerParams = [ (2,4,5), (2,5,0), (3,8,0), (3,8,1) ] # Array of workers arrWorkers = [] # Create process pool object with concurrent.futures.ProcessPoolExecutor(max_workers=nProcessorCount/2) as polWorkers : for Params in arrWorkerParams : # Assign parameter values Param1Value = Params[0] Param2Value = Params[1] Param3Value = Params[2] # Submit works resCurrentWorker = polWorkers.submit(DoWork, Param1Value, Param2Value, Param3Value) # Cache current worker arrWorkers.append(resCurrentWorker) #Next # Register completion list arrCompletedTasks = concurrent.futures.as_completed(arrWorkers) # Collect results arrWorkerResults = [] for WorkerResult in arrCompletedTasks : # Do post processting arrWorkerResults.append(WorkResult) #Next #End With #End If