diff --git a/surf-redis-core/src/main/kotlin/dev/slne/surf/redis/event/RedisEventBusImpl.kt b/surf-redis-core/src/main/kotlin/dev/slne/surf/redis/event/RedisEventBusImpl.kt index 8f7040c..d2b607e 100644 --- a/surf-redis-core/src/main/kotlin/dev/slne/surf/redis/event/RedisEventBusImpl.kt +++ b/surf-redis-core/src/main/kotlin/dev/slne/surf/redis/event/RedisEventBusImpl.kt @@ -19,8 +19,16 @@ import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.Deferred import kotlinx.coroutines.launch +import kotlinx.serialization.ExperimentalSerializationApi +import kotlinx.serialization.KSerializer import kotlinx.serialization.Serializable import kotlinx.serialization.SerializationException +import kotlinx.serialization.descriptors.SerialDescriptor +import kotlinx.serialization.descriptors.buildClassSerialDescriptor +import kotlinx.serialization.descriptors.element +import kotlinx.serialization.encoding.Decoder +import kotlinx.serialization.encoding.Encoder +import kotlinx.serialization.encoding.encodeStructure import kotlinx.serialization.json.JsonElement import org.redisson.client.codec.StringCodec import reactor.core.Disposable @@ -125,8 +133,11 @@ class RedisEventBusImpl(private val api: RedisApi) : RedisEventBus { val handlers = eventHandlers[eventClass] if (handlers.isNullOrEmpty()) return - for (invoker in handlers) { - api.redisListenerScope.launch { + // Dispatch all handlers from a single coroutine to avoid one Job + scheduler hop + // per handler. Exceptions are still isolated per handler so one failing handler + // does not abort the others. + api.redisListenerScope.launch { + for (invoker in handlers) { try { invoker.invoke(event) } catch (e: Throwable) { @@ -142,10 +153,24 @@ class RedisEventBusImpl(private val api: RedisApi) : RedisEventBus { override fun publish(event: RedisEvent): Deferred { RedisComponentProvider.injectOriginId(event) - val eventData = serializeEvent(event) ?: return CompletableDeferred(0L) - val envelope = EventEnvelope.forEvent(event, eventData) + val serializer = serializerCache.get(event.javaClass) + if (serializer == null) { + log.atWarning() + .log("No serializer found for event ${event::class.simpleName} — cannot serialize.") + return CompletableDeferred(0L) + } - val message = api.json.encodeToString(envelope) + val message = try { + api.json.encodeToString( + EventEnvelopeSerializer(serializer), + EventEnvelopePayload(event.javaClass.name, event) + ) + } catch (e: SerializationException) { + log.atWarning() + .withCause(e) + .log("Unable to serialize event ${event::class.simpleName}: ${e.message}") + return CompletableDeferred(0L) + } return topic.publish(message).asDeferred() } @@ -192,13 +217,10 @@ class RedisEventBusImpl(private val api: RedisApi) : RedisEventBus { @Suppress("UNCHECKED_CAST") firstParamType as Class - registrationLock.write { - eventTypeRegistry[firstParamType.name] = firstParamType - } - val invoker = INVOKER_FACTORY.create(listener, method, firstParamType) registrationLock.write { + eventTypeRegistry[firstParamType.name] = firstParamType eventHandlers.computeIfAbsent(firstParamType) { ObjectArrayList() } .add(invoker) } @@ -206,31 +228,6 @@ class RedisEventBusImpl(private val api: RedisApi) : RedisEventBus { } } - /** - * Serializes the given event to JSON. - * - * @return the serialized event, or `null` if no serializer is available - */ - private fun serializeEvent(event: RedisEvent): JsonElement? { - val serializer = serializerCache.get(event.javaClass) - - if (serializer == null) { - log.atWarning() - .log("No serializer found for event ${event::class.simpleName} — cannot serialize.") - return null - } - - try { - return api.json.encodeToJsonElement(serializer, event) - } catch (e: SerializationException) { - log.atWarning() - .withCause(e) - .log("Unable to serialize event ${event::class.simpleName}: ${e.message}") - - return null - } - } - /** * Deserializes an event of the given type from JSON. * @@ -260,20 +257,53 @@ class RedisEventBusImpl(private val api: RedisApi) : RedisEventBus { /** * Wire format for Redis event messages. + * + * Used only for **decoding** incoming messages. For encoding, see [EventEnvelopeSerializer] + * which streams the typed event payload directly into the output without first materializing + * it as a [JsonElement] tree. */ @Serializable private data class EventEnvelope( val eventClass: String, val eventData: JsonElement - ) { - companion object { - fun forEvent(event: RedisEvent, data: JsonElement): EventEnvelope { - return EventEnvelope( - eventClass = event.javaClass.name, - eventData = data - ) + ) + + /** + * Encode-only payload pairing an event with its serialized class name. + */ + private data class EventEnvelopePayload( + val eventClass: String, + val event: RedisEvent + ) + + /** + * Streaming JSON serializer for outgoing event envelopes. + * + * Writes the envelope wrapper and then delegates payload encoding directly to the typed + * [dataSerializer], avoiding the intermediate [JsonElement] tree allocation that the + * symmetric data-class based path would incur. + * + * The descriptor element names match [EventEnvelope] so that the configured + * [kotlinx.serialization.json.JsonNamingStrategy] produces an identical wire format. + */ + @OptIn(ExperimentalSerializationApi::class) + private class EventEnvelopeSerializer( + private val dataSerializer: KSerializer + ) : KSerializer { + override val descriptor: SerialDescriptor = + buildClassSerialDescriptor("EventEnvelope") { + element("eventClass") + element("eventData", dataSerializer.descriptor) } + override fun serialize(encoder: Encoder, value: EventEnvelopePayload) { + encoder.encodeStructure(descriptor) { + encodeStringElement(descriptor, 0, value.eventClass) + encodeSerializableElement(descriptor, 1, dataSerializer, value.event) + } } + + override fun deserialize(decoder: Decoder): EventEnvelopePayload = + error("EventEnvelopeSerializer is encode-only") } } \ No newline at end of file diff --git a/surf-redis-core/src/main/kotlin/dev/slne/surf/redis/request/RequestResponseBusImpl.kt b/surf-redis-core/src/main/kotlin/dev/slne/surf/redis/request/RequestResponseBusImpl.kt index 8cd1c85..179b655 100644 --- a/surf-redis-core/src/main/kotlin/dev/slne/surf/redis/request/RequestResponseBusImpl.kt +++ b/surf-redis-core/src/main/kotlin/dev/slne/surf/redis/request/RequestResponseBusImpl.kt @@ -17,9 +17,18 @@ import dev.slne.surf.redis.util.asDeferred import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap import kotlinx.coroutines.* import kotlinx.coroutines.reactor.awaitSingle +import kotlinx.serialization.ExperimentalSerializationApi +import kotlinx.serialization.KSerializer import kotlinx.serialization.Serializable import kotlinx.serialization.SerializationException +import kotlinx.serialization.descriptors.SerialDescriptor +import kotlinx.serialization.descriptors.buildClassSerialDescriptor +import kotlinx.serialization.descriptors.element +import kotlinx.serialization.encoding.Decoder +import kotlinx.serialization.encoding.Encoder +import kotlinx.serialization.encoding.encodeStructure import kotlinx.serialization.json.JsonElement +import kotlinx.serialization.serializer import reactor.core.Disposable import reactor.core.publisher.Mono import java.lang.reflect.ParameterizedType @@ -200,15 +209,35 @@ class RequestResponseBusImpl(private val api: RedisApi) : RequestResponseBus { timeoutMs: Long ): T { RedisComponentProvider.injectOriginId(request) + + @Suppress("UNCHECKED_CAST") + val serializer = (serializerCache.get(request.javaClass) + ?: throw IllegalStateException("No serializer found for request class: ${request::class.simpleName}")) + as KSerializer + val requestId = UUID.randomUUID() - val requestData = serializeRequest(request) ?: throw IllegalStateException() val deferred = CompletableDeferred() pendingRequests[requestId] = deferred - responseTypeRegistry[responseType.name] = responseType + // Avoid a String.hashCode + ConcurrentHashMap put on every request when this response + // type has already been registered. putIfAbsent is atomic, so a redundant put from a + // concurrent thread is harmless. + if (!responseTypeRegistry.containsKey(responseType.name)) { + responseTypeRegistry.putIfAbsent(responseType.name, responseType) + } - val envelope = RequestEnvelope.forRequest(request, requestId, requestData) - val message = api.json.encodeToString(envelope) + val message = try { + api.json.encodeToString( + RequestEnvelopeSerializer(serializer), + RequestEnvelopePayload(requestId, request.javaClass.name, request) + ) + } catch (e: SerializationException) { + pendingRequests.remove(requestId) + log.atWarning() + .withCause(e) + .log("Unable to serialize request: ${e.message}") + throw IllegalStateException("Unable to serialize request: ${request::class.simpleName}", e) + } requestTopic.publish(message).awaitSingle() @@ -233,9 +262,27 @@ class RequestResponseBusImpl(private val api: RedisApi) : RequestResponseBus { * or `0` if the response could not be serialized */ private fun sendResponse(requestId: UUID, response: RedisResponse): Deferred { - val responseData = serializeResponse(response) ?: return CompletableDeferred(0L) - val envelope = ResponseEnvelope.forResponse(response, requestId, responseData) - val message = api.json.encodeToString(envelope) + val rawSerializer = serializerCache.get(response.javaClass) + if (rawSerializer == null) { + log.atWarning() + .log("No serializer found for response class: ${response::class.simpleName} - ignoring response.") + return CompletableDeferred(0L) + } + + @Suppress("UNCHECKED_CAST") + val serializer = rawSerializer as KSerializer + + val message = try { + api.json.encodeToString( + ResponseEnvelopeSerializer(serializer), + ResponseEnvelopePayload(requestId, response.javaClass.name, response) + ) + } catch (e: SerializationException) { + log.atWarning() + .withCause(e) + .log("Unable to serialize response: ${e.message}") + return CompletableDeferred(0L) + } return responseTopic.publish(message).asDeferred() } @@ -306,14 +353,12 @@ class RequestResponseBusImpl(private val api: RedisApi) : RequestResponseBus { @Suppress("UNCHECKED_CAST") requestType as Class - registrationLock.write { + val invoker = INVOKER_FACTORY.create(handler, method, requestType) + val current = registrationLock.write { requestTypeRegistry[requestType.name] = requestType + requestHandlers.putIfAbsent(requestType, invoker) } - val invoker = INVOKER_FACTORY.create(handler, method, requestType) - val current = - registrationLock.write { requestHandlers.putIfAbsent(requestType, invoker) } - if (current != null) { log.atWarning() .withStackTrace(StackSize.MEDIUM) @@ -322,30 +367,6 @@ class RequestResponseBusImpl(private val api: RedisApi) : RequestResponseBus { } } - /** - * Serializes a request to JSON. - * - * @return the serialized request, or `null` if no serializer is available or serialization fails - */ - private fun serializeRequest(request: RedisRequest): JsonElement? { - val serializer = serializerCache.get(request.javaClass) - - if (serializer == null) { - log.atWarning() - .log("No serializer found for request class: ${request::class.simpleName} - ignoring request.") - return null - } - - try { - return api.json.encodeToJsonElement(serializer, request) - } catch (e: SerializationException) { - log.atWarning() - .withCause(e) - .log("Unable to serialize request: ${e.message}") - return null - } - } - /** * Deserializes a request of the given type from JSON. * @@ -380,30 +401,6 @@ class RequestResponseBusImpl(private val api: RedisApi) : RequestResponseBus { } } - /** - * Serializes a response to JSON. - * - * @return the serialized response, or `null` if no serializer is available or serialization fails - */ - private fun serializeResponse(response: RedisResponse): JsonElement? { - val serializer = serializerCache.get(response.javaClass) - - if (serializer == null) { - log.atWarning() - .log("No serializer found for response class: ${response::class.simpleName} - ignoring response.") - return null - } - - try { - return api.json.encodeToJsonElement(serializer, response) - } catch (e: SerializationException) { - log.atWarning() - .withCause(e) - .log("Unable to serialize response: ${e.message}") - return null - } - } - /** * Deserializes a response of the given type from JSON. * @@ -455,51 +452,110 @@ class RequestResponseBusImpl(private val api: RedisApi) : RequestResponseBus { responseTypeRegistry.clear() } + /** + * Serializer used to encode the [SerializableUUID] type alias for the request id wire field. + * + * Resolved once and reused across both envelope serializers. + */ + private val uuidSerializer: KSerializer = + api.json.serializersModule.serializer() + /** * Wire format for request messages published to Redis. + * + * Used only for **decoding** incoming messages. For encoding, see [RequestEnvelopeSerializer]. */ @Serializable private data class RequestEnvelope( val requestId: SerializableUUID, val requestClass: String, val requestData: JsonElement - ) { - companion object { - fun forRequest( - request: RedisRequest, - requestId: UUID, - data: JsonElement - ): RequestEnvelope { - return RequestEnvelope( - requestId = requestId, - requestClass = request.javaClass.name, - requestData = data - ) - } - } - } + ) /** * Wire format for response messages published to Redis. + * + * Used only for **decoding** incoming messages. For encoding, see [ResponseEnvelopeSerializer]. */ @Serializable private data class ResponseEnvelope( val requestId: SerializableUUID, val responseClass: String, val responseData: JsonElement - ) { - companion object { - fun forResponse( - response: RedisResponse, - requestId: UUID, - data: JsonElement - ): ResponseEnvelope { - return ResponseEnvelope( - requestId = requestId, - responseClass = response.javaClass.name, - responseData = data - ) + ) + + /** + * Encode-only payload pairing a request with its routing metadata. + */ + private data class RequestEnvelopePayload( + val requestId: UUID, + val requestClass: String, + val request: RedisRequest + ) + + /** + * Encode-only payload pairing a response with its routing metadata. + */ + private data class ResponseEnvelopePayload( + val requestId: UUID, + val responseClass: String, + val response: RedisResponse + ) + + /** + * Streaming JSON serializer for outgoing request envelopes. + * + * Delegates payload encoding directly to the typed [dataSerializer], avoiding the + * intermediate [JsonElement] tree allocation that the symmetric data-class based path + * would incur. Element names match [RequestEnvelope] so the configured + * [kotlinx.serialization.json.JsonNamingStrategy] yields an identical wire format. + */ + @OptIn(ExperimentalSerializationApi::class) + private inner class RequestEnvelopeSerializer( + private val dataSerializer: KSerializer + ) : KSerializer { + override val descriptor: SerialDescriptor = + buildClassSerialDescriptor("RequestEnvelope") { + element("requestId", uuidSerializer.descriptor) + element("requestClass") + element("requestData", dataSerializer.descriptor) + } + + override fun serialize(encoder: Encoder, value: RequestEnvelopePayload) { + encoder.encodeStructure(descriptor) { + encodeSerializableElement(descriptor, 0, uuidSerializer, value.requestId) + encodeStringElement(descriptor, 1, value.requestClass) + encodeSerializableElement(descriptor, 2, dataSerializer, value.request) } } + + override fun deserialize(decoder: Decoder): RequestEnvelopePayload = + error("RequestEnvelopeSerializer is encode-only") + } + + /** + * Streaming JSON serializer for outgoing response envelopes. See [RequestEnvelopeSerializer]. + */ + @OptIn(ExperimentalSerializationApi::class) + private inner class ResponseEnvelopeSerializer( + private val dataSerializer: KSerializer + ) : KSerializer { + override val descriptor: SerialDescriptor = + buildClassSerialDescriptor("ResponseEnvelope") { + element("requestId", uuidSerializer.descriptor) + element("responseClass") + element("responseData", dataSerializer.descriptor) + } + + override fun serialize(encoder: Encoder, value: ResponseEnvelopePayload) { + encoder.encodeStructure(descriptor) { + encodeSerializableElement(descriptor, 0, uuidSerializer, value.requestId) + encodeStringElement(descriptor, 1, value.responseClass) + encodeSerializableElement(descriptor, 2, dataSerializer, value.response) + } + } + + override fun deserialize(decoder: Decoder): ResponseEnvelopePayload = + error("ResponseEnvelopeSerializer is encode-only") } } \ No newline at end of file diff --git a/surf-redis-core/src/main/kotlin/dev/slne/surf/redis/util/KotlinSerializerCache.kt b/surf-redis-core/src/main/kotlin/dev/slne/surf/redis/util/KotlinSerializerCache.kt index 016bce4..cb5ed1d 100644 --- a/surf-redis-core/src/main/kotlin/dev/slne/surf/redis/util/KotlinSerializerCache.kt +++ b/surf-redis-core/src/main/kotlin/dev/slne/surf/redis/util/KotlinSerializerCache.kt @@ -4,18 +4,35 @@ import kotlinx.serialization.KSerializer import kotlinx.serialization.modules.SerializersModule import kotlinx.serialization.serializerOrNull +/** + * Per-class cache of [KSerializer] instances. + * + * Backed by [ClassValue] for fast, GC-friendly lookups keyed by [Class]. Misses are cached as + * a sentinel value so that repeated lookups for unserializable classes do not re-invoke + * [SerializersModule.serializerOrNull] every time. + */ class KotlinSerializerCache( private val module: SerializersModule, private val type: Class -) : ClassValue?>() { +) { + + private val cache = object : ClassValue() { + @Suppress("UNCHECKED_CAST") + override fun computeValue(type: Class<*>): Any { + if (!this@KotlinSerializerCache.type.isAssignableFrom(type)) return MISSING + return (module.serializerOrNull(type) as? KSerializer) ?: MISSING + } + } @Suppress("UNCHECKED_CAST") - override fun computeValue(type: Class<*>): KSerializer? { - if (!this.type.isAssignableFrom(type)) return null - return module.serializerOrNull(type) as? KSerializer + fun get(type: Class<*>): KSerializer? { + val value = cache.get(type) + return if (value === MISSING) null else value as KSerializer } companion object { + private val MISSING: Any = Any() + inline operator fun invoke( module: SerializersModule, ): KotlinSerializerCache = KotlinSerializerCache(module, T::class.java)