Future与Promise

发布时间:2017-7-9 7:33:07编辑:www.fx114.net 分享查询网我要评论
本篇文章主要介绍了"Future与Promise ",主要涉及到Future与Promise 方面的内容,对于Future与Promise 感兴趣的同学可以参考一下。

https://code.csdn.NET/DOC_Scala/chinese_scala_offical_document/file/Futures-and-Promises-cn.md#anchor_0

Philipp Haller, Aleksandar Prokopec, Heather Miller, Viktor Klang, Roland Kuhn, and Vojin Jovanovic著

简介

Future提供了一套高效便捷的非阻塞并行操作管理方案。其基本思想很简单,所谓Future,指的是一类占位符对象,用于指代某些尚未完成的计算的结果。一般来说,由Future指代的计算都是并行执行的,计算完毕后可另行获取相关计算结果。以这种方式组织并行任务,便可以写出高效、异步、非阻塞的并行代码。

默认情况下,future和promise并不采用一般的阻塞操作,而是依赖回调进行非阻塞操作。为了在语法和概念层面更加简明扼要地使用这些回调,Scala还提供了flatMap、foreach和filter等算子,使得我们能够以非阻塞的方式对future进行组合。当然,future仍然支持阻塞操作——必要时,可以阻塞等待future(不过并不鼓励这样做)。

Future

所谓Future,是一种用于指代某个尚未就绪的值的对象。而这个值,往往是某个计算过程的结果:

  • 若该计算过程尚未完成,我们就说该Future未就位;
  • 若该计算过程正常结束,或中途抛出异常,我们就说该Future已就位。

Future的就位分为两种情况:

  • 当Future带着某个值就位时,我们就说该Future携带计算结果成功就位。
  • 当Future因对应计算过程抛出异常而就绪,我们就说这个Future因该异常而失败。

Future的一个重要属性在于它只能被赋值一次。一旦给定了某个值或某个异常,future对象就变成了不可变对象——无法再被改写。

创建future对象最简单的方法是调用future方法,该future方法启用异步(asynchronous)计算并返回保存有计算结果的futrue,一旦该future对象计算完成,其结果就变的可用。

注意_Future[T]_ 是表示future对象的类型,而future是方法,该方法创建和调度一个异步计算,并返回随着计算结果而完成的future对象。

这最好通过一个例子予以说明。

假设我们使用某些流行的社交网络的假定API获取某个用户的朋友列表,我们将打开一个新对话(session),然后发送一个请求来获取某个特定用户的好友列表。

import scala.concurrent._
import ExecutionContext.Implicits.global

val session = socialNetwork.createSessionFor("user", credentials)
val session = socialNetwork.createSessionFor("user", credentials)
  session.getFriends()
}

以上,首先导入scala.concurrent 包使得Future类型和future构造函数可见。我们将马上解释第二个导入。

然后我们初始化一个session变量来用作向服务器发送请求,用一个假想的 createSessionFor 方法来返回一个List[Friend]。为了获得朋友列表,我们必须通过网络发送一个请求,这个请求可能耗时很长。这能从调用getFriends方法得到解释。为了更好的利用CPU,响应到达前不应该阻塞(block)程序的其他部分执行,于是在计算中使用异步。future方法就是这样做的,它并行地执行指定的计算块,在这个例子中是向服务器发送请求和等待响应。

一旦服务器响应,future f 中的好友列表将变得可用。

未成功的尝试可能会导致一个异常(exception)。在下面的例子中,session的值未被正确的初始化,于是在future的计算中将抛出NullPointerException,future f 不会圆满完成,而是以此异常失败。

val session = null
val session = socialNetwork.createSessionFor("user", credentials)
  session.getFriends
}

import ExecutionContext.Implicits.global 上面的线条导入默认的全局执行上下文(global execution context),执行上下文执行执行提交给他们的任务,也可把执行上下文看作线程池,这对于future方法来说是必不可少的,因为这可以处理异步计算如何及何时被执行。我们可以定义自己的执行上下文,并在future上使用它,但是现在只需要知道你能够通过上面的语句导入默认执行上下文就足够了。

我们的例子是基于一个假定的社交网络API,此API的计算包含发送网络请求和等待响应。提供一个涉及到你能试着立即使用的异步计算的例子是公平的。假设你有一个文本文件,你想找出一个特定的关键字第一次出现的位置。当磁盘正在检索此文件内容时,这种计算可能会陷入阻塞,因此并行的执行该操作和程序的其他部分是合理的(make sense)。

val firstOccurrence: Future[Int] = future {
  val source = scala.io.Source.fromFile("myText.txt")
  source.toSeq.indexOfSlice("myKeyword")
}

Callbacks(回调函数)

现在我们知道如何开始一个异步计算来创建一个新的future值,但是我们没有展示一旦此结果变得可用后如何来使用,以便我们能够用它来做一些有用的事。我们经常对计算结果感兴趣而不仅仅是它的副作用。

在许多future的实现中,一旦future的client对future的结果感兴趣,它不得不阻塞它自己的计算直到future完成——然后才能使用future的值继续它自己的计算。虽然这在Scala的Future API(在后面会展示)中是允许的,但是从性能的角度来看更好的办法是一种完全非阻塞的方法,即在future中注册一个回调,future完成后这个回调称为异步回调。如果当注册回调时future已经完成,则回调可能是异步执行的,或在相同的线程中循序执行。

