Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions thorlcr/activities/funnel/thfunnelslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ class CParallelFunnel : implements IRowStream, public CSimpleInterface
SimpleInterThreadQueueOf<const void, true> rows;
Semaphore fullSem;
size32_t totSize;
size32_t peakTotSize;
unsigned waiting = 0;
bool stopped;
Linked<IOutputRowSerializer> serializer;
Expand All @@ -161,6 +162,8 @@ class CParallelFunnel : implements IRowStream, public CSimpleInterface
}
rows.enqueue(row);
totSize += rowSize;
if (totSize > peakTotSize)
peakTotSize = totSize;
if (totSize > FUNNEL_MIN_BUFF_SIZE)
{
waiting++;
Expand Down Expand Up @@ -193,6 +196,8 @@ class CParallelFunnel : implements IRowStream, public CSimpleInterface
}
rows.enqueueMany(numRows, newRows);
totSize += rowSizes;
if (totSize > peakTotSize)
peakTotSize = totSize;
if (totSize > FUNNEL_MIN_BUFF_SIZE)
{
waiting++;
Expand Down Expand Up @@ -229,6 +234,7 @@ class CParallelFunnel : implements IRowStream, public CSimpleInterface
stopped = false;
waiting = 0;
totSize = 0;
peakTotSize = 0;
eoss = 0;
serializer.set(activity.queryRowSerializer());
for (unsigned i=0; i<activity.queryNumInputs(); i++)
Expand Down Expand Up @@ -331,6 +337,11 @@ class CParallelFunnel : implements IRowStream, public CSimpleInterface
fullSem.signal(numToSignal);
return row.getClear();
}

size32_t getPeakRowMemory() const
{
return peakTotSize;
}

friend class CInputHandler;
};
Expand Down Expand Up @@ -562,6 +573,15 @@ class FunnelSlaveActivity : public CSlaveActivity
calcMetaInfoSize(info, inputs);
}
virtual bool isGrouped() const override { return grouped; }
virtual void gatherActiveStats(CRuntimeStatisticCollection &activeStats) const override
{
PARENT::gatherActiveStats(activeStats);
if (parallelOutput)
{
CParallelFunnel *funnel = static_cast<CParallelFunnel *>(parallelOutput.get());
activeStats.setStatistic(StSizePeakRowMemory, funnel->getPeakRowMemory());
}
}
};

/////
Expand Down
4 changes: 4 additions & 0 deletions thorlcr/activities/nsplitter/thnsplitterslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class CSplitterOutput : public CSimpleInterfaceOf<IStartableEngineRowStream>, pu
virtual void stop() override;
virtual const void *nextRow() override;
virtual void resetEOF() { throwUnexpected(); }
virtual memsize_t getPeakRowMemory() const override { return 0; }
};


Expand Down Expand Up @@ -407,7 +408,10 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf
{
PARENT::gatherActiveStats(activeStats);
if (sharedRowStream)
{
mergeRemappedStats(activeStats, sharedRowStream, diskToTempStatsMap);
activeStats.setStatistic(StSizePeakRowMemory, sharedRowStream->getPeakRowMemory());
}
}
// ISharedSmartBufferCallback impl.
virtual void paged() { pagedOut = true; }
Expand Down
6 changes: 6 additions & 0 deletions thorlcr/activities/thactivityutil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,12 @@ class CRowStreamLookAhead : public CSimpleInterfaceOf<IStartableEngineRowStream>
throw getexception.getClear();
}
}
memsize_t getPeakRowMemory() const
{
if (smartbuf)
return smartbuf->getPeakRowMemory();
return 0;
}
};


Expand Down
18 changes: 18 additions & 0 deletions thorlcr/graph/thgraphslave.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
interface IStartableEngineRowStream : extends IEngineRowStream
{
virtual void start() = 0;
virtual memsize_t getPeakRowMemory() const = 0;
};

class COutputTiming
Expand Down Expand Up @@ -186,6 +187,12 @@ class CThorInput : public CSimpleInterfaceOf<IInterface>
}
bool isFastThrough() const;
bool suppressLookAhead() const;
memsize_t getPeakRowMemory() const
{
if (lookAhead && lookAheadActive)
return lookAhead->getPeakRowMemory();
return 0;
}
};
typedef IArrayOf<CThorInput> CThorInputArray;

