Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Add FutureOps which with an await style. #1666

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import java.util.concurrent.atomic.{ AtomicLong, AtomicReference }

import scala.annotation.{ nowarn, tailrec }
import scala.collection.immutable
import scala.concurrent.{ Await, ExecutionContext, Future, Promise }
import scala.concurrent.{ ExecutionContext, Future, Promise }
import scala.concurrent.duration._
import scala.util.control.NonFatal

Expand Down Expand Up @@ -178,7 +178,8 @@ class LightArrayRevolverScheduler(config: Config, log: LoggingAdapter, threadFac
}
}

Await.result(stop(), getShutdownTimeout).foreach {
import pekko.util.Helpers._
stop().await(getShutdownTimeout).foreach {
case task: Scheduler.TaskRunOnClose =>
runTask(task)
case holder: TaskHolder => // don't run
Expand Down
18 changes: 10 additions & 8 deletions actor/src/main/scala/org/apache/pekko/pattern/CircuitBreaker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger, AtomicLong }
import java.util.function.BiFunction
import java.util.function.Consumer
import scala.annotation.nowarn
import scala.concurrent.{ Await, ExecutionContext, Future, Promise }
import scala.concurrent.{ ExecutionContext, Future, Promise }
import scala.concurrent.TimeoutException
import scala.concurrent.duration._
import scala.util.{ Failure, Success, Try }
Expand Down Expand Up @@ -446,13 +446,15 @@ class CircuitBreaker(
* @param defineFailureFn function that define what should be consider failure and thus increase failure count
* @return The result of the call
*/
def withSyncCircuitBreaker[T](body: => T, defineFailureFn: Try[T] => Boolean): T =
Await.result(
withCircuitBreaker(
try Future.successful(body)
catch { case NonFatal(t) => Future.failed(t) },
defineFailureFn),
callTimeout)
def withSyncCircuitBreaker[T](body: => T, defineFailureFn: Try[T] => Boolean): T = {
import pekko.util.Helpers._
withCircuitBreaker(
try Future.successful(body)
catch {
case NonFatal(t) => Future.failed(t)
},
defineFailureFn).await(callTimeout)
}

/**
* Java API for [[#withSyncCircuitBreaker]]. Throws [[java.util.concurrent.TimeoutException]] if the call timed out.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ package org.apache.pekko.serialization

import java.util.concurrent.CompletionStage

import scala.concurrent.{ Await, Future }
import scala.concurrent.duration.Duration
import scala.concurrent.Future

import org.apache.pekko
import pekko.actor.ExtendedActorSystem
Expand Down Expand Up @@ -59,14 +58,16 @@ abstract class AsyncSerializerWithStringManifest(system: ExtendedActorSystem)
log.warning(
"Async serializer called synchronously. This will block. Async serializers should only be used for pekko persistence plugins that support them. Class: {}",
o.getClass)
Await.result(toBinaryAsync(o), Duration.Inf)
import pekko.util.Helpers._
toBinaryAsync(o).await()
}

final override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = {
log.warning(
"Async serializer called synchronously. This will block. Async serializers should only be used for Pekko persistence plugins that support them. Manifest: [{}]",
manifest)
Await.result(fromBinaryAsync(bytes, manifest), Duration.Inf)
import pekko.util.Helpers._
fromBinaryAsync(bytes, manifest).await()
}
}

Expand Down
19 changes: 19 additions & 0 deletions actor/src/main/scala/org/apache/pekko/util/Helpers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,14 @@ import java.time.format.DateTimeFormatter
import java.util.{ Comparator, Locale }
import java.util.concurrent.TimeUnit
import java.util.regex.Pattern

import scala.annotation.tailrec
import scala.concurrent.duration.{ Duration, FiniteDuration }

import com.typesafe.config.{ Config, ConfigRenderOptions }

import scala.concurrent.{ blocking, Future }

object Helpers {

def toRootLowerCase(s: String): String = s.toLowerCase(Locale.ROOT)
Expand Down Expand Up @@ -195,4 +199,19 @@ object Helpers {
Duration(config.getDuration(path, unit), unit)
}

/**
* INTERNAL API
*/
private[pekko] final implicit class FutureOps[T](val future: Future[T]) extends AnyVal {

/**
* Wait for the future to complete and return the result, or throw an exception if the future failed.
* Optimize for the case when the future is already completed.
* @since 1.2.0
*/
def await(atMost: Duration = Duration.Inf): T = future.value match {
case Some(value) => value.get
case None => blocking(future.result(atMost)(null))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

avoid the double checking now

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    @throws(classOf[TimeoutException])
    @throws(classOf[InterruptedException])
    final def resultOption[T](awaitable: Awaitable[T], atMost: Duration): T = awaitable match {
      case CompletedFuture(v)  => v.get
      case _ => blocking(awaitable.result(atMost)(AwaitPermission))
    }

    private final object CompletedFuture {
      def unapply[T](f: Future[T]): Option[Try[T]] = f.value
    }

}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.util

import org.openjdk.jmh.annotations.{ Benchmark, BenchmarkMode, Measurement, Mode, OutputTimeUnit, Scope, State, Warmup }

import java.util.concurrent.TimeUnit
import scala.concurrent.Future

@State(Scope.Benchmark)
@BenchmarkMode(Array(Mode.Throughput))
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Warmup(iterations = 1000)
@Measurement(iterations = 10000)
class FutureOpsBenchmark {
private val completedFuture: Future[Int] = Future.successful(1)
// jmh:run -i 11 -wi 11 -f1 -t1 org.apache.pekko.util.FutureOpsBenchmark
// [info] Benchmark Mode Cnt Score Error Units
// [info] FutureOpsBenchmark.awaitWithAwaitable thrpt 11 706198.499 ± 8185.983 ops/ms
// [info] FutureOpsBenchmark.awaitWithFutureOps thrpt 11 766901.781 ± 9741.792 ops/ms
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pjfanning Attached.


@Benchmark
def awaitWithFutureOps(): Unit = {
import scala.concurrent.duration._
import org.apache.pekko.util.Helpers._
completedFuture.await(Duration.Inf)
}

@Benchmark
def awaitWithAwaitable(): Unit = {
import scala.concurrent.duration._
import scala.concurrent.Await
Await.result(completedFuture, Duration.Inf)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package org.apache.pekko.stream

import scala.annotation.nowarn
import scala.concurrent.Await
import scala.concurrent.Promise

import org.apache.pekko
Expand Down Expand Up @@ -77,7 +76,8 @@ final class SystemMaterializer(system: ExtendedActorSystem) extends Extension {
private[pekko] def createAdditionalSystemMaterializer(): Materializer = {
val started =
(materializerGuardian ? MaterializerGuardian.StartMaterializer).mapTo[MaterializerGuardian.MaterializerStarted]
Await.result(started, materializerTimeout.duration).materializer
import pekko.util.Helpers._
started.await(materializerTimeout.duration).materializer
}

/**
Expand All @@ -91,12 +91,14 @@ final class SystemMaterializer(system: ExtendedActorSystem) extends Extension {
val started =
(materializerGuardian ? MaterializerGuardian.LegacyStartMaterializer(namePrefix, settings))
.mapTo[MaterializerGuardian.MaterializerStarted]
Await.result(started, materializerTimeout.duration).materializer
import pekko.util.Helpers._
started.await(materializerTimeout.duration).materializer
}

val materializer: Materializer = {
// block on async creation to make it effectively final
Await.result(systemMaterializerPromise.future, materializerTimeout.duration)
import pekko.util.Helpers._
systemMaterializerPromise.future.await(materializerTimeout.duration)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package org.apache.pekko.stream.impl.io
import java.io.{ IOException, OutputStream }
import java.util.concurrent.{ Semaphore, TimeUnit }

import scala.concurrent.Await
import scala.concurrent.duration.FiniteDuration
import scala.util.control.NonFatal

Expand Down Expand Up @@ -87,7 +86,8 @@ private[pekko] class OutputStreamAdapter(
}

try {
Await.result(sendToStage.invokeWithFeedback(Send(data)), writeTimeout)
import pekko.util.Helpers._
sendToStage.invokeWithFeedback(Send(data)).await(writeTimeout)
} catch {
case NonFatal(e) => throw new IOException(e)
}
Expand Down Expand Up @@ -115,7 +115,8 @@ private[pekko] class OutputStreamAdapter(
@scala.throws(classOf[IOException])
override def close(): Unit = {
try {
Await.result(sendToStage.invokeWithFeedback(Close), writeTimeout)
import pekko.util.Helpers._
sendToStage.invokeWithFeedback(Close).await(writeTimeout)
} catch {
case NonFatal(e) => throw new IOException(e)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ import java.io.{ InputStream, OutputStream }
import java.util.Spliterators
import java.util.stream.{ Collector, StreamSupport }

import scala.concurrent.{ Await, Future }
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.concurrent.duration.Duration._

import org.apache.pekko
import pekko.NotUsed
Expand Down Expand Up @@ -197,7 +196,8 @@ object StreamConverters {
var nextElement: Option[T] = _

override def hasNext: Boolean = {
nextElement = Await.result(nextElementFuture, Inf)
import pekko.util.Helpers._
nextElement = nextElementFuture.await()
nextElement.isDefined
}

Expand Down
Loading