在Python中进行多线程编程

利用线程池

Python的concurrent.futures库提供了多线程编程接口。concurrent.futures.ThreadPoolExecutor类则提供了线程池化的线程管理基础设施。

import concurrent.futures
import multiprocessing
 
# Worker function
def DoWork(Param1, Param2, Param3) :
    # Do some work
    WorkResult = f"Pokemon#{Param1}{Param2}{Param3}"
 
    return WorkResult
#End Function
 
# Script entry
if __name__ == "__main__" :
    # Get processor count
    nProcessorCount = multiprocessing.cpu_count()
 
    # Preprocessing
    # Do some pre processing here
    # arrWorkerParams is an example of worker's parameters
    arrWorkerParams = [
        (2,4,5),
        (2,5,0),
        (3,8,0),
        (3,8,1)
    ]
 
    # Array of workers
    arrWorkers = []
 
    # Create thread pool object
    with concurrent.futures.ThreadPoolExecutor(max_workers=int(nProcessorCount/2)) as polWorkers :
        for Params in arrWorkerParams :
            # Assign parameter values
            Param1Value = Params[0]
            Param2Value = Params[1]
            Param3Value = Params[2]
 
            # Submit works
            ftrCurrentWorker = polWorkers.submit(DoWork,
                Param1Value,
                Param2Value,
                Param3Value)
 
            # Cache current worker
            arrWorkers.append(ftrCurrentWorker)
        #Next
 
        # Register completion list
        arrCompletedTasks = concurrent.futures.as_completed(arrWorkers)
 
        # Collect results
        arrWorkerResults = []
        for WorkerResult in arrCompletedTasks :
            # Do post processting
            arrWorkerResults.append(WorkerResult.result())
        #Next
    #End With
 
    # Collect worker results
    for CurrentResult in arrWorkerResults :
        # Do post processting
        print(CurrentResult)
    #Next
#End If

参考资料:

https://blog.csdn.net/huapingqi/article/details/132521391

https://deepinout.com/python/python-qa/18_python_pass_multiple_parameters_to_concurrentfuturesexecutormap.html

https://blog.csdn.net/waitan2018/article/details/108386898

利用进程池

Python的concurrent.futures库提供了多线程编程接口。concurrent.futures.ProcessPoolExecutor类则提供了进程池化的线程管理基础设施。

利用进程池化的多线程编程,可以在一定程度上避免如MatPlotLib的多线程绘图报警等问题。

需要注意的是,在其它脚本中调用会创建进程池或子进程的函数时,需将该脚本的入口代码使用“if __name__ == "__main__" :”结构包裹,否则在执行时会产生“An attempt has been made to start a new process before the current process has finished its bootstrapping phase.”错误。

同时,如果子进程中调用了可能建立concurrent.futures.Future对象的函数或方法(例如:某些并行化计算工具),可能导致进程池在退出(离开with子句确定的域,或显式调用shutdown()方法)时进入长时间甚至无限等待状态。请参考: https://github.com/python/cpython/issues/94440

import concurrent.futures
import multiprocessing
 
# Worker function
def DoWork(Param1, Param2, Param3) :
    # Do some work
    WorkResult = f"Pokemon#{Param1}{Param2}{Param3}"
 
    return WorkResult
#End Function
 
