Source code for distarray.localapi.maps

# encoding: utf-8
# ---------------------------------------------------------------------------
#  Copyright (C) 2008-2014, IPython Development Team and Enthought, Inc.
#  Distributed under the terms of the BSD License.  See COPYING.rst.
# ---------------------------------------------------------------------------

Classes to manage the distribution-specific aspects of a LocalArray.

The Distribution class is the main entry point and is meant to be used by
LocalArrays to help translate between local and global index spaces.  It
manages `ndim` one-dimensional map objects.

The one-dimensional map classes BlockMap, CyclicMap, BlockCyclicMap, and
UnstructuredMap all manage the mapping tasks for their particular dimension.
All are subclasses of MapBase.  The reason for the several subclasses is to
allow more compact and efficient operations.


from __future__ import division

import operator
from functools import reduce
from numbers import Integral

import numpy as np
from distarray.externals.six.moves import range, zip, reduce

from distarray.localapi import construct
from distarray.metadata_utils import (make_grid_shape, normalize_grid_shape,
                                      normalize_dist, distribute_indices,

# Register numpy integer types with numbers.Integral ABC.

[docs]class Distribution(object): """Multi-dimensional Map class. Manages one or more one-dimensional map classes. """ def __init__(self, comm, dim_data): """Create a Distribution from a `dim_data` structure.""" self._maps = tuple(map_from_dim_dict(dim_dict) for dim_dict in dim_data) self.base_comm = construct.init_base_comm(comm) self.comm = construct.init_comm(self.base_comm, self.grid_shape) @classmethod
[docs] def from_shape(cls, comm, shape, dist=None, grid_shape=None): """Create a Distribution from a `shape` and optional arguments.""" dist = {0: 'b'} if dist is None else dist ndim = len(shape) dist_tuple = normalize_dist(dist, ndim) base_comm = construct.init_base_comm(comm) comm_size = base_comm.Get_size() if grid_shape is None: # Make a new grid_shape if not provided. grid_shape = make_grid_shape(shape, dist_tuple, comm_size) grid_shape = normalize_grid_shape(grid_shape, shape, dist_tuple, comm_size) comm = construct.init_comm(base_comm, grid_shape) grid_coords = comm.Get_coords(comm.Get_rank()) dim_data = [] for dist, size, grid_rank, grid_size in zip(dist_tuple, shape, grid_coords, grid_shape): dim_dict = dict(dist_type=dist, size=size, proc_grid_rank=grid_rank, proc_grid_size=grid_size) distribute_indices(dim_dict) dim_data.append(dim_dict) return cls(comm=base_comm, dim_data=dim_data)
def __getitem__(self, idx): return self._maps[idx] def __len__(self): return len(self._maps) @property def dim_data(self): return tuple(m.dim_dict for m in self._maps) @property def grid_shape(self): return tuple(m.grid_size for m in self._maps) @property def global_shape(self): return tuple(m.global_size for m in self._maps) @property def global_size(self): return reduce(operator.mul, self.global_shape, 1) @property def local_shape(self): return tuple(m.size for m in self._maps) @property def local_size(self): return reduce(operator.mul, self.local_shape, 1) @property def ndim(self): return len(self._maps) @property def comm_size(self): return self.base_comm.Get_size() @property def comm_rank(self): return self.base_comm.Get_rank() @property def dist(self): return tuple(m.dist for m in self._maps) @property def cart_coords(self): coords = tuple(m.grid_rank for m in self._maps) assert coords == tuple(self.comm.Get_coords(self.comm_rank)) return coords @property def global_slice(self): """Return a slice representing the global index space of this dimension. """ return tuple(m.global_slice for m in self._maps)
[docs] def coords_from_rank(self, rank): return self.comm.Get_coords(rank)
[docs] def rank_from_coords(self, coords): return self.comm.Get_cart_rank(coords)
[docs] def local_from_global(self, global_ind): """ Given `global_ind` indices, translate into local indices.""" _, idxs = sanitize_indices(global_ind, self.ndim, self.global_shape) local_idxs = [] for m, idx in zip(self._maps, global_ind): if isinstance(idx, Integral): local_idxs.append(m.local_from_global_index(idx)) elif isinstance(idx, slice): local_idxs.append(m.local_from_global_slice(idx)) else: raise TypeError("Index must be Integral or slice.") return tuple(local_idxs)
[docs] def global_from_local(self, local_ind): """ Given `local_ind` indices, translate into global indices.""" global_idxs = [] for m, idx in zip(self._maps, local_ind): if isinstance(idx, Integral): global_idxs.append(m.global_from_local_index(idx)) elif isinstance(idx, slice): global_idxs.append(m.global_from_local_slice(idx)) else: raise TypeError("Index must be Integral or slice.") return tuple(global_idxs)
[docs] def local_flat_from_local(self, local_ind): local_strides = _get_strides(self.local_shape) def flatten(idx, strides): return sum(a * b for (a, b) in zip(idx, strides)) return flatten(local_ind, local_strides)
[docs]def map_from_dim_dict(dd): """ Factory function that returns a 1D map for a given dimension dictionary. """ # Extract parameters from the dimension dictionary. dist_type = dd['dist_type'] size = dd['size'] start = dd.get('start', None) stop = dd.get('stop', None) grid_rank = dd.get('proc_grid_rank', 0) grid_size = dd.get('proc_grid_size', 1) block_size = dd.get('block_size', 1) indices = dd.get('indices', None) if dist_type == 'n': return BlockMap(global_size=size, grid_size=grid_size, grid_rank=grid_rank, start=0, stop=size) if dist_type == 'b': return BlockMap(global_size=size, grid_size=grid_size, grid_rank=grid_rank, start=start, stop=stop) if dist_type == 'c' and block_size == 1: return CyclicMap(global_size=size, grid_size=grid_size, grid_rank=grid_rank, start=start) if dist_type == 'c' and block_size > 1: return BlockCyclicMap(global_size=size, grid_size=grid_size, grid_rank=grid_rank, start=start, block_size=block_size) if dist_type == 'u': return UnstructuredMap(global_size=size, grid_size=grid_size, grid_rank=grid_rank, indices=indices) raise ValueError("Unsupported dist_type of %r" % dist_type)
def _accum(start, next): return tuple(s * next for s in start) + (next,) def _get_strides(shape): return reduce(_accum, tuple(shape[1:]) + (1,), ())
[docs]class MapBase(object): """ Base class for all one dimensional Map classes. """ pass
[docs]class BlockMap(MapBase): """ One-dimensional block map class. """ dist = 'b' def __init__(self, global_size, grid_size, grid_rank, start, stop): self.start = start self.stop = stop self.local_size = stop - start self.global_size = global_size self.grid_size = grid_size self.grid_rank = grid_rank
[docs] def local_from_global_index(self, gidx): if gidx < self.start or gidx >= self.stop: raise IndexError("Global index %s out of bounds" % gidx) return gidx - self.start
[docs] def local_from_global_slice(self, gidx): # we don't make the effort to compute the exact slice # `__getitem__` doesn't care about overly-large slices, we just # have to get the offset from the start correct based on the `step` start = gidx.start if gidx.start is not None else 0 stop = gidx.stop if gidx.stop is not None else self.global_size step = gidx.step if gidx.step is not None else 1 new_start = start - self.start if new_start < 0: # don't allow negative starts new_start += step * abs(new_start // step) if new_start < 0: new_start += step new_stop = stop - self.start return slice(new_start, new_stop, gidx.step)
[docs] def global_from_local_index(self, lidx): if lidx >= self.local_size: raise IndexError("Local index %s out of bounds" % lidx) return lidx + self.start
[docs] def global_from_local_slice(self, lidx): start = lidx.start if lidx.start is not None else 0 stop = lidx.stop if lidx.stop is not None else self.global_size new_start = start + self.start new_stop = stop + self.start return slice(new_start, new_stop)
@property def dim_dict(self): return {'dist_type': self.dist, 'size': self.global_size, 'proc_grid_rank': self.grid_rank, 'proc_grid_size': self.grid_size, 'start': self.start, 'stop': self.stop, } @property def global_iter(self): return iter(range(self.start, self.stop)) @property def size(self): return self.local_size @property def global_slice(self): """Return a slice representing the global index space of this dimension. """ return slice(self.start, self.stop)
[docs]class CyclicMap(MapBase): """ One-dimensional cyclic map class. """ dist = 'c' def __init__(self, global_size, grid_size, grid_rank, start): if start != grid_rank: msg = "start value (given %d) does not equal grid_rank (given %d)" raise ValueError(msg % (start, grid_rank)) if start >= grid_size: msg = "start (%d) is greater or equal to grid_size (%d)" raise ValueError(msg % (start, grid_size)) self.start = start self.grid_size = grid_size self.grid_rank = grid_rank self.local_size = (global_size - 1 - grid_rank) // grid_size + 1 self.global_size = global_size
[docs] def local_from_global_index(self, gidx): if (gidx - self.start) % self.grid_size: raise IndexError("Global index %s out of bounds" % gidx) return (gidx - self.start) // self.grid_size
[docs] def global_from_local_index(self, lidx): if lidx >= self.local_size: raise IndexError("Local index %s out of bounds" % lidx) return (lidx * self.grid_size) + self.start
@property def dim_dict(self): return {'dist_type': self.dist, 'size': self.global_size, 'proc_grid_rank': self.grid_rank, 'proc_grid_size': self.grid_size, 'start': self.start, } @property def global_iter(self): return iter(range(self.start, self.global_size, self.grid_size)) @property def size(self): return self.local_size @property def global_slice(self): """Return a slice representing the global index space of this dimension. """ return slice(self.start, self.global_size, self.grid_size)
[docs]class BlockCyclicMap(MapBase): """ One-dimensional block cyclic map class. """ dist = 'c' def __init__(self, global_size, grid_size, grid_rank, start, block_size): if start % block_size: msg = "Value of start (%r) does not evenly divide block_size (%r)." raise ValueError(msg % (start, block_size)) self.start = start self.start_block = start // block_size self.block_size = block_size global_nblocks, partial = divmod(global_size, block_size) self.grid_size = grid_size self.grid_rank = grid_rank local_nblocks = (global_nblocks - 1 - grid_rank) // grid_size + 1 local_partial = partial if grid_rank == 0 else 0 self.local_size = local_nblocks * block_size + local_partial self.global_size = global_size
[docs] def local_from_global_index(self, gidx): global_block, offset = divmod(gidx, self.block_size) if (global_block - self.start_block) % self.grid_size: raise IndexError("Global index %s out of bounds" % gidx) return self.block_size * ((global_block - self.start_block) // self.grid_size) + offset
[docs] def global_from_local_index(self, lidx): if lidx >= self.local_size: raise IndexError("Local index %s out of bounds" % lidx) local_block, offset = divmod(lidx, self.block_size) global_block = (local_block * self.grid_size) + self.start_block return global_block * self.block_size + offset
@property def dim_dict(self): return {'dist_type': self.dist, 'size': self.global_size, 'proc_grid_rank': self.grid_rank, 'proc_grid_size': self.grid_size, 'start': self.start, 'block_size': self.block_size, } @property def global_iter(self): _global_index = np.empty((self.local_size,), dtype=np.int32) # FIXME: this is the slow way to do this... for i in range(self.local_size): _global_index[i] = self.global_from_local_index(i) return iter(_global_index) @property def size(self): return self.local_size @property def global_slice(self): """Return a slice representing the global index space of this dimension; only possible for block_size == 1. """ if self.block_size == 1: return slice(self.start, self.global_size, self.grid_size) else: msg = "`global_slice` not possible for block_size > 1." raise AttributeError(msg)
[docs]class UnstructuredMap(MapBase): """ One-dimensional unstructured map class. """ dist = 'u' def __init__(self, global_size, grid_size, grid_rank, indices): self.global_size = global_size self.grid_size = grid_size self.grid_rank = grid_rank self.indices = np.asarray(indices) self.local_size = len(self.indices) local_indices = range(self.local_size) self._local_index = dict(zip(self.indices, local_indices))
[docs] def local_from_global_index(self, gidx): try: lidx = self._local_index[gidx] except KeyError: raise IndexError("Global index %s out of bounds" % gidx) return lidx
[docs] def global_from_local_index(self, lidx): return self.indices[lidx]
@property def dim_dict(self): return {'dist_type': self.dist, 'size': self.global_size, 'proc_grid_rank': self.grid_rank, 'proc_grid_size': self.grid_size, 'indices': self.indices, } @property def global_iter(self): return iter(self.indices) @property def size(self): return self.local_size