使用带有 multiprocessing.Pool 的锁时遇到问题:酸洗错误

Trouble using a lock with multiprocessing.Pool: pickling error(使用带有 multiprocessing.Pool 的锁时遇到问题:酸洗错误)
本文介绍了使用带有 multiprocessing.Pool 的锁时遇到问题:酸洗错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着跟版网的小编来一起学习吧!

问题描述

我正在构建一个 python 模块来从大量文本中提取标签,虽然它的结果质量很高,但它的执行速度非常慢.我试图通过使用多处理来加速这个过程,这也很有效,直到我尝试引入一个锁,以便一次只有一个进程连接到我们的数据库.我一生都无法弄清楚如何完成这项工作-尽管进行了很多搜索和调整,但我仍然收到 PicklingError: Can't pickle <type 'thread.lock'>: attribute lookup thread.lock 失败.这是有问题的代码 - 在我尝试将锁定对象作为 f 的参数传递之前它运行良好.

I'm building a python module to extract tags from a large corpus of text, and while its results are high quality it executes very slowly. I'm trying to speed the process up by using multiprocessing, and that was working too, until I tried to introduce a lock so that only one process was connecting to our database at a time. I can't figure out for the life of me how to make this work - despite much searching and tweaking I am still getting a PicklingError: Can't pickle <type 'thread.lock'>: attribute lookup thread.lock failed. Here's the offending code - it worked fine until I tried to pass a lock object as an argument for f.

def make_network(initial_tag, max_tags = 2, max_iter = 3):
    manager = Manager()
    lock = manager.Lock()
    pool = manager.Pool(8)

    # this is a very expensive function that I would like to parallelize 
    # over a list of tags. It involves a (relatively cheap) call to an external
    # database, which needs a lock to avoid simultaneous queries. It takes a list
    # of strings (tags) as its sole argument, and returns a list of sets with entries
    # corresponding to the input list.
    f = partial(get_more_tags, max_tags = max_tags, lock = lock) 

    def _recursively_find_more_tags(tags, level):
        if level >= max_iter:
            raise StopIteration
        new_tags = pool.map(f, tags)
        to_search = []
        for i, s in zip(tags, new_tags):
            for t in s:
                joined = ' '.join(t)
                print i + "|" + joined
                to_search.append(joined)
        try:
            return _recursively_find_more_tags(to_search, level+1)
        except StopIteration:
            return None

    _recursively_find_more_tags([initial_tag], 0)

推荐答案

你的问题是锁对象不可picklable.在这种情况下,我可以为您看到两种可能的解决方案.

Your problem is that lock objects are not picklable. I can see two possible solutions for you in that case.

  • 为避免这种情况,您可以将锁变量设为全局变量.然后,您将能够在池进程函数中直接将其作为全局变量引用,而不必将其作为参数传递给池进程函数.这是因为 Python 在创建池进程时使用 OS fork 机制,因此将创建池进程的进程的全部内容复制到它们.这是将锁传递给使用 multiprocessing 包创建的 Python 进程的唯一方法.顺便说一句,没有必要只为这个锁使用 Manager 类.通过此更改,您的代码将如下所示:

  • To avoid this, you can make your lock variable a global variable. Then you will be able to reference it within your pool process function directly as a global variable, and will not have to pass it as an argument to the pool process function. This works because Python uses the OS fork mechanism when creating the pool processes and hence copies the entire contents of the process that creates the pool processes to them. This is the only way of passing a lock to a Python process created with the multiprocessing package. Incidentally, it is not necessary to use the Manager class just for this lock. With this change your code would look like this:

import multiprocessing
from functools import partial

lock = None  # Global definition of lock
pool = None  # Global definition of pool


def make_network(initial_tag, max_tags=2, max_iter=3):
    global lock
    global pool
    lock = multiprocessing.Lock()
    pool = multiprocessing.Pool(8)


def get_more_tags():
    global lock
    pass


# this is a very expensive function that I would like to parallelize
# over a list of tags. It involves a (relatively cheap) call to an external
# database, which needs a lock to avoid simultaneous queries. It takes a
# list of strings (tags) as its sole argument, and returns a list of sets
# with entries corresponding to the input list.
f = partial(get_more_tags, max_tags=max_tags) 

def _recursively_find_more_tags(tags, level):
    global pool
    if level >= max_iter:
        raise StopIteration
    new_tags = pool.map(f, tags)
    to_search = []
    for i, s in zip(tags, new_tags):
        for t in s:
            joined = ' '.join(t)
            print(i + "|" + joined)
            to_search.append(joined)
    try:
        return _recursively_find_more_tags(to_search, level + 1)
    except StopIteration:
        return None

_recursively_find_more_tags([initial_tag], 0)

在您的真实代码中,锁和池变量可能是类实例变量.

In your real code, it is possible that the lock and pool variables might be class instance variables.

  • 完全避免使用锁但开销可能稍高的第二种解决方案是使用 multiprocessing.Process 创建另一个进程并通过 multiprocessing.Queue 到您的每个池进程.此过程将负责运行您的数据库查询.您将使用队列来允许池进程将参数发送到管理数据库查询的进程.由于所有池进程将使用相同的队列,因此对数据库的访问将自动序列化.额外的开销将来自数据库查询参数和查询响应的酸洗/解酸.请注意,您可以将 multiprocessing.Queue 对象作为参数传递给池进程.另请注意,基于 multiprocessing.Lock 的解决方案不适用于未使用 fork 语义创建进程的 Windows.
  • A second solution which avoids the use of locks altogether but which might have slightly higher overhead would be to create another process with multiprocessing.Process and connect it via a multiprocessing.Queue to each of your pool processes. This process would be responsible for running your database query. You would use the queue to allow your pool processes to send parameters to the process that managed the database query. Since all the pool processes would use the same queue, access to the database would automatically be serialized. The additional overheads would come from the pickling/unpickling of the database query arguments and the query response. Note that you can pass a multiprocessing.Queue object to a pool process as an argument. Note also that the multiprocessing.Lock based solution would not work on Windows where process are not created with fork semantics.

这篇关于使用带有 multiprocessing.Pool 的锁时遇到问题:酸洗错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持跟版网!

本站部分内容来源互联网,如果有图片或者内容侵犯了您的权益,请联系我们,我们会在确认后第一时间进行删除!

相关文档推荐

build conda package from local python package(从本地 python 包构建 conda 包)
How can I see all packages that depend on a certain package with PIP?(如何使用 PIP 查看依赖于某个包的所有包?)
How to organize multiple python files into a single module without it behaving like a package?(如何将多个 python 文件组织到一个模块中而不像一个包一样?)
Check if requirements are up to date(检查要求是否是最新的)
How to upload new versions of project to PyPI with twine?(如何使用 twine 将新版本的项目上传到 PyPI?)
Why #egg=foo when pip-installing from git repo(为什么从 git repo 进行 pip 安装时 #egg=foo)