Cassandra 19480 Additional task execution specific instrumentation of job stats#51
Cassandra 19480 Additional task execution specific instrumentation of job stats#51arjunashok wants to merge 6 commits intoapache:trunkfrom
Conversation
yifan-c
left a comment
There was a problem hiding this comment.
it looks good in general. A rebase is required though.
668a44e to
7bb12eb
Compare
|
|
||
| import java.util.Map; | ||
|
|
||
| public class JobEventDetail |
There was a problem hiding this comment.
can we add a javadoc here?
| this.sparkContext = JavaSparkContext.fromSparkContext(sqlContext.sparkContext()); | ||
| this.broadcastContext = sparkContext.<BulkWriterContext>broadcast(writerContext); | ||
| this.jobStatsListener = new JobStatsListener((jobEventDetail) -> { | ||
| if (writerContext.job().getId().toString().equals(jobEventDetail.internalJobID())) |
There was a problem hiding this comment.
can we add a comment in code mentioning why we need this condition here?
| } | ||
| catch (Throwable throwable) | ||
| { | ||
| publishFailureJobStats(throwable.getMessage()); |
There was a problem hiding this comment.
not sure what happens here, do we no longer publish failure stats?
There was a problem hiding this comment.
Yes, we are not explicitly publishing failure stats at the point of failure. Instead, we rely on the job failure event, and the listener now publishes these stats.
| put("sparkVersion", sparkVersion); | ||
| put("keyspace", jobInfo.getId().toString()); | ||
| put("table", jobInfo.getId().toString()); | ||
| put("keyspace", jobInfo.getId()); |
There was a problem hiding this comment.
is the jobInfo.getId() the keyspace? shouldn't we use qualifiedTableName().keyspace() here instead?
There was a problem hiding this comment.
Yes, probably got mixed-up during rebase. Corrected.
| put("keyspace", jobInfo.getId().toString()); | ||
| put("table", jobInfo.getId().toString()); | ||
| put("keyspace", jobInfo.getId()); | ||
| put("table", jobInfo.qualifiedTableName().toString()); |
There was a problem hiding this comment.
should this be qualifiedTableName().table() instead?
There was a problem hiding this comment.
Yes, probably got mixed-up during rebase. Corrected.
| */ | ||
| void publish(Map<String, String> stats); | ||
|
|
||
| Map<String, String> stats(); |
There was a problem hiding this comment.
This is no longer being used. Removed.
|
|
||
| dataLayer.startupValidate(); | ||
|
|
||
|
|
There was a problem hiding this comment.
NIT: unnecessary extra line?
bbotella
left a comment
There was a problem hiding this comment.
There is an opportunity here to move all the hardcoded stat names to a Consts file. Maybe worth a separate ticket?
| put("bytesWritten", Long.toString(totalBytesWritten)); | ||
| put("jobStatus", "Succeeded"); | ||
| put("clusterResizeDetected", String.valueOf(hasClusterTopologyChanged)); | ||
| put("jobElapsedTimeMillis", Long.toString(elapsedTimeMillis())); |
There was a problem hiding this comment.
Why are we removing the jobElapsedTimeMillis stat?
| @@ -258,28 +269,17 @@ private void persist(@NotNull JavaPairRDD<DecoratedKey, Object[]> sortedRDD, Str | |||
|
|
|||
| private void publishSuccessfulJobStats(long rowCount, long totalBytesWritten, boolean hasClusterTopologyChanged) | |||
There was a problem hiding this comment.
Does it make sense to keep the Successful name on the method if we are ignoring failure stats?
| onCloudStorageTransport(ignored -> this.heartbeatReporter = new HeartbeatReporter()); | ||
| this.jobStatsListener = new JobStatsListener((jobEventDetail) -> { | ||
| // Note: Consumers are called for all jobs and tasks. We only publish for the existing job | ||
| if (writerContext.job().getId().equals(jobEventDetail.internalJobID())) |
There was a problem hiding this comment.
Should we also check for !internalJobId.isEmpty()?
| jobStats.put("failureReason", reason); | ||
| jobStats.put("jobElapsedTimeMillis", String.valueOf(elapsedTimeMillis)); | ||
|
|
||
| LOGGER.debug("Job END for jobId:{} status:{} Reason:{} ElapsedTime: {}", |
There was a problem hiding this comment.
There is an extra space after ElapsedTime
| new CqlField.CqlType[]{mockCqlType(INT), mockCqlType(DATE), mockCqlType(VARCHAR), mockCqlType(INT)}); | ||
| private ConsistencyLevel.CL consistencyLevel; | ||
| private int sstableDataSizeInMB = 128; | ||
| private int sstableWriteBatchSize = 2; |
There was a problem hiding this comment.
What is this new variable doing? Where is it used?
… job stats
Changes
jobGroupbased UUID to reader, similar to existing implementation in writer. This is used as thejobIdto uniquely identify and potentially merge multiple stats published from the same job.Testing