Skip to content

Error "INTERNAL: Encountered end-of-stream mid-frame" during DataFrame.Save() operations #149

@caldempsey

Description

@caldempsey

Environment

  • Spark Connect Go client v34
  • Delta Lake operations via DataFrameWriter.Save()
  • Parquet files stored in S3

Issue Description

Intermittent org.sparkproject.connect.grpc.StatusRuntimeException: INTERNAL: Encountered end-of-stream mid-frame errors occur during DataFrame.Write().Format("delta").Mode("append").Save() operations when loading a series of Parquet files from S3 and appending to Delta tables using the Data Frame Write operations. In my experiment the error is intermittent, and can safely be ignored while requests are retried. The Spark Connect application when receiving row streams will also automatically retry this, so it doesn't cause an issue in real world usage (seemingly) beyond a performance bottleneck.

Error Details

25/06/23 23:28:46 WARN NettyServerStream: Exception processing message
org.sparkproject.connect.grpc.StatusRuntimeException: INTERNAL: Encountered end-of-stream mid-frame

Full context shows:

25/06/23 23:28:46 INFO SparkConnectExecutionManager: ExecuteHolder ExecuteKey(...) is removed.
25/06/23 23:28:46 INFO ExecuteResponseObserver: Release all for opId=.... Execution stats: total=CachedSize(3552,20) autoRemoved=CachedSize(3372,19) cachedUntilConsumed=CachedSize(0,0) cachedUntilProduced=CachedSize(0,0) maxCachedUntilConsumed=CachedSize(360,2) maxCachedUntilProduced=CachedSize(360,2)
25/06/23 23:28:46 WARN NettyServerStream: Exception processing message
org.sparkproject.connect.grpc.StatusRuntimeException: INTERNAL: Encountered end-of-stream mid-frame

Reproduction Pattern

  • Loading Parquet files from S3 using sess.Read().Format("parquet").Load(s3URI)
  • Writing to Delta table using df.Write().Format("delta").Mode("append").Save(deltaPath)
  • Error occurs intermittently, not on every operation
  • Appears to be related to gRPC stream handling during the save operation

Code Pattern

df, err := sess.Read().Format("parquet").Load(parquetURI)
if err != nil {
    return err
}
err = df.Write().Format("delta").Mode("append").Save(deltaPath)
// ^ This is where the mid-frame error occurs intermittently

Workaround

Retry logic with exponential backoff works around the issue, because of the transactional nature of Delta inserts:

for attempt := 0; attempt < 3; attempt++ {
    err := df.Save(dst)
    if err == nil {
        return nil
    }
    if !strings.Contains(err.Error(), "mid-frame") {
        return err
    }
    time.Sleep(backoffDuration)
}

Expected Behavior

DataFrame.Save() operations should complete reliably without gRPC stream interruptions. This was not reproducible using SQL.

Use Case Context

I've been experimenting with bulk data uploads to Delta Lake via Spark Connect Go client. Since Go client lacks parameterized queries, I've tried using the pattern of writing Parquet to S3 then loading via DataFrames for Delta table operations. This actually works very well, and achieves million rows / s throughput except for these intermittent stream errors.

The gRPC stream appears to be closing prematurely during the save operation. I think this suggests a potential issue with stream lifecycle management in the Spark Connect server. I think it may be related to ExecuteHolder ExecuteKey(...) is removed.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions