Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions common/api/current.api
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,76 @@ package com.urlaunched.android.common.socket {

}

package com.urlaunched.android.common.socket.socketmanager {

public abstract sealed class DataType {
method public final <T, R> R safeType(kotlin.jvm.functions.Function1<? super com.urlaunched.android.common.socket.socketmanager.DataType.DateSeparator,? extends R> dateSeparator, kotlin.jvm.functions.Function1<? super com.urlaunched.android.common.socket.socketmanager.DataType.TimeSeparator<T>,? extends R> timeSeparator, kotlin.jvm.functions.Function1<? super com.urlaunched.android.common.socket.socketmanager.DataType.Data<T>,? extends R> data);
method @androidx.compose.runtime.Composable public final <T> void safeType(kotlin.jvm.functions.Function1<? super com.urlaunched.android.common.socket.socketmanager.DataType.DateSeparator,kotlin.Unit> dateSeparator, kotlin.jvm.functions.Function1<? super com.urlaunched.android.common.socket.socketmanager.DataType.TimeSeparator<T>,kotlin.Unit> timeSeparator, kotlin.jvm.functions.Function1<? super com.urlaunched.android.common.socket.socketmanager.DataType.Data<T>,kotlin.Unit> data);
}

public static final class DataType.Data<T> extends com.urlaunched.android.common.socket.socketmanager.DataType {
ctor public DataType.Data(T data, java.time.Instant createdAt);
method public T component1();
method public java.time.Instant component2();
method public com.urlaunched.android.common.socket.socketmanager.DataType.Data<T> copy(T! data, java.time.Instant createdAt);
method public java.time.Instant getCreatedAt();
method public T getData();
property public final java.time.Instant createdAt;
property public final T data;
}

public static final class DataType.DateSeparator extends com.urlaunched.android.common.socket.socketmanager.DataType {
ctor public DataType.DateSeparator(java.time.Instant time);
method public java.time.Instant component1();
method public com.urlaunched.android.common.socket.socketmanager.DataType.DateSeparator copy(java.time.Instant time);
method public java.time.Instant getTime();
property public final java.time.Instant time;
}

public static final class DataType.TimeSeparator<T> extends com.urlaunched.android.common.socket.socketmanager.DataType {
ctor public DataType.TimeSeparator(T data);
method public T component1();
method public com.urlaunched.android.common.socket.socketmanager.DataType.TimeSeparator<T> copy(T! data);
method public T getData();
property public final T data;
}

public interface SocketEvent<T> {
method public T getData();
property public abstract T data;
}

public static interface SocketEvent.Delete<T> extends com.urlaunched.android.common.socket.socketmanager.SocketEvent<T> {
property public abstract T data;
}

public static interface SocketEvent.New<T> extends com.urlaunched.android.common.socket.socketmanager.SocketEvent<T> {
property public abstract T data;
}

public static interface SocketEvent.Updated<T> extends com.urlaunched.android.common.socket.socketmanager.SocketEvent<T> {
property public abstract T data;
}

public final class SocketManager<I, T> {
ctor public SocketManager(kotlinx.coroutines.flow.Flow<androidx.paging.PagingData<I>> pagingDataFlow, kotlin.jvm.functions.Function1<? super I,? extends T> mapper, kotlin.jvm.functions.Function1<? super T,java.lang.Long> itemId, kotlin.jvm.functions.Function1<? super T,java.time.Instant> itemCreatedAt, kotlinx.coroutines.CoroutineScope scope, kotlinx.coroutines.CoroutineDispatcher coroutineDispatcher);
method public void cancel();
method public void cancelDelete(long id);
method public void delete(long id, T data);
method public kotlinx.coroutines.flow.Flow<androidx.paging.PagingData<T>> getFlow();
method public kotlinx.coroutines.flow.Flow<androidx.paging.PagingData<com.urlaunched.android.common.socket.socketmanager.DataType>> getFlow(kotlin.jvm.functions.Function2<? super com.urlaunched.android.common.socket.socketmanager.SocketManager<I,T>,? super androidx.paging.PagingData<com.urlaunched.android.common.socket.socketmanager.DataType.Data<T>>,androidx.paging.PagingData<com.urlaunched.android.common.socket.socketmanager.DataType>> separators, kotlin.jvm.functions.Function1<? super T,java.time.Instant> createdAt);
method public void handleSocketAction(long id, com.urlaunched.android.common.socket.socketmanager.SocketEvent<T> data);
method public <T> androidx.paging.PagingData<com.urlaunched.android.common.socket.socketmanager.DataType> insertDateSeparators(androidx.paging.PagingData<com.urlaunched.android.common.socket.socketmanager.DataType.Data<T>>, kotlin.jvm.functions.Function1<? super com.urlaunched.android.common.socket.socketmanager.DataType.Data<T>,java.time.Instant> createdAt);
method public <T> androidx.paging.PagingData<com.urlaunched.android.common.socket.socketmanager.DataType> insertTimeSeparators(androidx.paging.PagingData<com.urlaunched.android.common.socket.socketmanager.DataType>, kotlin.jvm.functions.Function1<? super com.urlaunched.android.common.socket.socketmanager.DataType.Data<T>,java.lang.Boolean> isAuthor, optional java.time.Duration? maxDurationBetweenData);
method public void mapHandleSocketAction(long id, com.urlaunched.android.common.socket.socketmanager.SocketEvent<I> data);
method public void mapUpdate(long id, I data);
method public void refresh();
method public <S> void subscribeUpdatesFromRemote(kotlinx.coroutines.flow.Flow<? extends com.urlaunched.android.common.socket.ActionCableSocketEventMessage<S>> flow, kotlin.jvm.functions.Function1<? super com.urlaunched.android.common.socket.ActionCableSocketEventMessage<S>,kotlin.Unit> onEvent);
method public void update(long id, T data);
}

}

