Environment
- Spark Connect Go client v34
- Delta Lake operations via DataFrameWriter.Save()
- Parquet files stored in S3
Issue Description
Intermittent org.sparkproject.connect.grpc.StatusRuntimeException: INTERNAL: Encountered end-of-stream mid-frame errors occur during DataFrame.Write().Format("delta").Mode("append").Save() operations when loading a series of Parquet files from S3 and appending to Delta tables using the Data Frame Write operations. In my experiment the error is intermittent, and can safely be ignored while requests are retried. The Spark Connect application when receiving row streams will also automatically retry this, so it doesn't cause an issue in real world usage (seemingly) beyond a performance bottleneck.
Error Details
25/06/23 23:28:46 WARN NettyServerStream: Exception processing message
org.sparkproject.connect.grpc.StatusRuntimeException: INTERNAL: Encountered end-of-stream mid-frame
Full context shows:
25/06/23 23:28:46 INFO SparkConnectExecutionManager: ExecuteHolder ExecuteKey(...) is removed.
25/06/23 23:28:46 INFO ExecuteResponseObserver: Release all for opId=.... Execution stats: total=CachedSize(3552,20) autoRemoved=CachedSize(3372,19) cachedUntilConsumed=CachedSize(0,0) cachedUntilProduced=CachedSize(0,0) maxCachedUntilConsumed=CachedSize(360,2) maxCachedUntilProduced=CachedSize(360,2)
25/06/23 23:28:46 WARN NettyServerStream: Exception processing message
org.sparkproject.connect.grpc.StatusRuntimeException: INTERNAL: Encountered end-of-stream mid-frame
Reproduction Pattern
- Loading Parquet files from S3 using
sess.Read().Format("parquet").Load(s3URI)
- Writing to Delta table using
df.Write().Format("delta").Mode("append").Save(deltaPath)
- Error occurs intermittently, not on every operation
- Appears to be related to gRPC stream handling during the save operation
Code Pattern
df, err := sess.Read().Format("parquet").Load(parquetURI)
if err != nil {
return err
}
err = df.Write().Format("delta").Mode("append").Save(deltaPath)
// ^ This is where the mid-frame error occurs intermittently
Workaround
Retry logic with exponential backoff works around the issue, because of the transactional nature of Delta inserts:
for attempt := 0; attempt < 3; attempt++ {
err := df.Save(dst)
if err == nil {
return nil
}
if !strings.Contains(err.Error(), "mid-frame") {
return err
}
time.Sleep(backoffDuration)
}
Expected Behavior
DataFrame.Save() operations should complete reliably without gRPC stream interruptions. This was not reproducible using SQL.
Use Case Context
I've been experimenting with bulk data uploads to Delta Lake via Spark Connect Go client. Since Go client lacks parameterized queries, I've tried using the pattern of writing Parquet to S3 then loading via DataFrames for Delta table operations. This actually works very well, and achieves million rows / s throughput except for these intermittent stream errors.
The gRPC stream appears to be closing prematurely during the save operation. I think this suggests a potential issue with stream lifecycle management in the Spark Connect server. I think it may be related to ExecuteHolder ExecuteKey(...) is removed.
Environment
Issue Description
Intermittent
org.sparkproject.connect.grpc.StatusRuntimeException: INTERNAL: Encountered end-of-stream mid-frameerrors occur duringDataFrame.Write().Format("delta").Mode("append").Save()operations when loading a series of Parquet files from S3 and appending to Delta tables using the Data Frame Write operations. In my experiment the error is intermittent, and can safely be ignored while requests are retried. The Spark Connect application when receiving row streams will also automatically retry this, so it doesn't cause an issue in real world usage (seemingly) beyond a performance bottleneck.Error Details
Full context shows:
Reproduction Pattern
sess.Read().Format("parquet").Load(s3URI)df.Write().Format("delta").Mode("append").Save(deltaPath)Code Pattern
Workaround
Retry logic with exponential backoff works around the issue, because of the transactional nature of Delta inserts:
Expected Behavior
DataFrame.Save() operations should complete reliably without gRPC stream interruptions. This was not reproducible using SQL.
Use Case Context
I've been experimenting with bulk data uploads to Delta Lake via Spark Connect Go client. Since Go client lacks parameterized queries, I've tried using the pattern of writing Parquet to S3 then loading via DataFrames for Delta table operations. This actually works very well, and achieves million rows / s throughput except for these intermittent stream errors.
The gRPC stream appears to be closing prematurely during the save operation. I think this suggests a potential issue with stream lifecycle management in the Spark Connect server. I think it may be related to
ExecuteHolder ExecuteKey(...) is removed.