Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,16 @@ import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.launch
import kotlinx.serialization.ExperimentalSerializationApi
import kotlinx.serialization.KSerializer
import kotlinx.serialization.Serializable
import kotlinx.serialization.SerializationException
import kotlinx.serialization.descriptors.SerialDescriptor
import kotlinx.serialization.descriptors.buildClassSerialDescriptor
import kotlinx.serialization.descriptors.element
import kotlinx.serialization.encoding.Decoder
import kotlinx.serialization.encoding.Encoder
import kotlinx.serialization.encoding.encodeStructure
import kotlinx.serialization.json.JsonElement
import org.redisson.client.codec.StringCodec
import reactor.core.Disposable
Expand Down Expand Up @@ -125,8 +133,11 @@ class RedisEventBusImpl(private val api: RedisApi) : RedisEventBus {
val handlers = eventHandlers[eventClass]
if (handlers.isNullOrEmpty()) return

for (invoker in handlers) {
api.redisListenerScope.launch {
// Dispatch all handlers from a single coroutine to avoid one Job + scheduler hop
// per handler. Exceptions are still isolated per handler so one failing handler
// does not abort the others.
api.redisListenerScope.launch {
for (invoker in handlers) {
try {
invoker.invoke(event)
} catch (e: Throwable) {
Expand All @@ -142,10 +153,24 @@ class RedisEventBusImpl(private val api: RedisApi) : RedisEventBus {
override fun publish(event: RedisEvent): Deferred<Long> {
RedisComponentProvider.injectOriginId(event)

val eventData = serializeEvent(event) ?: return CompletableDeferred(0L)
val envelope = EventEnvelope.forEvent(event, eventData)
val serializer = serializerCache.get(event.javaClass)
if (serializer == null) {
log.atWarning()
.log("No serializer found for event ${event::class.simpleName} — cannot serialize.")
return CompletableDeferred(0L)
}

val message = api.json.encodeToString(envelope)
val message = try {
api.json.encodeToString(
EventEnvelopeSerializer(serializer),
EventEnvelopePayload(event.javaClass.name, event)
)
} catch (e: SerializationException) {
log.atWarning()
.withCause(e)
.log("Unable to serialize event ${event::class.simpleName}: ${e.message}")
return CompletableDeferred(0L)
}

return topic.publish(message).asDeferred()
}
Expand Down Expand Up @@ -192,45 +217,17 @@ class RedisEventBusImpl(private val api: RedisApi) : RedisEventBus {
@Suppress("UNCHECKED_CAST")
firstParamType as Class<out RedisEvent>

registrationLock.write {
eventTypeRegistry[firstParamType.name] = firstParamType
}

val invoker = INVOKER_FACTORY.create(listener, method, firstParamType)

registrationLock.write {
eventTypeRegistry[firstParamType.name] = firstParamType
eventHandlers.computeIfAbsent(firstParamType) { ObjectArrayList() }
.add(invoker)
}
}
}
}

/**
* Serializes the given event to JSON.
*
* @return the serialized event, or `null` if no serializer is available
*/
private fun serializeEvent(event: RedisEvent): JsonElement? {
val serializer = serializerCache.get(event.javaClass)

if (serializer == null) {
log.atWarning()
.log("No serializer found for event ${event::class.simpleName} — cannot serialize.")
return null
}

try {
return api.json.encodeToJsonElement(serializer, event)
} catch (e: SerializationException) {
log.atWarning()
.withCause(e)
.log("Unable to serialize event ${event::class.simpleName}: ${e.message}")

return null
}
}

/**
* Deserializes an event of the given type from JSON.
*
Expand Down Expand Up @@ -260,20 +257,53 @@ class RedisEventBusImpl(private val api: RedisApi) : RedisEventBus {

/**
* Wire format for Redis event messages.
*
* Used only for **decoding** incoming messages. For encoding, see [EventEnvelopeSerializer]
* which streams the typed event payload directly into the output without first materializing
* it as a [JsonElement] tree.
*/
@Serializable
private data class EventEnvelope(
val eventClass: String,
val eventData: JsonElement
) {
companion object {
fun forEvent(event: RedisEvent, data: JsonElement): EventEnvelope {
return EventEnvelope(
eventClass = event.javaClass.name,
eventData = data
)
)

/**
* Encode-only payload pairing an event with its serialized class name.
*/
private data class EventEnvelopePayload(
val eventClass: String,
val event: RedisEvent
)

/**
* Streaming JSON serializer for outgoing event envelopes.
*
* Writes the envelope wrapper and then delegates payload encoding directly to the typed
* [dataSerializer], avoiding the intermediate [JsonElement] tree allocation that the
* symmetric data-class based path would incur.
*
* The descriptor element names match [EventEnvelope] so that the configured
* [kotlinx.serialization.json.JsonNamingStrategy] produces an identical wire format.
*/
@OptIn(ExperimentalSerializationApi::class)
private class EventEnvelopeSerializer(
private val dataSerializer: KSerializer<RedisEvent>
) : KSerializer<EventEnvelopePayload> {
override val descriptor: SerialDescriptor =
buildClassSerialDescriptor("EventEnvelope") {
element<String>("eventClass")
element("eventData", dataSerializer.descriptor)
}

override fun serialize(encoder: Encoder, value: EventEnvelopePayload) {
encoder.encodeStructure(descriptor) {
encodeStringElement(descriptor, 0, value.eventClass)
encodeSerializableElement(descriptor, 1, dataSerializer, value.event)
}
}

override fun deserialize(decoder: Decoder): EventEnvelopePayload =
error("EventEnvelopeSerializer is encode-only")
}
}
Loading