feat: Implement Message Updates, Deletes, and Appends#697
feat: Implement Message Updates, Deletes, and Appends#697
Conversation
Signed-off-by: Lewis Marshall <lewis.marshall@ably.com>
WalkthroughAdds message versioning and actions (create/update/delete/append), publish-with-result APIs for REST and Realtime, serial-aware ACK delivery via an ackCallback wrapper, default protocol bumped to v5, per-message retrieval/version history, supporting decoders, and extensive tests and examples for message lifecycle flows. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant REST_Channel as "REST Channel"
participant Server
Client->>REST_Channel: PublishWithResult(name, data)
REST_Channel->>Server: POST /messages (Action=Create)
Server-->>REST_Channel: {serials: ["s1"]}
REST_Channel-->>Client: PublishResult{Serial:"s1"}
Client->>REST_Channel: UpdateMessage(msg{Serial:"s1"}, opts)
REST_Channel->>REST_Channel: validate serial, build Version
REST_Channel->>Server: POST /messages (Action=Update, Version)
Server-->>REST_Channel: {versionSerial: "v2"}
REST_Channel-->>Client: UpdateResult{VersionSerial:"v2"}
sequenceDiagram
participant Client
participant Realtime_Channel as "Realtime Channel"
participant Connection
participant Server
Client->>Realtime_Channel: PublishWithResultAsync(name, data, cb)
Realtime_Channel->>Realtime_Channel: create ackCallback wrapper
Realtime_Channel->>Connection: send(msg, callback)
Connection->>Server: send PUBLISH
Server-->>Connection: ACK {serials: ["s1"]}
Connection->>Realtime_Channel: callback.call(serials, nil)
Realtime_Channel-->>Client: cb(PublishResult{Serial:"s1"}, nil)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
ably/paginated_result.go (1)
189-194:⚠️ Potential issue | 🟠 MajorBug:
copyHeaderassigns the original slice instead of the copy.Line 192 creates a copy
dof the header values slice, but line 193 assigns the originalvtodest[k], making the copy dead code. The destination header will share the same backing array as the source.🐛 Proposed fix
func copyHeader(dest, src http.Header) { for k, v := range src { d := make([]string, len(v)) copy(d, v) - dest[k] = v + dest[k] = d } }ably/rest_channel.go (1)
112-119:⚠️ Potential issue | 🟡 Minor
publishMultiplemutates caller's message slice elements in place.The
*m, err = (*m).withEncodedData(cipher)dereferences and overwrites the pointed-toMessagevalue. Sincemessagesis[]*Message, this modifies the caller's original messages (e.g., overwritingDatawith encoded bytes and settingEncoding). This could surprise callers who reuse message objects. The same pattern exists in theUpdateMessage/DeleteMessage/AppendMessagemethods, but those copy first (updateMsg := *msg), which is safer.
🤖 Fix all issues with AI agents
In `@ably/message_updates_integration_test.go`:
- Around line 315-329: The async callback passed to channel.AppendMessageAsync
currently calls t.Logf from a goroutine (in the loop over tokens) which can
panic if the test ends; change the callback to never call testing.T methods and
instead send results over the existing completed channel (e.g., send an index or
an error value/struct) so the main test goroutine can observe failures; update
the completion loop that reads from completed to detect and fail the test via
require/ t.Fatalf when an error result is received, and remove the t.Logf call
inside the callback (refer to the loop variable tokens, the
channel.AppendMessageAsync callback, and the completed channel when making the
change).
In `@ably/state.go`:
- Line 138: The Ack method on pendingEmitter has an unused conn *Connection
parameter; remove the parameter from the function signature (func (q
*pendingEmitter) Ack(msg *protocolMessage, errInfo *ErrorInfo)) and update all
callers (e.g., the calls from realtime_conn.go that currently pass a Connection)
to stop passing conn. Ensure you update any interface definitions or
implementations that referenced pendingEmitter.Ack to match the new signature
and run build/tests to catch remaining references.
🧹 Nitpick comments (6)
ably/rest_client.go (1)
812-819: Header ordering: custom headers copied beforeAcceptis set.Custom headers from
r.headerare copied at line 813 beforeAcceptis unconditionally set at line 815. If a caller ever passes a customAcceptheader, it would be silently overwritten. Currently this isn't an issue (only protocol version is overridden), but the conditional guard applied toablyProtocolVersionHeader(line 817) is not applied toAccept.Consider applying the same "set only if absent" pattern to
Acceptfor consistency, or document that certain headers cannot be overridden.ably/proto_message.go (1)
34-106: DRY: action↔numeric mapping is duplicated four times.The same switch logic for converting between
MessageActionand its numeric wire representation is repeated inMarshalJSON,UnmarshalJSON,CodecEncodeSelf, andCodecDecodeSelf. Consider extracting helpers likemessageActionToNumandnumToMessageActionto centralize the mapping.Also, the
defaultbranch silently maps unknown actions toMessageActionCreate(0) during both marshaling and unmarshaling. This could mask bugs if an invalid action is accidentally used. Consider returning an error for unknown values instead.♻️ Proposed helper extraction
+var messageActionToNum = map[MessageAction]int{ + MessageActionCreate: 0, + MessageActionUpdate: 1, + MessageActionDelete: 2, + MessageActionAppend: 5, +} + +var numToMessageAction = map[int]MessageAction{ + 0: MessageActionCreate, + 1: MessageActionUpdate, + 2: MessageActionDelete, + 5: MessageActionAppend, +} + func (a MessageAction) MarshalJSON() ([]byte, error) { - var num int - switch a { - case MessageActionCreate: - num = 0 - case MessageActionUpdate: - num = 1 - case MessageActionDelete: - num = 2 - case MessageActionAppend: - num = 5 - default: - num = 0 + num, ok := messageActionToNum[a] + if !ok { + return nil, fmt.Errorf("unknown MessageAction: %q", a) } return json.Marshal(num) }Apply the same pattern to
UnmarshalJSON,CodecEncodeSelf, andCodecDecodeSelf.ably/realtime_channel.go (1)
852-1046: Significant DRY opportunity:UpdateMessageAsync,DeleteMessageAsync, andAppendMessageAsyncare nearly identical.The three async methods differ only in the
MessageActionconstant assigned. The blocking variants follow the same boilerplate pattern too. Consider extracting a shared helper to reduce ~180 lines of near-duplicate code.♻️ Sketch of a shared helper
+func (c *RealtimeChannel) messageOperationAsync(msg *Message, action MessageAction, onAck func(*UpdateResult, error), options ...UpdateOption) error { + if err := validateMessageSerial(msg); err != nil { + return err + } + var opts updateOptions + for _, o := range options { + o(&opts) + } + version := &MessageVersion{ + Description: opts.description, + ClientID: opts.clientID, + Metadata: opts.metadata, + } + opMsg := *msg + opMsg.Action = action + opMsg.Version = version + protoMsg := &protocolMessage{ + Action: actionMessage, + Channel: c.Name, + Messages: []*Message{&opMsg}, + } + return c.sendWithSerialCallback(protoMsg, func(serials []string, err error) { + if err != nil { + onAck(nil, err) + return + } + result := &UpdateResult{} + if len(serials) > 0 { + result.VersionSerial = serials[0] + } + onAck(result, nil) + }) +} + +func (c *RealtimeChannel) messageOperation(ctx context.Context, msg *Message, action MessageAction, options ...UpdateOption) (*UpdateResult, error) { + type resultOrError struct { + result *UpdateResult + err error + } + listen := make(chan resultOrError, 1) + if err := c.messageOperationAsync(msg, action, func(result *UpdateResult, err error) { + listen <- resultOrError{result, err} + }, options...); err != nil { + return nil, err + } + select { + case <-ctx.Done(): + return nil, ctx.Err() + case result := <-listen: + return result.result, result.err + } +}Then each public method becomes a one-liner:
func (c *RealtimeChannel) UpdateMessage(ctx context.Context, msg *Message, options ...UpdateOption) (*UpdateResult, error) { return c.messageOperation(ctx, msg, MessageActionUpdate, options...) }ably/message_updates_integration_test.go (1)
205-209: Assertion on version ordering may be fragile.The test asserts that
versions[0]isMessageActionCreateandversions[1]/versions[2]areMessageActionUpdate. This depends on the server returning versions in a specific order. If the API's default ordering changes, this test will break. Consider documenting or parameterizing the expected direction.ably/rest_channel.go (2)
208-209: Remove debug log from production code.This
Debugfline logs response details including serials. While it's at debug level, it appears to be a development artifact (// Debug: log response). Consider removing it or ensuring it's intentional.Proposed fix
- // Debug: log response - c.log().Debugf("PublishMultipleWithResult response: serials=%v, count=%d", response.Serials, len(response.Serials)) - // Build results from serials
221-363: Significant code duplication acrossUpdateMessage,DeleteMessage, andAppendMessage.These three methods are nearly identical, differing only in the
Actionfield (MessageActionUpdate,MessageActionDelete,MessageActionAppend). Consider extracting a shared helper.Proposed refactor
+func (c *RESTChannel) performMessageAction(ctx context.Context, action MessageAction, msg *Message, options ...UpdateOption) (*UpdateResult, error) { + if err := validateMessageSerial(msg); err != nil { + return nil, err + } + + var opts updateOptions + for _, o := range options { + o(&opts) + } + + version := &MessageVersion{ + Description: opts.description, + ClientID: opts.clientID, + Metadata: opts.metadata, + } + + actionMsg := *msg + actionMsg.Action = action + actionMsg.Version = version + + cipher, _ := c.options.GetCipher() + var err error + actionMsg, err = actionMsg.withEncodedData(cipher) + if err != nil { + return nil, fmt.Errorf("encoding data for message: %w", err) + } + + var response publishResponse + res, err := c.client.post(ctx, c.baseURL+"/messages", []*Message{&actionMsg}, &response) + if err != nil { + return nil, err + } + defer res.Body.Close() + + result := &UpdateResult{} + if len(response.Serials) > 0 { + result.VersionSerial = response.Serials[0] + } + return result, nil +} + func (c *RESTChannel) UpdateMessage(ctx context.Context, msg *Message, options ...UpdateOption) (*UpdateResult, error) { - // ... ~45 lines of duplicated code + return c.performMessageAction(ctx, MessageActionUpdate, msg, options...) } func (c *RESTChannel) DeleteMessage(ctx context.Context, msg *Message, options ...UpdateOption) (*UpdateResult, error) { - // ... ~45 lines of duplicated code + return c.performMessageAction(ctx, MessageActionDelete, msg, options...) } func (c *RESTChannel) AppendMessage(ctx context.Context, msg *Message, options ...UpdateOption) (*UpdateResult, error) { - // ... ~45 lines of duplicated code + return c.performMessageAction(ctx, MessageActionAppend, msg, options...) }
|
It would have been nice as an experiment to derive the tests from https://github.com/ably/uts/blob/uts-experiments/test/completion-status.md |
|
@paddybyers yes apologies I did start with that but got a bit bogged down so I thought I'd just get this implemented (because I want it for load testing), and then follow up with you on the UTS stuff (we can try it out for annotations/summaries, which I haven't added here). |
There was a problem hiding this comment.
🧹 Nitpick comments (3)
ably/state_test.go (3)
20-161: Tests cover the happy path well. Consider adding edge-case coverage.The three sub-tests correctly verify per-message serial routing for the straightforward ACK case. The test logic and assertions are sound.
A few additional scenarios would strengthen confidence in the
Ackimplementation:
- Implicit NACK (serialShift > 0): Enqueue messages starting at serial N, send an ACK starting at serial N+1 — verify the first message gets
errImplictNACKand a nil result, while the rest get their correct serials.- NACK with errInfo: Call
Ackwith a non-nilerrInfoand verify the error propagates to all callbacks.- Nil callback safety: Enqueue a message with a nil
*ackCallbackand verifyAckdoesn't panic.onAck(non-serial) path: Enqueue with anackCallbackthat only setsonAck, verify it receives the error correctly.
21-23: Optional: Extract repeated logger setup into a test helper.
testLogger := logger{l: &stdLogger{log.New(io.Discard, "", 0)}}is duplicated in all three sub-tests. A small helper or asetupat the top ofTestPendingEmitter_AckResultwould reduce repetition.♻️ Suggested refactor
func TestPendingEmitter_AckResult(t *testing.T) { + testLogger := logger{l: &stdLogger{log.New(io.Discard, "", 0)}} + t.Run("two messages with single serial each", func(t *testing.T) { - testLogger := logger{l: &stdLogger{log.New(io.Discard, "", 0)}} emitter := newPendingEmitter(testLogger)
34-48: Optional: Assert thaterris nil in callbacks for ACK (non-NACK) scenarios.The callbacks capture
serialsbut silently discarderr. Addingassert.NoError(t, err)inside each callback would catch unexpected error propagation.♻️ Example for callback1
callback1 := &ackCallback{ - onAckWithSerials: func(serials []string, err error) { + onAckWithSerials: func(serials []string, err error) { + assert.NoError(t, err) msg1Serials = serials }, }
There was a problem hiding this comment.
Pull request overview
This PR implements message updates, deletes, and appends functionality for the ably-go SDK, enabling full message lifecycle management including publishing with result tracking, updating, deleting, and appending messages. The implementation follows the design specified in issue #696 and updates the protocol version from 2 to 5 to support message serials and version information.
Changes:
- Adds message operations API (PublishWithResult, UpdateMessage, DeleteMessage, AppendMessage) for both REST and Realtime channels with sync and async variants
- Introduces new message-related types: MessageAction enum, MessageVersion, PublishResult, UpdateResult, and UpdateOption functional options
- Updates protocol version to v5 by default, with special handling to keep Stats on v2 for backward compatibility
Reviewed changes
Copilot reviewed 18 out of 18 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| ablytest/sandbox.go | Adds MutableMessages flag to namespace configuration for testing message operations |
| ably/state_test.go | Adds comprehensive unit tests for ACK result distribution across multiple messages |
| ably/state.go | Refactors callback handling to support serial extraction via new ackCallback struct |
| ably/rest_client.go | Implements protocol version override for Stats requests to maintain v2 compatibility |
| ably/rest_channel.go | Adds REST message operation methods (PublishWithResult, UpdateMessage, DeleteMessage, AppendMessage, GetMessage, GetMessageVersions) |
| ably/realtime_presence.go | Updates presence code to use new ackCallback pattern |
| ably/realtime_experimental_objects_test.go | Updates experimental objects tests for new callback signature |
| ably/realtime_experimental_objects.go | Updates experimental objects to use new ackCallback pattern |
| ably/realtime_conn.go | Updates connection send() to use ackCallback with nil safety checks |
| ably/realtime_channel.go | Adds Realtime message operation methods with both blocking and async variants |
| ably/proto_protocol_message.go | Adds Res field to protocolMessage for ACK serial responses |
| ably/proto_message_operations_test.go | Adds unit tests for MessageAction encoding/decoding and validation |
| ably/proto_message.go | Adds MessageAction, MessageVersion, PublishResult, UpdateResult types and Message struct fields |
| ably/proto_http.go | Updates default protocol version to v5 with documentation |
| ably/paginated_result.go | Adds header field to paginatedRequest for custom HTTP headers |
| ably/message_updates_integration_test.go | Adds comprehensive integration tests for message operations |
| ably/export_test.go | Updates test helper to use new callback pattern |
| ably/example_message_updates_test.go | Adds usage examples for new message operation APIs |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
I reviewed current ably-go PR with respect to
Key Spec Points: RSL15 (REST update/delete/append), RTL32 (Realtime update/delete/append), TM5 (MessageAction enum), TM2s (version initialization), PBR2a (PublishResult), UDR2a (UpdateDeleteResult), MOP2 (MessageOperation), TR4s (ACK res field) 1. Critical Issues (Missing or Incorrect Spec Implementation)1.1 ❌ REST Update/Delete/Append Uses POST Instead of PATCH (RSL15b)Spec Requirement (
ably-go Implementation ( // UpdateMessage uses POST instead of PATCH, wrong URL, and sends array instead of single Message
res, err := c.client.post(ctx, c.baseURL+"/messages", []*Message{&updateMsg}, &response)ably-java Implementation ( scheduler.patch(basePath + "/messages/" + HttpUtils.encodeURIComponent(message.serial), ...)Issues:
Fix Required:
1.2 ❌ REST Response Parsing Uses Wrong Type (RSL15e)Spec Requirement (
ably-go Implementation ( var response publishResponse // Uses publishResponse (serials array) instead of UpdateDeleteResultably-java Implementation ( HttpCore.BodyHandler<UpdateDeleteResult> bodyHandler = UpdateDeleteResult.getBodyHandler();Issue: The ably-go REST update/delete/append methods parse the response as a Fix Required: Parse the response as an 1.3
|
…for message update, delete and append
There was a problem hiding this comment.
Actionable comments posted: 4
🧹 Nitpick comments (3)
ably/rest_channel.go (2)
208-210: Remove debug artifact logging raw serial arrays.This debug line looks temporary and can add noisy logs with internal identifiers.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ably/rest_channel.go` around lines 208 - 210, Remove the temporary debug log that prints raw serial arrays in the PublishMultipleWithResult flow: delete the c.log().Debugf("PublishMultipleWithResult response: serials=%v, count=%d", response.Serials, len(response.Serials)) line in rest_channel.go (inside the PublishMultipleWithResult handling) or replace it with a non-sensitive summary (e.g., logging only the count or a masked identifier) to avoid leaking internal identifiers.
68-70: Preserve nullable serial semantics in publish responses.
[]stringcannot representnullserial entries, so you lose the distinction between “discarded/superseded” and empty string. Consider using nullable elements in the wire struct (and document the public ergonomic mapping if you keepstringoutwardly).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ably/rest_channel.go` around lines 68 - 70, The publishResponse struct currently declares Serials as []string which cannot represent null elements; change Serials to a nullable-element slice (e.g., []*string) in the wire struct publishResponse to preserve null vs empty-string semantics when marshaling/unmarshaling, and update any serializers/deserializers and tests that touch publishResponse to handle pointers; if the public API must remain []string, add a clear conversion layer (documented) that maps nil pointers to a sentinel or explicitly documents the loss of null information.ably/realtime_channel.go (1)
788-794: Extract repeated clientId compatibility validation into a helper.This same check already exists in
PublishMultipleAsync; consolidating it avoids drift between result/non-result publish paths.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ably/realtime_channel.go` around lines 788 - 794, Extract the repeated clientId compatibility check into a shared helper (e.g., validateClientIDCompatibility) that accepts the slice of messages and the Auth/client used to obtain clientIDForCheck; move the loop and the existing error construction into that helper (use client.Auth.clientIDForCheck() and preserve the exact error text), then replace the inlined loop in the current function and in PublishMultipleAsync with a call to validateClientIDCompatibility and return its error if non-nil.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@ably/proto_message_operations_test.go`:
- Around line 43-44: The test currently asserts that an unknown action code
("999") maps to MessageActionCreate which codifies a fallback and hides
unsupported actions; update the assertions that reference "999" and
MessageActionCreate (and the duplicate occurrences later) so they no longer
expect Create — instead assert that parsing/validation returns an explicit error
or an Unknown/invalid action result (e.g., an error value or a dedicated
MessageActionUnknown) so unknown/new action codes are surfaced rather than
silently mapped to MessageActionCreate; update any helper parsing/assertion
logic used by the test to check for the error/unknown outcome.
In `@ably/realtime_channel.go`:
- Around line 859-881: The protocolMessage being built for realtime
update/delete/append never sets Params so operation-level params are dropped;
add a Params field to updateOptions (e.g., updateOptions.params) if it doesn't
exist, populate it via the provided option functions, and then set
protoMsg.Params = opts.params when constructing protocolMessage (the code that
creates version := &MessageVersion{...}, opMsg := *msg, and protoMsg :=
&protocolMessage{...}). Ensure option setters wire into updateOptions.params so
operation params flow into ProtocolMessage.params.
In `@ably/rest_channel.go`:
- Around line 254-267: The code is using the publish contract (posting an array
to "/messages" and reading publishResponse.Serials) instead of the
operation-specific request/response for update/delete/append; change the call in
the method that builds opMsg so it posts to the operation-specific path and body
(use the operation request contract expected by the server rather than
c.client.post(..., c.baseURL+"/messages", []*Message{&opMsg}, ...)), swap the
response type from publishResponse to the operation result schema (e.g.,
UpdateResponse/OperationResult type) and decode that response, then set
result.VersionSerial from the operation response's version field (e.g.,
response.VersionSerial or response.Version) instead of reading
publishResponse.Serials; keep using c.client.post and res.Body.Close but ensure
URL, request body shape, and response decoding match the operation contract.
---
Nitpick comments:
In `@ably/realtime_channel.go`:
- Around line 788-794: Extract the repeated clientId compatibility check into a
shared helper (e.g., validateClientIDCompatibility) that accepts the slice of
messages and the Auth/client used to obtain clientIDForCheck; move the loop and
the existing error construction into that helper (use
client.Auth.clientIDForCheck() and preserve the exact error text), then replace
the inlined loop in the current function and in PublishMultipleAsync with a call
to validateClientIDCompatibility and return its error if non-nil.
In `@ably/rest_channel.go`:
- Around line 208-210: Remove the temporary debug log that prints raw serial
arrays in the PublishMultipleWithResult flow: delete the
c.log().Debugf("PublishMultipleWithResult response: serials=%v, count=%d",
response.Serials, len(response.Serials)) line in rest_channel.go (inside the
PublishMultipleWithResult handling) or replace it with a non-sensitive summary
(e.g., logging only the count or a masked identifier) to avoid leaking internal
identifiers.
- Around line 68-70: The publishResponse struct currently declares Serials as
[]string which cannot represent null elements; change Serials to a
nullable-element slice (e.g., []*string) in the wire struct publishResponse to
preserve null vs empty-string semantics when marshaling/unmarshaling, and update
any serializers/deserializers and tests that touch publishResponse to handle
pointers; if the public API must remain []string, add a clear conversion layer
(documented) that maps nil pointers to a sentinel or explicitly documents the
loss of null information.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Jira integration is disabled
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (4)
ably/proto_message_operations_test.goably/realtime_channel.goably/realtime_conn.goably/rest_channel.go
🚧 Files skipped from review as they are similar to previous changes (1)
- ably/realtime_conn.go
| {"999", MessageActionCreate}, // Unknown values default to Create | ||
| } |
There was a problem hiding this comment.
Do not codify unknown MessageAction values as Create.
These assertions lock in fallback behavior that masks unsupported action codes instead of surfacing them. That will make it harder to align runtime behavior with protocol expectations for unknown/new actions.
Also applies to: 98-99
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@ably/proto_message_operations_test.go` around lines 43 - 44, The test
currently asserts that an unknown action code ("999") maps to
MessageActionCreate which codifies a fallback and hides unsupported actions;
update the assertions that reference "999" and MessageActionCreate (and the
duplicate occurrences later) so they no longer expect Create — instead assert
that parsing/validation returns an explicit error or an Unknown/invalid action
result (e.g., an error value or a dedicated MessageActionUnknown) so unknown/new
action codes are surfaced rather than silently mapped to MessageActionCreate;
update any helper parsing/assertion logic used by the test to check for the
error/unknown outcome.
| // Apply options | ||
| var opts updateOptions | ||
| for _, o := range options { | ||
| o(&opts) | ||
| } | ||
|
|
||
| // Build version from options | ||
| version := &MessageVersion{ | ||
| Description: opts.description, | ||
| ClientID: opts.clientID, | ||
| Metadata: opts.metadata, | ||
| } | ||
|
|
||
| // Create message for the operation | ||
| opMsg := *msg | ||
| opMsg.Action = action | ||
| opMsg.Version = version | ||
|
|
||
| protoMsg := &protocolMessage{ | ||
| Action: actionMessage, | ||
| Channel: c.Name, | ||
| Messages: []*Message{&opMsg}, | ||
| } |
There was a problem hiding this comment.
Realtime update/delete/append operation params are currently dropped.
protocolMessage is created without Params, so operation-level params cannot reach ProtocolMessage.params.
💡 Proposed direction
protoMsg := &protocolMessage{
Action: actionMessage,
Channel: c.Name,
Messages: []*Message{&opMsg},
+ Params: opts.params,
}If updateOptions does not yet expose params, add it there first.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@ably/realtime_channel.go` around lines 859 - 881, The protocolMessage being
built for realtime update/delete/append never sets Params so operation-level
params are dropped; add a Params field to updateOptions (e.g.,
updateOptions.params) if it doesn't exist, populate it via the provided option
functions, and then set protoMsg.Params = opts.params when constructing
protocolMessage (the code that creates version := &MessageVersion{...}, opMsg :=
*msg, and protoMsg := &protocolMessage{...}). Ensure option setters wire into
updateOptions.params so operation params flow into ProtocolMessage.params.
There was a problem hiding this comment.
@coderabbitai I didn't get what you mean here?
There was a problem hiding this comment.
@sacOO7, let me clarify!
The spec requires that realtime UpdateMessage, DeleteMessage, and AppendMessage methods accept an optional params argument (similar to how publish methods work). These params are query-string-like parameters that should be sent to the server in ProtocolMessage.params.
Example use case:
// User might want to pass custom params with an update operation
channel.UpdateMessage(ctx, msg,
UpdateWithDescription("Fixed typo"),
UpdateWithParams(url.Values{"custom_key": {"custom_value"}}),
)Current situation:
- ✅ REST methods (
rest_channel.go) already support params - they're passed as query strings - ❌ Realtime methods (
realtime_channel.go) currently don't wire params through to the protocol message
What needs to happen:
- Add a
paramsfield to theupdateOptionsstruct (inproto_message.go) - Create an
UpdateWithParams(params url.Values)option function - In
performMessageOperationAsync(line ~879), setprotoMsg.Params = opts.params
This way, when users provide params, they'll be included in the ProtocolMessage sent to the server, matching the spec requirement (RTL32a, RTL32e).
Does that clarify the issue? The realtime path is currently dropping any params that might be provided.
| var opts updateOptions | ||
| for _, o := range options { | ||
| o(&opts) | ||
| } |
There was a problem hiding this comment.
UpdateMessage/DeleteMessage/AppendMessage are not propagating request params.
Options are applied only to MessageVersion; query params are not added to the HTTP request path, so operation params are effectively ignored.
💡 Proposed direction
var opts updateOptions
for _, o := range options {
o(&opts)
}
+ var query string
+ if len(opts.params) > 0 {
+ q := url.Values{}
+ for k, v := range opts.params {
+ q.Set(k, v)
+ }
+ query = "?" + q.Encode()
+ }
- res, err := c.client.post(ctx, c.baseURL+"/messages", []*Message{&opMsg}, &response)
+ res, err := c.client.patch(ctx, path+query, &opMsg, &response)Also applies to: 256-257
There was a problem hiding this comment.
@coderabbitai I didn't get what you mean here? Can you cross check once again
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Find the updateOptions struct definition to confirm what fields it has
ast-grep --pattern 'type updateOptions struct {
$$$
}'
# Also search for any UpdateWith functions to see all available options
rg -A 5 'func UpdateWith' ably/proto_message.go
Length of output: 616
@sacOO7 Let me clarify! The issue is that UpdateMessage, DeleteMessage, and AppendMessage don't support query parameters, even though the spec requires them (RSL15a, RSL15f).
Current state:
Looking at the code, the updateOptions struct only has three fields:
type updateOptions struct {
description string
clientID string
metadata map[string]string
}And performMessageOperation doesn't build or append any query string - it just POSTs to c.baseURL+"/messages".
Compare with PublishMultiple:
The publish operations DO support params:
publishMultipleOptionshas aparams map[string]stringfield- There's a
PublishWithParams(params map[string]string)option publishMultiplebuilds the query string:
if params := publishOpts.params; len(params) > 0 {
queryParams := url.Values{}
for k, v := range params {
queryParams.Set(k, v)
}
query = "?" + queryParams.Encode()
}What's missing:
- Add a
params map[string]stringfield toupdateOptionsstruct - Create an
UpdateWithParams(params map[string]string) UpdateOptionfunction - In
performMessageOperation, build and append the query string to the request path (similar to howpublishMultipledoes it)
This way users can pass query parameters like UpdateMessage(ctx, msg, UpdateWithParams(map[string]string{"key": "value"})) and they'll be sent in the HTTP request.
Does this clarify the issue?
| // POST to API | ||
| var response publishResponse | ||
| res, err := c.client.post(ctx, c.baseURL+"/messages", []*Message{&opMsg}, &response) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| defer res.Body.Close() | ||
|
|
||
| // Extract version serial | ||
| result := &UpdateResult{} | ||
| if len(response.Serials) > 0 { | ||
| result.VersionSerial = response.Serials[0] | ||
| } | ||
| return result, nil |
There was a problem hiding this comment.
Use the operation-specific request contract (method/path/body) and decode the operation result schema.
This path currently posts a message array to /messages and reads serials[]; that is a publish-style flow, not an update/delete/append operation flow. It can return incorrect VersionSerial values (or none).
💡 Proposed direction
- var response publishResponse
- res, err := c.client.post(ctx, c.baseURL+"/messages", []*Message{&opMsg}, &response)
+ var response UpdateDeleteResult // wire schema with versionSerial
+ path := c.baseURL + "/messages/" + url.PathEscape(msg.Serial)
+ res, err := c.client.patch(ctx, path, &opMsg, &response)
if err != nil {
return nil, err
}
defer res.Body.Close()
- result := &UpdateResult{}
- if len(response.Serials) > 0 {
- result.VersionSerial = response.Serials[0]
- }
+ result := &UpdateResult{VersionSerial: response.VersionSerial}
return result, nil🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@ably/rest_channel.go` around lines 254 - 267, The code is using the publish
contract (posting an array to "/messages" and reading publishResponse.Serials)
instead of the operation-specific request/response for update/delete/append;
change the call in the method that builds opMsg so it posts to the
operation-specific path and body (use the operation request contract expected by
the server rather than c.client.post(..., c.baseURL+"/messages",
[]*Message{&opMsg}, ...)), swap the response type from publishResponse to the
operation result schema (e.g., UpdateResponse/OperationResult type) and decode
that response, then set result.VersionSerial from the operation response's
version field (e.g., response.VersionSerial or response.Version) instead of
reading publishResponse.Serials; keep using c.client.post and res.Body.Close but
ensure URL, request body shape, and response decoding match the operation
contract.
There was a problem hiding this comment.
@coderabbitai I didn't get what you mean here? Can you explain in detail
There was a problem hiding this comment.
🧩 Analysis chain
@sacOO7, happy to clarify! The issue is that performMessageOperation is using the publish API contract when it should be using the update/delete/append operation contract. Let me break down the differences:
What the current code does (WRONG):
// POST to API
var response publishResponse
res, err := c.client.post(ctx, c.baseURL+"/messages", []*Message{&opMsg}, &response)This sends:
- Method: POST
- URL:
/channels/{channelName}/messages - Body: Array of messages
[message] - Response:
publishResponsewithSerials []string
What the spec requires (CORRECT):
According to RSL15b and RSL15e (which you documented in your review):
- Method: PATCH
- URL:
/channels/{channelName}/messages/{messageSerial} - Body: Single message object (not an array)
- Response:
UpdateDeleteResultwithversionSerial stringfield
Why this matters:
- Different endpoints: Publish uses
/messages, but update/delete/append use/messages/{serial} - Different HTTP methods: Publish uses POST, operations use PATCH
- Different request shape: Publish accepts an array; operations accept a single message
- Different response shape: Publish returns
{serials: [...]}, operations return{versionSerial: "..."}
What breaks:
- The server might reject the request entirely (wrong method/path)
- Even if it works, you're reading
response.Serials[0]which is publish-specific, not the operation'sversionSerial - The
versionSerialyou return might be incorrect or empty
What needs to be fixed:
🏁 Script executed:
#!/bin/bash
# Check if rest_client.go has a patch() method
rg -n "func.*patch\(" ably/rest_client.go
Length of output: 0
🏁 Script executed:
#!/bin/bash
# Check what HTTP methods are currently available in rest_client.go
rg -n "func \(c \*REST\) (get|post|put|delete|patch)" ably/rest_client.go
Length of output: 334
As you can see, there's no patch() method in rest_client.go (only get(), getWithHeader(), and post()), which is exactly what you noted in your original review!
Here's what needs to be fixed:
1. Add a patch() method to rest_client.go:
func (c *REST) patch(ctx context.Context, path string, in, out interface{}) (*http.Response, error) {
return c.do(ctx, &request{
Method: "PATCH",
Path: path,
In: in,
Out: out,
})
}2. Create the correct response type (per RSL15e):
// UpdateDeleteResult represents the response from update/delete/append operations
type UpdateDeleteResult struct {
VersionSerial string `json:"versionSerial,omitempty" codec:"versionSerial,omitempty"`
}3. Fix performMessageOperation to use the operation contract:
func (c *RESTChannel) performMessageOperation(ctx context.Context, msg *Message, action MessageAction, options ...UpdateOption) (*UpdateResult, error) {
// ... validation and encoding (unchanged) ...
// PATCH to the operation-specific endpoint
var response UpdateDeleteResult // ← Use operation response type
path := c.baseURL + "/messages/" + url.PathEscape(msg.Serial) // ← Include serial in path
res, err := c.client.patch(ctx, path, &opMsg, &response) // ← Use PATCH, single message body
if err != nil {
return nil, err
}
defer res.Body.Close()
// Extract version serial from operation response
return &UpdateResult{
VersionSerial: response.VersionSerial, // ← Read versionSerial field
}, nil
}Summary:
The current code treats update/delete/append like a publish (POST array → get serials array), but the spec defines them as distinct operations with different request/response contracts. This matches points #1 and #2 from your detailed review in the PR comments!
Does this clarify the issue?
|
@sacOO7 thanks for taking the time to review this with the agents, I see you've pushed some changes but let's please discuss this first because quite a few of the issues the agents have identified here aren't accurate. For example the agents are encouraging the use of I think this highlights two issues with how the agents have reviewed this PR:
Let's discuss this as a team, because I think if we're using agents to check that SDK implementations are correct, I think we should be comparing them with the reference implementation (i.e. ably-js), rather than a spec + ably-java. |
This implements message updates, deletes, and appends as proposed in #696.
Summary by CodeRabbit
New Features
Tests