通过 SCOOP 使用 map 函数

当处理 list 或其他类型的序列时,一种很常用的操作是对序列中的每一个元素都执行相同的操作,然后收集结果。举例说,通过 Python IDLE 可以对一个 list 这样更新:

>>>items = [1,2,3,4,5,6,7,8,9,10]
>>>updated_items = []
>>>for x in items:
>>>    updated_items.append(x*2)
>>> updated_items
>>>  [2, 4, 6, 8, 10, 12, 14, 16, 18, 20]

这是比较常见的一个操作,但是 Python 内置了一些功能可以更优雅地完成类似操作。

Python 的函数 map(aFunction, aSequence) 将在序列的每一个元素上调用传入的函数,并将结果以 list 的形式返回。用这个函数完成上面的操作:

>>>items = [1,2,3,4,5,6,7,8,9,10]
>>>def multiplyFor2(x):return x*2
>>>print(list(map(multiplyFor2,items)))
>>>[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]

这里,我们将自定义的 multiplyFor2 函数传给 map 。 它将对 items 中所有的元素执行此函数,最后以 list 的形式返回结果。

我们也可以将 lambda 函数(不用绑定变量名的匿名函数)当做参数传给 map 函数。这段代码就变成以下这样:

>>>items = [1,2,3,4,5,6,7,8,9,10]
>>>print(list(map(lambda x:x*2,items)))
>>>[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]

内置的 map 函数比手动写 for 循环的性能更高。

1. 准备工作

SCOOP 模块提供了多个 map 函数可以将异步计算任务下发到多个计算节点:

  • futures.map(func, iterables, kargs): 此函数返回一个生成器,可以按照输入的顺序遍历结果。可以说是内置 map 函数的一个并行执行版本。
  • futures.map_as_completed(func, iterables, kargs): 每当有结果出现时,就立刻 yield 出来。
  • futures.scoop.futures.mapReduce(mapFunc, reductionOp, iterables, kargs): map 函数执行过后可以通过此函数执行 reduction 函数。返回结果是一个元素。

2. 如何做…

在这个例子中,我们将对比 SCOOP 实现 MapReduce 和 Python 内置函数实现:

"""
Compare SCOOP MapReduce with a serial implementation
"""
import operator
import time
from scoop import futures
def simulateWorkload(inputData):
    time.sleep(0.01)
    return sum(inputData)
def CompareMapReduce():
    mapScoopTime = time.time()
    res = futures.mapReduce(
        simulateWorkload,
        operator.add,
        list([a] * a for a in range(1000)),
    )
    mapScoopTime = time.time() - mapScoopTime
    print("futures.map in SCOOP executed in {0:.3f}s with result:{1}".format(
          mapScoopTime, res))
    mapPythonTime = time.time()
    res = sum(map(simulateWorkload, list([a] * a for a in range(1000))))
    mapPythonTime = time.time() - mapPythonTime
    print("map Python executed in: {0:.3f}s with result: {1}".format(
        mapPythonTime, res))
if __name__ == '__main__':
    CompareMapReduce()

这段代码通过以下命令来执行:

python -m scoop map_reduce.py
> [2015-06-12 20:13:25,602] launcher  INFO    SCOOP 0.7.2 dev on win32
using Python 3.4.3 (v3.4.3:9b73f1c3e601, Feb 24 2015, 22:43:06) [MSC
v.1600 32 bit (Intel)], API: 1013
[2015-06-12 20:13:25,602] launcher  INFO Deploying 2 worker(s) over 1
host(s).
[2015-06-12 20:13:25,602] launcher  INFO Worker d--istribution:
[2015-06-12 20:13:25,602] launcher  INFO 127.0.0.1:       1 + origin
Launching 2 worker(s) using an unknown shell.
futures.map in SCOOP executed in 8.459s with result: 332833500
map Python executed in: 10.034s with result: 332833500
[2015-06-12 20:13:45,344] launcher  (127.0.0.1:2559) INFO
is done.
[2015-06-12 20:13:45,368] launcher  (127.0.0.1:2559) INFO
cleaning spawned subprocesses.

3. 讨论

在这个例子中,我们将 SCOOP 实现的 MapReduce 和 Python 内置函数的 MapReduce 来对比。主要的函数是 CompareMapReduce(),里面有两种实现和对时间的统计。程序结构如下:

mapScoopTime = tme.time()
#Run SCOOP MapReduce
mapScoopTime = time.time() – mapScoopTime
mapPythonTime = time.time()
#Run serial MapReduce
mapPythonTime = time.time() - mapPythonTime

在输出中,我们打印了执行时间:

futures.map in SCOOP executed in 8.459s with result: 332833500
map Python executed in: 10.034s with result: 332833500

为了得到比较明显的时间比较,我们在 simulatedWordload 函数中引入了 time.sleep 来延长计算时间。

def simulateWorkload(inputData, chose=None):
    time.sleep(0.01)
    return sum(inputData)

SCOOP 版本的 MapReduce 如下:

res = futures.mapReduce(
    simulateWorkload,
    operator.add,
    list([a] * a for a in range(1000)),
)

futures.mapReduce 函数需要以下参数:

  • simulateWork : 这是要执行的 Futures,注意这个 callable 必须有返回值。
  • operator.add : 此函数将会在 reduce 操作的时候调用,必须接收两个参数,返回一个值。
  • list(...) : 一个可迭代对象,其中每一个元素都会传给 callable 对象作为 Future。 使用 Python 内置函数实现的 MapReduce 如下:
res = sum(map(simulateWorkload,
              list([a] * a for a in range(1000))))

Python 内置的 map() 函数接受两个参数: simulateWorkload 函数和可迭代的 list() 对象。Reduce 操作我们只是简单地用 Python 内置的 sum() 函数。

下一节:Python Remote Objects (Pyro4) 实现了类似 Java 的远程方法调用(Remote Method Invocation, RMI). 可以调用一个远程对象(存在于另一个进程中,甚至是另一台机器上),就像调用本地对象一样(处于和调用者一样的进程)。从概念的角度讲,RMI 的技术可以追溯到远程过程调用(remote procedure call,RPC),RMI 是远程过程调用技术针对面向对象范式进行改造——方法替换过程。在面向对象系统中,对远程方法调用使用这样一种机制可以在项目的统一性和对称性上有很多优势,因为这样我们可以复用同一应用不同对象或方法之间调用的模型。