From a372514fb5a92be15f19b38518e9bab0ecd066cf Mon Sep 17 00:00:00 2001 From: Kamil Podsiadlo Date: Wed, 5 Oct 2022 16:25:31 +0200 Subject: [PATCH 1/5] fix classpath hashing * remove previous mechanism which tried to share work between concurrent requests - it was causing too much mental overhead and was error prone * uncomment already existing tests & fix them * remove unused code --- .../main/scala/bloop/io/ClasspathHasher.scala | 303 +++++------------- backend/src/main/scala/bloop/task/Task.scala | 7 +- .../src/test/scala/bloop/task/TaskSpec.scala | 4 +- build.sbt | 3 +- .../tasks/compilation/CompileBundle.scala | 2 +- .../scala/bloop/io/ClasspathHasherSpec.scala | 300 +++++++++++++---- .../test/scala/bloop/testing/BaseSuite.scala | 37 +++ .../src/test/scala/bloop/util/TestUtil.scala | 11 +- project/Dependencies.scala | 3 + 9 files changed, 374 insertions(+), 296 deletions(-) diff --git a/backend/src/main/scala/bloop/io/ClasspathHasher.scala b/backend/src/main/scala/bloop/io/ClasspathHasher.scala index 042420e02d..c416e0631a 100644 --- a/backend/src/main/scala/bloop/io/ClasspathHasher.scala +++ b/backend/src/main/scala/bloop/io/ClasspathHasher.scala @@ -1,54 +1,37 @@ package bloop.io -import java.io.File -import java.io.InputStream import java.io.PrintStream import java.nio.file.Files import java.nio.file.Path import java.nio.file.attribute.BasicFileAttributes import java.nio.file.attribute.FileTime import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.TimeUnit -import java.util.zip.ZipEntry -import scala.collection.mutable import scala.concurrent.Promise +import scala.concurrent.duration._ import scala.util.control.NonFatal import bloop.logging.Logger import bloop.task.Task import bloop.tracing.BraveTracer -import monix.eval.{Task => MonixTask} -import monix.execution.Cancelable -import monix.execution.Scheduler import monix.execution.atomic.AtomicBoolean -import monix.reactive.Consumer -import monix.reactive.Observable import sbt.internal.inc.bloop.internal.BloopStamps import sbt.io.IO import xsbti.compile.FileHash +import bloop.logging.DebugFilter + +private final case class JarMetadata( + modifiedTime: FileTime, + size: Long +) object ClasspathHasher { - // For more safety, store both the time and size - private type JarMetadata = (FileTime, Long) - private[this] val hashingPromises = new ConcurrentHashMap[Path, Promise[FileHash]]() private[this] val cacheMetadataJar = new ConcurrentHashMap[Path, (JarMetadata, FileHash)]() /** - * Hash the classpath in parallel with Monix's task. - * - * The hashing works in two steps: first, we try to acquire the hash of a - * given entry. This "negotiation" step is required because we may be hashing - * other project's classpath concurrently and we want to minimize stalling - * and make as much progress as we can hashing. Those entries whose hashing - * couldn't be "acquired" are left to the second step, which blocks until the - * ongoing hashing finishes. - * - * This approach allows us to control how many concurrent tasks we spawn to - * new threads (and, therefore, how many threads we create in the io pool) - * and, at the same time, allows us to do as much progress without blocking. + * Hash the classpath in parallel * * NOTE: When the task returned by this method is cancelled, the promise * `cancelCompilation` will be completed and the returned value will be @@ -66,219 +49,103 @@ object ClasspathHasher { classpath: Array[AbsolutePath], parallelUnits: Int, cancelCompilation: Promise[Unit], - scheduler: Scheduler, logger: Logger, tracer: BraveTracer, serverOut: PrintStream ): Task[Either[Unit, Vector[FileHash]]] = { - val timeoutSeconds: Long = 20L - // We'll add the file hashes to the indices here and return it at the end - val classpathHashes = new Array[FileHash](classpath.length) - case class AcquiredTask(file: Path, idx: Int, p: Promise[FileHash]) - - val isCancelled = AtomicBoolean(false) - val parallelConsumer = { - Consumer.foreachParallelTask[AcquiredTask](parallelUnits) { - case AcquiredTask(path, idx, p) => - // Use task.now because Monix's load balancer already forces an async boundary - val hashingTask = MonixTask.now { - val hash = - try { - if (cancelCompilation.isCompleted) { - BloopStamps.cancelledHash(path) - } else if (isCancelled.get) { - cancelCompilation.trySuccess(()) - BloopStamps.cancelledHash(path) - } else { - val attrs = Files.readAttributes(path, classOf[BasicFileAttributes]) - if (attrs.isDirectory) BloopStamps.directoryHash(path) - else { - val currentMetadata = - (FileTime.fromMillis(IO.getModifiedTimeOrZero(path.toFile)), attrs.size()) - Option(cacheMetadataJar.get(path)) match { - case Some((metadata, hashHit)) if metadata == currentMetadata => hashHit - case _ => - tracer.traceVerbose(s"computing hash ${path.toAbsolutePath.toString}") { - _ => - val newHash = - FileHash.of(path, ByteHasher.hashFileContents(path.toFile)) - cacheMetadataJar.put(path, (currentMetadata, newHash)) - newHash - } - } - } - } - } catch { - // Can happen when a file doesn't exist, for example - case NonFatal(_) => BloopStamps.emptyHash(path) - } - classpathHashes(idx) = hash - hashingPromises.remove(path, p) - p.trySuccess(hash) - () - } + val timeoutSeconds: Long = 2L - /* - * As a protective measure, set up a task that will be run after 15s - * of hashing and will complete the downstream promise to unlock - * downstream clients on the assumption that the hashing of this - * entry is too slow because of something that happened to this - * process. The completion of the downstream promise will also log a - * warning to the downstream users so that they know that a hashing - * process is unusually slow. - */ - val timeoutCancellation = scheduler.scheduleOnce( - timeoutSeconds, - TimeUnit.SECONDS, - new Runnable { - def run(): Unit = { - val hash = BloopStamps.cancelledHash(path) - // Complete if hashing for this entry hasn't finished in 15s, otherwise ignore - hashingPromises.remove(path, p) - if (p.trySuccess(hash)) { - val msg = - s"Hashing ${path} is taking more than ${timeoutSeconds}s, detaching downstream clients to unblock them..." - try { - logger.warn(msg) - serverOut.println(msg) - } catch { case _: Throwable => () } - } - () - } - } - ) - - hashingTask - .doOnCancel(MonixTask(timeoutCancellation.cancel())) - .doOnFinish(_ => MonixTask(timeoutCancellation.cancel())) - } - } + implicit val debugFilter: DebugFilter = DebugFilter.Compilation - tracer.traceTaskVerbose("computing hashes") { _ => - val acquiredByOtherTasks = new mutable.ListBuffer[Task[Unit]]() - val acquiredByThisHashingProcess = new mutable.ListBuffer[AcquiredTask]() + val isCancelled = AtomicBoolean(false) + def makeHashingTask(path: Path): Task[FileHash] = { + val isHashingComplete = AtomicBoolean(false) - def acquireHashingEntry(entry: Path, entryIdx: Int): Unit = { - if (isCancelled.get) () + def hashFile(): FileHash = { + val attrs = Files.readAttributes(path, classOf[BasicFileAttributes]) + if (attrs.isDirectory) BloopStamps.directoryHash(path) else { - val entryPromise = Promise[FileHash]() - val promise = hashingPromises.putIfAbsent(entry, entryPromise) - if (promise == null) { // The hashing is done by this process - acquiredByThisHashingProcess.+=(AcquiredTask(entry, entryIdx, entryPromise)) - } else { // The hashing is acquired by another process, wait on its result - acquiredByOtherTasks.+=( - Task.fromFuture(promise.future).flatMap { hash => - if (hash == BloopStamps.cancelledHash) { - if (cancelCompilation.isCompleted) Task.now(()) - else { - // If the process that acquired it cancels the computation, try acquiring it again - logger - .warn(s"Unexpected hash computation of $entry was cancelled, restarting...") - Task.eval(acquireHashingEntry(entry, entryIdx)).asyncBoundary - } - } else { - Task.now { - // Save the result hash in its index - classpathHashes(entryIdx) = hash - () - } - } + val currentMetadata = + JarMetadata(FileTime.fromMillis(IO.getModifiedTimeOrZero(path.toFile)), attrs.size()) + val file = path.toAbsolutePath.toString + Option(cacheMetadataJar.get(path)) match { + case Some((metadata, hashHit)) if metadata == currentMetadata => + logger.debug(s"Using cached hash for $file") + hashHit + case other => + if (other.isDefined) { + logger.debug( + s"Cached entry for $file has different metadata, hash will be recomputed" + ) + } + tracer.traceVerbose(s"computing hash $file") { _ => + val newHash = + FileHash.of(path, ByteHasher.hashFileContents(path.toFile)) + cacheMetadataJar.put(path, (currentMetadata, newHash)) + newHash } - ) } } } - val initEntries = Task { - classpath.zipWithIndex.foreach { - case (absoluteEntry, idx) => - acquireHashingEntry(absoluteEntry.underlying, idx) - } - }.doOnCancel(Task { isCancelled.compareAndSet(false, true); () }) - - // Let's first turn the obtained hash tasks into an observable, don't allow cancellation - val acquiredTask = Observable.fromIterable(acquiredByThisHashingProcess) - - val cancelableAcquiredTask = Task.create[Unit] { (scheduler, cb) => - val (out, _) = parallelConsumer.createSubscriber(cb, scheduler) - val _ = acquiredTask.subscribe(out) - Cancelable { () => - isCancelled.compareAndSet(false, true); () + def hash(path: Path): Task[FileHash] = Task { + try { + if (cancelCompilation.isCompleted) { + BloopStamps.cancelledHash(path) + } else if (isCancelled.get) { + cancelCompilation.trySuccess(()) + BloopStamps.cancelledHash(path) + } else { + val hash = hashFile() + isHashingComplete.set(true) + hash + } + } catch { + // Can happen when a file doesn't exist, for example + case NonFatal(_) => BloopStamps.emptyHash(path) } } - initEntries.flatMap { _ => - cancelableAcquiredTask - .doOnCancel(Task { isCancelled.compareAndSet(false, true); () }) - .flatMap { _ => - if (isCancelled.get || cancelCompilation.isCompleted) { - cancelCompilation.trySuccess(()) - Task.now(Left(())) - } else { - Task.sequence(acquiredByOtherTasks.toList).map { _ => - val hasCancelledHash = classpathHashes.exists(_.hash() == BloopStamps.cancelledHash) - if (hasCancelledHash || isCancelled.get || cancelCompilation.isCompleted) { - cancelCompilation.trySuccess(()) - Left(()) - } else { - Right(classpathHashes.toVector) - } - } - } - } + /* + * As a protective measure, set up a timeout for hashing which complete hashing with empty value to unlock + * downstream clients on the assumption that the hashing of this + * entry is too slow because of something that happened to this + * process. The completion of the downstream promise will also log a + * warning to the downstream users so that they know that a hashing + * process is unusually slow. + * + * There is no need for specyfying `doOnCancel` callback, this task won't start before timeout (see timeoutTo implementation) + */ + val timeoutFallback: Task[FileHash] = Task { + val msg = + s"Hashing ${path} is taking more than ${timeoutSeconds}s" + try { + pprint.log(msg) + logger.warn(msg) + serverOut.println(msg) + } catch { case NonFatal(_) => () } + BloopStamps.emptyHash(path) } - } - } - private[this] val definedMacrosJarCache = new ConcurrentHashMap[File, (JarMetadata, Boolean)]() - - private val blackboxReference = "scala/reflect/macros/blackbox/Context".getBytes - private val whiteboxReference = "scala/reflect/macros/whitebox/Context".getBytes - def containsMacroDefinition(classpath: Seq[File]): Task[Seq[(File, Boolean)]] = { - import org.zeroturnaround.zip.commons.IOUtils - import org.zeroturnaround.zip.{ZipEntryCallback, ZipUtil} - def readJar(jar: File): Task[(File, Boolean)] = Task { - if (!jar.exists()) sys.error(s"File ${jar} doesn't exist") - else { - def detectMacro(jar: File): Boolean = { - var found: Boolean = false - ZipUtil.iterate( - jar, - new ZipEntryCallback { - override def process(in: InputStream, zipEntry: ZipEntry): Unit = { - if (found) () - else if (zipEntry.isDirectory) () - else if (!zipEntry.getName.endsWith(".class")) () - else { - try { - val bytes = IOUtils.toByteArray(in) - found = { - bytes.containsSlice(blackboxReference) || - bytes.containsSlice(whiteboxReference) - } - } catch { - case t: Throwable => println(s"Error in ${t}") - } - } - } - } - ) - found - } - - val attrs = Files.readAttributes(jar.toPath, classOf[BasicFileAttributes]) - val currentMetadata = (FileTime.fromMillis(IO.getModifiedTimeOrZero(jar)), attrs.size()) + hash(path).timeoutTo( + duration = timeoutSeconds.seconds, + backup = timeoutFallback + ) + } - Option(definedMacrosJarCache.get(jar)) match { - case Some((metadata, hit)) if metadata == currentMetadata => jar -> hit - case _ => - val detected = detectMacro(jar) - definedMacrosJarCache.put(jar, (currentMetadata, detected)) - jar -> detected + val onCancel = Task(isCancelled.compareAndSet(false, true)).void + + val tasks = classpath.toVector.map(path => makeHashingTask(path.underlying)) + Task + .parSequenceN(parallelUnits)(tasks) + .map { result => + if (isCancelled.get || cancelCompilation.isCompleted) { + cancelCompilation.trySuccess(()) + Left(()) + } else { + Right(result.toVector) } } - } - - Task.gatherUnordered(classpath.map(readJar(_))) + .doOnCancel(onCancel) } } diff --git a/backend/src/main/scala/bloop/task/Task.scala b/backend/src/main/scala/bloop/task/Task.scala index e1d37b4d5f..9137bdd102 100644 --- a/backend/src/main/scala/bloop/task/Task.scala +++ b/backend/src/main/scala/bloop/task/Task.scala @@ -160,6 +160,8 @@ sealed trait Task[+A] { self => def as[B](b: => B): Task[B] = self.map(_ => b) + @inline def void(): Task[Unit] = as(()) + def timeoutTo[B >: A](duration: FiniteDuration, backup: Task[B]): Task[B] = { Task .chooseFirstOf( @@ -168,9 +170,10 @@ sealed trait Task[+A] { self => ) .flatMap { case Left((a, _)) => + // there no need to cancel fb - it's just sleeping Task.now(a) - case Right((a, _)) => - a.cancel() + case Right((fa, _)) => + fa.cancel() backup } } diff --git a/backend/src/test/scala/bloop/task/TaskSpec.scala b/backend/src/test/scala/bloop/task/TaskSpec.scala index ceb9d687f4..4e001b452d 100644 --- a/backend/src/test/scala/bloop/task/TaskSpec.scala +++ b/backend/src/test/scala/bloop/task/TaskSpec.scala @@ -180,7 +180,6 @@ class TaskSpec { val (one, two) = Await.result(future, 1.second) assertEquals(one, two) - } @Test @@ -190,7 +189,8 @@ class TaskSpec { val t2 = Task(ref.set(false)) val withTimeout = t1.timeoutTo(1.second, t2) - Await.result((withTimeout *> Task.sleep(2.seconds)).runAsync, 3.second) + // await just a bit longer than timeout value to see if t2 was cancelled (not executed) + Await.result((withTimeout *> Task.sleep(1.seconds)).runAsync, 3.second) assertEquals(true, ref.get()) } diff --git a/build.sbt b/build.sbt index 8cb2dfd056..49b3289186 100644 --- a/build.sbt +++ b/build.sbt @@ -159,7 +159,8 @@ lazy val frontend: Project = project Dependencies.caseApp, Dependencies.scalaDebugAdapter, Dependencies.bloopConfig, - Dependencies.logback + Dependencies.logback, + Dependencies.oslib % Test, ), // needed for tests and to be automatically updated Test / libraryDependencies += Dependencies.semanticdb intransitive (), diff --git a/frontend/src/main/scala/bloop/engine/tasks/compilation/CompileBundle.scala b/frontend/src/main/scala/bloop/engine/tasks/compilation/CompileBundle.scala index 89e695c540..76c296bf39 100644 --- a/frontend/src/main/scala/bloop/engine/tasks/compilation/CompileBundle.scala +++ b/frontend/src/main/scala/bloop/engine/tasks/compilation/CompileBundle.scala @@ -179,7 +179,7 @@ object CompileBundle { import compileDependenciesData.dependencyClasspath val out = options.ngout val classpathHashesTask = bloop.io.ClasspathHasher - .hash(dependencyClasspath, 10, cancelCompilation, ioScheduler, logger, tracer, out) + .hash(dependencyClasspath, 10, cancelCompilation, logger, tracer, out) .executeOn(ioScheduler) val sourceHashesTask = tracer.traceTaskVerbose("discovering and hashing sources") { _ => diff --git a/frontend/src/test/scala/bloop/io/ClasspathHasherSpec.scala b/frontend/src/test/scala/bloop/io/ClasspathHasherSpec.scala index c00a7327bd..841fa3403f 100644 --- a/frontend/src/test/scala/bloop/io/ClasspathHasherSpec.scala +++ b/frontend/src/test/scala/bloop/io/ClasspathHasherSpec.scala @@ -1,12 +1,10 @@ package bloop.io -import java.util.concurrent.TimeUnit - -import scala.concurrent.Await import scala.concurrent.Promise -import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration._ import bloop.DependencyResolution +import bloop.logging.Logger import bloop.logging.RecordingLogger import bloop.task.Task import bloop.tracing.BraveTracer @@ -14,83 +12,251 @@ import bloop.tracing.TraceProperties import bloop.util.TestUtil import sbt.internal.inc.bloop.internal.BloopStamps +import java.nio.file.Files +import java.nio.file.attribute.BasicFileAttributes +import sbt.io.IO object ClasspathHasherSpec extends bloop.testing.BaseSuite { - ignore("cancellation works OK") { - import bloop.engine.ExecutionContext.ioScheduler - val logger = new RecordingLogger() + val testTimeout = 10.seconds + + val monix = DependencyResolution.Artifact("io.monix", "monix_2.13", "3.4.0") + val spark = DependencyResolution.Artifact("org.apache.spark", "spark-core_2.13", "3.3.0") + val hadoop = DependencyResolution.Artifact("org.apache.hadoop", "hadoop-common", "3.3.4") + + // arbitrary jar was picked to be test if hashing returns consistent results across test runs + val monixJar = "monix_2.13-3.4.0.jar" + val expectedHash = 581779648 + + testAsyncT("hash deps", testTimeout) { val cancelPromise = Promise[Unit]() - val cancelPromise2 = Promise[Unit]() - val tracer = BraveTracer("cancels-correctly-test", TraceProperties.default) - val jars = - Array( - DependencyResolution.Artifact("org.apache.spark", "spark-core_2.11", "2.4.4"), - DependencyResolution.Artifact("org.apache.hadoop", "hadoop-main", "3.2.1"), - DependencyResolution.Artifact("io.monix", "monix_2.12", "3.0.0") - ).flatMap(a => - // Force independent resolution for every artifact - DependencyResolution.resolve(List(a), logger) - ) + val logger = new RecordingLogger() + val tracer = BraveTracer("hashes-correctly", TraceProperties.default) + val jars = resolveArtifacts(logger, monix) + val hashClasspathTask = - ClasspathHasher.hash(jars, 2, cancelPromise, ioScheduler, logger, tracer, System.out) - val competingHashClasspathTask = - ClasspathHasher.hash(jars, 2, cancelPromise2, ioScheduler, logger, tracer, System.out) - val running = hashClasspathTask.runAsync(ioScheduler) - - Thread.sleep(10) - val running2 = competingHashClasspathTask.runAsync(ioScheduler) - - Thread.sleep(5) - running.cancel() - - val result = Await.result(running, FiniteDuration(20, "s")) - TestUtil.await(FiniteDuration(1, "s"), ioScheduler)(Task.fromFuture(cancelPromise.future)) - assert(!cancelPromise2.isCompleted, result.isLeft) - - // Cancelling the first result doesn't affect the results of the second - val competingResult = Await.result(running2, FiniteDuration(20, "s")) - assert( - competingResult.isRight, - competingResult.forall(s => s != BloopStamps.cancelledHash), - !cancelPromise2.isCompleted - ) + ClasspathHasher.hash( + classpath = jars, + parallelUnits = 2, + cancelCompilation = cancelPromise, + logger = logger, + tracer = tracer, + serverOut = System.out + ) + + val task = for { + result <- hashClasspathTask + } yield { + val fileHashes = result.orFail(_ => "Obtained empty result from hashing") + assertEquals( + obtained = cancelPromise.isCompleted, + expected = false, + "Cancel promise shouldn't be completed if hashing wasn't cancelled" + ) + assertEquals( + obtained = fileHashes.forall(hash => + hash != BloopStamps.cancelledHash && hash != BloopStamps.emptyHash(hash.file) + ), + expected = true, + hint = s"All hashes should be computed correctly, but found cancelled hash in $fileHashes" + ) + fileHashes.find(_.file.toString.contains(monixJar)) match { + case None => fail(s"There is no $monixJar among hashed jars, although it should be") + case Some(fileHash) => + assertEquals( + obtained = fileHash.hash, + expected = expectedHash + ) + } + } + + task *> Task.sleep(5.second) } - ignore("detect macros in classpath") { + testAsyncT("results are cached", testTimeout) { + val cancelPromise = Promise[Unit]() val logger = new RecordingLogger() - val jars = DependencyResolution - .resolve( - List(DependencyResolution.Artifact("ch.epfl.scala", "zinc_2.12", "1.2.1+97-636ca091")), - logger + val tracer = BraveTracer("hashes-correctly", TraceProperties.default) + val jars = resolveArtifacts(logger, monix) + + val hashClasspathTask = + ClasspathHasher.hash( + classpath = jars, + parallelUnits = 2, + cancelCompilation = cancelPromise, + logger = logger, + tracer = tracer, + serverOut = System.out ) - .filter(_.syntax.endsWith(".jar")) - - Timer.timed(logger) { - val duration = FiniteDuration(7, TimeUnit.SECONDS) - TestUtil.await(duration) { - ClasspathHasher.containsMacroDefinition(jars.map(_.toFile).toSeq).map { jarsCount => - jarsCount.foreach { - case (jar, detected) => - if (detected) - println(s"Detect macros in jar ${jar.getName}") - } - } + + for { + _ <- hashClasspathTask + cachedResult <- hashClasspathTask + } yield { + val fileHashes = cachedResult.orFail(_ => "Obtained empty result from hashing") + assertEquals( + obtained = fileHashes.forall(_ != BloopStamps.cancelledHash), + expected = true, + hint = s"All hashes should be computed correctly, but found cancelled hash in $fileHashes" + ) + + val debugOutput = logger.debugs.toSet + + val nonCached = jars.toList.filter { path => + !debugOutput.contains(s"Using cached hash for $path") + } + + if (nonCached.nonEmpty) { + pprint.log(debugOutput) + fail( + s"Hashing should used cached results for when computing hashes, but $nonCached were computed" + ) + } + + fileHashes.find(_.file.toString.contains(monixJar)) match { + case None => fail(s"There is no $monixJar among hashed jars, although it should be") + case Some(fileHash) => + assertEquals( + obtained = fileHash.hash, + expected = expectedHash + ) } } + } + + testAsyncT("hash is recalculated if file's metadata changed", testTimeout) { + TestUtil.withinWorkspaceT { workspace => + val cancelPromise = Promise[Unit]() + val logger = new RecordingLogger() + val tracer = BraveTracer("hashes-correctly", TraceProperties.default) - Timer.timed(logger) { - val duration = FiniteDuration(7, TimeUnit.SECONDS) - TestUtil.await(duration) { - ClasspathHasher.containsMacroDefinition(jars.map(_.toFile).toSeq).map { jarsCount => - jarsCount.foreach { - case (jar, detected) => - if (detected) - println(s"Detect macros in jar ${jar.getName}") - } + val file = os.Path(workspace.underlying) / "a.txt" + + val writeFileContent = Task { + os.write.over( + target = file, + data = "This is very huge Scala jar :)" + ) + } + + val filesToHash = Array(AbsolutePath(file.toNIO)) + + val hashClasspathTask = + ClasspathHasher.hash( + classpath = filesToHash, + parallelUnits = 2, + cancelCompilation = cancelPromise, + logger = logger, + tracer = tracer, + serverOut = System.out + ) + + val readAttributes = Task(Files.readAttributes(file.toNIO, classOf[BasicFileAttributes])) + val lastModifiedTime = Task(IO.getModifiedTimeOrZero(file.toIO)) + for { + _ <- writeFileContent + attributes <- readAttributes + time <- lastModifiedTime + _ <- hashClasspathTask + // make sure that at least one milli has ticked + _ <- Task.sleep(1.milli) + // write file's content once again to change lastModifiedTime + _ <- writeFileContent + newAttributes <- readAttributes + newTime <- lastModifiedTime + _ = assertNotEquals( + attributes.lastModifiedTime().toMillis, + newAttributes.lastModifiedTime().toMillis, + "Modified change should change" + ) + _ = assertNotEquals( + time, + newTime, + "Modified change should change" + ) + cachedResult <- hashClasspathTask + } yield { + val fileHashes = cachedResult.orFail(_ => "Obtained empty result from hashing") + assertEquals( + obtained = fileHashes.forall(_ != BloopStamps.cancelledHash), + expected = true, + hint = s"All hashes should be computed correctly, but found cancelled hash in $fileHashes" + ) + + val debugOutput = logger.debugs + + val cached = filesToHash.toList.filterNot { path => + debugOutput.exists(_.contains(s"$path has different metadata")) + } + + cached match { + case Nil => () + case nonempty => + fail( + s"Hashing should recompute hashes if file's metadata has changed, but $nonempty were taken from cache" + ) } } } + } + + testAsyncT("cancellation", 10.second) { + import bloop.engine.ExecutionContext.ioScheduler + + val logger = new RecordingLogger() + val cancelPromise = Promise[Unit]() + val tracer = BraveTracer("cancels-correctly-test", TraceProperties.default) + val jars = resolveArtifacts(logger, monix, spark, hadoop) - logger.dump() + val hashClasspathTask = + ClasspathHasher.hash( + jars, + 2, + cancelPromise, + logger, + tracer, + System.out + ) + + // start hashing, wait small amount of time and then cancel task + val cancelRunningTask = Task.defer { + for { + running <- Task(hashClasspathTask.runAsync) + _ <- Task.sleep(10.millis) + _ <- Task(running.cancel()) + result <- Task.fromFuture(running) + } yield result + } + + for { + result <- cancelRunningTask + } yield { + assertEquals( + obtained = cancelPromise.isCompleted, + expected = true, + hint = "Cancelation promise should be completed" + ) + + result match { + case Left(_) => () + case Right(hashes) => fail(s"Cancelled hash task should return empty result, got $hashes") + } + } + } + + private def resolveArtifacts( + logger: Logger, + deps: DependencyResolution.Artifact* + ): Array[AbsolutePath] = { + deps.toArray.flatMap(a => + // Force independent resolution for every artifact + DependencyResolution.resolve(List(a), logger) + ) + } + + private implicit class EitherSyntax[A, B](private val either: Either[A, B]) extends AnyVal { + def orFail(msg: A => String): B = either match { + case Left(value) => fail(msg(value)) + case Right(value) => value + } } } diff --git a/frontend/src/test/scala/bloop/testing/BaseSuite.scala b/frontend/src/test/scala/bloop/testing/BaseSuite.scala index 3bb9aa6414..3f102c7c29 100644 --- a/frontend/src/test/scala/bloop/testing/BaseSuite.scala +++ b/frontend/src/test/scala/bloop/testing/BaseSuite.scala @@ -533,6 +533,16 @@ abstract class BaseSuite extends TestSuite with BloopHelpers { ) } + @nowarn("msg=parameter value run|maxDuration in method ignore is never used") + def ignore(name: String, maxDuration: Duration)( + run: => Task[Unit] + ): Unit = { + myTests += FlatTest( + utest.ufansi.Color.LightRed(s"IGNORED - $name").toString(), + () => () + ) + } + def testOnlyOnJava8(name: String)(fun: => Any): Unit = { if (TestUtil.isJdk8) test(name)(fun) else ignore(name, label = s"IGNORED ON JAVA v${TestUtil.jdkVersion}")(fun) @@ -571,6 +581,33 @@ abstract class BaseSuite extends TestSuite with BloopHelpers { ) } + def testAsync(name: String, maxDuration: Duration = Duration("10s"))( + run: => Unit + ): Unit = { + test(name) { + Await.result(Task { run }.runAsync(ExecutionContext.scheduler), maxDuration) + } + } + + def testAsyncT(name: String, maxDuration: Duration)( + run: => Task[Unit] + ): Unit = { + test(name) { + Await.result(run.runAsync(ExecutionContext.scheduler), maxDuration) + } + } + + /* + def testAsync(name: String, maxDuration: Duration = Duration("10min"))( + run: => Future[Unit] + ): Unit = { + test(name) { + val fut = run + Await.result(fut, maxDuration) + } + } + */ + def fail(msg: String, stackBump: Int = 0): Nothing = { val ex = new DiffAssertions.TestFailedException(msg) ex.setStackTrace(ex.getStackTrace.slice(1 + stackBump, 5 + stackBump)) diff --git a/frontend/src/test/scala/bloop/util/TestUtil.scala b/frontend/src/test/scala/bloop/util/TestUtil.scala index 4d54475290..f1d45d07ae 100644 --- a/frontend/src/test/scala/bloop/util/TestUtil.scala +++ b/frontend/src/test/scala/bloop/util/TestUtil.scala @@ -480,11 +480,12 @@ object TestUtil { finally delete(AbsolutePath(temp)) } - /** Creates an empty workspace where operations can happen. */ - def withinWorkspace[T](op: AbsolutePath => Task[T]): Task[T] = { - val temp = Files.createTempDirectory("bloop-test-workspace").toRealPath() - op(AbsolutePath(temp)).doOnFinish(_ => Task(delete(AbsolutePath(temp)))) - } + /** Creates an empty workspace where operations described as Tasks can happen. */ + def withinWorkspaceT[T](fun: AbsolutePath => Task[T]): Task[T] = for { + temp <- Task(Files.createTempDirectory("bloop-test-workspace").toRealPath()) + tempDirectory = AbsolutePath(temp) + result <- fun(tempDirectory).doOnFinish(_ => Task(delete(tempDirectory))) + } yield result def withTemporaryFile[T](op: Path => T): T = { val temp = Files.createTempFile("tmp", "") diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 0ec62e46e3..c5556f64e3 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -73,6 +73,9 @@ object Dependencies { val utest = "com.lihaoyi" %% "utest" % "0.8.2" val pprint = "com.lihaoyi" %% "pprint" % "0.8.1" + val oslib = "com.lihaoyi" %% "os-lib" % "0.8.1" + + val scalacheck = "org.scalacheck" %% "scalacheck" % "1.13.4" val junit = "com.github.sbt" % "junit-interface" % junitVersion val directoryWatcher = "ch.epfl.scala" % "directory-watcher" % directoryWatcherVersion val difflib = "com.googlecode.java-diff-utils" % "diffutils" % difflibVersion From 5798261ca7b5e19ce57f8f65a8cd35c979b2bba2 Mon Sep 17 00:00:00 2001 From: Kamil Podsiadlo Date: Fri, 14 Oct 2022 19:16:33 +0200 Subject: [PATCH 2/5] bring back workload sharing --- .../main/scala/bloop/io/ClasspathHasher.scala | 161 +++++++++++---- backend/src/main/scala/bloop/task/Task.scala | 2 +- .../bloop/engine/caches/ResultsCache.scala | 6 +- .../tasks/compilation/CompileBundle.scala | 2 +- .../test/scala/bloop/bsp/BspBaseSuite.scala | 1 + .../scala/bloop/io/ClasspathHasherSpec.scala | 190 +++++++++++++++--- 6 files changed, 293 insertions(+), 69 deletions(-) diff --git a/backend/src/main/scala/bloop/io/ClasspathHasher.scala b/backend/src/main/scala/bloop/io/ClasspathHasher.scala index c416e0631a..05a76569eb 100644 --- a/backend/src/main/scala/bloop/io/ClasspathHasher.scala +++ b/backend/src/main/scala/bloop/io/ClasspathHasher.scala @@ -11,6 +11,7 @@ import scala.concurrent.Promise import scala.concurrent.duration._ import scala.util.control.NonFatal +import bloop.logging.DebugFilter import bloop.logging.Logger import bloop.task.Task import bloop.tracing.BraveTracer @@ -19,31 +20,34 @@ import monix.execution.atomic.AtomicBoolean import sbt.internal.inc.bloop.internal.BloopStamps import sbt.io.IO import xsbti.compile.FileHash -import bloop.logging.DebugFilter -private final case class JarMetadata( - modifiedTime: FileTime, - size: Long -) - -object ClasspathHasher { +class ClasspathHasher { + import ClasspathHasher._ + private[this] val hashingPromises = new ConcurrentHashMap[Path, Promise[FileHash]]() private[this] val cacheMetadataJar = new ConcurrentHashMap[Path, (JarMetadata, FileHash)]() /** * Hash the classpath in parallel * - * NOTE: When the task returned by this method is cancelled, the promise - * `cancelCompilation` will be completed and the returned value will be - * empty. The call-site needs to handle the case where cancellation happens. + * NOTE: When the task returned by this method is cancelled, the promise `cancelCompilation` will + * be completed and the returned value will be empty. The call-site needs to handle the case where + * cancellation happens. * - * @param classpath The list of files to be hashed (if they exist). - * @param parallelUnits The amount of classpath entries we can hash at once. - * @param cancelCompilation A promise that will be completed if task is cancelled. - * @param scheduler The scheduler that should be used for internal Monix usage. - * @param logger The logger where every action will be logged. - * @param tracer A tracer to keep track of timings in Zipkin. - * @return A task returning an error if the task was cancelled or a complete list of hashes. + * @param classpath + * The list of files to be hashed (if they exist). + * @param parallelUnits + * The amount of classpath entries we can hash at once. + * @param cancelCompilation + * A promise that will be completed if task is cancelled. + * @param scheduler + * The scheduler that should be used for internal Monix usage. + * @param logger + * The logger where every action will be logged. + * @param tracer + * A tracer to keep track of timings in Zipkin. + * @return + * A task returning an error if the task was cancelled or a complete list of hashes. */ def hash( classpath: Array[AbsolutePath], @@ -53,13 +57,12 @@ object ClasspathHasher { tracer: BraveTracer, serverOut: PrintStream ): Task[Either[Unit, Vector[FileHash]]] = { - val timeoutSeconds: Long = 2L + val timeoutSeconds: Long = 10L implicit val debugFilter: DebugFilter = DebugFilter.Compilation val isCancelled = AtomicBoolean(false) - def makeHashingTask(path: Path): Task[FileHash] = { - val isHashingComplete = AtomicBoolean(false) + def makeHashingTask(path: Path, promise: Promise[FileHash]): Task[FileHash] = { def hashFile(): FileHash = { val attrs = Files.readAttributes(path, classOf[BasicFileAttributes]) @@ -77,8 +80,10 @@ object ClasspathHasher { logger.debug( s"Cached entry for $file has different metadata, hash will be recomputed" ) + } else { + logger.debug(s"Cache miss for $file, hash will be computed") } - tracer.traceVerbose(s"computing hash $file") { _ => + tracer.traceVerbose(s"computing hash for $file") { _ => val newHash = FileHash.of(path, ByteHasher.hashFileContents(path.toFile)) cacheMetadataJar.put(path, (currentMetadata, newHash)) @@ -89,21 +94,21 @@ object ClasspathHasher { } def hash(path: Path): Task[FileHash] = Task { - try { - if (cancelCompilation.isCompleted) { - BloopStamps.cancelledHash(path) - } else if (isCancelled.get) { - cancelCompilation.trySuccess(()) - BloopStamps.cancelledHash(path) - } else { - val hash = hashFile() - isHashingComplete.set(true) - hash + val hash = + try { + if (isCancelled.get) { + cancelCompilation.trySuccess(()) + BloopStamps.cancelledHash(path) + } else { + hashFile() + } + } catch { + // Can happen when a file doesn't exist, for example + case NonFatal(_) => BloopStamps.emptyHash(path) } - } catch { - // Can happen when a file doesn't exist, for example - case NonFatal(_) => BloopStamps.emptyHash(path) - } + hashingPromises.remove(path, promise) + promise.trySuccess(hash) + hash } /* @@ -117,14 +122,16 @@ object ClasspathHasher { * There is no need for specyfying `doOnCancel` callback, this task won't start before timeout (see timeoutTo implementation) */ val timeoutFallback: Task[FileHash] = Task { + val cancelledHash = BloopStamps.emptyHash(path) val msg = s"Hashing ${path} is taking more than ${timeoutSeconds}s" try { - pprint.log(msg) + hashingPromises.remove(path, promise) + promise.trySuccess(cancelledHash) logger.warn(msg) serverOut.println(msg) } catch { case NonFatal(_) => () } - BloopStamps.emptyHash(path) + cancelledHash } hash(path).timeoutTo( @@ -133,12 +140,60 @@ object ClasspathHasher { ) } - val onCancel = Task(isCancelled.compareAndSet(false, true)).void + def acquireHashingEntry(entry: Path): FileHashingResult = { + if (isCancelled.get) Computed(Task.now(FileHash.of(entry, BloopStamps.cancelledHash))) + else { + val entryPromise = Promise[FileHash]() + val promise = Option(hashingPromises.putIfAbsent(entry, entryPromise)) + + promise match { + // The hashing is done by this process, compute hash and complete the promise + case None => + Computed(makeHashingTask(entry, entryPromise)) + // The hashing is acquired by another process, wait on its result + case Some(promise) => + logger.debug(s"Wait for hashing of $entry to complete") + FromPromise(waitForAnotherTask(entry, promise)) + } + } + } + + def waitForAnotherTask(entry: Path, promise: Promise[FileHash]): Task[FileHash] = + Task.fromFuture(promise.future).flatMap { hash => + hash.hash() match { + case BloopStamps.cancelledHash => + if (cancelCompilation.isCompleted) + Task.now(FileHash.of(entry, BloopStamps.cancelledHash)) + else { + // If the process that acquired it cancels the computation, try acquiring it again + logger + .warn(s"Unexpected hash computation of $entry was cancelled, restarting...") + acquireHashingEntry(entry).task + } + case _ => + Task.now(hash) + } + } + + val onCancel = Task(isCancelled.compareAndSet(false, true)).unit - val tasks = classpath.toVector.map(path => makeHashingTask(path.underlying)) - Task - .parSequenceN(parallelUnits)(tasks) + val (computing, fromAnotherTask) = + classpath.toVector.map(path => acquireHashingEntry(path.underlying)).partition { + case _: Computed => true + case _: FromPromise => false + } + + // first compute all the entries that are computed by this run + // then start waiting for entries that are computed by another tasks + val hashes = for { + computed <- Task.parSequenceN(parallelUnits)(computing.map(_.task).toList) + fromPromises <- Task.parSequenceN(parallelUnits)(fromAnotherTask.map(_.task).toList) + } yield computed ++ fromPromises + + hashes .map { result => + pprint.log(this.cacheMetadataJar.size()) + pprint.log(this.hashingPromises.size()) if (isCancelled.get || cancelCompilation.isCompleted) { cancelCompilation.trySuccess(()) Left(()) @@ -149,3 +204,27 @@ object ClasspathHasher { .doOnCancel(onCancel) } } + +object ClasspathHasher { + final val global: ClasspathHasher = new ClasspathHasher + + private final case class JarMetadata( + modifiedTime: FileTime, + size: Long + ) + + /** + * File hash can be computed in 2 ways: + * 1) `Computed` - The file content is going to be hashed + * 2) `FromPromise` - The hash result is going to be obtained from the promise (another running + * session of hashing is computing it) + * + * It's important to distinguish between them, because `Computed` case should be treated will + * higher priority as it needs CPU and `FromPromise` is just waiting on the result. + */ + private sealed trait FileHashingResult { + def task: Task[FileHash] + } + private final case class Computed(task: Task[FileHash]) extends FileHashingResult + private final case class FromPromise(task: Task[FileHash]) extends FileHashingResult +} diff --git a/backend/src/main/scala/bloop/task/Task.scala b/backend/src/main/scala/bloop/task/Task.scala index 9137bdd102..3c899d63ed 100644 --- a/backend/src/main/scala/bloop/task/Task.scala +++ b/backend/src/main/scala/bloop/task/Task.scala @@ -160,7 +160,7 @@ sealed trait Task[+A] { self => def as[B](b: => B): Task[B] = self.map(_ => b) - @inline def void(): Task[Unit] = as(()) + @inline def unit(): Task[Unit] = as(()) def timeoutTo[B >: A](duration: FiniteDuration, backup: Task[B]): Task[B] = { Task diff --git a/frontend/src/main/scala/bloop/engine/caches/ResultsCache.scala b/frontend/src/main/scala/bloop/engine/caches/ResultsCache.scala index b0b15efb68..63f9bd09a8 100644 --- a/frontend/src/main/scala/bloop/engine/caches/ResultsCache.scala +++ b/frontend/src/main/scala/bloop/engine/caches/ResultsCache.scala @@ -4,6 +4,7 @@ import java.nio.file.NoSuchFileException import java.nio.file.Path import java.util.Optional import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.TimeUnit import scala.collection.mutable import scala.concurrent.Await @@ -159,7 +160,10 @@ object ResultsCache { logger: Logger ): ResultsCache = { val handle = loadAsync(build, cwd, cleanOrphanedInternalDirs, logger) - Await.result(handle.runAsync(ExecutionContext.ioScheduler), Duration.Inf) + Await.result( + handle.runAsync(ExecutionContext.ioScheduler), + Duration.apply(30, TimeUnit.SECONDS) + ) } def loadAsync( diff --git a/frontend/src/main/scala/bloop/engine/tasks/compilation/CompileBundle.scala b/frontend/src/main/scala/bloop/engine/tasks/compilation/CompileBundle.scala index 76c296bf39..e0e6d4ef9b 100644 --- a/frontend/src/main/scala/bloop/engine/tasks/compilation/CompileBundle.scala +++ b/frontend/src/main/scala/bloop/engine/tasks/compilation/CompileBundle.scala @@ -178,7 +178,7 @@ object CompileBundle { import bloop.engine.ExecutionContext.ioScheduler import compileDependenciesData.dependencyClasspath val out = options.ngout - val classpathHashesTask = bloop.io.ClasspathHasher + val classpathHashesTask = bloop.io.ClasspathHasher.global .hash(dependencyClasspath, 10, cancelCompilation, logger, tracer, out) .executeOn(ioScheduler) diff --git a/frontend/src/test/scala/bloop/bsp/BspBaseSuite.scala b/frontend/src/test/scala/bloop/bsp/BspBaseSuite.scala index a474a5e8de..1eb8ea766f 100644 --- a/frontend/src/test/scala/bloop/bsp/BspBaseSuite.scala +++ b/frontend/src/test/scala/bloop/bsp/BspBaseSuite.scala @@ -593,6 +593,7 @@ abstract class BspBaseSuite extends BaseSuite with BspClientTest { // https://github.com/scalacenter/bloop/issues/281 super.ignore(name, "DISABLED")(fun) } else { + pprint.log(name) super.test(name)(fun) } } diff --git a/frontend/src/test/scala/bloop/io/ClasspathHasherSpec.scala b/frontend/src/test/scala/bloop/io/ClasspathHasherSpec.scala index 841fa3403f..ca4f67b037 100644 --- a/frontend/src/test/scala/bloop/io/ClasspathHasherSpec.scala +++ b/frontend/src/test/scala/bloop/io/ClasspathHasherSpec.scala @@ -19,6 +19,11 @@ import sbt.io.IO object ClasspathHasherSpec extends bloop.testing.BaseSuite { val testTimeout = 10.seconds + val jsoniter = DependencyResolution.Artifact( + "com.github.plokhotnyuk.jsoniter-scala", + "jsoniter-scala-core_2.13", + "2.17.5" + ) val monix = DependencyResolution.Artifact("io.monix", "monix_2.13", "3.4.0") val spark = DependencyResolution.Artifact("org.apache.spark", "spark-core_2.13", "3.3.0") val hadoop = DependencyResolution.Artifact("org.apache.hadoop", "hadoop-common", "3.3.4") @@ -27,14 +32,16 @@ object ClasspathHasherSpec extends bloop.testing.BaseSuite { val monixJar = "monix_2.13-3.4.0.jar" val expectedHash = 581779648 + def makeClasspathHasher: ClasspathHasher = new ClasspathHasher + testAsyncT("hash deps", testTimeout) { val cancelPromise = Promise[Unit]() val logger = new RecordingLogger() val tracer = BraveTracer("hashes-correctly", TraceProperties.default) - val jars = resolveArtifacts(logger, monix) + val jars = resolveArtifacts(monix) val hashClasspathTask = - ClasspathHasher.hash( + makeClasspathHasher.hash( classpath = jars, parallelUnits = 2, cancelCompilation = cancelPromise, @@ -43,7 +50,7 @@ object ClasspathHasherSpec extends bloop.testing.BaseSuite { serverOut = System.out ) - val task = for { + for { result <- hashClasspathTask } yield { val fileHashes = result.orFail(_ => "Obtained empty result from hashing") @@ -59,6 +66,23 @@ object ClasspathHasherSpec extends bloop.testing.BaseSuite { expected = true, hint = s"All hashes should be computed correctly, but found cancelled hash in $fileHashes" ) + + val dependencies = jars.toVector + val cacheMisses = dependencies.flatMap { path => + logger.debugs.find(entry => + entry.contains(path.toString) && entry.startsWith("Cache miss for") + ) + } + + // check if all jars hit cache miss and were computed + assertEquals( + obtained = cacheMisses.size, + expected = dependencies.size, + hint = s"Not everyone entry missed the cache when hashing ${dependencies + .mkString("\n")} (${fileHashes.mkString("\n")})" + ) + + // check if hash is computed in a stable way across test runs fileHashes.find(_.file.toString.contains(monixJar)) match { case None => fail(s"There is no $monixJar among hashed jars, although it should be") case Some(fileHash) => @@ -68,18 +92,17 @@ object ClasspathHasherSpec extends bloop.testing.BaseSuite { ) } } - - task *> Task.sleep(5.second) } testAsyncT("results are cached", testTimeout) { val cancelPromise = Promise[Unit]() - val logger = new RecordingLogger() val tracer = BraveTracer("hashes-correctly", TraceProperties.default) - val jars = resolveArtifacts(logger, monix) + val jars = resolveArtifacts(monix) - val hashClasspathTask = - ClasspathHasher.hash( + val classpathHasher = makeClasspathHasher + + def hashClasspathTask(logger: Logger) = + classpathHasher.hash( classpath = jars, parallelUnits = 2, cancelCompilation = cancelPromise, @@ -89,9 +112,12 @@ object ClasspathHasherSpec extends bloop.testing.BaseSuite { ) for { - _ <- hashClasspathTask - cachedResult <- hashClasspathTask + logger1 <- Task.now(new RecordingLogger()) + _ <- hashClasspathTask(logger1) + logger = new RecordingLogger() + cachedResult <- hashClasspathTask(logger) } yield { + val fileHashes = cachedResult.orFail(_ => "Obtained empty result from hashing") assertEquals( obtained = fileHashes.forall(_ != BloopStamps.cancelledHash), @@ -106,19 +132,57 @@ object ClasspathHasherSpec extends bloop.testing.BaseSuite { } if (nonCached.nonEmpty) { - pprint.log(debugOutput) fail( s"Hashing should used cached results for when computing hashes, but $nonCached were computed" ) } + } + } - fileHashes.find(_.file.toString.contains(monixJar)) match { - case None => fail(s"There is no $monixJar among hashed jars, although it should be") - case Some(fileHash) => - assertEquals( - obtained = fileHash.hash, - expected = expectedHash - ) + testAsyncT("work is shared accros tasks", testTimeout) { + import bloop.engine.ExecutionContext.ioScheduler + + val cancelPromise = Promise[Unit]() + val tracer = BraveTracer("results are cached", TraceProperties.default) + + // use small library to not pollute test output + val jars = resolveArtifacts(jsoniter) + + val classpathHasher = makeClasspathHasher + + def hashClasspathTask(logger: Logger) = + classpathHasher.hash( + classpath = jars, + parallelUnits = 2, + cancelCompilation = cancelPromise, + logger = logger, + tracer = tracer, + serverOut = System.out + ) + + for { + _ <- Task.now(hashClasspathTask(new RecordingLogger()).runAsync) + logger2 = new RecordingLogger() + cachedResult <- Task.fromFuture(hashClasspathTask(logger2).runAsync) + } yield { + + val fileHashes = cachedResult.orFail(_ => "Obtained empty result from hashing") + assertEquals( + obtained = fileHashes.forall(_ != BloopStamps.cancelledHash), + expected = true, + hint = s"All hashes should be computed correctly, but found cancelled hash in $fileHashes" + ) + + val debugOutput = logger2.debugs.toSet + + val nonCached = jars.toList.filterNot { path => + debugOutput.contains(s"Wait for hashing of $path to complete") + } + + if (nonCached.nonEmpty) { + fail( + s"Hashing should share workload when computing hashes at the same time, but $nonCached were computed" + ) } } } @@ -140,8 +204,10 @@ object ClasspathHasherSpec extends bloop.testing.BaseSuite { val filesToHash = Array(AbsolutePath(file.toNIO)) + val classpathHasher = makeClasspathHasher + val hashClasspathTask = - ClasspathHasher.hash( + classpathHasher.hash( classpath = filesToHash, parallelUnits = 2, cancelCompilation = cancelPromise, @@ -199,16 +265,22 @@ object ClasspathHasherSpec extends bloop.testing.BaseSuite { } } - testAsyncT("cancellation", 10.second) { + testAsyncT("cancellation of single task", testTimeout) { import bloop.engine.ExecutionContext.ioScheduler val logger = new RecordingLogger() val cancelPromise = Promise[Unit]() - val tracer = BraveTracer("cancels-correctly-test", TraceProperties.default) - val jars = resolveArtifacts(logger, monix, spark, hadoop) + val tracer = BraveTracer("cancel-single-task", TraceProperties.default) + + // use big libraries with a lot of dependencies, some of them have common deps which will + // result in situation where dep A is being computed by task1 and at the same time by task2 + // in such case task2 is waiting for task1 to be be finished + val jars = resolveArtifacts(monix, spark, hadoop) + + val classpathHasher = makeClasspathHasher val hashClasspathTask = - ClasspathHasher.hash( + classpathHasher.hash( jars, 2, cancelPromise, @@ -243,10 +315,78 @@ object ClasspathHasherSpec extends bloop.testing.BaseSuite { } } + testAsyncT("cancel one task, other one should work just fine", testTimeout) { + import bloop.engine.ExecutionContext.ioScheduler + + val logger1 = new RecordingLogger() + val cancelPromise1 = Promise[Unit]() + + val logger2 = new RecordingLogger() + val cancelPromise2 = Promise[Unit]() + + val tracer = BraveTracer("cancel-single-task", TraceProperties.default) + + // use big libraries with a lot of dependencies, some of them have common deps which will + // result in situation where dep A is being computed by task1 and at the same time by task2 + // in such case task2 is waiting for task1 to be be finished + val jars = resolveArtifacts(monix, spark, hadoop) + + val classpathHasher = makeClasspathHasher + + def hashClasspathTask(logger: Logger, cancelPromise: Promise[Unit]) = + classpathHasher.hash( + jars, + 2, + cancelPromise, + logger, + tracer, + System.out + ) + + // start hashing, wait small amount of time and then cancel task + val startAndCancelTask = Task.defer { + for { + running <- + Task(hashClasspathTask(logger1, cancelPromise1).runAsync) + _ <- Task.sleep(20.millis) + _ <- Task(running.cancel()) + } yield () + } + + for { + _ <- startAndCancelTask + result <- hashClasspathTask(logger2, cancelPromise2) + } yield { + assertEquals( + obtained = cancelPromise1.isCompleted, + expected = true, + hint = "Cancelation promise of task1 should be completed" + ) + + assertEquals( + obtained = cancelPromise2.isCompleted, + expected = false, + hint = "Cancelation promise of task2 should not be completed" + ) + + assertEquals( + obtained = result.isRight, + expected = true, + hint = "Task2 should return valid result" + ) + + assertEquals( + obtained = logger2.warnings.exists(_.startsWith("Unexpected hash computation of")), + expected = true, + hint = "Task2 should recover from task1 cancellation" + ) + } + } + private def resolveArtifacts( - logger: Logger, deps: DependencyResolution.Artifact* ): Array[AbsolutePath] = { + val logger = new RecordingLogger() deps.toArray.flatMap(a => // Force independent resolution for every artifact DependencyResolution.resolve(List(a), logger) From 169d0a7521c17d109e6bdecc3404131cdff8ab89 Mon Sep 17 00:00:00 2001 From: Kamil Podsiadlo Date: Wed, 27 Mar 2024 14:57:19 +0100 Subject: [PATCH 3/5] try to make par sequence N a bit faster --- .../main/scala/bloop/io/ClasspathHasher.scala | 2 - .../main/scala/bloop/task/ParSequenceN.scala | 63 ++++++ backend/src/main/scala/bloop/task/Task.scala | 9 +- .../scala/bloop/task/ParSequenceNSpec.scala | 180 ++++++++++++++++++ 4 files changed, 248 insertions(+), 6 deletions(-) create mode 100644 backend/src/main/scala/bloop/task/ParSequenceN.scala create mode 100644 backend/src/test/scala/bloop/task/ParSequenceNSpec.scala diff --git a/backend/src/main/scala/bloop/io/ClasspathHasher.scala b/backend/src/main/scala/bloop/io/ClasspathHasher.scala index 05a76569eb..006e58fd63 100644 --- a/backend/src/main/scala/bloop/io/ClasspathHasher.scala +++ b/backend/src/main/scala/bloop/io/ClasspathHasher.scala @@ -192,8 +192,6 @@ class ClasspathHasher { hashes .map { result => - pprint.log(this.cacheMetadataJar.size()) - pprint.log(this.hashingPromises.size()) if (isCancelled.get || cancelCompilation.isCompleted) { cancelCompilation.trySuccess(()) Left(()) diff --git a/backend/src/main/scala/bloop/task/ParSequenceN.scala b/backend/src/main/scala/bloop/task/ParSequenceN.scala new file mode 100644 index 0000000000..a4bd827fee --- /dev/null +++ b/backend/src/main/scala/bloop/task/ParSequenceN.scala @@ -0,0 +1,63 @@ +package bloop.task + +import java.util.concurrent.atomic.AtomicBoolean + +import scala.concurrent.Promise + +/** + * Implementation is based on https://github.com/monix/monix/blob/2faa2cf7425ab0b88ea57b1ea193bce16613f42a/monix-eval/shared/src/main/scala/monix/eval/internal/TaskParSequenceN.scala + */ +private[task] object ParSequenceN { + def parSequenceN[A](n: Int)(in: Iterable[Task[A]]): Task[Vector[A]] = { + if (in.isEmpty) { + Task.now(Vector.empty) + } else { + // val isCancelled = new AtomicBoolean(false) + Task.defer { + val queue = new java.util.concurrent.ConcurrentLinkedQueue[(Promise[A], Task[A])]() + val pairs = in.map(t => (Promise[A](), t)) + pairs.foreach(queue.add) + val errorPromise = Promise[Throwable]() + val workDone = new AtomicBoolean(false) + + val singleJob: Task[Unit] = Task + .defer { + queue.poll() match { + case null => + Task(workDone.set(true)) + case (p, t) => + t.transform( + value => p.trySuccess(value), + error => errorPromise.tryFailure(error) + ) + } + } + .map(_ => ()) + + lazy val thunkOfWork: Task[Unit] = Task.defer { + if (workDone.get()) Task.unit + else { + singleJob.flatMap(_ => thunkOfWork) + } + } + + val workers = Task.parSequence { + List.fill(n)(thunkOfWork) + } + + Task.chooseFirstOf(Task.fromFuture(errorPromise.future), workers).flatMap { + case Left((err, fb)) => + Task.raiseError { + fb.cancel() + err + } + case Right((fa, _)) => + fa.cancel() + val values = pairs.unzip._1.toVector.map(p => Task.fromFuture(p.future)) + Task.sequence(values) + } + } + } + + } +} diff --git a/backend/src/main/scala/bloop/task/Task.scala b/backend/src/main/scala/bloop/task/Task.scala index 3c899d63ed..27260c85e4 100644 --- a/backend/src/main/scala/bloop/task/Task.scala +++ b/backend/src/main/scala/bloop/task/Task.scala @@ -160,6 +160,9 @@ sealed trait Task[+A] { self => def as[B](b: => B): Task[B] = self.map(_ => b) + def void: Task[Unit] = + self.map(_ => ()) + @inline def unit(): Task[Unit] = as(()) def timeoutTo[B >: A](duration: FiniteDuration, backup: Task[B]): Task[B] = { @@ -486,10 +489,8 @@ object Task { } } - def parSequenceN[A](n: Int)(in: Iterable[Task[A]]): Task[List[A]] = { - val chunks = in.grouped(n).toList.map(group => Task.parSequence(group)) - Task.sequence(chunks).map(_.flatten) - } + def parSequenceN[A](n: Int)(in: Iterable[Task[A]]): Task[Vector[A]] = + ParSequenceN.parSequenceN(n)(in) def fromFuture[A](f: Future[A]): Task[A] = Wrap(MonixTask.fromFuture(f), List.empty) diff --git a/backend/src/test/scala/bloop/task/ParSequenceNSpec.scala b/backend/src/test/scala/bloop/task/ParSequenceNSpec.scala new file mode 100644 index 0000000000..3fe513622a --- /dev/null +++ b/backend/src/test/scala/bloop/task/ParSequenceNSpec.scala @@ -0,0 +1,180 @@ +package bloop.task + +import scala.concurrent.duration._ + +import org.junit.Assert +import org.junit.Test +import monix.execution.schedulers.TestScheduler +import java.util.concurrent.atomic.AtomicLong +import scala.util.Success +import scala.util.Failure +import scala.concurrent.Await +import scala.concurrent.Promise + +/** + * Test cases are from https://github.com/monix/monix/blob/2faa2cf7425ab0b88ea57b1ea193bce16613f42a/monix-eval/shared/src/test/scala/monix/eval/TaskParSequenceNSuite.scala + */ +class ParSequenceNSpec { + + private def assertEquals[A](actual: A, expected: A): Unit = { + Assert.assertEquals(expected, actual) + } + + @Test + def empty: Unit = { + implicit val sh: TestScheduler = TestScheduler() + val future = Task.parSequenceN(1)(Vector.empty).runAsync + // check that it completes + sh.tickOne() + assertEquals(future.value, Some(Success(Vector.empty))) + } + + @Test + def Task_parSequenceN_should_execute_in_parallel_bounded_by_parallelism: Unit = { + implicit val s: TestScheduler = TestScheduler() + + val num = new AtomicLong(0) + val task = Task(num.incrementAndGet()).flatMap(_ => Task.sleep(2.seconds)) + val seq = List.fill(100)(task) + + Task.parSequenceN(5)(seq).runAsync + + s.tick() + assertEquals(num.get(), 5) + s.tick(2.seconds) + assertEquals(num.get(), 10) + s.tick(4.seconds) + assertEquals(num.get(), 20) + s.tick(34.seconds) + assertEquals(num.get(), 100) + } + + @Test + def Task_parSequenceN_should_return_result_in_order: Unit = { + implicit val s: TestScheduler = TestScheduler() + val task = 1.until(10).toList.map(Task.eval(_)) + val res = Task.parSequenceN(2)(task).runAsync + + s.tick() + assertEquals(res.value, Some(Success(List(1, 2, 3, 4, 5, 6, 7, 8, 9)))) + } + + @Test + def Task_parSequenceN_should_return_empty_list: Unit = { + implicit val s: TestScheduler = TestScheduler() + val res = Task.parSequenceN(2)(List.empty).runAsync + + s.tick() + assertEquals(res.value, Some(Success(List.empty))) + } + + @Test + def Task_parSequenceN_should_handle_single_elem: Unit = { + implicit val s: TestScheduler = TestScheduler() + val task = 1.until(5).toList.map(Task.eval(_)) + val res = Task.parSequenceN(10)(task).runAsync + + s.tick() + assertEquals(res.value, Some(Success(List(1, 2, 3, 4)))) + } + + @Test + def Task_parSequenceN_should_on_error_when_one_elem_fail: Unit = { + implicit val s: TestScheduler = TestScheduler() + val ex = new Exception("dummy") + val seq = Seq( + Task(3).delayExecution(3.seconds), + Task(2).delayExecution(1.second), + Task(throw ex).delayExecution(1.seconds), + Task(3).delayExecution(5.seconds) + ) + + val f = Task.parSequenceN(2)(seq).runAsync + + assertEquals(f.value, None) + s.tick(1.seconds) + assertEquals(f.value, None) + s.tick(2.seconds) + assertEquals(f.value, Some(Failure(ex))) + } + + @Test + def Task_parSequenceN_should_be_stack_safe: Unit = { + implicit val s: TestScheduler = TestScheduler() + val count: Int = 200000 + val tasks = for (_ <- 0 until count) yield Task.now(1) + val composite = Task.parSequenceN(count)(tasks).map(_.sum) + val result = composite.runAsync + s.tick() + assertEquals(result.value, Some(Success(count))) + } + + @Test + def Task_parSequenceN_runAsync_multiple_times: Unit = { + implicit val s: TestScheduler = TestScheduler() + val state = new AtomicLong(0) + val task1 = Task { state.incrementAndGet(); 3 }.memoize + val task2 = task1.map { x => + state.incrementAndGet(); x + 1 + } + val task3 = Task.parSequenceN(2)(List(task2, task2, task2)) + + val result1 = task3.runAsync + s.tick() + assertEquals(result1.value, Some(Success(List(4, 4, 4)))) + assertEquals(state.get(), 1 + 3) + + val result2 = task3.runAsync + s.tick() + assertEquals(result2.value, Some(Success(List(4, 4, 4)))) + assertEquals(state.get(), 1 + 3 + 3) + } + + /** + * Cancellation semantic is the major difference between Monix 2 and 3. + * In Monix 2 cancellation is a mere signal that can be ignored, in Monix 3 it's a hard stop. + * Here we test whether `Task.parSequenceN` behaves according to the Monix 2 semantics. + */ + @Test + def Task_parSequenceN_should_NOT_be_canceled: Unit = { + implicit val s: TestScheduler = TestScheduler() + val num = new AtomicLong(0) + val canceled = Promise[Boolean]() + val seq = Seq( + Task.unit + .delayExecution(4.seconds) + .doOnCancel(Task.eval(canceled.success(true)).void), + Task(num.compareAndSet(0, 10)).delayExecution(1.second) + ) + val f = Task.parSequenceN(1)(seq).runAsync + + s.tick(1.second) + f.cancel() + s.tick(2.second) + + // doOnCancel uses global scheduler, so we need to wait for it with await rather than tick + Await.ready(canceled.future, 5.second) + + s.tick(1.day) + assertEquals(num.get(), 10) + } + + @Test + def Task_parSequenceN_workers_dont_wait_for_each_other: Unit = { + implicit val s: TestScheduler = TestScheduler() + val seq = Seq( + Task.sleep(4.seconds).map(_ => 1), + Task.sleep(1.second).map(_ => 2), + Task.sleep(2.second).map(_ => 3) + ) + val f = Task.parSequenceN(2)(seq).runAsync + + s.tick(2.seconds) + assertEquals(f.value, None) + + s.tick(2.seconds) + Await.ready(f, 1.second) + assertEquals(f.value, Some(Success(Vector(1, 2, 3)))) + } + +} From 9edc0fff7473e67d51a6afe28eda81c5274fb040 Mon Sep 17 00:00:00 2001 From: Kamil Podsiadlo Date: Wed, 27 Mar 2024 16:44:52 +0100 Subject: [PATCH 4/5] fix formatting --- build.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index 49b3289186..0ef49e3063 100644 --- a/build.sbt +++ b/build.sbt @@ -160,7 +160,7 @@ lazy val frontend: Project = project Dependencies.scalaDebugAdapter, Dependencies.bloopConfig, Dependencies.logback, - Dependencies.oslib % Test, + Dependencies.oslib % Test ), // needed for tests and to be automatically updated Test / libraryDependencies += Dependencies.semanticdb intransitive (), From cf77788c63dbe6a2b5c6a2c7d087d4809cae81e8 Mon Sep 17 00:00:00 2001 From: Kamil Podsiadlo Date: Wed, 27 Mar 2024 16:48:27 +0100 Subject: [PATCH 5/5] extract cancellation outisde of raiseError --- backend/src/main/scala/bloop/task/ParSequenceN.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/backend/src/main/scala/bloop/task/ParSequenceN.scala b/backend/src/main/scala/bloop/task/ParSequenceN.scala index a4bd827fee..f5e4ce9398 100644 --- a/backend/src/main/scala/bloop/task/ParSequenceN.scala +++ b/backend/src/main/scala/bloop/task/ParSequenceN.scala @@ -47,10 +47,8 @@ private[task] object ParSequenceN { Task.chooseFirstOf(Task.fromFuture(errorPromise.future), workers).flatMap { case Left((err, fb)) => - Task.raiseError { - fb.cancel() - err - } + fb.cancel() + Task.raiseError(err) case Right((fa, _)) => fa.cancel() val values = pairs.unzip._1.toVector.map(p => Task.fromFuture(p.future))