Skip to content

Commit

Permalink
Fixes for runBackground mutex and log management (#3971)
Browse files Browse the repository at this point in the history
Fixes #3955.

* We make `runBackground` forward the stdout/stderr to
`$serverDir/{stdout,stderr}` instead of `os.Inherit`. This is necessary
because since we started using
com-lihaoyi/os-lib@59b5fd9,
`os.Inherit` is automatically pumped to the enclosing task logger, which
for `runBackground` ends up being closed immediately so the logs are
lost.
* Now, the logs are instead picked up asynchronously by the
`FileToStreamTailer` infrastructure, which picks them up and forwards
them to any connected client regardless of who started the runBackground
process

* Moved usage of `FileToStreamTailer` from the mill client to the
server.
* This allows better integration with the Mill logging infrastructure,
e.g. ensuring tailed logs properly interact with the multi-line prompt
by clearing the prompt before being printed and re-printing the prompt
after.

* Simplified `BackgroundWrapper`
* Renamed it `MillBackgroundWrapper` so it's more clear what it is when
seen in `jps`
* Use a file-lock for mutex, rather than polling on the process
uuid/tombstone files
* We still need to add a `Thread.sleep` after we take the lock, because
the prior process seems to still hold on to sockets for some period of
time. This defaults to 500ms (what is necessary experimentally) but is
configurable by the new `runBackgroundRestartDelayMillis: T[Int]` task
* Generally unified the creation/shutdown logic within
`MillBackgroundWrapper`, rather than having it split between
`BackgroundWrapper` and `def backgroundSetup` in the Mill server process

Tested manually by running `rm -rf out &&
/Users/lihaoyi/Github/mill/target/mill-release -w runBackground` inside
`example/javalib/web/1-hello-jetty`. Forced updates via `Enter` in the
terminal or via editing server source files. Verified that the
`runBackground` server logs appear in the console and that they do not
conflict with the multi-line status prompt
  • Loading branch information
lihaoyi authored Nov 16, 2024
1 parent 4cddacd commit 7457601
Show file tree
Hide file tree
Showing 15 changed files with 272 additions and 216 deletions.
4 changes: 2 additions & 2 deletions example/scalalib/web/1-todo-webapp/test/src/WebAppTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ import utest._
object WebAppTests extends TestSuite {
def withServer[T](example: cask.main.Main)(f: String => T): T = {
val server = io.undertow.Undertow.builder
.addHttpListener(8081, "localhost")
.addHttpListener(8181, "localhost")
.setHandler(example.defaultHandler)
.build
server.start()
val res =
try f("http://localhost:8081")
try f("http://localhost:8181")
finally server.stop()
res
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ import utest._
object WebAppTests extends TestSuite {
def withServer[T](example: cask.main.Main)(f: String => T): T = {
val server = io.undertow.Undertow.builder
.addHttpListener(8081, "localhost")
.addHttpListener(8182, "localhost")
.setHandler(example.defaultHandler)
.build
server.start()
val res =
try f("http://localhost:8081")
try f("http://localhost:8182")
finally server.stop()
res
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ import utest._
object WebAppTests extends TestSuite {
def withServer[T](example: cask.main.Main)(f: String => T): T = {
val server = io.undertow.Undertow.builder
.addHttpListener(8081, "localhost")
.addHttpListener(8184, "localhost")
.setHandler(example.defaultHandler)
.build
server.start()
val res =
try f("http://localhost:8081")
try f("http://localhost:8184")
finally server.stop()
res
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ import utest._
object WebAppTests extends TestSuite {
def withServer[T](example: cask.main.Main)(f: String => T): T = {
val server = io.undertow.Undertow.builder
.addHttpListener(8081, "localhost")
.addHttpListener(8185, "localhost")
.setHandler(example.defaultHandler)
.build
server.start()
val res =
try f("http://localhost:8081")
try f("http://localhost:8185")
finally server.stop()
res
}
Expand Down
113 changes: 50 additions & 63 deletions main/client/src/mill/main/client/ServerLauncher.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ public static class Result {
public Path serverDir;
}

static final int tailerRefreshIntervalMillis = 2;
final int serverProcessesLimit = 5;
final int serverInitWaitMillis = 10000;

Expand Down Expand Up @@ -120,75 +119,63 @@ public Result acquireLocksAndRun(String outDir) throws Exception {
}

int run(Path serverDir, boolean setJnaNoSys, Locks locks) throws Exception {
try (final FileToStreamTailer stdoutTailer = new FileToStreamTailer(
new java.io.File(serverDir + "/" + ServerFiles.stdout),
stdout,
tailerRefreshIntervalMillis);
final FileToStreamTailer stderrTailer = new FileToStreamTailer(
new java.io.File(serverDir + "/" + ServerFiles.stderr),
stderr,
tailerRefreshIntervalMillis); ) {
stdoutTailer.start();
stderrTailer.start();
String serverPath = serverDir + "/" + ServerFiles.runArgs;
try (OutputStream f = Files.newOutputStream(Paths.get(serverPath))) {
f.write(System.console() != null ? 1 : 0);
Util.writeString(f, BuildInfo.millVersion);
Util.writeArgs(args, f);
Util.writeMap(env, f);
}

if (locks.processLock.probe()) {
initServer(serverDir, setJnaNoSys, locks);
}

while (locks.processLock.probe()) Thread.sleep(3);

String socketName = ServerFiles.pipe(serverDir.toString());
AFUNIXSocketAddress addr = AFUNIXSocketAddress.of(new File(socketName));
String serverPath = serverDir + "/" + ServerFiles.runArgs;
try (OutputStream f = Files.newOutputStream(Paths.get(serverPath))) {
f.write(System.console() != null ? 1 : 0);
Util.writeString(f, BuildInfo.millVersion);
Util.writeArgs(args, f);
Util.writeMap(env, f);
}

long retryStart = System.currentTimeMillis();
Socket ioSocket = null;
Throwable socketThrowable = null;
while (ioSocket == null && System.currentTimeMillis() - retryStart < serverInitWaitMillis) {
try {
ioSocket = AFUNIXSocket.connectTo(addr);
} catch (Throwable e) {
socketThrowable = e;
Thread.sleep(10);
}
}
if (locks.processLock.probe()) {
initServer(serverDir, setJnaNoSys, locks);
}

if (ioSocket == null) {
throw new Exception("Failed to connect to server", socketThrowable);
}
while (locks.processLock.probe()) Thread.sleep(3);

InputStream outErr = ioSocket.getInputStream();
OutputStream in = ioSocket.getOutputStream();
ProxyStream.Pumper outPumper = new ProxyStream.Pumper(outErr, stdout, stderr);
InputPumper inPump = new InputPumper(() -> stdin, () -> in, true);
Thread outPumperThread = new Thread(outPumper, "outPump");
outPumperThread.setDaemon(true);
Thread inThread = new Thread(inPump, "inPump");
inThread.setDaemon(true);
outPumperThread.start();
inThread.start();

if (forceFailureForTestingMillisDelay > 0) {
Thread.sleep(forceFailureForTestingMillisDelay);
throw new Exception("Force failure for testing: " + serverDir);
}
outPumperThread.join();
String socketName = ServerFiles.pipe(serverDir.toString());
AFUNIXSocketAddress addr = AFUNIXSocketAddress.of(new File(socketName));

long retryStart = System.currentTimeMillis();
Socket ioSocket = null;
Throwable socketThrowable = null;
while (ioSocket == null && System.currentTimeMillis() - retryStart < serverInitWaitMillis) {
try {
return Integer.parseInt(
Files.readAllLines(Paths.get(serverDir + "/" + ServerFiles.exitCode))
.get(0));
ioSocket = AFUNIXSocket.connectTo(addr);
} catch (Throwable e) {
return Util.ExitClientCodeCannotReadFromExitCodeFile();
} finally {
ioSocket.close();
socketThrowable = e;
Thread.sleep(10);
}
}

if (ioSocket == null) {
throw new Exception("Failed to connect to server", socketThrowable);
}

InputStream outErr = ioSocket.getInputStream();
OutputStream in = ioSocket.getOutputStream();
ProxyStream.Pumper outPumper = new ProxyStream.Pumper(outErr, stdout, stderr);
InputPumper inPump = new InputPumper(() -> stdin, () -> in, true);
Thread outPumperThread = new Thread(outPumper, "outPump");
outPumperThread.setDaemon(true);
Thread inThread = new Thread(inPump, "inPump");
inThread.setDaemon(true);
outPumperThread.start();
inThread.start();

if (forceFailureForTestingMillisDelay > 0) {
Thread.sleep(forceFailureForTestingMillisDelay);
throw new Exception("Force failure for testing: " + serverDir);
}
outPumperThread.join();

try {
return Integer.parseInt(
Files.readAllLines(Paths.get(serverDir + "/" + ServerFiles.exitCode)).get(0));
} catch (Throwable e) {
return Util.ExitClientCodeCannotReadFromExitCodeFile();
} finally {
ioSocket.close();
}
}
}
15 changes: 13 additions & 2 deletions main/util/src/mill/util/Jvm.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package mill.util

import mill.api.Loose.Agg
import mill.api._
import mill.main.client.ServerFiles
import os.{ProcessOutput, SubProcess}

import java.io._
Expand Down Expand Up @@ -117,8 +118,18 @@ object Jvm extends CoursierSupport {
mainArgs,
workingDir,
if (!background) None
else if (runBackgroundLogToConsole) Some((os.Inherit, os.Inherit))
else Jvm.defaultBackgroundOutputs(ctx.dest),
else if (runBackgroundLogToConsole) {
val pwd0 = os.Path(java.nio.file.Paths.get(".").toAbsolutePath)
// Hack to forward the background subprocess output to the Mill server process
// stdout/stderr files, so the output will get properly slurped up by the Mill server
// and shown to any connected Mill client even if the current command has completed
Some(
(
os.PathAppendRedirect(pwd0 / ".." / ServerFiles.stdout),
os.PathAppendRedirect(pwd0 / ".." / ServerFiles.stderr)
)
)
} else Jvm.defaultBackgroundOutputs(ctx.dest),
useCpPassingJar
)
}
Expand Down
2 changes: 1 addition & 1 deletion mill
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
set -e

if [ -z "${DEFAULT_MILL_VERSION}" ] ; then
DEFAULT_MILL_VERSION=0.11.12
DEFAULT_MILL_VERSION=0.12.2
fi

if [ -z "$MILL_VERSION" ] ; then
Expand Down
3 changes: 0 additions & 3 deletions runner/src/mill/runner/MillBuildBootstrap.scala
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,6 @@ class MillBuildBootstrap(
}

def evaluateRec(depth: Int): RunnerState = {
mill.main.client.DebugLog.println(
"MillBuildBootstrap.evaluateRec " + depth + " " + targetsAndParams.mkString(" ")
)
// println(s"+evaluateRec($depth) " + recRoot(projectRoot, depth))
val prevFrameOpt = prevRunnerState.frames.lift(depth)
val prevOuterFrameOpt = prevRunnerState.frames.lift(depth - 1)
Expand Down
129 changes: 67 additions & 62 deletions runner/src/mill/runner/MillMain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import java.io.{PipedInputStream, PrintStream}
import java.nio.file.Files
import java.nio.file.StandardOpenOption
import java.util.Locale
import scala.jdk.CollectionConverters._
import scala.jdk.CollectionConverters.*
import scala.util.Properties
import mill.java9rtexport.Export
import mill.api.{MillException, SystemStreams, WorkspaceRoot, internal}
Expand Down Expand Up @@ -33,7 +33,7 @@ object MillMain {
err.println(e.getCause.getMessage())
(false, onError)
case NonFatal(e) =>
err.println("An unexpected error occurred")
err.println("An unexpected error occurred " + e)
throw e
(false, onError)
}
Expand Down Expand Up @@ -221,69 +221,74 @@ object MillMain {
while (repeatForBsp) {
repeatForBsp = false

val (isSuccess, evalStateOpt) = Watching.watchLoop(
ringBell = config.ringBell.value,
watch = config.watch.value,
streams = streams,
setIdle = setIdle,
evaluate = (prevState: Option[RunnerState]) => {
adjustJvmProperties(userSpecifiedProperties, initialSystemProperties)

withOutLock(
config.noBuildLock.value || bspContext.isDefined,
config.noWaitForBuildLock.value,
out,
targetsAndParams,
streams
) {
val logger = getLogger(
streams,
config,
mainInteractive,
enableTicker =
config.ticker
.orElse(config.enableTicker)
.orElse(Option.when(config.disableTicker.value)(false)),
printLoggerState,
serverDir,
colored = colored,
colors = colors
)
Using.resource(logger) { _ =>
try new MillBuildBootstrap(
projectRoot = WorkspaceRoot.workspaceRoot,
output = out,
home = config.home,
keepGoing = config.keepGoing.value,
imports = config.imports,
env = env,
threadCount = threadCount,
targetsAndParams = targetsAndParams,
prevRunnerState = prevState.getOrElse(stateCache),
logger = logger,
disableCallgraph = config.disableCallgraph.value,
needBuildFile = needBuildFile(config),
requestedMetaLevel = config.metaLevel,
config.allowPositional.value,
systemExit = systemExit,
streams0 = streams0
).evaluate()
Using.resource(new TailManager(serverDir)) { tailManager =>
val (isSuccess, evalStateOpt) = Watching.watchLoop(
ringBell = config.ringBell.value,
watch = config.watch.value,
streams = streams,
setIdle = setIdle,
evaluate = (prevState: Option[RunnerState]) => {
adjustJvmProperties(userSpecifiedProperties, initialSystemProperties)

withOutLock(
config.noBuildLock.value || bspContext.isDefined,
config.noWaitForBuildLock.value,
out,
targetsAndParams,
streams
) {
Using.resource(getLogger(
streams,
config,
mainInteractive,
enableTicker =
config.ticker
.orElse(config.enableTicker)
.orElse(Option.when(config.disableTicker.value)(false)),
printLoggerState,
serverDir,
colored = colored,
colors = colors
)) { logger =>
SystemStreams.withStreams(logger.systemStreams) {
tailManager.withOutErr(logger.outputStream, logger.errorStream) {
new MillBuildBootstrap(
projectRoot = WorkspaceRoot.workspaceRoot,
output = out,
home = config.home,
keepGoing = config.keepGoing.value,
imports = config.imports,
env = env,
threadCount = threadCount,
targetsAndParams = targetsAndParams,
prevRunnerState = prevState.getOrElse(stateCache),
logger = logger,
disableCallgraph = config.disableCallgraph.value,
needBuildFile = needBuildFile(config),
requestedMetaLevel = config.metaLevel,
config.allowPositional.value,
systemExit = systemExit,
streams0 = streams0
).evaluate()
}
}
}
}
}
},
colors = colors
)
bspContext.foreach { ctx =>
repeatForBsp =
BspContext.bspServerHandle.lastResult == Some(
BspServerResult.ReloadWorkspace
)
streams.err.println(
s"`$bspCmd` returned with ${BspContext.bspServerHandle.lastResult}"
},
colors = colors
)
}
bspContext.foreach { ctx =>
repeatForBsp =
BspContext.bspServerHandle.lastResult == Some(
BspServerResult.ReloadWorkspace
)
streams.err.println(
s"`$bspCmd` returned with ${BspContext.bspServerHandle.lastResult}"
)
}

loopRes = (isSuccess, evalStateOpt)
loopRes = (isSuccess, evalStateOpt)
}
} // while repeatForBsp
bspContext.foreach { ctx =>
streams.err.println(
Expand Down
Loading

0 comments on commit 7457601

Please sign in to comment.