在Python中进行多线程编程

利用线程池

Python的concurrent.futures库提供了多线程编程接口。concurrent.futures.ThreadPoolExecutor类则提供了线程池化的线程管理基础设施。

import concurrent.futures
import multiprocessing
 
# Worker function
def DoWork(Param1, Param2, Param3) :
    # Do some work
 
    return WorkResult
#End Function
 
# Script entry
if __name__ == "__main__" :
    # Get processor count
    nProcessorCount = multiprocessing.cpu_count()
 
    # Preprocessing
    # Do some pre processing here
    # arrWorkerParams is an example of worker's parameters
    arrWorkerParams = [
        (2,4,5),
        (2,5,0),
        (3,8,0),
        (3,8,1)
    ]
 
    # Array of workers
    arrWorkers = []
 
    # Create thread pool object
    with concurrent.futures.ThreadPoolExecutor(max_workers=nProcessorCount/2) as polWorkers :
        for Params in arrWorkerParams :
            # Assign parameter values
            Param1Value = Params[0]
            Param2Value = Params[1]
            Param3Value = Params[2]
 
            # Submit works
            ftrCurrentWorker = polWorkers.submit(DoWork,
                Param1Value,
                Param2Value,
                Param3Value)
 
            # Cache current worker
            arrWorkers.append(ftrCurrentWorker)
        #Next
 
        # Register completion list
        arrCompletedTasks = concurrent.futures.as_completed(arrWorkers)
 
        # Collect results
        arrWorkerResults = []
        for WorkerResult in arrCompletedTasks :
            # Do post processting
            arrWorkerResults.append(WorkResult)
        #Next
    #End With
#End If

参考资料:

https://blog.csdn.net/huapingqi/article/details/132521391

https://deepinout.com/python/python-qa/18_python_pass_multiple_parameters_to_concurrentfuturesexecutormap.html

https://blog.csdn.net/waitan2018/article/details/108386898

利用进程池

Python的concurrent.futures库提供了多线程编程接口。concurrent.futures.ProcessPoolExecutor类则提供了进程池化的线程管理基础设施。

利用进程池化的多线程编程,可以在一定程度上避免如MatPlotLib的多线程绘图报警等问题。

需要注意的是,在其它脚本中调用会创建进程池或子进程的函数时,需将该脚本的入口代码使用“if __name__ == "__main__" :”结构包裹,否则在执行时会产生“An attempt has been made to start a new process before the current process has finished its bootstrapping phase.”错误。

同时,如果子进程中调用了可能建立concurrent.futures.Future对象的函数或方法(例如:某些并行化计算工具),可能导致进程池在退出(离开with子句确定的域,或显式调用shutdown()方法)时进入长时间甚至无限等待状态。请参考: https://github.com/python/cpython/issues/94440

import concurrent.futures
import multiprocessing
 
# Worker function
def DoWork(Param1, Param2, Param3) :
    # Do some work
 
    return WorkResult
#End Function
 
# Script entry
if __name__ == "__main__" :
    # Get processor count
    nProcessorCount = multiprocessing.cpu_count()
 
    # Preprocessing
    # Do some pre processing here
    # arrWorkerParams is an example of worker's parameters
    arrWorkerParams = [
        (2,4,5),
        (2,5,0),
        (3,8,0),
        (3,8,1)
    ]
 
    # Array of workers
    arrWorkers = []
 
    # Create process pool object
    with concurrent.futures.ProcessPoolExecutor(max_workers=nProcessorCount/2) as polWorkers :
        for Params in arrWorkerParams :
            # Assign parameter values
            Param1Value = Params[0]
            Param2Value = Params[1]
            Param3Value = Params[2]
 
            # Submit works
            ftrCurrentWorker = polWorkers.submit(DoWork,
                Param1Value,
                Param2Value,
                Param3Value)
 
            # Cache current worker
            arrWorkers.append(ftrCurrentWorker)
        #Next
 
        # Register completion list
        arrCompletedTasks = concurrent.futures.as_completed(arrWorkers)
 
        # Collect results
        arrWorkerResults = []
        for WorkerResult in arrCompletedTasks :
            # Do post processting
            arrWorkerResults.append(WorkResult)
        #Next
    #End With
#End If

优化进程池

`concurrent.futures.ProcessPoolExecutor`即使在`max_workers`为1时也会创建子进程,这可能造成严重的性能下降,或在嵌套调用可以创建子进程的函数时产生问题。

