Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed

- etcd connection failing with 404 when gRPC gateway uses a different API prefix (auto-detects `/v3/`, `/v3beta/`, `/v3alpha/`)

## [0.21.0] - 2026-03-19

### Added
Expand Down
136 changes: 111 additions & 25 deletions Plugins/EtcdDriverPlugin/EtcdHttpClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ internal final class EtcdHttpClient: @unchecked Sendable {
private var currentTask: URLSessionDataTask?
private var authToken: String?
private var _isAuthenticating = false
private var apiPrefix = "v3"

private static let logger = Logger(subsystem: "com.TablePro", category: "EtcdHttpClient")

Expand All @@ -332,6 +333,13 @@ internal final class EtcdHttpClient: @unchecked Sendable {
return "\(scheme)://\(config.host):\(config.port)"
}

private func apiPath(_ suffix: String) -> String {
lock.lock()
let prefix = apiPrefix
lock.unlock()
return "\(prefix)/\(suffix)"
}

// MARK: - Connection Lifecycle

func connect() async throws {
Expand Down Expand Up @@ -366,7 +374,14 @@ internal final class EtcdHttpClient: @unchecked Sendable {
lock.unlock()

do {
try await ping()
try await detectApiPrefix()
} catch let etcdError as EtcdError {
lock.lock()
session?.invalidateAndCancel()
session = nil
lock.unlock()
Self.logger.error("Connection test failed: \(etcdError.localizedDescription)")
throw etcdError
} catch {
lock.lock()
session?.invalidateAndCancel()
Expand Down Expand Up @@ -399,56 +414,125 @@ internal final class EtcdHttpClient: @unchecked Sendable {
session = nil
authToken = nil
_isAuthenticating = false
apiPrefix = "v3"
lock.unlock()
}

func ping() async throws {
let _: EtcdStatusResponse = try await post(path: "v3/maintenance/status", body: EmptyBody())
let _: EtcdStatusResponse = try await post(path: apiPath("maintenance/status"), body: EmptyBody())
}

/// Probes etcd gateway prefixes in order and selects the first that responds
/// with a non-404 status. Covers all etcd versions:
/// 3.5+ → /v3/ only
/// 3.4 → /v3/ + /v3beta/
/// 3.3 → /v3beta/ + /v3alpha/
/// 3.2- → /v3alpha/ only
private func detectApiPrefix() async throws {
let candidates = ["v3", "v3beta", "v3alpha"]

lock.lock()
guard let session else {
lock.unlock()
throw EtcdError.notConnected
}
lock.unlock()

for candidate in candidates {
guard let url = URL(string: "\(baseUrl)/\(candidate)/maintenance/status") else {
continue
}

var request = URLRequest(url: url)
request.httpMethod = "POST"
request.setValue("application/json", forHTTPHeaderField: "Content-Type")
request.httpBody = try JSONEncoder().encode(EmptyBody())

let response: URLResponse
do {
(_, response) = try await session.data(for: request)
} catch {
// Network-level failure — server is unreachable regardless of prefix
throw error
}

guard let httpResponse = response as? HTTPURLResponse else {
throw EtcdError.serverError("Invalid response type")
}

switch httpResponse.statusCode {
case 404:
continue
case 200:
lock.lock()
apiPrefix = candidate
lock.unlock()
Self.logger.debug("Detected etcd API prefix: \(candidate)")
return
case 401 where !config.username.isEmpty:
// Auth required but credentials are configured — prefix is valid,
// authenticate() will run after detection
lock.lock()
apiPrefix = candidate
lock.unlock()
Self.logger.debug("Detected etcd API prefix: \(candidate) (auth required)")
return
case 401:
throw EtcdError.authFailed("Authentication required")
default:
Self.logger.warning("Prefix probe \(candidate) returned HTTP \(httpResponse.statusCode)")
throw EtcdError.serverError("Unexpected HTTP \(httpResponse.statusCode) from \(candidate)/maintenance/status")
}
}

throw EtcdError.serverError(
"No supported etcd API found (tried: \(candidates.joined(separator: ", ")))"
)
}

// MARK: - KV Operations

func rangeRequest(_ req: EtcdRangeRequest) async throws -> EtcdRangeResponse {
try await post(path: "v3/kv/range", body: req)
try await post(path: apiPath("kv/range"), body: req)
}

func putRequest(_ req: EtcdPutRequest) async throws -> EtcdPutResponse {
try await post(path: "v3/kv/put", body: req)
try await post(path: apiPath("kv/put"), body: req)
}

func deleteRequest(_ req: EtcdDeleteRequest) async throws -> EtcdDeleteResponse {
try await post(path: "v3/kv/deleterange", body: req)
try await post(path: apiPath("kv/deleterange"), body: req)
}

// MARK: - Lease Operations

func leaseGrant(ttl: Int64) async throws -> EtcdLeaseGrantResponse {
let req = EtcdLeaseGrantRequest(TTL: String(ttl))
return try await post(path: "v3/lease/grant", body: req)
return try await post(path: apiPath("lease/grant"), body: req)
}

func leaseRevoke(leaseId: Int64) async throws {
let req = EtcdLeaseRevokeRequest(ID: String(leaseId))
try await postVoid(path: "v3/lease/revoke", body: req)
try await postVoid(path: apiPath("lease/revoke"), body: req)
}

func leaseTimeToLive(leaseId: Int64, keys: Bool) async throws -> EtcdLeaseTimeToLiveResponse {
let req = EtcdLeaseTimeToLiveRequest(ID: String(leaseId), keys: keys)
return try await post(path: "v3/lease/timetolive", body: req)
return try await post(path: apiPath("lease/timetolive"), body: req)
}

func leaseList() async throws -> EtcdLeaseListResponse {
try await post(path: "v3/lease/leases", body: EmptyBody())
try await post(path: apiPath("lease/leases"), body: EmptyBody())
}

// MARK: - Cluster Operations

func memberList() async throws -> EtcdMemberListResponse {
try await post(path: "v3/cluster/member/list", body: EmptyBody())
try await post(path: apiPath("cluster/member/list"), body: EmptyBody())
}

func endpointStatus() async throws -> EtcdStatusResponse {
try await post(path: "v3/maintenance/status", body: EmptyBody())
try await post(path: apiPath("maintenance/status"), body: EmptyBody())
}

// MARK: - Watch
Expand All @@ -469,8 +553,9 @@ internal final class EtcdHttpClient: @unchecked Sendable {
}
let watchReq = EtcdWatchRequest(createRequest: createReq)

guard let url = URL(string: "\(baseUrl)/v3/watch") else {
throw EtcdError.serverError("Invalid URL: \(baseUrl)/v3/watch")
let watchPath = apiPath("watch")
guard let url = URL(string: "\(baseUrl)/\(watchPath)") else {
throw EtcdError.serverError("Invalid URL: \(baseUrl)/\(watchPath)")
}

var request = URLRequest(url: url)
Expand Down Expand Up @@ -525,58 +610,58 @@ internal final class EtcdHttpClient: @unchecked Sendable {
// MARK: - Auth Management

func authEnable() async throws {
try await postVoid(path: "v3/auth/enable", body: EmptyBody())
try await postVoid(path: apiPath("auth/enable"), body: EmptyBody())
}

func authDisable() async throws {
try await postVoid(path: "v3/auth/disable", body: EmptyBody())
try await postVoid(path: apiPath("auth/disable"), body: EmptyBody())
}

func userAdd(name: String, password: String) async throws {
let req = EtcdUserAddRequest(name: name, password: password)
try await postVoid(path: "v3/auth/user/add", body: req)
try await postVoid(path: apiPath("auth/user/add"), body: req)
}

func userDelete(name: String) async throws {
let req = EtcdUserDeleteRequest(name: name)
try await postVoid(path: "v3/auth/user/delete", body: req)
try await postVoid(path: apiPath("auth/user/delete"), body: req)
}

func userList() async throws -> [String] {
let resp: EtcdUserListResponse = try await post(path: "v3/auth/user/list", body: EmptyBody())
let resp: EtcdUserListResponse = try await post(path: apiPath("auth/user/list"), body: EmptyBody())
return resp.users ?? []
}

func roleAdd(name: String) async throws {
let req = EtcdRoleAddRequest(name: name)
try await postVoid(path: "v3/auth/role/add", body: req)
try await postVoid(path: apiPath("auth/role/add"), body: req)
}

func roleDelete(name: String) async throws {
let req = EtcdRoleDeleteRequest(name: name)
try await postVoid(path: "v3/auth/role/delete", body: req)
try await postVoid(path: apiPath("auth/role/delete"), body: req)
}

func roleList() async throws -> [String] {
let resp: EtcdRoleListResponse = try await post(path: "v3/auth/role/list", body: EmptyBody())
let resp: EtcdRoleListResponse = try await post(path: apiPath("auth/role/list"), body: EmptyBody())
return resp.roles ?? []
}

func userGrantRole(user: String, role: String) async throws {
let req = EtcdUserGrantRoleRequest(user: user, role: role)
try await postVoid(path: "v3/auth/user/grant", body: req)
try await postVoid(path: apiPath("auth/user/grant"), body: req)
}

func userRevokeRole(user: String, role: String) async throws {
let req = EtcdUserRevokeRoleRequest(user: user, role: role)
try await postVoid(path: "v3/auth/user/revoke", body: req)
try await postVoid(path: apiPath("auth/user/revoke"), body: req)
}

// MARK: - Maintenance

func compaction(revision: Int64, physical: Bool) async throws {
let req = EtcdCompactionRequest(revision: String(revision), physical: physical)
try await postVoid(path: "v3/kv/compaction", body: req)
try await postVoid(path: apiPath("kv/compaction"), body: req)
}

// MARK: - Cancellation
Expand Down Expand Up @@ -710,7 +795,8 @@ internal final class EtcdHttpClient: @unchecked Sendable {
}

let authReq = EtcdAuthRequest(name: config.username, password: config.password)
guard let url = URL(string: "\(baseUrl)/v3/auth/authenticate") else {
let authPath = apiPath("auth/authenticate")
guard let url = URL(string: "\(baseUrl)/\(authPath)") else {
throw EtcdError.serverError("Invalid auth URL")
}

Expand Down
Loading