Skip to content

Commit b4406c9

Browse files
committed
versioned cleanup for mongodb
1 parent 0e8368a commit b4406c9

4 files changed

Lines changed: 262 additions & 30 deletions

File tree

src/main/java/picoded/dstack/core/Core_FileWorkspaceMap.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,12 @@ protected static String normalizeFilePathString(final String filePath) {
125125
if (res.endsWith("/")) {
126126
res = res.substring(0, res.length() - 1);
127127
}
128+
129+
// Block empty filepath
130+
if( res.isEmpty() ) {
131+
throw new RuntimeException("Empty file path is not allowed");
132+
}
133+
128134
return res;
129135
}
130136

src/main/java/picoded/dstack/mongodb/MongoDB_FileWorkspaceMap.java

Lines changed: 210 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,6 @@
88
import java.util.Map;
99
import java.util.Set;
1010
import java.util.regex.Pattern;
11-
12-
import javax.management.RuntimeErrorException;
13-
1411
import java.io.ByteArrayInputStream;
1512
import java.io.ByteArrayOutputStream;
1613
import java.io.IOException;
@@ -359,9 +356,110 @@ protected void ensureParentPath(String oid, String path) {
359356
/**
360357
* Because mongoDB does file versioining on each save, we would need to cleanup
361358
* older file versions where applicable, in a safe way
359+
*
360+
* In general, due to the difficulty of possible race conditions that may occur
361+
* when removing an "old version" immediately, that could be "read" mid-way.
362+
*
363+
* First we scan for the list of all the file versions.
364+
*
365+
* We find the latest that is at-least 10 seconds old (what we consider a safe window)
366+
* and delete all version before it.
367+
*
368+
* If after the above, we found that there are still "10 versions", as there were
369+
* 10 writes in the past 10 seconds. We force a thread.sleep in increments of 1 second,
370+
* and remove any versions that matches the above criteria. Up to a full 10 seconds of delay.
371+
*
372+
* This will forcefully throttle down any write heavy flows, to avoid contentions.
373+
*
374+
* This safety measure is used in addition, to the checks performed on file write
362375
*/
363376
protected void performVersionedFileCleanup(String oid, String path) {
364-
// @TODO !!!
377+
378+
// Lets get the list of files and their respective versions
379+
// We query the file table directly, to reduce the required
380+
// back and forth queries
381+
382+
// Get the full filename
383+
String filename = oid+"/"+path;
384+
385+
// Get the current timestamp
386+
long now = System.currentTimeMillis();
387+
long tenSecondsAgo = now - (10 * 1000);
388+
389+
// Lets fetch the full list in descending date order
390+
FindIterable<Document> search = filesCollection.find( Filters.eq("filename", filename) );
391+
search = search.sort( (new Document()).append("uploadDate", -1) );
392+
393+
// Lets remap from cursor to list
394+
List<Document> searchList = new ArrayList<>();
395+
try (MongoCursor<Document> cursor = search.iterator()) {
396+
while (cursor.hasNext()) {
397+
searchList.add(cursor.next());
398+
}
399+
}
400+
401+
// Safe anchor point, all items after this is "safe to be deleted"
402+
// if this is detected properly (do not delete the safeAnchorPoint file itself)
403+
int safeAnchorPoint = -1;
404+
405+
// Lets find the document thats atleast 10 seconds old
406+
for( int i=1; i<searchList.size(); ++i ) {
407+
Document doc = searchList.get(i);
408+
409+
// Check if it meets the required timestamp
410+
if(doc.getDate("uploadDate").getTime() < tenSecondsAgo) {
411+
safeAnchorPoint = i;
412+
break;
413+
}
414+
}
415+
416+
// Lets clear the old files, if safeAnchorPoint is found
417+
if( safeAnchorPoint >= 1 ) {
418+
// Lets loop through all items after the safeAnchorPoint
419+
while( searchList.size() > (safeAnchorPoint + 1) ) {
420+
// Get and remove the last item
421+
Document doc = searchList.remove( searchList.size() - 1 );
422+
ObjectId objID = doc.getObjectId("_id");
423+
424+
// Lets remove the file (and its chunks)
425+
try {
426+
gridFSBucket.delete(objID);
427+
} catch(Exception e) {
428+
// do nothing, as there could be a race condition delete
429+
// (2 delete by seperate write commands happenign together)
430+
}
431+
}
432+
}
433+
434+
// If the list is less then 10, lets return
435+
if( searchList.size() <= 10 ) {
436+
return;
437+
}
438+
439+
// We have more then 10 files, that is less then 10 seconds old
440+
// Lets do a forced 10 seconds halt, so we can forcefully clear the files
441+
try {
442+
Thread.sleep(10 * 1000);
443+
} catch(InterruptedException e) {
444+
throw new RuntimeException(e);
445+
}
446+
447+
// And clear the various outdated files
448+
// after the latest, and its immediate previous version
449+
while( searchList.size() > 2 ) {
450+
// Get and remove the last item
451+
Document doc = searchList.remove( searchList.size() - 1 );
452+
ObjectId objID = doc.getObjectId("_id");
453+
454+
// Lets remove the file (and its chunks)
455+
try {
456+
gridFSBucket.delete(objID);
457+
} catch(Exception e) {
458+
// do nothing, as there could be a race condition delete
459+
// (2 delete by seperate write commands happenign together)
460+
}
461+
}
462+
365463
}
366464

367465
//--------------------------------------------------------------------------
@@ -382,16 +480,111 @@ protected void performVersionedFileCleanup(String oid, String path) {
382480
**/
383481
@Override
384482
public void backend_fileWrite(String oid, String filepath, byte[] data) {
385-
// Build the input stream
386-
ByteArrayInputStream buffer = null;
483+
484+
// Build the full path
485+
String fullPath = oid + "/" + filepath;
387486

388-
// Only build if its not null
389-
if (data != null) {
390-
buffer = new ByteArrayInputStream(data);
487+
//
488+
// Due to the rather huge penalty of writing files, without actual content changes,
489+
// and the performance implications of a high number of back to back file changes.
490+
//
491+
// We will employ the following throttling safeguards
492+
//
493+
// 1) Throttling file writes, when the existing file is less then 2 seconds old
494+
// 2) Check against the current values, and skip the write if they match.
495+
//
496+
// This prevents the creation of a "new version" unless its needed. And slow down
497+
// any flooding of back to back file writes.
498+
//
499+
500+
// 1) Lets check the previous write timing, and throttle it if needed
501+
// ---
502+
503+
// Lets get the time "NOW"
504+
long now = System.currentTimeMillis();
505+
506+
// Lets build the query for the file involved
507+
Bson query = Filters.eq("filename", fullPath);
508+
509+
// Read timestamp, and objectid
510+
ObjectId readObjId = null;
511+
long readUploadTimestamp = -1;
512+
513+
// Lets iterate the search result, and return true on an item
514+
try (MongoCursor<GridFSFile> cursor = gridFSBucket.find(query).limit(1).iterator()) {
515+
if (cursor.hasNext()) {
516+
GridFSFile fileObj = cursor.next();
517+
readUploadTimestamp = fileObj.getUploadDate().getTime();
518+
readObjId = fileObj.getObjectId();
519+
}
520+
}
521+
522+
// Check if the current file is less then 2 seconds old
523+
// If so, we induce a wait for it to occur (if file exists)
524+
if( readObjId != null && readUploadTimestamp + 2000 >= now ) {
525+
try {
526+
Thread.sleep( Math.min( Math.max( readUploadTimestamp + 2000 - now, 500), 2000 ) );
527+
} catch(InterruptedException e) {
528+
throw new RuntimeException(e);
529+
}
530+
531+
// And get the latest objectID again (in case of any changes)
532+
try (MongoCursor<GridFSFile> cursor = gridFSBucket.find(query).limit(1).iterator()) {
533+
if (cursor.hasNext()) {
534+
GridFSFile fileObj = cursor.next();
535+
readUploadTimestamp = fileObj.getUploadDate().getTime();
536+
readObjId = fileObj.getObjectId();
537+
}
538+
}
539+
}
540+
541+
// 2) Lets check against current value
542+
// ---
543+
544+
// Handle null byte[]
545+
if( data == null ) {
546+
data = EmptyArray.BYTE;
547+
}
548+
549+
// Lets map the current value to an inputstream, in closable blocks
550+
// We intentionally use inputstream, to avoid needing 2 byte[] blocks in memory
551+
// (if file exists)
552+
if( readObjId == null ) {
553+
// does nothing if the object does not exists
554+
} else {
555+
try (ByteArrayInputStream inBuffer = new ByteArrayInputStream(data) ) {
556+
try(InputStream existingValue = gridFSBucket.openDownloadStream(readObjId)) {
557+
if(IOUtils.contentEquals(inBuffer, existingValue)) {
558+
// They are the same, skip the write
559+
return;
560+
}
561+
}
562+
} catch (IOException e) {
563+
throw new RuntimeException(e);
564+
}
565+
}
566+
567+
// Finally, lets write the update
568+
// ---
569+
570+
try (ByteArrayInputStream inBuffer = new ByteArrayInputStream(data) ) {
571+
// Setup the metadata for the file
572+
Document metadata = new Document();
573+
metadata.append("oid", oid);
574+
metadata.append("type", "file");
575+
576+
// Prepare the upload options
577+
GridFSUploadOptions opt = (new GridFSUploadOptions()).metadata(metadata);
578+
ObjectId objID = gridFSBucket.uploadFromStream(fullPath, inBuffer, opt);
579+
objID.toString();
580+
} catch (IOException e) {
581+
throw new RuntimeException(e);
391582
}
392583

393-
// Then pump it
394-
backend_fileWriteInputStream(oid, filepath, buffer);
584+
// Perform post file write cleanup (if there was a previous version)
585+
if( readObjId != null ) {
586+
performVersionedFileCleanup(oid, filepath);
587+
}
395588
}
396589

397590
/**
@@ -408,25 +601,11 @@ public void backend_fileWrite(String oid, String filepath, byte[] data) {
408601
**/
409602
@Override
410603
public void backend_fileWriteInputStream(final String oid, final String filepath, InputStream data) {
411-
// Build the full path
412-
String fullPath = oid + "/" + filepath;
413-
414-
if (data == null) {
415-
data = new ByteArrayInputStream(EmptyArray.BYTE);
416-
}
417-
418-
// Write the file
604+
// Converts it to bytearray respectively
605+
byte[] rawBytes = null;
419606
try {
420-
// Setup the metadata for the file
421-
Document metadata = new Document();
422-
metadata.append("oid", oid);
423-
metadata.append("type", "file");
424-
425-
// Prepare the upload options
426-
GridFSUploadOptions opt = (new GridFSUploadOptions()).metadata(metadata);
427-
ObjectId objID = gridFSBucket.uploadFromStream(fullPath, data, opt);
428-
objID.toString();
429-
} catch (Exception e) {
607+
rawBytes = IOUtils.toByteArray(data);
608+
} catch (IOException e) {
430609
throw new RuntimeException(e);
431610
} finally {
432611
try {
@@ -435,6 +614,8 @@ public void backend_fileWriteInputStream(final String oid, final String filepath
435614
throw new RuntimeException(e);
436615
}
437616
}
617+
// Does the bytearray writes
618+
backend_fileWrite(oid, filepath, rawBytes);
438619
}
439620

440621
//--------------------------------------------------------------------------

src/main/java/picoded/dstack/stack/Stack_FileWorkspaceMap.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,8 +223,12 @@ public InputStream backend_fileReadInputStream(final String oid, final String fi
223223
public void backend_fileWriteInputStream(final String oid, final String filepath,
224224
final InputStream data) {
225225

226+
//
226227
// Due to the behaviour of how the file data needs to be handled across multiple layers
227-
// we only use an optimized "readStream" call if the filesystem is a single stack layer
228+
// we only use an optimized "writeStream" call ONLY if the filesystem is a single stack layer
229+
//
230+
// Else we will revert to byte[] that can be applied multiple times across the stack
231+
//
228232
if (dataLayers.length == 1) {
229233
dataLayers[0].backend_fileWriteInputStream(oid, filepath, data);
230234
return;

src/test/java/picoded/dstack/struct/simple/StructSimple_FileWorkspaceMap_test.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,47 @@ public void fileWrite_andProperlySetupFolder() {
166166
assertTrue(fileWorkspace.folderPathExist("test"));
167167
}
168168

169+
//-----------------------------------------------------------------------------------
170+
//
171+
// Multiple Writes
172+
//
173+
//-----------------------------------------------------------------------------------
174+
175+
@Test
176+
public void fileWrite_fiveTimes() {
177+
// Get the file workspace to use
178+
FileWorkspace fileWorkspace = testObj.newEntry();
179+
assertNotNull(fileWorkspace);
180+
181+
// Folder does not exist first
182+
assertFalse(fileWorkspace.folderPathExist("test/folder"));
183+
184+
// Write and read file
185+
for(int i=0; i < 5; ++i) {
186+
fileWorkspace.writeString("test/folder/file.txt", "ver-"+i);
187+
assertEquals("ver-"+i, fileWorkspace.readString("test/folder/file.txt"));
188+
fileWorkspace.writeString("test/folder/file.txt", "ver-"+i);
189+
assertEquals("ver-"+i, fileWorkspace.readString("test/folder/file.txt"));
190+
}
191+
}
192+
@Test
193+
public void fileWrite_twentyTimes() {
194+
// Get the file workspace to use
195+
FileWorkspace fileWorkspace = testObj.newEntry();
196+
assertNotNull(fileWorkspace);
197+
198+
// Folder does not exist first
199+
assertFalse(fileWorkspace.folderPathExist("test/folder"));
200+
201+
// Write and read file
202+
for(int i=0; i < 20; ++i) {
203+
fileWorkspace.writeString("test/folder/file.txt", "ver-"+i);
204+
assertEquals("ver-"+i, fileWorkspace.readString("test/folder/file.txt"));
205+
fileWorkspace.writeString("test/folder/file.txt", "ver-"+i);
206+
assertEquals("ver-"+i, fileWorkspace.readString("test/folder/file.txt"));
207+
}
208+
}
209+
169210
//-----------------------------------------------------------------------------------
170211
//
171212
// Move test

0 commit comments

Comments
 (0)