因此,可以考虑建立一个包裹函数,在指定子进程个数为1时,不创建子进程:

import concurrent.futures
import multiprocessing
 
# Run jobs in parallel with process pool
# This is a wrapper of concurrent.futures.ProcessPoolExecutor
# This also prevents child processes from creating when nWorkers <= 1
#     arrFunctions is list of functions
#     arrArgs is list of tuple, with arguments (*args)
#     arrKwArgs is list of dict, with keyed arguments (**kwargs)
# Returns list of results
def RunJobsWithProcessPool(arrFunctions : list, arrArgs : list = None, arrKwArgs : list = None,
    nWorkers : int = None) -> list :
 
    # Handling null-references
    if arrArgs is None :
        arrArgs = []
    #End If
    if arrKwArgs is None :
        arrKwArgs = []
    #End If
 
    # Initialize result buffer
    arrResults = []
 
    # Check if we really need to create child workers
    IsChildWorkersNeeded = False
    if nWorkers is None :
        IsChildWorkersNeeded = True
    elif nWorkers > 1 :
        IsChildWorkersNeeded = True
    #End If
 
    # Run jobs
    if IsChildWorkersNeeded :
        # Multithread workers
        arrWorkers = []
 
        # Create thread pool object
        with concurrent.futures.ProcessPoolExecutor(max_workers=nWorkers) as polWorkers :
            # Create and submit job package
            for i in range(0, len(arrFunctions)) :
                # Get function object and arguments
                fncFunction = arrFunctions[i]
                tplArgs = arrArgs[i] if (i < len(arrArgs)) else tuple()
                dctKwArgs = arrKwArgs[i] if (i < len(arrKwArgs)) else dict()
 
                # Submit a job
                ftrCurrentWorker = polWorkers.submit(fncFunction, *tplArgs, **dctKwArgs)
 
                # Cache current worker
                arrWorkers.append(ftrCurrentWorker)
            #Next
 
            # Register completion list
            arrCompletedTasks = concurrent.futures.as_completed(arrWorkers)
 
            # Collect results
            for WorkerResult in arrCompletedTasks :
                arrResults.append(WorkerResult.result())
            #Next
        #End With
    else :
        # Run jobs in main process directly
        for i in range(0, len(arrFunctions)) :
            # Get function object and arguments
            fncFunction = arrFunctions[i]
            tplArgs = arrArgs[i] if (i < len(arrArgs)) else ()
            dctKwArgs = arrKwArgs[i] if (i < len(arrKwArgs)) else dict()
 
            # Run job
            resCurrentWorker = fncFunction(*tplArgs, **dctKwArgs)
 
            # Collect results
            arrResults.append(resCurrentWorker)
        #Next
    #End If
 
    return arrResults
#End Fucntion
 
# Worker function
def DoWork(Param1, Param2, Param3) :
    # Do some work
 
    return WorkResult
#End Function
 
# Main entry point
if __name__ == "__main__" :
    # Run jobs in parallel
    # Create functions & arguments list
    arrWorkerFuncs = []
    arrWorkerArgs = []
    # Reult=DoWork(2,4,5)
    arrWorkerFuncs.append(DoWork)
    arrWorkerArgs.append((2,4,5))
    # Reult=DoWork(2,5,0)
    arrWorkerFuncs.append(DoWork)
    arrWorkerArgs.append((2,5,0))
    # Reult=DoWork(3,8,0)
    arrWorkerFuncs.append(DoWork)
    arrWorkerArgs.append((3,8,0))
    # Reult=DoWork(3,8,1)
    arrWorkerFuncs.append(DoWork)
    arrWorkerArgs.append((3,8,1))
    # Run jobs
    arrWorkerResults = SharedUtil.RunJobsWithProcessPool(arrWorkerFuncs, arrWorkerArgs,
        nWorkers=nParallelCPUJobCount)
    # Collect worker results
    for CurrentResult in arrWorkerResults :
        # Do post processting
        pass
    #Next

参考资料:

https://docs.python.org/3/library/multiprocessing.html#the-spawn-and-forkserver-start-methods

https://gairuo.com/p/python-multiprocessing-pool

https://www.cnblogs.com/piperliu/articles/18615898

it
除非特别注明,本页内容采用以下授权方式: Creative Commons Attribution-ShareAlike 3.0 License