Skip to content

Demo of implementation to the local directory#6

Closed
ifilonenko wants to merge 32 commits into
mccheah:operation-remote-shufflesfrom
ifilonenko:SPARK-25299-v3
Closed

Demo of implementation to the local directory#6
ifilonenko wants to merge 32 commits into
mccheah:operation-remote-shufflesfrom
ifilonenko:SPARK-25299-v3

Conversation

@ifilonenko
Copy link
Copy Markdown

@ifilonenko ifilonenko commented Dec 21, 2018

What changes were proposed in this pull request?

Built off of yifeih#1

How was this patch tested?

The following file for running the External Shuffle Service in a replica set

apiVersion: apps/v1
kind: ReplicaSet
metadata:
  name: shuffle-service
  labels:
    shuffle-service-selector: shuffle-service
spec:
  # modify replicas according to your case
  replicas: 3
  selector:
    matchLabels:
      shuffle-service-selector: shuffle-service
    matchExpressions:
      - {key: shuffle-service-selector, operator: In, values: [shuffle-service]}
  template:
    metadata:
      labels:
        shuffle-service-selector: shuffle-service
    spec:
      containers:
      - name: shuffle-service
        image: ifilonenko/spark:shuffle-service
        env:
        - name: SPARK_NO_DAEMONIZE
          value: "1"
        command: ["/bin/bash"]
        args: [ "/opt/spark/sbin/start-k8s-shuffle-service.sh"]
        securityContext:
          runAsUser: 0
        ports:
        - containerPort: 7337

After running

> kubectl create -f shuffle-service.yml
> bin/spark-submit --deploy-mode cluster --master k8s://https://localhost:6443 \
--conf spark.executor.memory=2g \
--conf spark.kubernetes.container.image=<YOUR_IMAGE> \
--conf spark.kubernetes.shuffle.service.remote.label.shuffle-service-selector=shuffle-service \
--conf spark.kubernetes.shuffle.service.remote.pods.namespace=default \
--conf spark.k8s.shuffle.service.enabled=true \
--conf spark.shuffle.provider.plugin.class=org.apache.spark.shuffle.k8s.KubernetesShuffleServiceAddressProviderFactory \
--conf spark.shuffle.io.plugin.class=org.apache.spark.shuffle.external.ExternalShuffleDataIO \
--class org.apache.spark.examples.GroupByShuffleTest /opt/spark/examples/jars/spark-examples_2.12-3.0.0-SNAPSHOT.jar

Tests a shuffle by running a groupByKey() operation

@ifilonenko
Copy link
Copy Markdown
Author

ifilonenko commented Jan 10, 2019

@squito @vanzin @mccheah @felixcheung @tgravescs @yifeih for review and comments
This PR still needs to:

  • look at style optimizations w.r.t creating shared constructs between MesosExternalShuffleService and KubernetesExternalShuffleService
  • not use the same BlockManagerId for each executor
  • investigate .transferTo() for zero-copy transfer
  • enable encryption in the FileWriterStreamCallback

Copy link
Copy Markdown

@yifeih yifeih left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for working on the implementation, looks good so far! I left a few comments for now

ssId._1, ssId._2,
conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs",
s"${conf.getTimeAsSeconds("spark.network.timeout", "120s")}s"),
conf.get(config.EXECUTOR_HEARTBEAT_INTERVAL))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This config value is used for executor heartbeats to the driver right? so we'll probably need a new one for the drivers' heartbeats to the ESS?

extends BlockDataManager with BlockEvictionHandler with Logging {

private[spark] val externalShuffleServiceEnabled =
private[spark] val externalNonK8sShuffleService =
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's kind of weird to have the initialization here in the BlockManager because I don't think you use any of the setup variables in the rest of the BlockManager code? I think the goal with the pluggable API is so that you can separate the code having to do with the new ESS into the initialization function so as to not have to think about it in the BlockManager which is run in every spark app regardless of configuration

}
}