Expand Down Expand Up @@ -229,6 +236,17 @@ class graphslave_decl CSlaveActivity : public CActivityBase, public CEdgeProgres
offset_t peakTempSize = queryPeakTempSize();
if (peakTempSize)
activeStats.mergeStatistic(StSizePeakTempDisk, peakTempSize);

// Gather peak row memory from all inputs with look-ahead
memsize_t totalPeakRowMemory = 0;
ForEachItemIn(i, inputs)
{
memsize_t inputPeak = inputs.item(i).getPeakRowMemory();
if (inputPeak > totalPeakRowMemory)
totalPeakRowMemory = inputPeak;
}
if (totalPeakRowMemory)
activeStats.setStatistic(StSizePeakRowMemory, totalPeakRowMemory);
}
public:
IMPLEMENT_IINTERFACE_USING(CActivityBase)
Expand Down
37 changes: 36 additions & 1 deletion thorlcr/thorutil/thbuf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class CSmartRowBuffer: public CSimpleInterface, implements ISmartRowBuffer, impl
CActivityBase *activity;
ThorRowQueue *in;
size32_t insz;
size32_t peakInsz;
ThorRowQueue *out;
CFileOwner tmpFileOwner;
Owned<IFileIO> tempFileIO;
Expand Down Expand Up @@ -269,6 +270,7 @@ class CSmartRowBuffer: public CSimpleInterface, implements ISmartRowBuffer, impl
blocksize = ((bufsize/2+0xfffff)/0x100000)*0x100000;
numblocks = 0;
insz = 0;
peakInsz = 0;
eoi = false;
diskfree.setown(createThreadSafeBitSet());

Expand Down Expand Up @@ -309,6 +311,8 @@ class CSmartRowBuffer: public CSimpleInterface, implements ISmartRowBuffer, impl
diskflush();
in->enqueue(row);
insz += sz;
if (insz > peakInsz)
peakInsz = insz;
if (waiting) {
waitsem.signal();
waiting = false;
Expand Down Expand Up @@ -447,6 +451,10 @@ class CSmartRowBuffer: public CSimpleInterface, implements ISmartRowBuffer, impl
else
return 0;
}
size32_t getPeakRowMemory() const
{
return peakInsz;
}
};


Expand All @@ -457,6 +465,7 @@ class CSmartRowInMemoryBuffer: public CSimpleInterface, implements ISmartRowBuff
IThorRowInterfaces *rowIf;
ThorRowQueue *in;
size32_t insz;
size32_t peakInsz;
SpinLock lock; // MORE: This lock is held for quite long periods. I suspect it could be significantly optimized.
bool waitingin;
Semaphore waitinsem;
Expand Down Expand Up @@ -484,6 +493,7 @@ class CSmartRowInMemoryBuffer: public CSimpleInterface, implements ISmartRowBuff
waitingout = false;
blocksize = ((bufsize/2+0xfffff)/0x100000)*0x100000;
insz = 0;
peakInsz = 0;
eoi = false;
}

Expand Down Expand Up @@ -524,6 +534,8 @@ class CSmartRowInMemoryBuffer: public CSimpleInterface, implements ISmartRowBuff
if (!eoi) {
in->enqueue(row);
insz += sz;
if (insz > peakInsz)
peakInsz = insz;
#ifdef _TRACE_SMART_PUTGET
ActPrintLog(activity, "***putRow2(%x) insize = %d ",insz);
#endif
Expand Down Expand Up @@ -649,6 +661,10 @@ class CSmartRowInMemoryBuffer: public CSimpleInterface, implements ISmartRowBuff
{
return 0;
}
size32_t getPeakRowMemory() const
{
return peakInsz;
}
};