# Script entry
if __name__ == "__main__" :
    # Get processor count
    nProcessorCount = multiprocessing.cpu_count()
 
    # Preprocessing
    # Do some pre processing here
    # arrWorkerParams is an example of worker's parameters
    arrWorkerParams = [
        (2,4,5),
        (2,5,0),
        (3,8,0),
        (3,8,1)
    ]
 
    # Array of workers
    arrWorkers = []
 
    # Create process pool object
    with concurrent.futures.ProcessPoolExecutor(max_workers=int(nProcessorCount/2)) as polWorkers :
        for Params in arrWorkerParams :
            # Assign parameter values
            Param1Value = Params[0]
            Param2Value = Params[1]
            Param3Value = Params[2]
 
            # Submit works
            ftrCurrentWorker = polWorkers.submit(DoWork,
                Param1Value,
                Param2Value,
                Param3Value)
 
            # Cache current worker
            arrWorkers.append(ftrCurrentWorker)
        #Next
 
        # Register completion list
        arrCompletedTasks = concurrent.futures.as_completed(arrWorkers)
 
        # Collect results
        arrWorkerResults = []
        for WorkerResult in arrCompletedTasks :
            # Do post processting
            arrWorkerResults.append(WorkerResult.result())
        #Next
    #End With
 
    # Collect worker results
    for CurrentResult in arrWorkerResults :
        # Do post processting
        print(CurrentResult)
    #Next
#End If

concurrent.futures.ProcessPoolExecutor在Windows上可能存在性能和处理器数限制问题,因此,可以考虑使用multiprocessing.Pool

multiprocessing.Pool进程池使用multiprocessing.Pool.starmap_async()提交异步任务,该函数的第一个参数指定目标函数,第二个参数使用包含元组的列表(list)表示需要依次提交的参数组。请注意,即使目标函数只有1个参数,也应将参数置于元组中。即使目标函数只被调用一次,也要将参数元组置于列表(list)结构中。提交的任务返回一个multiprocessing.AsyncResult对象。调用该对象的get()函数将等待对应线程(组)结束,并返回一个结果列表(请注意,即使目标函数只被调用一次,get()函数也将结果封装在列表中)。

import multiprocessing
 
# Worker function
def DoWork(Param1, Param2, Param3) :
    # Do some work
    WorkResult = f"Pokemon#{Param1}{Param2}{Param3}"
 
    return WorkResult
#End Function
 
# Script entry
if __name__ == "__main__" :
    # Get processor count
    nProcessorCount = multiprocessing.cpu_count()
 
    # Preprocessing
    # Do some pre processing here
    # arrWorkerParams is an example of worker's parameters
    arrWorkerParams = [
        (2,4,5),
        (2,5,0),
        (3,8,0),
        (3,8,1)
    ]
 
    # Array of workers
    arrWorkers = []
 
    # Create process pool object
    with multiprocessing.Pool(processes=int(nProcessorCount/2)) as polWorkers :
        for Params in arrWorkerParams :
            # Submit works
            resCurrentWorker = polWorkers.starmap_async(DoWork, [Params])
 
            # Cache async results current worker
            arrWorkers.append(resCurrentWorker)
        #Next
 
        # Collect results
        arrWorkerResults = []
        for WorkerResult in arrWorkers :
            # Do post processting
            arrWorkerResults.append(WorkerResult.get()[0])
        #Next
    #End With
 
    # Collect worker results
    for CurrentResult in arrWorkerResults :
        # Do post processting
        print(CurrentResult)
    #Next
#End If

优化进程池

concurrent.futures.ProcessPoolExecutor即使在max_workers1时也会创建子进程,multiprocessing.Pool亦同,这可能造成严重的性能下降,或在嵌套调用可以创建子进程的函数时产生问题。

因此,可以考虑建立一个包装函数,在指定子进程个数为1时,不创建子进程,该函数还可以进行多线程后端选择等工作:

import concurrent.futures
import multiprocessing
import sys
 
# Constants
MULTIPROCESSING_POOL_MAX_WORKERS_LIMIT_WINDOWS = 61
PROCESS_POOL_EXECUTOR_MAX_WORKERS_LIMIT_WINDOWS = 61
 
# Python 3.14 has fixed windows process pool limit for multiprocessing.Pool
# But the limitation still applies to concurrent.futures.ProcessPoolExecutor
# See https://github.com/python/cpython/pull/107873
# See also https://github.com/python/cpython/issues/89240 and https://docs.python.org/3/whatsnew/changelog.html#id247
if sys.version_info >= (3,14) :
    MULTIPROCESSING_POOL_MAX_WORKERS_LIMIT_WINDOWS = 3969
