1+ package picoded .dstack .mongodb ;
2+
3+ // Java imports
4+ import java .util .HashMap ;
5+ import java .util .HashSet ;
6+ import java .util .List ;
7+ import java .util .Map ;
8+ import java .util .Set ;
9+ import java .util .ArrayList ;
10+ import java .util .Arrays ;
11+ import java .util .Date ;
12+ import java .util .concurrent .ConcurrentHashMap ;
13+ import java .util .concurrent .TimeUnit ;
14+ import java .util .concurrent .locks .ReentrantReadWriteLock ;
15+
16+ // JavaCommons imports
17+ import picoded .core .conv .ConvertJSON ;
18+ import picoded .core .conv .GenericConvert ;
19+ import picoded .core .conv .NestedObjectFetch ;
20+ import picoded .core .conv .NestedObjectUtil ;
21+ import picoded .core .conv .StringEscape ;
22+ import picoded .core .struct .MutablePair ;
23+ import picoded .core .struct .query .OrderBy ;
24+ import picoded .core .struct .query .Query ;
25+ import picoded .core .struct .query .QueryType ;
26+ import picoded .core .common .ObjectToken ;
27+ import picoded .dstack .*;
28+ import picoded .dstack .core .*;
29+
30+ // MongoDB imports
31+ import org .bson .Document ;
32+ import org .bson .types .Binary ;
33+ import org .bson .conversions .Bson ;
34+ import com .mongodb .client .*;
35+ import com .mongodb .client .model .Projections ;
36+ import com .mongodb .client .model .Filters ;
37+ import com .mongodb .client .model .IndexOptions ;
38+ import com .mongodb .client .model .Indexes ;
39+ import com .mongodb .client .model .FindOneAndUpdateOptions ;
40+ import com .mongodb .client .model .Aggregates ;
41+
42+ /**
43+ * ## Purpose Support MongoDB implementation of KeyValueMap
44+ *
45+ * Built ontop of the Core_KeyValueMap implementation.
46+ **/
47+ public class MongoDB_KeyValueMap extends Core_KeyValueMap {
48+
49+ // --------------------------------------------------------------------------
50+ //
51+ // Constructor
52+ //
53+ // --------------------------------------------------------------------------
54+
55+ /** MongoDB instance representing the backend connection */
56+ MongoCollection <Document > collection = null ;
57+
58+ /**
59+ * Constructor, with name constructor
60+ *
61+ * @param inStack hazelcast stack to use
62+ * @param name of data object map to use
63+ */
64+ public MongoDB_KeyValueMap (MongoDBStack inStack , String name ) {
65+ super ();
66+ collection = inStack .db_conn .getCollection (name );
67+ }
68+
69+ @ Override
70+ public void systemSetup () {
71+ //
72+ // By mongodb default we use its native _id implementation
73+ // and handle our _oid seperately.
74+ //
75+ // We intentionally DO NOT use mongodb _id, allowing it retain optimal performance.
76+ //
77+
78+ // Lets create the unique key index
79+ IndexOptions opt = new IndexOptions ().unique (true ).name ("key" );
80+ collection .createIndex (Indexes .ascending ("key" ), opt );
81+
82+ // Expirary key support
83+ opt = new IndexOptions ().expireAfter (0L , TimeUnit .SECONDS );
84+ collection .createIndex (Indexes .ascending ("expireAt" ), opt );
85+ }
86+
87+ /**
88+ * Teardown and delete the backend storage table, etc. If needed
89+ **/
90+ public void systemDestroy () {
91+ collection .drop ();
92+ }
93+
94+ /**
95+ * Removes all data, without tearing down setup
96+ **/
97+ @ Override
98+ public void clear () {
99+ // Delete all items
100+ //
101+ // Due to the lack of an all * wildcard
102+ // we are using a exists OR condition, which is true
103+ // for all objects
104+ collection .deleteMany ( //
105+ Filters .or ( //
106+ Filters .exists ("key" , true ), //
107+ Filters .exists ("key" , false ) //
108+ ) //
109+ ); //
110+ }
111+
112+ //--------------------------------------------------------------------------
113+ //
114+ // Internal functions, used by DataObject
115+ //
116+ //--------------------------------------------------------------------------
117+
118+ /**
119+ * Generate a BSON filter set, for unexpired items
120+ * this should be used in combination with an AND clause filter
121+ **/
122+ protected Bson filterForUnexpired () {
123+ // Current timestamp
124+ Date now = new Date ();
125+
126+ // the or array to join
127+ return Filters .or ( //
128+ Filters .exists ("expireAt" , false ), //
129+ Filters .gt ("expireAt" , now ), //
130+ Filters .lte ("expireAt" , 0 ) //
131+ ); //
132+ }
133+
134+ /**
135+ * Search using the value, all the relevent key mappings
136+ *
137+ * Handles re-entrant lock where applicable
138+ *
139+ * @param key, note that null matches ALL
140+ *
141+ * @return array of keys
142+ **/
143+ @ Override
144+ public Set <String > keySet (String value ) {
145+ // The return hashset
146+ HashSet <String > ret = new HashSet <String >();
147+
148+ // Search result
149+ FindIterable <Document > search = null ;
150+
151+ // Lets either fetch with a value, or everything
152+ if (value == null ) {
153+ // Lets fetch everything ... D=
154+ search = collection .find (filterForUnexpired ());
155+ } else {
156+ search = collection .find (Filters .and ( //
157+ filterForUnexpired (), //
158+ Filters .eq ("val" , value ) //
159+ )); //
160+ }
161+
162+ // Get all the various keys
163+ search = search .projection (Projections .include ("key" ));
164+
165+ // Lets iterate the search
166+ try (MongoCursor <Document > cursor = search .iterator ()) {
167+ while (cursor .hasNext ()) {
168+ ret .add (cursor .next ().getString ("key" ));
169+ }
170+ }
171+
172+ // Return the full keyset
173+ return ret ;
174+ }
175+
176+ //--------------------------------------------------------------------------
177+ //
178+ // Fundemental set/get value (core)
179+ //
180+ //--------------------------------------------------------------------------
181+
182+ /**
183+ * [Internal use, to be extended in future implementation]
184+ * Sets the value, with validation
185+ *
186+ * @param key
187+ * @param value, null means removal
188+ * @param expire timestamp, 0 means not timestamp
189+ *
190+ * @return null
191+ **/
192+ @ Override
193+ public String setValueRaw (String key , String value , long expire ) {
194+ // Configure this to be an "upsert" query
195+ FindOneAndUpdateOptions opt = new FindOneAndUpdateOptions ();
196+ opt .upsert (true );
197+
198+ // Generate the document of changes
199+ // See: https://www.mongodb.com/docs/manual/reference/operator/update/setOnInsert/
200+ Document set_doc = new Document ();
201+ set_doc .append ("val" , value );
202+
203+ // Expire timestamp if its configured, else it should be ignored
204+ if (expire > 0 ) {
205+ set_doc .append ("expireAt" , new Date (expire ));
206+ }
207+
208+ // Set the key on insert
209+ Document setOnInsert_doc = new Document ();
210+ setOnInsert_doc .append ("key" , key );
211+
212+ // Generate the "update" doc
213+ Document updateDoc = new Document ();
214+ updateDoc .append ("$set" , set_doc );
215+ updateDoc .append ("$setOnInsert" , setOnInsert_doc );
216+
217+ // Upsert the document
218+ collection .findOneAndUpdate (Filters .eq ("key" , key ), updateDoc , opt );
219+ return null ;
220+ }
221+
222+ /**
223+ * [Internal use, to be extended in future implementation]
224+ *
225+ * Returns the value and expiry, with validation against the current timestamp
226+ *
227+ * @param key as String
228+ * @param now timestamp
229+ *
230+ * @return String value
231+ **/
232+ @ Override
233+ public MutablePair <String , Long > getValueExpiryRaw (String key , long now ) {
234+ // Get the find result
235+ FindIterable <Document > res = collection .find (Filters .eq ("key" , key ));
236+
237+ // Get the Document object
238+ Document resObj = res .first ();
239+ if (resObj == null ) {
240+ return null ;
241+ }
242+
243+ // Lets get all the key values
244+ String val = GenericConvert .toString (resObj .get ("val" ), null );
245+ long expireAt = GenericConvert .toLong (resObj .get ("expireAt" ), 0 );
246+
247+ // Check for null objects
248+ if (val == null || val .isEmpty ()) {
249+ return null ;
250+ }
251+
252+ // No valid value found, return null
253+ if (expireAt < 0 ) {
254+ return null ;
255+ }
256+
257+ // Expired value, return null
258+ if (expireAt != 0 && expireAt < now ) {
259+ return null ;
260+ }
261+
262+ // Get the value, and return the pair
263+ return new MutablePair <String , Long >(val , expireAt );
264+ }
265+
266+ /**
267+ * [Internal use, to be extended in future implementation]
268+ * Sets the expire time stamp value, raw without validation
269+ *
270+ * @param key as String
271+ * @param expireAt timestamp in seconds, 0 means NO expire
272+ **/
273+ @ Override
274+ public void setExpiryRaw (String key , long expireAt ) {
275+ // Configure this to be an "update" query
276+ FindOneAndUpdateOptions opt = new FindOneAndUpdateOptions ();
277+
278+ // Generate the document of changes
279+ // See: https://www.mongodb.com/docs/manual/reference/operator/update/setOnInsert/
280+
281+ // Generate the "update" doc
282+ Document updateDoc = new Document ();
283+
284+ // Expire timestamp if its configured, else it should be ignored
285+ if (expireAt > 0 ) {
286+ Document set_doc = new Document ();
287+ set_doc .append ("expireAt" , new Date (expireAt ));
288+ updateDoc .append ("$set" , set_doc );
289+ } else {
290+ Document unset_doc = new Document ();
291+ unset_doc .append ("expireAt" , "" );
292+ updateDoc .append ("$unset" , unset_doc );
293+ }
294+
295+ // Upsert the document
296+ collection .findOneAndUpdate (Filters .eq ("key" , key ), updateDoc , opt );
297+ }
298+
299+ //--------------------------------------------------------------------------
300+ //
301+ // Remove call
302+ //
303+ //--------------------------------------------------------------------------
304+
305+ /**
306+ * Remove the value, given the key
307+ *
308+ * @param key param find the thae meta key
309+ *
310+ * @return null
311+ **/
312+ @ Override
313+ public KeyValue remove (Object key ) {
314+ removeValue (key );
315+ return null ;
316+ }
317+
318+ /**
319+ * Remove the value, given the key
320+ *
321+ * Important note: It does not return the previously stored value
322+ * Its return String type is to maintain consistency with Map interfaces
323+ *
324+ * @param key param find the thae meta key
325+ *
326+ * @return null
327+ **/
328+ @ Override
329+ public String removeValue (Object key ) {
330+ if (key == null ) {
331+ throw new IllegalArgumentException ("delete 'key' cannot be null" );
332+ }
333+
334+ // Delete the data
335+ collection .deleteOne (Filters .eq ("key" , key ));
336+ return null ;
337+ }
338+
339+ //--------------------------------------------------------------------------
340+ //
341+ // Maintenance calls
342+ //
343+ //--------------------------------------------------------------------------
344+
345+ /**
346+ * Incremental maintainance should not trigger maintenance.
347+ * As its potentially blocking with a very long call
348+ **/
349+ public void incrementalMaintenance () {
350+ // does nothing
351+ }
352+
353+ @ Override
354+ public void maintenance () {
355+ // @TODO : something? (not sure what needs to be done)
356+ }
357+
358+ }
0 commit comments