diff --git a/src/org/thoughtcrime/securesms/dependencies/SignalCommunicationModule.java b/src/org/thoughtcrime/securesms/dependencies/SignalCommunicationModule.java index 0b3d82c7960..07e8d47915e 100644 --- a/src/org/thoughtcrime/securesms/dependencies/SignalCommunicationModule.java +++ b/src/org/thoughtcrime/securesms/dependencies/SignalCommunicationModule.java @@ -131,11 +131,10 @@ synchronized SignalServiceMessageSender provideSignalMessageSender() { new SignalProtocolStoreImpl(context), BuildConfig.USER_AGENT, TextSecurePreferences.isMultiDevice(context), - Optional.fromNullable(IncomingMessageObserver.getPipe()), - Optional.fromNullable(IncomingMessageObserver.getUnidentifiedPipe()), + IncomingMessageObserver.getPipeReference(), + IncomingMessageObserver.getUnidentifiedPipeReference(), Optional.of(new SecurityEventListener(context))); } else { - this.messageSender.setMessagePipe(IncomingMessageObserver.getPipe(), IncomingMessageObserver.getUnidentifiedPipe()); this.messageSender.setIsMultiDevice(TextSecurePreferences.isMultiDevice(context)); } diff --git a/src/org/thoughtcrime/securesms/service/IncomingMessageObserver.java b/src/org/thoughtcrime/securesms/service/IncomingMessageObserver.java index c0d7ac95aab..e51f19085e5 100644 --- a/src/org/thoughtcrime/securesms/service/IncomingMessageObserver.java +++ b/src/org/thoughtcrime/securesms/service/IncomingMessageObserver.java @@ -1,11 +1,18 @@ package org.thoughtcrime.securesms.service; import android.app.Service; +import android.content.BroadcastReceiver; import android.arch.lifecycle.DefaultLifecycleObserver; import android.arch.lifecycle.LifecycleOwner; import android.arch.lifecycle.ProcessLifecycleOwner; import android.content.Context; import android.content.Intent; +import android.content.IntentFilter; +import android.net.ConnectivityManager; +import android.net.Network; +import android.net.NetworkRequest; +import android.net.NetworkInfo; +import android.os.Build; import android.os.IBinder; import android.support.annotation.NonNull; import android.support.annotation.Nullable; @@ -28,6 +35,8 @@ import org.whispersystems.signalservice.api.SignalServiceMessagePipe; import org.whispersystems.signalservice.api.SignalServiceMessageReceiver; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -46,6 +55,14 @@ public class IncomingMessageObserver implements InjectableType, ConstraintObserv private final Context context; private final NetworkConstraint networkConstraint; + public static AtomicReference pipeReference = new AtomicReference<>(); + public static AtomicReference unidentifiedPipeReference = new AtomicReference<>(); + + private final MessageRetrievalThread retrievalThread; + + private BroadcastReceiver connectivityChangeReceiver; + private ConnectivityManager.NetworkCallback connectivityChangeCallback; + private boolean appVisible; @Inject SignalServiceMessageReceiver receiver; @@ -58,10 +75,13 @@ public IncomingMessageObserver(@NonNull Context context) { this.networkConstraint = new NetworkConstraint.Factory(ApplicationContext.getInstance(context)).create(); new NetworkConstraintObserver(ApplicationContext.getInstance(context)).register(this); - new MessageRetrievalThread().start(); + + retrievalThread = new MessageRetrievalThread(); + retrievalThread.start(); if (TextSecurePreferences.isFcmDisabled(context)) { ContextCompat.startForegroundService(context, new Intent(context, ForegroundService.class)); + setupNetworkMonitoring(); } ProcessLifecycleOwner.get().getLifecycle().addObserver(new DefaultLifecycleObserver() { @@ -108,10 +128,12 @@ private synchronized boolean isConnectionNecessary() { } private synchronized void waitForConnectionNecessary() { - try { - while (!isConnectionNecessary()) wait(); - } catch (InterruptedException e) { - throw new AssertionError(e); + while (!isConnectionNecessary()) { + try { + wait(); + } catch (InterruptedException e) { + Log.d(TAG, "Retrieval thread interrupted while not connected; ignoring."); + } } } @@ -132,6 +154,14 @@ private void shutdown(SignalServiceMessagePipe pipe, SignalServiceMessagePipe un return unidentifiedPipe; } + public static AtomicReference getPipeReference() { + return pipeReference; + } + + public static AtomicReference getUnidentifiedPipeReference() { + return unidentifiedPipeReference; + } + private class MessageRetrievalThread extends Thread implements Thread.UncaughtExceptionHandler { MessageRetrievalThread() { @@ -149,11 +179,14 @@ public void run() { pipe = receiver.createMessagePipe(); unidentifiedPipe = receiver.createUnidentifiedMessagePipe(); - SignalServiceMessagePipe localPipe = pipe; - SignalServiceMessagePipe unidentifiedLocalPipe = unidentifiedPipe; + pipeReference.set(pipe); + unidentifiedPipeReference.set(unidentifiedPipe); + + final SignalServiceMessagePipe localPipe = pipe; + final SignalServiceMessagePipe unidentifiedLocalPipe = unidentifiedPipe; try { - while (isConnectionNecessary()) { + while (isConnectionNecessary() && !interrupted()) { try { Log.i(TAG, "Reading message..."); localPipe.read(REQUEST_TIMEOUT_MINUTES, TimeUnit.MINUTES, @@ -167,6 +200,10 @@ public void run() { Log.w(TAG, e); } } + } catch (InterruptedException e) { + Log.d(TAG, "Retrieval thread interrupted."); + } catch (IOException e) { + Log.d(TAG, "Message pipe failed: " + e.getMessage()); } catch (Throwable e) { Log.w(TAG, e); } finally { @@ -185,6 +222,78 @@ public void uncaughtException(Thread t, Throwable e) { } } + private void setupNetworkMonitoring() { + if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.M) { + final ConnectivityManager connectivityManager = (ConnectivityManager)context.getSystemService(Context.CONNECTIVITY_SERVICE); + + connectivityChangeCallback = new ConnectivityManager.NetworkCallback() { + private Network current; + + private void update(Network network) { + final Network previous = current; + current = network; + + Log.d(TAG, "Currently active network: " + network); + + if (previous != null && (current == null || !current.equals(previous))) { + Log.d(TAG, + "Active network changed (" + previous + " -> " + current + + "); interrupting the retrieval thread to recycle the pipe."); + + retrievalThread.interrupt(); + } + } + + @Override + public void onAvailable(Network network) { + final ConnectivityManager connectivityManager = (ConnectivityManager)context.getSystemService(Context.CONNECTIVITY_SERVICE); + + update(connectivityManager.getActiveNetwork()); + } + + @Override + public void onLost(Network network) { + final ConnectivityManager connectivityManager = (ConnectivityManager)context.getSystemService(Context.CONNECTIVITY_SERVICE); + + update(connectivityManager.getActiveNetwork()); + } + }; + + connectivityManager.registerNetworkCallback(new NetworkRequest.Builder().build(), + connectivityChangeCallback); + } else { + connectivityChangeReceiver = new BroadcastReceiver() { + private int current = -1; + + @Override + public void onReceive(Context context, Intent intent) { + final ConnectivityManager connectivityManager = (ConnectivityManager)context.getSystemService(Context.CONNECTIVITY_SERVICE); + + final NetworkInfo info = connectivityManager.getActiveNetworkInfo(); + final int previous = current; + + if (info == null) { + current = -1; + } else if (info.isConnected()) { + current = info.getType(); + } + + Log.d(TAG, "Currently active network: " + current); + + if (previous != -1 && previous != current) { + Log.d(TAG, + "Active network changed (" + previous + " -> " + current + + "); interrupting the retrieval thread to recycle the pipe."); + retrievalThread.interrupt(); + } + } + }; + + context.registerReceiver(connectivityChangeReceiver, + new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION)); + } + } + public static class ForegroundService extends Service { @Override