Sparkdeduplication#429
Conversation
a new version of the clustering mechanism with auto-sizing clusters.
to obtain better scalability.
Added programatical logging into stdout, for easier log reading
Added reshuffle for better work balance.
Needs code cleanup and qality assurance.
Version used for performance testing.
a new version of the clustering mechanism with auto-sizing clusters.
to obtain better scalability.
Added programatical logging into stdout, for easier log reading
Added reshuffle for better work balance.
Needs code cleanup and qality assurance.
Version used for performance testing.
Task tiling class rewritten to scala, with tests.
Cleaning up project files. Fixed workflow building for oozie.
…into sparkdeduplication
| import pl.edu.icm.coansys.deduplication.document.comparator.VotesProductComparator | ||
| import pl.edu.icm.coansys.deduplication.document.comparator.WorkComparator | ||
| import scala.collection.mutable.ListBuffer | ||
| import pl.edu.icm.coansys.document.deduplication._ |
miconi
left a comment
There was a problem hiding this comment.
The code is overall good, though there is one major issue: CartesianTaskSplit.processPairs function is returning an empty list. It doesn't look like it should work.
There are also some minor style & performance observations (reduceByKey and foldByKey used to achieve results of a groupByKey operation) noted in comments.
Apart from the places indicated in comments it would be great to go through IDE suggestions & do code reformatting for the whole code.
Thanks for the code.
| * | ||
| */ | ||
| object DeduplicateDocuments { | ||
| val log = org.slf4j.LoggerFactory.getLogger(getClass().getName()) |
There was a problem hiding this comment.
Empty parentheses should be removed in method calls that do not have side effects (here and in other places in this code)
| * | ||
| */ | ||
| object DeduplicateDocuments { | ||
| val log = org.slf4j.LoggerFactory.getLogger(getClass().getName()) |
There was a problem hiding this comment.
'log' variable can be private
| object DeduplicateDocuments { | ||
| val log = org.slf4j.LoggerFactory.getLogger(getClass().getName()) | ||
|
|
||
| implicit def toJavaBiPredicate[A, B](predicate: (A, B) => Boolean) = |
There was a problem hiding this comment.
Import scala.language.implicitConversions to turn off compiler warnings about implicits
| } else { | ||
| false | ||
| } | ||
| } |
There was a problem hiding this comment.
isValidDocument could be rewritten in a functional programming style using the Option monad:
def isValidDocument(doc: DocumentWrapper): Boolean =
Option(doc.getDocumentMetadata)
.flatMap(md => Option(md.getBasicMetadata))
.exists(bmd => bmd.getTitleCount > 0 || bmd.getAuthorCount > 0 || bmd.hasDoi || bmd.hasJournal)"The Option companion object's apply method serves as a conversion function from nullable references" https://stackoverflow.com/questions/4692506/wrapping-null-returning-method-in-java-with-option-in-scala
It would be even better if we used scala protobuf compiler. That would directly support Option for optional values in protobufs (see https://scalapb.github.io/generated-code.html).
flatMap is acting on monads like bind in Haskell, if that clarifies something. Simple introduction to the concept of using Option as a monad in Scala and how it makes code clearer: https://www.slideshare.net/jankrag/introduction-to-option-monad-in-scala
|
|
||
|
|
||
|
|
||
| def main(args: Array[String]): Unit = { |
There was a problem hiding this comment.
It would be more readable if the main method was placed before the helper methods.
| } | ||
| } | ||
| }).mapValues(_._1) | ||
| inputDocs.join(selectedClusters).map(p => (p._2._2, p._2._1)).groupByKey |
There was a problem hiding this comment.
Last expression could be simplified to:
selectedClusters.join(inputDocs).values.groupByKey
| */ | ||
| def mergeDocuments(docs: List[DocumentWrapper]): DocumentWrapper = { | ||
| val merger = buildDocumentsMerger() | ||
| val merged = merger.merge(docs); |
There was a problem hiding this comment.
No need to define val merged
| val normalized = StringTools.normalize(title); | ||
| //seems that normalize removes stopwords, which is wrong, and quite expensive | ||
| //val normalized = StringTools.removeStopWords(StringTools.normalize(title)); | ||
| val res = normalized.replaceAll("\\s+", "") |
There was a problem hiding this comment.
There is no need to define val res here, the expression assigned to it could just be simply the last expression in this function.
| def generateKeys(title: String): Seq[String] = { | ||
| val ctitle = cleanUpString(title) | ||
| val mlen = keySizes.max | ||
| val longestKey = ctitle.zipWithIndex.filter(_._2 % 2 == 0).map(_._1).take(mlen).mkString |
| val ctitle = cleanUpString(title) | ||
| val mlen = keySizes.max | ||
| val longestKey = ctitle.zipWithIndex.filter(_._2 % 2 == 0).map(_._1).take(mlen).mkString | ||
| keySizes.map(keyLength => longestKey.substring(0, Math.min(keyLength, longestKey.size))).distinct |
Ready, working version of scala/spark deduplication, to replace the original mapreduce/pig solution.