Expand Down Expand Up @@ -741,6 +757,7 @@ class CCompressedSpillingRowStream: public CSimpleInterfaceOf<ISmartRowBuffer>,
// in-memory related members
CSPSCQueue<RowEntry> inMemRows;
std::atomic<memsize_t> inMemRowsMemoryUsage = 0; // NB updated from writer and reader threads
std::atomic<memsize_t> peakInMemRowsMemoryUsage = 0;
Semaphore moreRows;
std::atomic<bool> readerWaitingForQ = false; // set by reader, cleared by writer

Expand Down Expand Up @@ -960,7 +977,10 @@ class CCompressedSpillingRowStream: public CSimpleInterfaceOf<ISmartRowBuffer>,
if (queued)
{
trace("WRITE: Q: nextOutputRow: %" RCPF "u", nextOutputRow.load());
inMemRowsMemoryUsage += rowSz;
memsize_t newUsage = inMemRowsMemoryUsage += rowSz;
memsize_t currentPeak = peakInMemRowsMemoryUsage.load();
while (newUsage > currentPeak && !peakInMemRowsMemoryUsage.compare_exchange_weak(currentPeak, newUsage))
;
++nextOutputRow;
recentlyQueued = true;
}
Expand Down Expand Up @@ -1111,6 +1131,10 @@ class CCompressedSpillingRowStream: public CSimpleInterfaceOf<ISmartRowBuffer>,
v += currentOutputIFileIO->getStatistic(kind);
return v;
}
memsize_t getPeakRowMemory() const
{
return peakInMemRowsMemoryUsage.load();
}
// IRowStream
virtual const void *nextRow() override
{
Expand Down Expand Up @@ -1846,6 +1870,10 @@ class CSharedWriteAheadBase : public CSimpleInterface, implements ISharedSmartBu
{
return 0;
}
virtual memsize_t getPeakRowMemory() const override
{
return 0;
}
friend class COutput;
friend class CRowSet;
};
Expand Down Expand Up @@ -2531,6 +2559,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
std::vector<Owned<COutputRowStream>> outputs;
std::deque<std::tuple<const void *, size32_t>> rows;
memsize_t rowsMemUsage = 0;
memsize_t peakRowsMemUsage = 0;
std::atomic<rowcount_t> totalInputRowsRead = 0; // not used until spilling begins, represents count of all rows read
rowcount_t inMemTotalRows = 0; // whilst in memory, represents count of all rows seen
CriticalSection readAheadCS; // ensure single reader (leader), reads ahead (updates rows/totalInputRowsRead/inMemTotalRows)
Expand Down Expand Up @@ -2751,6 +2780,8 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
size32_t sz = thorRowMemoryFootprint(serializer, row);
rows.emplace_back(row, sz);
rowsMemUsage += sz;
if (rowsMemUsage > peakRowsMemUsage)
peakRowsMemUsage = rowsMemUsage;
if ((rowsMemUsage >= options.inMemReadAheadGranularity) ||
(rows.size() >= options.inMemReadAheadGranularityRows))
break;
Expand Down Expand Up @@ -2813,6 +2844,10 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
{
return stats.getStatisticValue(kind);
}
memsize_t getPeakRowMemory() const
{
return peakRowsMemUsage;
}
};

ISharedRowStreamReader *createSharedFullSpillingWriteAhead(CActivityBase *_activity, unsigned numOutputs, IRowStream *_input, bool _inputGrouped, const SharedRowStreamReaderOptions &options, IThorRowInterfaces *_rowIf, const char *tempFileName, ICompressHandler *compressHandler)
Expand Down
2 changes: 2 additions & 0 deletions thorlcr/thorutil/thbuf.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ interface ISmartRowBuffer: extends IRowStream
{
virtual IRowWriter *queryWriter() = 0;
virtual unsigned __int64 getStatistic(StatisticKind kind) const = 0;
virtual memsize_t getPeakRowMemory() const = 0;
};

class CActivityBase;
Expand Down Expand Up @@ -89,6 +90,7 @@ interface ISharedRowStreamReader : extends IInterface
virtual void cancel()=0;
virtual void reset() = 0;
virtual unsigned __int64 getStatistic(StatisticKind kind) const = 0;
virtual memsize_t getPeakRowMemory() const = 0;
};


Expand Down