#End If
 
# Checks if current platform is Windows
def IsRunningOnWindows() -> bool :
    return sys.platform.lower().startswith("win")
#End Function
 
# Check if Python version greater than version specified by tuple (Major, Minor) or (Major, Minor, Micro)
def IsPythonVersionGreaterThan(tplVersion : tuple, IsEqualAllowed : bool = True) :
    if IsEqualAllowed :
        return sys.version_info >= tplVersion
    else :
        return sys.version_info > tplVersion
    #End If
#End Function
 
# Calculate CPU core count for parallel computing
def GetCPUCoreCountForParallelComputing(dCPUCoreDividingDenominator : float = 2, nMaxCoreCountToUse : int = -1, 
                                        IsPlatformCheckingsSkipped : bool = False, IsStrictPlatformCheckingsEnabled : bool = False) -> int :
    # Check arguments
    if dCPUCoreDividingDenominator < 1 :
        raise ValueError("dCPUCoreDividingDenominator must be equal to or greater than 1")
    #End If
 
    # Platform checking
    if not IsPlatformCheckingsSkipped :
        # On Windows, some multithreading libraries (e.g. concurrent.futures.ProcessPoolExecutor) cannont use too many CPUs
        # Python 3.14 has fixed windows process pool limit for multiprocessing.Pool
        # But the limitation still applies to concurrent.futures.ProcessPoolExecutor
        if IsRunningOnWindows() :
            if IsStrictPlatformCheckingsEnabled or (not IsPythonVersionGreaterThan((3,14))) :
                if nMaxCoreCountToUse <= 0 or nMaxCoreCountToUse > PROCESS_POOL_EXECUTOR_MAX_WORKERS_LIMIT_WINDOWS :
                    nMaxCoreCountToUse = PROCESS_POOL_EXECUTOR_MAX_WORKERS_LIMIT_WINDOWS
                #End If
            else :
                if nMaxCoreCountToUse <= 0 or nMaxCoreCountToUse > MULTIPROCESSING_POOL_MAX_WORKERS_LIMIT_WINDOWS :
                    nMaxCoreCountToUse = MULTIPROCESSING_POOL_MAX_WORKERS_LIMIT_WINDOWS
                #End If
            #End If
        #End If
    #End If
 
    # Calculate CPU count
    nCPUCount = multiprocessing.cpu_count()
    nCPUCountToUse = math.ceil(nCPUCount / dCPUCoreDividingDenominator)
    if nMaxCoreCountToUse > 0 :
        nCPUCountToUse = min(nMaxCoreCountToUse, nCPUCountToUse)
    #End If
 
    LogInfo(f"ParallelComputing: {nCPUCount} logical processor(s) found on your system, recalibrating parallel job count to {nCPUCountToUse}...")
 
    return nCPUCountToUse
#End Function
 