package com.urlaunched.android.common.string {

public final class OrNullKt {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.urlaunched.android.common.socket.socketmanager

import androidx.compose.runtime.Composable
import java.time.Instant

sealed class DataType {
data class DateSeparator(val time: Instant) : DataType()
data class TimeSeparator<T>(val data: T) : DataType()
data class Data<T>(val data: T, val createdAt: Instant) : DataType()

fun <T, R> safeType(
dateSeparator: (DateSeparator) -> R,
timeSeparator: (TimeSeparator<T>) -> R,
data: (Data<T>) -> R
) = when {
this::class == Data::class -> data(this as Data<T>)
this::class == DateSeparator::class -> dateSeparator(this as DateSeparator)
this::class == TimeSeparator::class -> timeSeparator(this as TimeSeparator<T>)
else -> throw IllegalStateException("Unknown type of DataType class")
}

@Suppress("ComposableNaming")
@Composable
fun <T> safeType(
dateSeparator: @Composable (DateSeparator) -> Unit,
timeSeparator: @Composable (TimeSeparator<T>) -> Unit,
data: @Composable (Data<T>) -> Unit
) = when {
this::class == Data::class -> data(this as Data<T>)
this::class == DateSeparator::class -> dateSeparator(this as DateSeparator)
this::class == TimeSeparator::class -> timeSeparator(this as TimeSeparator<T>)
else -> throw IllegalStateException("Unknown type of DataType class")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.urlaunched.android.common.socket.socketmanager

interface SocketEvent<T> {
val data: T

interface Updated<T> : SocketEvent<T> {
override val data: T
}
interface New<T> : SocketEvent<T> {
override val data: T
}
interface Delete<T> : SocketEvent<T> {
override val data: T
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
package com.urlaunched.android.common.socket.socketmanager

import androidx.paging.PagingData
import androidx.paging.cachedIn
import androidx.paging.filter
import androidx.paging.insertHeaderItem
import androidx.paging.insertSeparators
import androidx.paging.map
import com.urlaunched.android.common.socket.ActionCableSocketEventMessage
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.collectLatest
import kotlinx.coroutines.flow.combine
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.coroutines.flow.update
import kotlinx.coroutines.launch
import java.time.Duration
import java.time.Instant
import java.time.ZoneId

class SocketManager<I : Any, T : Any>(
private val pagingDataFlow: Flow<PagingData<I>>,
private val mapper: (I) -> T,
private val itemId: (T) -> Long,
private val itemCreatedAt: (T) -> Instant?,
private val scope: CoroutineScope,
private val coroutineDispatcher: CoroutineDispatcher
) {
private val refreshChannel = Channel<Unit>()
private var updatesSocketJob: Job? = null
private val accumulatedUpdates = MutableStateFlow(mapOf<Long, T>())
private val accumulatedInserts = MutableStateFlow(mapOf<Long, T>())
private val accumulatedDeletes = MutableStateFlow(mapOf<Long, T>())

fun getFlow(): Flow<PagingData<T>> = combine(
// Channel for ability to refresh by lifecycle action
refreshChannel
.receiveAsFlow()
.onStart { emit(Unit) }
.flatMapLatest { pagingDataFlow }
.onEach {
// Reset local state on list reload
accumulatedUpdates.update { emptyMap() }
accumulatedInserts.update { emptyMap() }
}
.cachedIn(scope),
accumulatedUpdates,
accumulatedInserts,
accumulatedDeletes
) { pagingData, chatsUpdates, chatInserts, chatDeletes ->
if (chatsUpdates.isEmpty() && chatInserts.isEmpty()) {
pagingData.map { mapper(it) }
} else {
val flow = pagingData.map { data ->
mapper(data)
}

chatInserts.values
.sortedBy { chat -> itemCreatedAt(chat) }
.fold(flow) { acc, data ->
acc.insertHeaderItem(item = data)
}
}
.filter { chatDeletes.contains(itemId(it)).not() }
.let { data ->
if (chatsUpdates.isNotEmpty()) {
data.map {
chatsUpdates[itemId(it)] ?: it
}
} else {
data
}
}
}

fun getFlow(
separators: SocketManager<I, T>.(PagingData<DataType.Data<T>>) -> PagingData<DataType>,
createdAt: (T) -> Instant
): Flow<PagingData<DataType>> = getFlow().map {
it.map { data ->
DataType.Data(data = data, createdAt = createdAt(data))
}.run {
with(this@SocketManager) {
separators(this@run)
}
}
}

fun <T> PagingData<DataType.Data<T>>.insertDateSeparators(createdAt: (DataType.Data<T>) -> Instant) =
insertSeparators { before: DataType.Data<T>?, after: DataType.Data<T>? ->
val dateBefore = before?.let(createdAt)
val dateAfter = after?.let(createdAt)
val localDateBefore = dateBefore?.atZone(ZoneId.systemDefault())
val localDateAfter = dateAfter?.atZone(ZoneId.systemDefault())

when {
dateAfter == null && dateBefore != null -> {
DataType.DateSeparator(dateBefore)
}

dateBefore != null && localDateBefore?.dayOfYear != localDateAfter?.dayOfYear -> {
DataType.DateSeparator(dateBefore)
}

else -> null
}
}

fun <T> PagingData<DataType>.insertTimeSeparators(
isAuthor: (DataType.Data<T>) -> Boolean,
maxDurationBetweenData: Duration? = null
) = insertSeparators { before: DataType?, after: DataType? ->
val beforeData = (before as? DataType.Data<T>)
val afterData = (after as? DataType.Data<T>)

when {
before == null && after is DataType.Data<*> -> {
DataType.TimeSeparator(data = after.data)
}

before is DataType.DateSeparator && (after as? DataType.Data<*>) != null -> {
DataType.TimeSeparator(data = after.data)
}

beforeData?.let(isAuthor) == false && afterData?.let(isAuthor) == true -> {
DataType.TimeSeparator(data = after.data)
}

beforeData?.let(isAuthor) == true && afterData?.let(isAuthor) == false -> {
DataType.TimeSeparator(data = after.data)
}

beforeData != null && afterData != null && maxDurationBetweenData != null && Duration.between(
afterData.createdAt,
beforeData.createdAt
) > maxDurationBetweenData -> {
DataType.TimeSeparator(data = after.data)
}

else -> null
}
}

fun handleSocketAction(id: Long, data: SocketEvent<T>) {
when (data) {
is SocketEvent.New -> accumulatedInserts.update { updates ->
updates.plus(id to data.data)
}

is SocketEvent.Updated -> accumulatedUpdates.update { updates ->
updates.plus(id to data.data)
}

is SocketEvent.Delete -> accumulatedDeletes.update { updates ->
updates.plus(id to data.data)
}
}
}

fun mapHandleSocketAction(id: Long, data: SocketEvent<I>) {
when (data) {
is SocketEvent.New -> accumulatedInserts.update { updates ->
updates.plus(id to mapper(data.data))
}

is SocketEvent.Updated -> accumulatedUpdates.update { updates ->
updates.plus(id to mapper(data.data))
}

is SocketEvent.Delete -> accumulatedDeletes.update { updates ->
updates.plus(id to mapper(data.data))
}
}
}

fun update(id: Long, data: T) {
accumulatedUpdates.update { updates ->
updates.plus(id to data)
}
}

fun delete(id: Long, data: T) {
accumulatedDeletes.update { updates ->
updates.plus(id to data)
}
}

fun cancelDelete(id: Long) {
accumulatedDeletes.update { updates ->
updates.minus(id)
}
}

fun mapUpdate(id: Long, data: I) {
accumulatedUpdates.update { updates ->
updates.plus(id to mapper(data))
}
}

fun <S> subscribeUpdatesFromRemote(
flow: Flow<ActionCableSocketEventMessage<S>>,
onEvent: (ActionCableSocketEventMessage<S>) -> Unit
) {
updatesSocketJob?.cancel()
updatesSocketJob = scope.launch(coroutineDispatcher) {
flow.collectLatest { socketMessage ->
when (socketMessage) {
is ActionCableSocketEventMessage.Message -> {
onEvent(socketMessage)
}

is ActionCableSocketEventMessage.SubscriptionRejected -> {
onEvent(socketMessage)
}

is ActionCableSocketEventMessage.Error -> {
// Quietly resubscribe on network error
subscribeUpdatesFromRemote(flow, onEvent)
onEvent(socketMessage)
}

else -> {
onEvent(socketMessage)
}
}
}
}
}

fun refresh() {
scope.launch(coroutineDispatcher) {
refreshChannel.send(Unit)
}
}

fun cancel() {
updatesSocketJob?.cancel()
}
}