注册回调最通常的形式是使用OnComplete方法,即创建一个Try[T] => U类型的回调函数。如果future成功完成,回调则会应用到Success[T]类型的值中,否则应用到Failure[T]类型的值中。

Try[T] 和Option[T]或 Either[T, S]相似,因为它是一个可能持有某种类型值的单子。然而,它是特意设计来保持一个值或某个可抛出(throwable)对象。Option[T] 既可以是一个值(如:Some[T])也可以是完全无值(如:None),如果Try[T]获得一个值则它为Success[T] ,否则为Failure[T]的异常。 Failure[T] 获得更多的关于为什么这儿没值的信息,而不仅仅是None。同时也可以把Try[T]看作一种特殊版本的Either[Throwable, T],专门用于左值为可抛出类型(Throwable)的情形。

回到我们的社交网络的例子,假设我们想要获取我们最近的帖子并显示在屏幕上,我们通过调用getRecentPosts方法获得一个返回值List[String]——一个近期帖子的列表文本:

val f: Future[List[String]] = future {
  session.getRecentPosts
}

f onComplete {
  case Success(posts) => for (post <- posts) println(post)
  case Success(posts) => for (post <- posts) println(post)
}

onComplete方法一般在某种意义上它允许客户处理future计算出的成功或失败的结果。对于仅仅处理成功的结果,onSuccess 回调使用如下(该回调以一个偏函数(partial function)为参数):

val f: Future[List[String]] = future {
  session.getRecentPosts
}

f onSuccess {
  case posts => for (post <- posts) println(post)
}

对于处理失败结果,onFailure回调使用如下:

val f: Future[List[String]] = future {
  session.getRecentPosts
}

f onFailure {
  case t => println("An error has occured: " + t.getMessage)
}

f onSuccess {
  case posts => for (post <- posts) println(post)
}

如果future失败,即future抛出异常,则执行onFailure回调。

因为偏函数具有 isDefinedAt方法, onFailure方法只有在特定的Throwable类型对象中被定义才会触发。下面例子中的onFailure回调永远不会被触发:

val f = future {
  2 / 0
}

f onFailure {
  case npe: NullPointerException =>
    println("I'd be amazed if this printed out.")
}

回到前面查找某个关键字第一次出现的例子,我们想要在屏幕上打印出此关键字的位置:

val firstOccurrence: Future[Int] = future {
  val source = scala.io.Source.fromFile("myText.txt")
  source.toSeq.indexOfSlice("myKeyword")
}

firstOccurrence onSuccess {
  case idx => println("The keyword first appears at position: " + idx)
}

firstOccurrence onFailure {
  case t => println("Could not process file: " + t.getMessage)
}

onComplete,、onSuccess 和 onFailure 方法都具有Unit的结果类型,这意味着不能链接使用这些方法的回调。注意这种设计是为了避免暗示而刻意为之的,因为链接回调也许暗示着按照一定的顺序执行注册回调(回调注册在同一个future中是无序的)。

也就是说,我们现在应讨论论何时调用callback。因为callback需要future的值是可用的,所有回调只能在future完成之后被调用。然而,不能保证callback在完成future的线程或创建callback的线程中被调用。反而, 回调(callback)会在future对象完成之后的一些线程和一段时间内执行。所以我们说回调(callback)最终会被执行。

此外,回调(callback)执行的顺序不是预先定义的,甚至在相同的应用程序中callback的执行顺序也不尽相同。事实上,callback也许不是一个接一个连续的调用,但是可能会在同一时间同时执行。这意味着在下面的例子中,变量totalA也许不能在计算上下文中被设置为正确的大写或者小写字母。

@volatile var totalA = 0

val text = future {
  "na" * 16 + "BATMAN!!!"
}

text onSuccess {
  case txt => totalA += txt.count(_ == 'a')
}

text onSuccess {
  case txt => totalA += txt.count(_ == 'a')
}

以上,这两个回调(callbacks)可能是一个接一个地执行的,这样变量totalA得到的预期值为18。然而,它们也可能是并发执行的,于是totalA最终可能是16或2,因为+= 是一个不可分割的操作符(即它是由一个读和一个写的步骤组成,这样就可能使其与其他的读和写任意交错执行)。

考虑到完整性,回调的使用情景列在这儿:

  • 在future中注册onComplete回调的时候要确保最后future执行完成之后调用相应的终止回调。

  • 注册onSuccess或者onFailure回调时也和注册onComplete一样,不同之处在于future执行成功或失败分别调用onSuccess或onSuccess的对应的闭包。

  • 注册一个已经完成的future的回调最后将导致此回调一直处于执行状态(1所隐含的)。

  • 在future中注册多个回调的情况下,这些回调的执行顺序是不确定的。事实上,这些回调也许是同时执行的,然而,特定的ExecutionContext执行可能导致明确的顺序。

  • 在一些回调抛出异常的情况下,其他的回调的执行不受影响。

  • 在一些情况下,回调函数永远不能结束(例如,这些回调处于无限循环中),其他回调可能完全不会执行。在这