private def pollForPods(): Unit = {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you need to reregister with each External shuffle service when you find new ones?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, good point, maybe that is appropriate

private static final StoreVersion CURRENT_VERSION = new StoreVersion(1, 0);

// TODO: Dont necessarily write to local
private final File shuffleDir;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't think this gets used?

offset += length;
longBuffer.put(offset);
}
client = clientFactory.createUnmanagedClient(hostName, port);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I might have written this part of the code, but if we should be using the same hostname/port for all the writers in a single MapOutputWriter, so they can share a client, so we can pass in that instead of the clientFactory, hostname and port?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The client will become idle by the time a new partitionReader is requested. The ClientFactory will just generate a Client to use. It should handle the smarts, AFAIK

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I'm not sure what you mean by it becoming idle? Don't you have to manually close the client for the connection to be closed? Or does it actually timeout if you don't send info through within a time period?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah nevermind, I see the comment below #6 (comment)

}

@Override
public void commitAllPartitions(long[] partitionLengths) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think i was actually envisioning this to have no parameters, and then this function would call commitAndGetTotalLength() for each of the writers (which it keeps track of), and then use those lengths to commit the IndexWriter?

Having the user of this API have to commit each writer separately and gather the lengths seems less than ideal because it could accidentally be misused if somebody passes in the wrong length values.

I guess that that point, commitAndGetTotalLength feels like an internal API. Perhaps we can remove that method from the ShuffleOutputWriter interface, and since ExternalShuffleMapOutputWriter should be creating instances of ExternalShufflePartitionWriter, we can call that method on those instances.

also, thoughts @mccheah ?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, on second thought, you probably don't want to keep that many buffers open, so they should be written to and commit independently sequentially instead of all at once, but I would still like to see the partitionLengths become internal to the MapOutputWriter, perhaps through another function inside of the PartitionWriter that returns the lengths of the committed file

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This - which is the reason why the API shouldn't be opinionated about partition lengths here. There's a few ways we can do this without having to pass the partition lengths:

  1. Add a field to this top level class which is of type Map<Integer, AtomicInteger>. Call this partitionLengths. This is the count of the partition bytes written by partition ID. On the invocation of newPartitionWriter, create a new AtomicInteger that keeps a running count of that partition's bytes. Pass the Atomic Integer into the ExternalShufflePartitionWriter. Thus the partition byte count is now shared between this top level writer and the child writers it makes.

  2. Add a field to this top level class which is of type Map<Integer, ExternalShufflePartitionWriter>. These are all the writers we've created so far. Create additional methods on ExternalShufflePartitionWriter called getWrittenBytes. and isClosed. On ExternalShufflePartitionWriter#commitAndGetTotalLength make sure to deallocate the byte buffer by setting the partition stream to null (thus it will be garbage collected). On commitAllPartitions, cycle through all the writers and assert that they are closed, and derive the length of the partition via the writer's getWrittenBytes method.

I think option (1) is easier to write and is less prone to memory leaks, but is more opaque / less expressive to what it is accomplishing and is harder to read. I don't have a strong opinion between the two options. Both of these accomplish the goal of making partition lengths and index files an implementation detail.

Seq(clazz), conf)).getOrElse(Seq())
val serviceLoaders = shuffleProvider
.filter(_.canCreate(conf.get("spark.master")))
if (serviceLoaders.size > 1) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be more than one? you're only ever specifying a single class in the conf? but I could be misunderstanding this

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically shouldn't be possible based on the implementation of loadExtensions, but I'm fine with this if it catches changes in the loadExtensions method. After all, the contract of loadExtensions is returning a result that can theoretically have more than one of these.

endPartition: Int,
context: TaskContext,
metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
val shuffleIoPlugin = conf.get(SHUFFLE_IO_PLUGIN_CLASS)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was there a reason why this was changed from a private variable?

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 - this should really be initialized once per shuffle manager instance. Possibly one can make the initialization lazy though. But I don't see why one would want to make initialization lazy.

Copy link
Copy Markdown
Author

@ifilonenko ifilonenko Jan 14, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This wasn't possible because the ExternalShuffleService needed the SparkEnv to grab the securityManager and host:port information. And it wasn't initialized yet.

