Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Acquire fewer locks in TaskRunner #8394

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

swankjesse
Copy link
Collaborator

Previously each run did this:

  • acquire a lock to take a task
  • acquire a lock to finish a task
  • if crashed, acquire a lock to start a new thread

So to run 10 tasks without any crashes, we'd acquire the lock 20 times.

With this update, we do this:

  • acquire a lock to take the first task
  • acquire a lock to release task N and take task N + 1

So to run 10 tasks without any crashes, we now acquire the lock 11 times.

Previously each run did this:

 - acquire a lock to take a task
 - acquire a lock to finish a task
 - if crashed, acquire a lock to start a new thread

So to run 10 tasks without any crashes, we'd acquire
the lock 20 times.

With this update, we do this:

 - acquire a lock to take the first task
 - acquire a lock to release task N and take task N + 1

So to run 10 tasks without any crashes, we now acquire
the lock 11 times.
lock.withLock {
afterRun(task, delayNanos)
}
currentThread.name = oldName
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also change the thread name fewer times!

"FINE: Q10000 finished run in 0 µs: task",
"FINE: Q10000 run again after 50 µs: task",
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this new order better

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 more logical

@yschimke
Copy link
Collaborator

Does this failure need an update?

ConnectionPoolTest > interruptStopsThread() FAILED
    org.opentest4j.AssertionFailedError: expected to be empty but was:<[Q10505]>
        at app//okhttp3.internal.connection.ConnectionPoolTest.interruptStopsThread(ConnectionPoolTest.kt:203)

@swankjesse
Copy link
Collaborator Author

I think there’s non-determinism that I need to fix in that test, as I can’t reproduce that failure locally! Lemme do that first, then rebase this on top.

@Endeavour233
Copy link
Contributor

I think there’s non-determinism that I need to fix in that test, as I can’t reproduce that failure locally! Lemme do that first, then rebase this on top.

I am confused about what the interruptStopsThread is designed for. Did we expect that a TaskRunner thread(coordinator) is interrupted when it's waiting to execute a scheduled cleanupTask in cleanupQueue, and then cancel all cancellable tasks of queues in activeQueues(readyQueues + busyQueues)(in this case, cleanupTask is cancelled and removed from cleanupQueue, resulting in an empty cleanupQueue, which is then removed from readyQueues), so that we could observe that the activeQueues is empty?

If so, I think the idleAtNs of the added connection should be LONG.MAX_VALUE - 101 instead of the default value: LONG.MAX_VALUE.

 @Test fun interruptStopsThread() {
    val realTaskRunner = TaskRunner.INSTANCE
    val pool =
      factory.newConnectionPool(
        taskRunner = TaskRunner.INSTANCE,
        maxIdleConnections = 2,
      )
    // factory.newConnection(pool, routeA1, Long.MAX_VALUE - 101)?
    factory.newConnection(pool, routeA1)
    assertThat(realTaskRunner.activeQueues()).isNotEmpty()
    Thread.sleep(100)
    val threads = arrayOfNulls<Thread>(Thread.activeCount() * 2)
    Thread.enumerate(threads)
    for (t in threads) {
      if (t != null && t.name == "OkHttp TaskRunner") {
        t.interrupt()
      }
    }
    Thread.sleep(100)
    assertThat(realTaskRunner.activeQueues()).isEmpty()
  }

This default value will make the connection ignored when cleanupTask tries to figure out whether there is any connection remaining to be closed. Consequently, the cleanupTask won't be added back to cleanupQueue and the cleanupQueue won't be added back to the readyQueue after its initial execution - activeQueue is empty even if we did not interrupt threads.

// RealConnectionPool.kt      
fun closeCnnections(now: Long): Long {      
                  //...          
          if (idleAtNs < earliestEvictableIdleAtNs) { // idleAtNs = Long.MAX_VALUE earliestEvicatbleIdleAtNs = Long.MAX_VALUE
            earliestEvictableIdleAtNs = idleAtNs
            earliestEvictableConnection = connection
          }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants