如何使用多处理在一个非常大的列表中删除重复项?

How to use multiprocessing to drop duplicates in a very big list?(如何使用多处理在一个非常大的列表中删除重复项?)
本文介绍了如何使用多处理在一个非常大的列表中删除重复项?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着跟版网的小编来一起学习吧!

问题描述

假设我有一个包含随机数的巨大列表

Let's say I have a huge list containing random numbers for example

L = [random.randrange(0,25000000000) for _ in range(1000000000)]

我需要删除此列表中的重复项

I need to get rid of the duplicates in this list

我为包含较少元素的列表编写了这段代码

I wrote this code for lists containing a smaller number of elements

def remove_duplicates(list_to_deduplicate):
seen = set()
result=[]
for i in list_to_deduplicate:
    if i not in seen:
        result.append(i)
        seen.add(i)
return result

在上面的代码中,我创建了一个集合,以便我可以记住哪些数字已经出现在我正在处理的列表中,如果该数字不在集合中,那么我将它添加到我需要返回并保存的结果列表中它在集合中,因此它不会再次添加到结果列表中

In the code above I create a set so I can memorize what numbers have already appeared in the list I'm working on if the number is not in the set then I add it to the result list I need to return and save it in the set so it won't be added again in the result list

现在对于列表中的 1000000 个数字,一切都很好,我可以快速得到结果,但是对于比 1000000000 个问题更好的数字,我需要使用我机器上的不同内核来尝试解决问题,然后结合多进程的结果

Now for 1000000 number in a list all is good I can get a result fast but for numbers superior to let's say 1000000000 problems arise I need to use the different cores on my machine to try and break up the problem and then combine the results from multiple processes

我的第一个猜测是让所有进程都可以访问一个集合,但会出现许多复杂情况一个进程如何在另一个进程添加到集合时读取,我什至不知道是否可以在进程之间共享一个集合我知道我们可以使用队列或管道,但我不确定如何使用它

My first guess was to make a set accessible to all processes but many complications will arise How can a process read while maybe another one is adding to the set and I don't even know if it is possible to share a set between processes I know we can use a Queue or a pipe but I'm not sure on how to use it

谁能给我一个建议,告诉我解决这个问题的最佳方法是什么?我对任何新想法都持开放态度

Can someone give me an advice on what is the best way to solve this problem I am open to any new idea

推荐答案

我怀疑即使你最好的列表足够大,以至于多处理会改善时序.使用 numpy 和 多线程 可能是你最好的机会.

I'm skeptic even your greatest list is big enough so that multiprocessing would improve timings. Using numpy and multithreading is probably your best chance.

多处理引入了相当多的开销并增加了内存消耗,就像前面正确提到的@Frank Merrow.但是,对于多线程来说,情况并非如此(在此范围内).重要的是不要混淆这些术语,因为进程和线程是不一样的.同一进程中的线程共享它们的内存,不同的进程不共享.

Multiprocessing introduces quite some overhead and increases memory consumption like @Frank Merrow rightly mentioned earlier. That's not the case (to that extend) for multithreading, though. It's important to not mix these terms up because processes and threads are not the same. Threads within the same process share their memory, distinct processes do not.

在 Python 中实现多核的问题是 GIL,它没有允许多个线程(在同一个进程中)并行执行 Python 字节码.一些像 numpy 这样的 C 扩展可以释放 GIL,这可以从多核并行和多线程中获利.这是您通过使用 numpy 在重大改进的基础上加快速度的机会.

The problem with going multi-core in Python is the GIL, which doesn't allow multiple threads (in the same process) to execute Python bytecode in parallel. Some C-extensions like numpy can release the GIL, this enables profiting from multi-core parallelism with multithreading. Here's your chance to get some speed up on top of a big improvement just by using numpy.

from multiprocessing.dummy import Pool  # .dummy uses threads
import numpy as np

r = np.random.RandomState(42).randint(0, 25000000000, 100_000_000)
n_threads = 8

result = np.unique(np.concatenate(
    Pool(n_threads).map(np.unique, np.array_split(r, n_threads)))
).tolist()

使用 numpy 和线程池,拆分数组,使子数组在单独的线程中唯一,然后连接子数组并使重新组合的数组再次唯一.最后删除重组数组的重复项是必要的,因为在子数组中只能识别本地个重复项.

Use numpy and a thread-pool, split up the array, make the sub-arrays unique in separate threads, then concatenate the sub-arrays and make the recombined array once more unique again. The final dropping of duplicates for the recombined array is necessary because within the sub-arrays only local duplicates can be identified.

对于低熵数据(许多重复),使用 pandas.unique 而不是 numpy.unique 可以更快.不同于 numpy.unique 它还保留了外观的顺序.

For low entropy data (many duplicates) using pandas.unique instead of numpy.unique can be much faster. Unlike numpy.unique it also preserves order of appearance.

请注意,只有当 numpy 函数还不是多线程的 幕后通过调用低级数学库.因此,请务必进行测试,看看它是否真的能提高性能,不要认为这是理所当然的.

Note that using a thread-pool like above makes only sense if the numpy-function is not already multi-threaded under the hood by calling into low-level math libraries. So, always test to see if it actually improves performance and don't take it for granted.

使用范围内的 100M 随机生成的整数进行测试:

Tested with 100M random generated integers in the range:

  • 高熵:0 - 25_000_000_000(199560 次重复)
  • 低熵:0 - 1000
import time
import timeit
from multiprocessing.dummy import Pool  # .dummy uses threads

import numpy as np
import pandas as pd


def time_stmt(stmt, title=None):
    t = timeit.repeat(
        stmt=stmt,
        timer=time.perf_counter_ns, repeat=3, number=1, globals=globals()
    )
    print(f"	{title or stmt}")
    print(f"		{min(t) / 1e9:.2f} s")


if __name__ == '__main__':

    n_threads = 8  # machine with 8 cores (4 physical cores)

    stmt_np_unique_pool = 
"""
np.unique(np.concatenate(
    Pool(n_threads).map(np.unique, np.array_split(r, n_threads)))
).tolist()
"""

    stmt_pd_unique_pool = 
"""
pd.unique(np.concatenate(
    Pool(n_threads).map(pd.unique, np.array_split(r, n_threads)))
).tolist()
"""
    # -------------------------------------------------------------------------

    print(f"
high entropy (few duplicates) {'-' * 30}
")
    r = np.random.RandomState(42).randint(0, 25000000000, 100_000_000)

    r = list(r)
    time_stmt("list(set(r))")

    r = np.asarray(r)
    # numpy.unique
    time_stmt("np.unique(r).tolist()")
    # pandas.unique
    time_stmt("pd.unique(r).tolist()")    
    # numpy.unique & Pool
    time_stmt(stmt_np_unique_pool, "numpy.unique() & Pool")
    # pandas.unique & Pool
    time_stmt(stmt_pd_unique_pool, "pandas.unique() & Pool")

    # ---
    print(f"
low entropy (many duplicates) {'-' * 30}
")
    r = np.random.RandomState(42).randint(0, 1000, 100_000_000)

    r = list(r)
    time_stmt("list(set(r))")

    r = np.asarray(r)
    # numpy.unique
    time_stmt("np.unique(r).tolist()")
    # pandas.unique
    time_stmt("pd.unique(r).tolist()")
    # numpy.unique & Pool
    time_stmt(stmt_np_unique_pool, "numpy.unique() & Pool")
    # pandas.unique() & Pool
    time_stmt(stmt_pd_unique_pool, "pandas.unique() & Pool")

就像您在下面的时序中看到的那样,仅使用 numpy 而不使用多线程已经占到了最大的性能提升.另请注意 pandas.unique()numpy.unique() (仅)对于许多重复项更快.

Like you can see in the timings below, just using numpy without multithreading already accounts for the biggest performance improvement. Also note pandas.unique() being faster than numpy.unique() (only) for many duplicates.

high entropy (few duplicates) ------------------------------

    list(set(r))
        32.76 s
    np.unique(r).tolist()
        12.32 s
    pd.unique(r).tolist()
        23.01 s
    numpy.unique() & Pool
        9.75 s
    pandas.unique() & Pool
        28.91 s

low entropy (many duplicates) ------------------------------

    list(set(r))
        5.66 s
    np.unique(r).tolist()
        4.59 s
    pd.unique(r).tolist()
        0.75 s
    numpy.unique() & Pool
        1.17 s
    pandas.unique() & Pool
        0.19 s

这篇关于如何使用多处理在一个非常大的列表中删除重复项?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持跟版网!

本站部分内容来源互联网,如果有图片或者内容侵犯您的权益请联系我们删除!

相关文档推荐

python: iterating through a dictionary with list values(python:遍历具有列表值的字典)
What is the difference between chain and chain.from_iterable in itertools?(itertools中chain和chain.from_iterable有什么区别?)
python JSON only get keys in first level(python JSON只获取第一级的键)
Iterate over n successive elements of list (with overlapping)(迭代列表的 n 个连续元素(重叠))
Loop problem while iterating through a list and removing recurring elements(遍历列表并删除重复元素时出现循环问题)
Elegant way to skip elements in an iterable(跳过可迭代元素的优雅方式)