getFile(appId, shuffleId, mapId, "data"),
shuffleIndexRecord.getOffset,
shuffleIndexRecord.getLength)
callback.onSuccess(managedBuffer.nioByteBuffer())
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we probably want to make this a stream eventually so we can send back more data?

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 - we really want this to send back streaming data because of the potential size of this partition.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FileSegmentManagedBuffers already are sent in a smart way, so that you can send a large amount of data backed by a file as a stream (see convertToNetty). However, the receiving end needs to know how to get the result back without just slurping it all into memory -- I left a comment elsewhere about that.

conf.get(config.SHUFFLE_SERVICE_ENABLED)

private[spark] val externalk8sShuffleServiceEnabled =
conf.get(config.K8S_SHUFFLE_SERVICE_ENABLED)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's some overlap between this config and the SHUFFLE_SERVICE_PROVIDER_CLASS config. Perhaps we can use this flag to determine which provider class to load, thus simplifying the configs?

Copy link
Copy Markdown
Owner

@mccheah mccheah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A starting point for feedback. Thanks @yifeih for reviewing, I agree with her feedback as well. There's more work to do here, like supporting transferTo with this API and putting more existing code behind the API. But this is a good start.

return executors.size();
}


Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whitespace changes can be reverted


private final String typeString;

FileType(String typeString) {
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indentation is weird here

BlockTransferMessage header,
TransportClient client,
RpcResponseCallback callback) {
throw new UnsupportedOperationException("Unexpected message header: " + header);
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks odd at least to me - why is this change needed?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh this is for the case in which a stream, that is handled by the KubernetesExternalShuffleBlockResolver, is malformed and would be then be defaulted to this class via a super.handleStream() request. I can take this out, it isn't necessary

private final List<String> knownManagers = Arrays.asList(
"org.apache.spark.shuffle.sort.SortShuffleManager",
"org.apache.spark.shuffle.unsafe.UnsafeShuffleManager");
"org.apache.spark.shuffle.sort.SortShuffleManager",
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change isn't necessary. Might as well minimize the diff to make future reviews more manageable.

}
try {
// TODO encryption
fileOutputChannel = Channels.newChannel(new FileOutputStream(file));
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try FileChannel#open - sounds more idiomatic for what we're trying to do here anyway.

endPartition: Int,
context: TaskContext,
metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
val shuffleIoPlugin = conf.get(SHUFFLE_IO_PLUGIN_CLASS)
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 - this should really be initialized once per shuffle manager instance. Possibly one can make the initialization lazy though. But I don't see why one would want to make initialization lazy.

extends ExternalShuffleBlockHandler(transportConf, null) with Logging {


ThreadUtils.newDaemonSingleThreadScheduledExecutor("shuffle-cleaner-watcher")
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's put this in a start method - technically this is being started in the constructor of this class and doing side-effect actions in constructors is an antipattern.

getFile(appId, shuffleId, mapId, "data"),
shuffleIndexRecord.getOffset,
shuffleIndexRecord.getLength)
callback.onSuccess(managedBuffer.nioByteBuffer())
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 - we really want this to send back streaming data because of the potential size of this partition.


private class CleanerThread extends Runnable {
override def run(): Unit = {
val now = System.nanoTime()
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this done for something like Mesos or YARN as well? Surprised this cleanup code has to be introduced as a new idea.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this is done in Mesos world, as well. I am gonna share the logic between the two so there is no code duplication. Just wanted to get a WIP working

extends ExternalShuffleService(conf, securityManager) {

protected override def newShuffleBlockHandler(
conf: TransportConf): ExternalShuffleBlockHandler = {
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just name this transportConf and then we don't have to refer to this.

shuffleServerId = if (externalk8sShuffleServiceEnabled) {
// TODO: Investigate better methods of load balancing
// note: might break if re-initialized
randomShuffleServiceAddress = remoteShuffleServiceAddress.head
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's stopping us from just doing remoteShuffleServiceAddress.shuffle.head? Picking a random one arbitrarily shouldn't be any different than picking the first one every time.

TransportConf conf,
boolean authEnabled,
SecretKeyHolder secretKeyHolder,
String hostName,
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what needs to happen is that we don't bind the reader to a single hostName. Instead:

  • The write side should be able to write to any given selected shuffle services (although all partition blocks belong to the same partition file on one given host)
  • The write side should be storing locations of blocks on the MapOutputTracker. That probably requires the write implementation to create its own connection to the MapOutputTrackerMaster. Or we can inspect into whether or not the existing code reports the right block locations to the MapOutputTrackerMaster already.
  • The read side should directly ask the MapOutputTracker for locations of blocks. Thus the host and port to read from via the external shuffle client is dynamic per block. Tricky here is making sure we're not opening and closing clients too frequently, so ideally we'd like to pool and reuse clients wherever possible.

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yifeih is going to work on a proof of concept of the above ideas.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The write side should be storing locations of blocks on the MapOutputTracker

This is what I was trying to mention in our call; I feel something in the API needs to return a MapStatus that is automatically sent to the driver and provided to the read API on the reduce side. That would avoid every implementation having to deal with this. (MapStatus would be an opaque type instead of the current implementation.)

I still have to go and page all this code back in to see if what I'm talking about makes sense given the current way the MapOutputTracker works, though...

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thing is, most implementations will want to manage their locations on their own. For example, in the Ignite prototype the location is completely unnecessary because it can look up partition bytes simply by the block id as the cache key, letting the Ignite cluster handle the routing.

I don't think we want the API to expose the concept of a shuffle location as a matter of principle. This was in a previous proposal. However, I think we would revisit the concept of a shuffle location that would be stored alongside the MapStatus only as a concession to the way the code currently works. Unfortunately it adds more complexity to the API layer and such locations would be meaningless in many other implementations of the API.

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The answers here might be more obvious if we put the existing code paths behind this API as much as possible. A preliminary suggestion could be to have the write methods return locations to the shuffle blocks which are passed along on the read side as well. But, one would ideally like such locations to be optional. A benefit of making the locations optional is that we don't have to take up more driver memory in storing such locations for implementations that don't need them.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's less a question of exposing shuffle location and more a question of exposing an interface between the mapper and reducer that is more than just a block id, in case the implementation needs extra context that just the block id does not provide.

If you don't expose this in the API, which seems pretty easy to do for me, everybody that needs it will need to figure out how to do it. They may go to the map output tracker, and then that class now officially becomes part of the API too, whether it's encoded in the interface or not.

And those who don't need it can just ignore it (return null / ignore input parameter).

Anyway, no need to tackle that now. We can add it later if it makes sense.

@Override
public ShufflePartitionReader newPartitionReader(String appId, int shuffleId, int mapId) {
// TODO combine this into a function with ExternalShuffleWriteSupport
TransportContext context = new TransportContext(conf, new NoOpRpcHandler(), false);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as implementation details go, it would be a good idea to have the read and write support share the same transport context, instead of creating separate ones for each instance of each.

// Block transfer rate in byte per second
private final Meter blockTransferRateBytes = new Meter();
// Partition upload latency in ms
private final Timer uploadPartitionkStreamMillis = new Timer();
Copy link
Copy Markdown
Author

@ifilonenko ifilonenko Jan 14, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should actually be moved to the KubernetesExternalShuffleBlockHandler

}

@Override
public void initialize() {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I started playing with this code a bit, and pretty quickly run into this: it would be good for the interfaces to declare at least throws IOException, otherwise implementations will have to write extra code for no good reason.

override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {
scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend)
}
override def createShuffleServiceAddressProvider(): ShuffleServiceAddressProvider =
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI neither this nor the Mesos version of this method compile.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was building distribution without the -Pyarn and -Pmesos commands :) I'll fix that

@vanzin
Copy link
Copy Markdown

vanzin commented Jan 18, 2019

BTW, I haven't looked at which of the 3 PRs implements that part... but there are a few calls to ByteBuffer.reset() in the code that break the shuffle when you're using the external sorter. You can force that path with spark.shuffle.sort.bypassMergeThreshold=0. Without it, you're using BypassMergeSortShuffleWriter which seems to work.

I haven't yet fixed that in my current hacks.

@vanzin
Copy link
Copy Markdown

vanzin commented Jan 18, 2019

Figured out. Basically ShufflePartitionWriterOutputStream is pretty broken... I'll open a PR in my repo and link here, but it's starting to get complicated to track things with 3 (and soon 4) separate forks making progress in parallel.

@vanzin
Copy link
Copy Markdown

vanzin commented Jan 19, 2019

Pushed my code and made a PR at: vanzin#52

Of interest to you guys probably are the fixes outside of the "DFS" classes I'm creating as a p.o.c. for the API.

* Returns a {@link CommittedPartition} indicating information about that written partition.
*/
long commitAndGetTotalLength();
CommittedPartition commitPartition();
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I merged my branch with your recent changes and I'm not a big fan of this.

I think it useful to have the API expose something like this, but your implementation has two very important issues:

  • it's opinionated about what CommittedPartition is
  • it doesn't allow the implementation to return null

There may be a third issue, which is what happens with very large shuffles, where you'll have a ton of these objects.

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would you propose as an alternative?

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Regarding the third issue, we have plans on how to compress these on the driver, such that we only store the set of unique locations and create an index to look up the location per reduce id for a given map output. But perhaps more efficient is making ShuffleLocation be an opaque Array[Byte].)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My original idea was for the map side to just return an opaque Object (or Serializable, I guess), and the reduce side to take that as an argument.

It might need some extra APIs, though (the driver must be able to re-organize things so that the reducers have a view of the data corresponding to all of the different mappers).

Something like:

ShuffleMapOutputWriter {
  Serializable commitAllPartitions() throws IOException
}

ShuffleDataIO {
  Serializable mergeMapOutput(Serializable mapOutput);
}

ShuffleReadSupport {
  ShufflePartitionReader newPartitionReader(String appId, int shuffleId, int mapId, Serializable metadata)
}

That's just a quick dump off the top of my head, so may not really be workable.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(But, really, for my current use, just being able to return null and have things work would have been great.)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vanzin I think we may have to expose something in the API to return the partition lengths because they're currently used for jobs stats and compressing the MapStatus data, so unless we refactor that part of the current code too, I think we need to at least expose the partition lengths. However, I do like how your interface is more symmetrical, and that you're exposing the object that the writer returns as a parameter into the reader, I think I will change it to that.

Here's a document of the thoughts that went into the current implementation, if if further clarifies anything. The current implementation is at the bottom of the doc: https://docs.google.com/document/d/1Eyuoo7p0DIGDRciSaOOACpqSiY9bW32cllTV8SRm1zI/edit#

logger.info("response is: " + response.toString() +
" " + response.array() + " " + response.hasArray());
if (response.hasArray()) {
logger.info("response hashcode: " + Arrays.hashCode(response.array()));
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just want to check -- this is all your WIP debug logging, right? The response might be quite large (GB even), I don't think you want to be taking a hashcode here just for logging

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, all for debug logging. This will definitely be removed

client.setClientId(requestID);
logger.info("clientid: " + client.getClientId() + " " + client.isActive());

ByteBuffer response = client.sendRpcSync(openMessage.toByteBuffer(), 60000);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here you're always fetching the remote block into memory, you won't use fetch-to-disk at all.

Also you wont' do any pipelining of requests. You'll request one block, fetch it all into memory, consume the entire block, and only then start the request for the next block. I mentioned this on matt's work on the api too, but wanted to mention it again here.

I understand if this doesn't go in right away, but we want to make sure we design things so this is possible, at least.

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this done currently? Does the prior implementation do fetch to disk directly?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its quite a bit more complicated.

(1) ShuffleBlockFetcherIterator looks at the block size to determine whether or not to fetch it to disk:
https://github.com/apache/spark/blob/dc2da72100811988ee1b31190f219b620f88f8de/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L253-L261
(btw it also uses that size info to know how many requests to pipeline)

(2) eventually that gets down to OneForOneBlockFetcher here:
https://github.com/apache/spark/blob/dc2da72100811988ee1b31190f219b620f88f8de/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java#L123-L127

the msg it sends is slightly different, but more or less its still just saying "I want shuffle block X".

(3) The receiving end still looks at that msg to figure out what shuffle block is requested, and then streams back that data. The file on disk is still pushed directly to the socket, without copying into memory in the JVM, regardless of whether the receiving end wants them on disk or not. But the header it sends back is slightly different.

(4) because the response is a StreamResponse, the TransportResponseHandler puts an interceptor in the frame decoder, so that rather than buffering up the entire msg, it just send the bytes in small chunks on to the callback.

https://github.com/apache/spark/blob/dc2da72100811988ee1b31190f219b620f88f8de/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java#L209-L220

Also see MessageEncoder and how it uses isBodyInFrame.

Going back up to OneForOneBlockFetcher, that callback is a DownloadCallback which just writes bytes to a file:

https://github.com/apache/spark/blob/dc2da72100811988ee1b31190f219b620f88f8de/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java#L155-L172

and when its fetched all the data, it gives a view of the bytes in that file as a ManagedBuffer, so other parts of spark don't care whether its on disk or in memory (the details here get complicated by encryption).

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does implementing this require changes at the API layer? Or is it only on the implementation of our API?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pipelining would almost certainly require API changes (but probably just pushing down a bit of the logic here: https://github.com/mccheah/spark/pull/4/files#r246172407

Fetch-to-disk I think mostly just requires a change to the implementation. But it does require knowledge of the size of the shuffle block that is about to be fetched (or at least, whether or not its bigger than the limit) -- I'm not sure I follow which parts of that will be changed in the api yet.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw another thing I was thinking was if you could push the api change down even lower, so that you only change the ShuffleClient used, but keep the ShuffleBlockFetcherIterator. You'd have to change the arguments to ShuffleClient.fetchBlocks (and I'm not exactly sure how atm) but otherwise maybe you want to reuse the rest of ShuffleBlockFetcherIterator?

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify, is the suggestion to make ShuffleClient the API that developers would customize to work with pluggable backends?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, that's what I was thinking (I haven't really thought through whether that would actually work or not, though)

getFile(appId, shuffleId, mapId, "data"),
shuffleIndexRecord.getOffset,
shuffleIndexRecord.getLength)
callback.onSuccess(managedBuffer.nioByteBuffer())
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FileSegmentManagedBuffers already are sent in a smart way, so that you can send a large amount of data backed by a file as a stream (see convertToNetty). However, the receiving end needs to know how to get the result back without just slurping it all into memory -- I left a comment elsewhere about that.


/** An extractor object for matching BlockTransferMessages. */
private object RegisterDriverParam {
def unapply(r: RegisterDriver): Option[(String, AppState)] =
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark doesn't really use this pattern, of having object's with just unapply methods for pattern matching. Its a bit harder for folks that have just a java background to follow (and arguably this is a part of the code where we want lower-level expertise, and too many scala-isms would discourage that).

vanzin and others added 3 commits January 22, 2019 14:38
- add exceptions to API
- fix write side: use serializer manager, fix usage of ByteBuffer.
- misc compilation / style fixes to get things to build.
* compiles
`

* fix UnsafeShuffleWriter

* remove unnecessary changes
@vanzin
Copy link
Copy Markdown

vanzin commented Jan 31, 2019

One thing I ran into while playing with my code is that failure to write shuffle files doesn't seem to be handled correctly. writePartitionedToExternalShuffleWriteSupport is catching an exception from my openPartitionStream implementation, but instead of the tasks / job failing, I'm getting an empty output.

@github-actions
Copy link
Copy Markdown

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions Bot added the Stale label Mar 19, 2020
@github-actions github-actions Bot closed this Mar 20, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants