Source code for distarray.mpi_engine

# encoding: utf-8
# ---------------------------------------------------------------------------
#  Copyright (C) 2008-2014, IPython Development Team and Enthought, Inc.
#  Distributed under the terms of the BSD License.  See COPYING.rst.
# ---------------------------------------------------------------------------
"""
The engine_loop function and utilities necessary for it.
"""

from functools import reduce
from importlib import import_module
import types

from distarray.metadata_utils import arg_kwarg_proxy_converter
from distarray.localapi import LocalArray
from distarray.localapi.proxyize import Proxy

from distarray.mpionly_utils import (initial_comm_setup,
                                     make_targets_comm,
                                     get_comm_world)


[docs]class Engine(object): INTERCOMM = None def __init__(self): self.world = get_comm_world() self.world_ranks = list(range(self.world.size)) # make engine and client comm self.client_rank = 0 self.engine_ranks = [i for i in self.world_ranks if i != self.client_rank] # make engines intracomm (Context._base_comm): Engine.INTERCOMM = initial_comm_setup() assert self.world.rank != self.client_rank while True: msg = Engine.INTERCOMM.recv(source=self.client_rank) val = self.parse_msg(msg) if val == 'kill': break Engine.INTERCOMM.Free()
[docs] def is_engine(self): if self.world.rank != self.client_rank: return True else: return False
[docs] def parse_msg(self, msg): to_do = msg[0] what = {'func_call': self.func_call, 'execute': self.execute, 'push': self.push, 'pull': self.pull, 'kill': self.kill, 'free_comm': self.free_comm, 'delete': self.delete, 'make_targets_comm': self.engine_make_targets_comm, 'builtin_call': self.builtin_call} func = what[to_do] ret = func(msg) return ret
[docs] def delete(self, msg): obj = msg[1] if isinstance(obj, Proxy): obj.cleanup() else: name = obj try: module = import_module('__main__') delattr(module, name) except AttributeError: pass
[docs] def func_call(self, msg): func_data = msg[1] args = msg[2] kwargs = msg[3] nonce, context_key = msg[4] autoproxyize = msg[5] module = import_module('__main__') module.proxyize.set_state(nonce) args, kwargs = arg_kwarg_proxy_converter(args, kwargs) new_func_globals = module.__dict__ # add proper proxyize, context_key new_func_globals.update({'proxyize': module.proxyize, 'context_key': context_key}) new_func = types.FunctionType(func_data[0], new_func_globals, func_data[1], func_data[2], func_data[3]) res = new_func(*args, **kwargs) if autoproxyize and isinstance(res, LocalArray): res = module.proxyize(res) Engine.INTERCOMM.send(res, dest=self.client_rank)
[docs] def execute(self, msg): main = import_module('__main__') code = msg[1] exec(code, main.__dict__)
[docs] def push(self, msg): d = msg[1] module = import_module('__main__') for k, v in d.items(): pieces = k.split('.') place = reduce(getattr, [module] + pieces[:-1]) setattr(place, pieces[-1], v)
[docs] def pull(self, msg): name = msg[1] module = import_module('__main__') res = reduce(getattr, [module] + name.split('.')) Engine.INTERCOMM.send(res, dest=self.client_rank)
[docs] def free_comm(self, msg): comm = msg[1].dereference() comm.Free()
[docs] def kill(self, msg): """Break out of the engine loop.""" return 'kill'
[docs] def engine_make_targets_comm(self, msg): targets = msg[1] make_targets_comm(targets)
[docs] def builtin_call(self, msg): func = msg[1] args = msg[2] kwargs = msg[3] args, kwargs = arg_kwarg_proxy_converter(args, kwargs) res = func(*args, **kwargs) Engine.INTERCOMM.send(res, dest=self.client_rank)