# Run jobs in parallel with process pool
# This is a wrapper of concurrent.futures.ProcessPoolExecutor
# This also prevents child processes from creating when nWorkers <= 1
#     arrFunctions is list of functions
#     arrArgs is list of tuple, with arguments (*args)
#     arrKwArgs is list of dict, with keyed arguments (**kwargs)
#     iPoolBackend selects which backend is used:
#         0 - Uses concurrent.futures.ProcessPoolExecutor
#         1 - Uses multiprocessing.Pool (warning: in this case, arrKwArgs will be ignored)
#     If IsBackendAutoSelectingEnabled, will use the following logic to select backend:
#         When running on Windows, nWorkers is None or nWorkers > PROCESS_POOL_EXECUTOR_MAX_WORKERS_LIMIT_WINDOWS, arrKwArgs is None: use multiprocessing.Pool
#         Otherwise, use concurrent.futures.ProcessPoolExecutor
# Returns list of results
def RunJobsWithProcessPool(arrFunctions : list, arrArgs : list = None, arrKwArgs : list = None,
    nWorkers : int = None, iPoolBackend : int = 0, IsBackendAutoSelectingEnabled : bool = True) -> list :
 
    # Handling null-references
    if arrArgs is None :
        arrArgs = []
    #End If
    if arrKwArgs is None :
        arrKwArgs = []
    #End If
    if iPoolBackend < 0 :
        iPoolBackend = 0
    elif iPoolBackend > 1 :
        iPoolBackend = 1
    #End If
 
    # Initialize result buffer
    arrResults = []
 
    # Check if we really need to create child workers
    IsChildWorkersNeeded = False
    if nWorkers is None :
        IsChildWorkersNeeded = True
    elif nWorkers > 1 :
        IsChildWorkersNeeded = True
    #End If
 
    # Backend auto selecting
    if IsBackendAutoSelectingEnabled :
        # Check if we are using Windows
        if IsRunningOnWindows() :
            # Check if we need a lot of processor cores
            IsProcessorCoreCountLarge = False
            if nWorkers is None :
                IsProcessorCoreCountLarge = True
            elif nWorkers > PROCESS_POOL_EXECUTOR_MAX_WORKERS_LIMIT_WINDOWS :
                IsProcessorCoreCountLarge = True
            #End If
            # Select backend
            if IsProcessorCoreCountLarge :
                iPoolBackend = 1
                # Avoid too many workers
                if nWorkers is None :
                    nWorkers = MULTIPROCESSING_POOL_MAX_WORKERS_LIMIT_WINDOWS
                elif nWorkers > MULTIPROCESSING_POOL_MAX_WORKERS_LIMIT_WINDOWS :
                    nWorkers = MULTIPROCESSING_POOL_MAX_WORKERS_LIMIT_WINDOWS
                #End If
            else :
                iPoolBackend = 0
            #End If
        else :
            iPoolBackend = 0
        #End If
    else :
        # Avoid ValueError caused by concurrent.futures.ProcessPoolExecutor on Windows
        if IsRunningOnWindows() :
            if iPoolBackend == 0 :
                if nWorkers is None :
                    nWorkers = PROCESS_POOL_EXECUTOR_MAX_WORKERS_LIMIT_WINDOWS
                elif nWorkers > PROCESS_POOL_EXECUTOR_MAX_WORKERS_LIMIT_WINDOWS :
                    nWorkers = PROCESS_POOL_EXECUTOR_MAX_WORKERS_LIMIT_WINDOWS
                #End If
            elif iPoolBackend == 1 :
                if nWorkers is None :
                    nWorkers = MULTIPROCESSING_POOL_MAX_WORKERS_LIMIT_WINDOWS
                elif nWorkers > MULTIPROCESSING_POOL_MAX_WORKERS_LIMIT_WINDOWS :
                    nWorkers = MULTIPROCESSING_POOL_MAX_WORKERS_LIMIT_WINDOWS
                #End If
            #End If
        #End If
    #End If
 
    # Run jobs
    if IsChildWorkersNeeded :
        # Use backend selected by iPoolBackend to run works
        if iPoolBackend == 0 :
            # Using concurrent.futures.ProcessPoolExecutor
            # Multithread workers
            arrWorkers = []
 
            # Create thread pool object
            with concurrent.futures.ProcessPoolExecutor(max_workers=nWorkers) as polWorkers :
                # Create and submit job package
                for i in range(0, len(arrFunctions)) :
                    # Get function object and arguments
                    fncFunction = arrFunctions[i]
                    tplArgs = arrArgs[i] if (i < len(arrArgs)) else tuple()
                    dctKwArgs = arrKwArgs[i] if (i < len(arrKwArgs)) else dict()
 
                    # Submit a job
                    ftrCurrentWorker = polWorkers.submit(fncFunction, *tplArgs, **dctKwArgs)
 
                    # Cache current worker
                    arrWorkers.append(ftrCurrentWorker)
                #Next
 
                # Register completion list
                arrCompletedTasks = concurrent.futures.as_completed(arrWorkers)
 
                # Collect results
                for WorkerResult in arrCompletedTasks :
                    arrResults.append(WorkerResult.result())
                #Next
            #End With
        elif iPoolBackend == 1 :
            # Using multiprocessing.Pool
            # Multithread workers
            arrWorkers = []
 
            # Create thread pool object
            with multiprocessing.Pool(processes=nWorkers) as polWorkers :
                # Create and submit job package
                for i in range(0, len(arrFunctions)) :
                    # Get function object and arguments
                    fncFunction = arrFunctions[i]
                    tplArgs = arrArgs[i] if (i < len(arrArgs)) else tuple()
 
                    # Submit a job
                    resCurrentWorker = polWorkers.starmap_async(fncFunction, [tplArgs])
 
                    # Cache async results current worker
                    arrWorkers.append(resCurrentWorker)
                #Next
 
                # Collect results
                for WorkerResult in arrWorkers :
                    arrResults.append(WorkerResult.get()[0])
                #Next
            #End With
        #End If
    else :
        # Run jobs in current process directly
        for i in range(0, len(arrFunctions)) :
            # Get function object and arguments
            fncFunction = arrFunctions[i]
            tplArgs = arrArgs[i] if (i < len(arrArgs)) else ()
            dctKwArgs = arrKwArgs[i] if (i < len(arrKwArgs)) else dict()
 
            # Run job
            resCurrentWorker = fncFunction(*tplArgs, **dctKwArgs)
 
            # Collect results
            arrResults.append(resCurrentWorker)
        #Next
    #End If
 
    return arrResults
