Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ import kotlinx.serialization.json.JsonElement
import org.redisson.client.codec.StringCodec
import reactor.core.Disposable
import reactor.core.publisher.Mono
import java.nio.charset.StandardCharsets
import java.util.Base64
import java.util.concurrent.locks.ReentrantReadWriteLock
import javax.crypto.Mac
import javax.crypto.spec.SecretKeySpec
import kotlin.concurrent.write

@Suppress("UnstableApiUsage")
Expand Down Expand Up @@ -66,6 +70,9 @@ class RedisEventBusImpl(private val api: RedisApi) : RedisEventBus {

private val topic by lazy { api.redissonReactive.getTopic(REDIS_CHANNEL, StringCodec.INSTANCE) }
private lateinit var subscription: Disposable
private val eventAuthSecret = System.getenv("SURF_REDIS_EVENT_SECRET")
?.takeIf { it.isNotBlank() }
?: System.getProperty("surf.redis.event.secret")?.takeIf { it.isNotBlank() }

companion object {
private val log = logger()
Expand All @@ -86,6 +93,10 @@ class RedisEventBusImpl(private val api: RedisApi) : RedisEventBus {
* Incoming messages are dispatched to handler coroutines on `Dispatchers.Default`.
*/
private fun setupSubscription() {
require(!eventAuthSecret.isNullOrBlank()) {
"Missing event-bus authentication secret. Set SURF_REDIS_EVENT_SECRET or -Dsurf.redis.event.secret."
}

val listenerId =
topic.addListener(String::class.java) { _, msg -> handleIncomingMessage(msg) }.block()
subscription = {
Expand Down Expand Up @@ -113,6 +124,11 @@ class RedisEventBusImpl(private val api: RedisApi) : RedisEventBus {
return
}

if (!envelope.isValid(eventAuthSecret!!)) {
log.atWarning().log("Rejected event with invalid signature for type: ${envelope.eventClass}")
return
}

val eventClass = eventTypeRegistry[envelope.eventClass]

if (eventClass == null) {
Expand Down Expand Up @@ -143,7 +159,7 @@ class RedisEventBusImpl(private val api: RedisApi) : RedisEventBus {
RedisComponentProvider.injectOriginId(event)

val eventData = serializeEvent(event) ?: return CompletableDeferred(0L)
val envelope = EventEnvelope.forEvent(event, eventData)
val envelope = EventEnvelope.forEvent(event, eventData, eventAuthSecret!!)

val message = api.json.encodeToString(envelope)

Expand Down Expand Up @@ -264,16 +280,31 @@ class RedisEventBusImpl(private val api: RedisApi) : RedisEventBus {
@Serializable
private data class EventEnvelope(
val eventClass: String,
val eventData: JsonElement
val eventData: JsonElement,
val signature: String
) {
fun isValid(secret: String): Boolean {
return signature == sign(secret, eventClass, eventData)
}

companion object {
fun forEvent(event: RedisEvent, data: JsonElement): EventEnvelope {
fun forEvent(event: RedisEvent, data: JsonElement, secret: String): EventEnvelope {
return EventEnvelope(
eventClass = event.javaClass.name,
eventData = data
eventData = data,
signature = sign(secret, event.javaClass.name, data)
)
}

private fun sign(secret: String, eventClass: String, eventData: JsonElement): String {
val mac = Mac.getInstance("HmacSHA256")
mac.init(SecretKeySpec(secret.toByteArray(StandardCharsets.UTF_8), "HmacSHA256"))

val payload = "$eventClass:$eventData"
val digest = mac.doFinal(payload.toByteArray(StandardCharsets.UTF_8))
return Base64.getEncoder().encodeToString(digest)
}

}
}
}
}