Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions nutils/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,4 +166,34 @@ def _wait(pid):
treelog.error('process {} {}'.format(pid, msg))
return False


def Manager():
'''Return a manager for sharing data between processes.

The returned manager only supports the methods
:meth:`multiprocessing.managers.SyncManager.dict` and
:meth:`multiprocessing.managers.SyncManager.list`.
'''

if maxprocs.current > 1:
return multiprocessing.Manager()
else:
return _NoManager()


class _NoManager:

def __enter__(self):
return self

def __exit__(self, *exc_info):
pass

def dict(self, value=()):
return dict(value)

def list(self, value=()):
return list(value)


# vim:sw=4:sts=4:et
71 changes: 38 additions & 33 deletions nutils/topology.py
Original file line number Diff line number Diff line change
Expand Up @@ -1416,39 +1416,44 @@ def trim(self, levelset, maxrefine, ndivisions=8, name='trimmed', leveltopo=None
if arguments is None:
arguments = {}

refs = []
if leveltopo is None:
ielem_arg = evaluable.Argument('_trim_index', (), dtype=int)
coordinates = self.references.getpoints('vertex', maxrefine).get_evaluable_coords(ielem_arg)
transform_chains = self.transforms.get_evaluable(ielem_arg), self.opposites.get_evaluable(ielem_arg)
levelset = levelset.lower(function.LowerArgs.for_space(self.space, transform_chains, coordinates)).optimized_for_numpy
with log.iter.percentage('trimming', range(len(self)), self.references) as items:
for ielem, ref in items:
levels = levelset.eval(_trim_index=ielem, **arguments)
refs.append(ref.trim(levels, maxrefine=maxrefine, ndivisions=ndivisions))
else:
log.info('collecting leveltopo elements')
coordinates = evaluable.Points(evaluable.NPoints(), self.ndims)
transform_chain = transform.EvaluableTransformChain.from_argument('trans', self.transforms.todims, self.transforms.fromdims)
levelset = levelset.lower(function.LowerArgs.for_space(self.space, (transform_chain, transform_chain), coordinates)).optimized_for_numpy
bins = [set() for ielem in range(len(self))]
for trans in leveltopo.transforms:
ielem, tail = self.transforms.index_with_tail(trans)
bins[ielem].add(tail)
fcache = cache.WrapperCache()
with log.iter.percentage('trimming', self.references, self.transforms, bins) as items:
for ref, trans, ctransforms in items:
levels = numpy.empty(ref._nlinear_by_level(maxrefine))
cover = list(fcache[ref._linear_cover](frozenset(ctransforms), maxrefine))
# confirm cover and greedily optimize order
mask = numpy.ones(len(levels), dtype=bool)
while mask.any():
imax = numpy.argmax([mask[indices].sum() for tail, points, indices in cover])
tail, points, indices = cover.pop(imax)
levels[indices] = levelset.eval(trans=trans + tail, _points=points, **arguments)
mask[indices] = False
refs.append(ref.trim(levels, maxrefine=maxrefine, ndivisions=ndivisions))
log.debug('cache', fcache.stats)
with parallel.Manager() as manager:
refs = manager.list([None]*len(self))
if leveltopo is None:
ielem_arg = evaluable.Argument('_trim_index', (), dtype=int)
coordinates = self.references.getpoints('vertex', maxrefine).get_evaluable_coords(ielem_arg)
transform_chains = self.transforms.get_evaluable(ielem_arg), self.opposites.get_evaluable(ielem_arg)
levelset = levelset.lower(function.LowerArgs.for_space(self.space, transform_chains, coordinates)).optimized_for_numpy
with parallel.ctxrange('trimming', len(self)) as ielems:
for ielem in ielems:
levels = levelset.eval(_trim_index=ielem, **arguments)
refs[ielem] = self.references[ielem].trim(levels, maxrefine=maxrefine, ndivisions=ndivisions)
else:
log.info('collecting leveltopo elements')
coordinates = evaluable.Points(evaluable.NPoints(), self.ndims)
transform_chain = transform.EvaluableTransformChain.from_argument('trans', self.transforms.todims, self.transforms.fromdims)
levelset = levelset.lower(function.LowerArgs.for_space(self.space, (transform_chain, transform_chain), coordinates)).optimized_for_numpy
bins = [set() for ielem in range(len(self))]
for trans in leveltopo.transforms:
ielem, tail = self.transforms.index_with_tail(trans)
bins[ielem].add(tail)
fcache = cache.WrapperCache()
with parallel.ctxrange('trimming', len(self)) as ielems:
for ielem in ielems:
ref = self.references[ielem]
trans = self.transforms[ielem]
ctransforms = bins[ielem]
levels = numpy.empty(ref._nlinear_by_level(maxrefine))
cover = list(fcache[ref._linear_cover](frozenset(ctransforms), maxrefine))
# confirm cover and greedily optimize order
mask = numpy.ones(len(levels), dtype=bool)
while mask.any():
imax = numpy.argmax([mask[indices].sum() for tail, points, indices in cover])
tail, points, indices = cover.pop(imax)
levels[indices] = levelset.eval(trans=trans + tail, _points=points, **arguments)
mask[indices] = False
refs[ielem] = ref.trim(levels, maxrefine=maxrefine, ndivisions=ndivisions)
log.debug('cache', fcache.stats)
refs = tuple(refs)
return SubsetTopology(self, refs, newboundary=name)

def subset(self, topo, newboundary=None, strict=False):
Expand Down