RxJava: thread safety of the Operators and Subjects

Hello, dear reader.

TL;TR: most of RxJava Operators and Subjects are NOT thread safe. Is it ok or not? No spoilers!

RxJava is great and so on. But. Concurrency. This topic was on my list for a long time. Let's finally discuss it.

The Observable Contract

Observables must issue notifications to observers serially (not in parallel). They may issue these notifications from different threads, but there must be a formal happens-before relationship between the notifications.

See it? It is a violation of the contract if you emit data from different threads in parallel from the Observable!


Let's take a look at veeery simple operator take(n).

If you'll open OperatorTake sources you will find out that there is no synchronization and thread-safety going on.

// …

int count;  
boolean completed;

@Override
public void onCompleted() {  
    if (!completed) {
        completed = true;
        child.onCompleted();
    }
}

// …

Wait, but why it's ok? Isn't RxJava is a solution for asynchronous reactive programming?

Well, it is a solution and it's pretty damn good. Async doesn't mean concurrent.

The reason why most of the Operators and Subjects are NOT thread safe is performance. If every operator, even such trivial as take(n) will be thread safe we will have to pay a lot for usually useless synchronization.

David Karnok (Lead developer of RxJava) added some other reasons why most of the operators are not thread safe:

  • Because complexity of the operators implementation would be significantly increased (take a look at merge()).
  • Will be harder to maintain operators.
  • Will be harder to reason about operators and the flows they combined to.

Wait, so you're saying that you can break take(n) via multithreading emission?

Well, sorry, but yes, I do 😅.

@Test
fun `break take(n)`() {  
    val numberOfThreads = 10

    repeat(100000) {
        println("Iteration = $it")

        val publishSubject = PublishSubject.create<Int>() // Fix: .toSerialized().
        val actuallyReceived = AtomicInteger()

        publishSubject.take(3).subscribe { actuallyReceived.incrementAndGet() }

        val latch = CountDownLatch(numberOfThreads)
        var threads = listOf<Thread>()

        (0..numberOfThreads).forEach {
            threads += thread(start = false) {
                publishSubject.onNext(it)
                latch.countDown()
            }
        }

        threads.forEach { it.start() }

        latch.await()

        assertThat(actuallyReceived.get()).isEqualTo(3)
    }
}

(actually this code ^ crashes Kotlin compiler 😅, but works if you rewrite it in Java)

Don't panic and carry your towel

Ok? Good 👍

What Operators are NOT thread safe?

Basically, all Operators that operate over one Observable: take(n), map(), distinctUntilChanged() and so on.

Except operators with scheduling, like: window(…, scheduler), debounce(…, scheduler), etc.

Don't look for a complete list here, just try to understand what kind of Operators are thread safe and what aren't.

What Operators are thread safe?

Basically, all Operators that operate over multiple Observables are thread safe: merge(), combineLatest(), zip() and so on.

They do serialize resulting emission for downstream which makes things work correctly for non-thread safe downstream Operators.

Again don't look for a complete list here. Get the idea!

So, the pattern for Operators is…?

Mostly it's:

fun operatorThreadSafety() = if (operator.worksWithOneObservable() &&  
  operator.supportsScheduling == false) {
  Operator.NOT_THREAD_SAFE_AND_THAT_IS_OK
} else {
  Operator.MOST_LIKELY_THREAD_SAFE
}

What about Subjects?

So, here is the problem… All Subjects are NOT thread safe, except SerializedSubject.

Yes, all your favorite PublishSubjects and BehaviorSubjects are NOT thread safe.

This ^ is actually dangerous! Because subjects usually shared between different pieces of code which may potentially run on different threads in parallel.

I learned it the hard way, we had a subject and multiple streams from network requests that were onNext()ing to it, it eventually broke downstream distinctUntilChanged() and our business logic.

Shit, concurrent write to Subjects is dangerous… What should I do with them then?

Serialize them! Pattern:

fun threadSafeSubject(subject: Subject) = if (you.writeToItFromMultipleThreads()) {  
  subject.toSerialized() // Serialize it! Now!
} else {
  subject // You're fine, use it as is.
}

This is especially dangerous if you use Subject as an event bus. For example we do in StorIO and we have our subject serialized because changes in database happen on different threads and potentially concurrently so we keep our users safe.

What should I do with my custom Observables that emit data concurrently?

First of all, don't use Observable.create(): see RxJava#PR#4253.

Secondly, you need to serialize emission of the Observable, easiest way is to call serialize() on it and use resulting Observable.

What should I do in general with concurrency and RxJava?

Just don't violate The Observable Contract and serialize() Observable if you do emit data concurrently.


Easy breezy. No need to panic. Sorry for saying that your production code may be broken 😁.

Any plans to make it better from RxJava side?

Well, probably Subjects will become serialized by default and non-serialized versions will be available as kind of "Unsafe" api for those, who understand what are they doing.

If you're not creating custom Observables that emit data in parallel and not writing to Subjects from multiple threads in parallel you're fine!


Article is ~true for RxJava 1.1.8.

comments powered by Disqus