diff --git a/nutils/parallel.py b/nutils/parallel.py index ef57da177..e682a35f6 100644 --- a/nutils/parallel.py +++ b/nutils/parallel.py @@ -166,4 +166,34 @@ def _wait(pid): treelog.error('process {} {}'.format(pid, msg)) return False + +def Manager(): + '''Return a manager for sharing data between processes. + + The returned manager only supports the methods + :meth:`multiprocessing.managers.SyncManager.dict` and + :meth:`multiprocessing.managers.SyncManager.list`. + ''' + + if maxprocs.current > 1: + return multiprocessing.Manager() + else: + return _NoManager() + + +class _NoManager: + + def __enter__(self): + return self + + def __exit__(self, *exc_info): + pass + + def dict(self, value=()): + return dict(value) + + def list(self, value=()): + return list(value) + + # vim:sw=4:sts=4:et diff --git a/nutils/topology.py b/nutils/topology.py index eb8ffd76a..4941d1e8b 100644 --- a/nutils/topology.py +++ b/nutils/topology.py @@ -1416,39 +1416,44 @@ def trim(self, levelset, maxrefine, ndivisions=8, name='trimmed', leveltopo=None if arguments is None: arguments = {} - refs = [] - if leveltopo is None: - ielem_arg = evaluable.Argument('_trim_index', (), dtype=int) - coordinates = self.references.getpoints('vertex', maxrefine).get_evaluable_coords(ielem_arg) - transform_chains = self.transforms.get_evaluable(ielem_arg), self.opposites.get_evaluable(ielem_arg) - levelset = levelset.lower(function.LowerArgs.for_space(self.space, transform_chains, coordinates)).optimized_for_numpy - with log.iter.percentage('trimming', range(len(self)), self.references) as items: - for ielem, ref in items: - levels = levelset.eval(_trim_index=ielem, **arguments) - refs.append(ref.trim(levels, maxrefine=maxrefine, ndivisions=ndivisions)) - else: - log.info('collecting leveltopo elements') - coordinates = evaluable.Points(evaluable.NPoints(), self.ndims) - transform_chain = transform.EvaluableTransformChain.from_argument('trans', self.transforms.todims, self.transforms.fromdims) - levelset = levelset.lower(function.LowerArgs.for_space(self.space, (transform_chain, transform_chain), coordinates)).optimized_for_numpy - bins = [set() for ielem in range(len(self))] - for trans in leveltopo.transforms: - ielem, tail = self.transforms.index_with_tail(trans) - bins[ielem].add(tail) - fcache = cache.WrapperCache() - with log.iter.percentage('trimming', self.references, self.transforms, bins) as items: - for ref, trans, ctransforms in items: - levels = numpy.empty(ref._nlinear_by_level(maxrefine)) - cover = list(fcache[ref._linear_cover](frozenset(ctransforms), maxrefine)) - # confirm cover and greedily optimize order - mask = numpy.ones(len(levels), dtype=bool) - while mask.any(): - imax = numpy.argmax([mask[indices].sum() for tail, points, indices in cover]) - tail, points, indices = cover.pop(imax) - levels[indices] = levelset.eval(trans=trans + tail, _points=points, **arguments) - mask[indices] = False - refs.append(ref.trim(levels, maxrefine=maxrefine, ndivisions=ndivisions)) - log.debug('cache', fcache.stats) + with parallel.Manager() as manager: + refs = manager.list([None]*len(self)) + if leveltopo is None: + ielem_arg = evaluable.Argument('_trim_index', (), dtype=int) + coordinates = self.references.getpoints('vertex', maxrefine).get_evaluable_coords(ielem_arg) + transform_chains = self.transforms.get_evaluable(ielem_arg), self.opposites.get_evaluable(ielem_arg) + levelset = levelset.lower(function.LowerArgs.for_space(self.space, transform_chains, coordinates)).optimized_for_numpy + with parallel.ctxrange('trimming', len(self)) as ielems: + for ielem in ielems: + levels = levelset.eval(_trim_index=ielem, **arguments) + refs[ielem] = self.references[ielem].trim(levels, maxrefine=maxrefine, ndivisions=ndivisions) + else: + log.info('collecting leveltopo elements') + coordinates = evaluable.Points(evaluable.NPoints(), self.ndims) + transform_chain = transform.EvaluableTransformChain.from_argument('trans', self.transforms.todims, self.transforms.fromdims) + levelset = levelset.lower(function.LowerArgs.for_space(self.space, (transform_chain, transform_chain), coordinates)).optimized_for_numpy + bins = [set() for ielem in range(len(self))] + for trans in leveltopo.transforms: + ielem, tail = self.transforms.index_with_tail(trans) + bins[ielem].add(tail) + fcache = cache.WrapperCache() + with parallel.ctxrange('trimming', len(self)) as ielems: + for ielem in ielems: + ref = self.references[ielem] + trans = self.transforms[ielem] + ctransforms = bins[ielem] + levels = numpy.empty(ref._nlinear_by_level(maxrefine)) + cover = list(fcache[ref._linear_cover](frozenset(ctransforms), maxrefine)) + # confirm cover and greedily optimize order + mask = numpy.ones(len(levels), dtype=bool) + while mask.any(): + imax = numpy.argmax([mask[indices].sum() for tail, points, indices in cover]) + tail, points, indices = cover.pop(imax) + levels[indices] = levelset.eval(trans=trans + tail, _points=points, **arguments) + mask[indices] = False + refs[ielem] = ref.trim(levels, maxrefine=maxrefine, ndivisions=ndivisions) + log.debug('cache', fcache.stats) + refs = tuple(refs) return SubsetTopology(self, refs, newboundary=name) def subset(self, topo, newboundary=None, strict=False):