#End Fucntion
 
# Worker function
def DoWork(Param1, Param2, Param3) :
    # Do some work
    WorkResult = f"Pokemon#{Param1}{Param2}{Param3}"
 
    return WorkResult
#End Function
 
# Main entry point
if __name__ == "__main__" :
    # Run jobs in parallel
    # Create functions & arguments list
    arrWorkerFuncs = []
    arrWorkerArgs = []
    # Reult=DoWork(2,4,5)
    arrWorkerFuncs.append(DoWork)
    arrWorkerArgs.append((2,4,5))
    # Reult=DoWork(2,5,0)
    arrWorkerFuncs.append(DoWork)
    arrWorkerArgs.append((2,5,0))
    # Reult=DoWork(3,8,0)
    arrWorkerFuncs.append(DoWork)
    arrWorkerArgs.append((3,8,0))
    # Reult=DoWork(3,8,1)
    arrWorkerFuncs.append(DoWork)
    arrWorkerArgs.append((3,8,1))
    # Run jobs
    nParallelCPUJobCount = GetCPUCoreCountForParallelComputing()
    arrWorkerResults = RunJobsWithProcessPool(arrWorkerFuncs, arrWorkerArgs,
        nWorkers=nParallelCPUJobCount)
    # Collect worker results
    for CurrentResult in arrWorkerResults :
        # Do post processting
        print(CurrentResult)
    #Next
#End If

进程池的内存使用问题

连续创建子进程时,子进程执行体可能不会及时退出,导致极高的内存占用,此时可以考虑在创建multiprocessing.Pool对象时传入maxtasksperchild参数,限制每个进程池可以连续执行的最大任务数。

参考资料:

https://docs.python.org/3/library/multiprocessing.html#the-spawn-and-forkserver-start-methods

https://gairuo.com/p/python-multiprocessing-pool

https://www.cnblogs.com/piperliu/articles/18615898

https://superfastpython.com/multiprocessing-pool-starmap_async/

https://martinlwx.github.io/en/data-parallel-with-two-different-api/

https://www.cnblogs.com/midworld/p/14614634.html

https://blog.winghau.com/article/000006/.html

it
除非特别注明,本页内容采用以下授权方式: Creative Commons Attribution-ShareAlike 3.0 License