在下面的示例问题中,主程序创建一个长度随机字符串列表data_size。如果没有多处理,数据将直接发送到Test.iterate()类只是将字符串添加Test-到每个随机字符串的开头的位置。在没有多处理的情况下运行时,代码可以很好地处理小值data_size和大值data_size。

我决定为这个测试问题添加一个多处理能力,并将多处理的核心组件分解为类标题MultiProc。成员函数Multiproc.run_processes()管理类中的所有函数。该函数假定输入列表将被划分为x个较小的列表,具体取决于用户希望使用的进程数。结果,该函数通过确定每个子列表相对于初始列表的上下索引来开始,因此代码知道要为每个线程迭代哪些部分。然后,该函数启动进程,启动进程,加入进程,从中提取数据Queue,然后根据传递给主函数的计数器重新排序返回的数据。MultiProc类在小值时工作得相当好data_size,但高于~500的值,代码永远不会终止,但我怀疑根据内存,值会因计算机而异。但是,在某些时候,多进程函数停止工作,我怀疑它与从多进程返回数据的方式有关。有谁知道可能导致此问题的原因以及如何解决?

from multiprocessing import Process, Queue

from itertools import chain

import string

import random

class Test:

def __init__(self, array_list):

self.array_list = array_list

def func(self, names):

return ‘Test-‘ + names

def iterate(self, upper, lower, counter):

output = [self.func(self.array_list[i]) for i in range(lower, upper)]

return output, counter

class MultiProc:

def __init__(self, num_procs, data_array, func):

self.num_procs = num_procs

self.data_array = data_array

self.func = func

if self.num_procs > len(self.data_array):

self.num_procs = len(self.data_array)

self.length = int((len(self.data_array) / self.num_procs) // 1)

def run_processes(self):

upper = self.__determine_upper_indices()

lower = self.__determine_lower_indices(upper)

p, q = self.__initiate_proc(self.func, upper, lower)

self.__start_thread(p)

self.__join_threads(p)

results = self.__extract_data(q)

new = self.__reorder_data(results)

return new

def __determine_upper_indices(self):

upper = [i * self.length for i in range(1, self.num_procs)]

upper.append(len(self.data_array))

return upper

def __determine_lower_indices(self, upper):

lower = [upper[i] for i in range(len(upper) – 1)]

lower = [0] + lower

return lower

def __initiate_proc(self, func, upper, lower):

q = Queue()

p = [Process(target=self.run_and_send_back_output,

args=(q, func, upper[i], lower[i], i))

for i in range(self.num_procs)]

return p, q

def __start_thread(self, p):

[p[i].start() for i in range(self.num_procs)]

def __join_threads(self, p):

[p[i].join() for i in range(self.num_procs)]

def __extract_data(self, q):

results = []

while not q.empty():

results.extend(q.get())

return results

def __reorder_data(self, results):

new = [results[i – 1] for j in range(self.num_procs)

for i in range(len(results)) if results[i] == j]

new = list(chain.from_iterable(new))

return new

def run_and_send_back_output(self, queue, func, *args):

result = func(*args) # run the func

queue.put(result) # send the result back

def id_generator(size=6, chars=string.ascii_uppercase + string.digits):

return ”.join(random.choice(chars) for _ in range(size))

if __name__ == “__main__”:

random.seed(1234)

data_size = 9

num_proc = 2

test_list = [id_generator() for i in range(data_size)]

obj1 = Test(test_list)

result1 = obj1.iterate(data_size, 0, 1)

print(result1)

multi = MultiProc(num_proc, test_list, obj1.iterate)

result2 = multi.run_processes()

print(result2)

# >> [‘Test-2HAFCF’, ‘Test-GWPBBB’, ‘Test-W43JFL’, ‘Test-HA65PE’,

# ‘Test-83EF6C’, ‘Test-R9ET4W’, ‘Test-RPM37B’, ‘Test-6EAVJ4’,

# ‘Test-YKDE5K’]

写回答关注邀请回答

提问于 2018-09-06

当数据集太大时,Python多处理示例永远不会终止

写回答关注

1 个回答

热门排序

用户回答回答于 2018-09-06

你的主要问题是:

self.__start_thread(p)

self.__join_threads(p)

results = self.__extract_data(q)

您启动尝试将某些内容放入队列的工作人员,然后加入工作人员,然后才开始从队列中检索数据。然而,工作人员只能在将所有数据刷新到底层管道后退出,否则将在退出时阻止。在开始从管道中检索元素之前,连接过程像这样被阻止可能导致死锁。

也许你应该调查multiprocessing.Pool,因为你想要实现的是某种map()操作。你的例子可以更优雅地重写这样的东西:

from multiprocessing import Pool

import string

import random

def func(name):

return ‘Test-‘ + name

def id_generator(size=6, chars=string.ascii_uppercase + string.digits):

return ”.join(random.choice(chars) for _ in range(size))

if __name__ == “__main__”:

random.seed(1234)

data_size = 5000

num_proc = 2

test_list = [id_generator() for i in range(data_size)]

with Pool(num_proc) as pool:

result = pool.map(func, test_list)

print(result)

发表评论

电子邮件地址不会被公开。 必填项已用*标注