實(shí)戰(zhàn)中的 Promise 和 Future

2018-02-24 16:00 更新

上一章介紹了 Future 類(lèi)型,以及如何用它來(lái)編寫(xiě)高可讀性、高組合性的異步執(zhí)行代碼。

Future 只是整個(gè)謎團(tuán)的一部分:它是一個(gè)只讀類(lèi)型,允許你使用它計(jì)算得到的值,或者處理計(jì)算中出現(xiàn)的錯(cuò)誤。但是在這之前,必須得有一種方法把這個(gè)值放進(jìn)去。這一章里,你將會(huì)看到如何通過(guò) Promise 類(lèi)型來(lái)達(dá)到這個(gè)目的。

類(lèi)型 Promise

之前,我們把一段順序執(zhí)行的代碼塊傳遞給了 scala.concurrent 里的 future 方法,并且在作用域中給出了一個(gè) ExecutionContext,它神奇地異步調(diào)用代碼塊,返回一個(gè) Future 類(lèi)型的結(jié)果。

雖然這種獲得 Future 的方式很簡(jiǎn)單,但還有其他的方法來(lái)創(chuàng)建 Future 實(shí)例,并填充它,這就是 Promise。Promise 允許你在 Future 里放入一個(gè)值,不過(guò)只能做一次,F(xiàn)uture 一旦完成,就不能更改了。

一個(gè) Future 實(shí)例總是和一個(gè)(也只能是一個(gè))Promise 實(shí)例關(guān)聯(lián)在一起。如果你在 REPL 里調(diào)用 future 方法,你會(huì)發(fā)現(xiàn)返回的也是一個(gè) Promise:

import concurrent.Future
import concurrent.Future

scala> import concurrent.future
import concurrent.future

scala> import concurrent.ExecutionContext.Implicits.global
import concurrent.ExecutionContext.Implicits.global

scala> val f: Future[String] = future { "Hello World!" }
f: scala.concurrent.Future[String] = scala.concurrent.impl.Promise$DefaultPromise@2b509249

你得到的對(duì)象是一個(gè) DefaultPromise ,它實(shí)現(xiàn)了 FuturePromise 接口,不過(guò)這就是具體的實(shí)現(xiàn)細(xì)節(jié)了(譯注,有興趣的讀者可翻閱其實(shí)現(xiàn)的源碼),使用者只需要知道代碼實(shí)現(xiàn)把 Future 和對(duì)應(yīng)的 Promise 之間的聯(lián)系分的很清晰。

這個(gè)小例子說(shuō)明了:除了通過(guò) Promise,沒(méi)有其他方法可以完成一個(gè) Future,future 方法也只是一個(gè)輔助函數(shù),隱藏了具體的實(shí)現(xiàn)機(jī)制。

現(xiàn)在,讓我們動(dòng)動(dòng)手,看看怎樣直接使用 Promise 類(lèi)型。

給出承諾

當(dāng)我們談?wù)撈鸪兄Z能否被兌現(xiàn)時(shí),一個(gè)很熟知的例子是那些政客的競(jìng)選諾言。

假設(shè)被推選的政客給他的投票者一個(gè)減稅的承諾。這可以用 Promise[TaxCut] 表示:

import concurrent.Promise
case class TaxCut(reduction: Int)
// either give the type as a type parameter to the factory method:
val taxcut = Promise[TaxCut]()
// or give the compiler a hint by specifying the type of your val:
val taxcut2: Promise[TaxCut] = Promise()
// taxcut: scala.concurrent.Promise[TaxCut] = scala.concurrent.impl.Promise$DefaultPromise@66ae2a84
// taxcut2: scala.concurrent.Promise[TaxCut] = scala.concurrent.impl.Promise$DefaultPromise@346974c6

一旦創(chuàng)建了這個(gè) Promise,就可以在它上面調(diào)用 future 方法來(lái)獲取承諾的未來(lái):

 val taxCutF: Future[TaxCut] = taxcut.future
 // `> scala.concurrent.Future[TaxCut] `  scala.concurrent.impl.Promise$DefaultPromise@66ae2a84

返回的 Future 可能并不和 Promise 一樣,但在同一個(gè) Promise 上調(diào)用 future 方法總是返回同一個(gè)對(duì)象,以確保 Promise 和 Future 之間一對(duì)一的關(guān)系。

結(jié)束承諾

一旦給出了承諾,并告訴全世界會(huì)在不遠(yuǎn)的將來(lái)兌現(xiàn)它,那最好盡力去實(shí)現(xiàn)。在 Scala 中,可以結(jié)束一個(gè) Promise,無(wú)論成功還是失敗。

兌現(xiàn)承諾

為了成功結(jié)束一個(gè) Promise,你可以調(diào)用它的 success 方法,并傳遞一個(gè)大家期許的結(jié)果:

  taxcut.success(TaxCut(20))

這樣做之后,Promise 就無(wú)法再寫(xiě)入其他值了,如果偏要再寫(xiě),會(huì)產(chǎn)生異常。

此時(shí),和 Promise 關(guān)聯(lián)的 Future 也成功完成,注冊(cè)的回調(diào)會(huì)開(kāi)始執(zhí)行,或者說(shuō)對(duì)這個(gè) Future 進(jìn)行了映射,那這個(gè)時(shí)候,映射函數(shù)也該執(zhí)行了。

一般來(lái)說(shuō),Promise 的完成和對(duì)返回的 Future 的處理發(fā)生在不同的線(xiàn)程。很可能你創(chuàng)建了 Promise,并立即返回和它關(guān)聯(lián)的 Future 給調(diào)用者,而實(shí)際上,另外一個(gè)線(xiàn)程還在計(jì)算它。

為了說(shuō)明這一點(diǎn),我們拿減稅來(lái)舉個(gè)例子:

object Government {
  def redeemCampaignPledge(): Future[TaxCut] = {
    val p = Promise[TaxCut]()
    Future {
      println("Starting the new legislative period.")
      Thread.sleep(2000)
      p.success(TaxCut(20))
      println("We reduced the taxes! You must reelect us!!!!1111")
    }
    p.future
  }
}

這個(gè)例子中使用了 Future 伴生對(duì)象,不過(guò)不要被它搞混淆了,這個(gè)例子的重點(diǎn)是:Promise 并不是在調(diào)用者的線(xiàn)程里完成的。

現(xiàn)在我們來(lái)兌現(xiàn)當(dāng)初的競(jìng)選宣言,在 Future 上添加一個(gè) onComplete 回調(diào):

import scala.util.{Success, Failure}
val taxCutF: Future[TaxCut] = Government.redeemCampaignPledge()
println("Now that they're elected, let's see if they remember their promises...")
taxCutF.onComplete {
  case Success(TaxCut(reduction)) =>
    println(s"A miracle! They really cut our taxes by $reduction percentage points!")
  case Failure(ex) =>
    println(s"They broke their promises! Again! Because of a ${ex.getMessage}")
}

多次運(yùn)行這個(gè)例子,會(huì)發(fā)現(xiàn)顯示屏輸出的結(jié)果順序是不確定的,而且,最終回調(diào)函數(shù)會(huì)執(zhí)行,進(jìn)入成功的那個(gè) case 。

違背諾言

政客習(xí)慣違背諾言,Scala 程序員有時(shí)候也只能這樣做。調(diào)用 failure 方法,傳遞一個(gè)異常,結(jié)束 Promise:

case class LameExcuse(msg: String) extends Exception(msg)
object Government {
  def redeemCampaignPledge(): Future[TaxCut] = {
     val p = Promise[TaxCut]()
     Future {
       println("Starting the new legislative period.")
       Thread.sleep(2000)
       p.failure(LameExcuse("global economy crisis"))
       println("We didn't fulfill our promises, but surely they'll understand.")
     }
     p.future
   }
}

這個(gè) redeemCampaignPledge 實(shí)現(xiàn)最終會(huì)違背承諾。一旦用 failure 結(jié)束這個(gè) Promise,也無(wú)法再次寫(xiě)入了,正如 success 方法一樣。相關(guān)聯(lián)的 Future 也會(huì)以 Failure 收?qǐng)觥?/p>

如果已經(jīng)有了一個(gè) Try,那可以直接把它傳遞給 Promise 的 complete 方法,以此來(lái)結(jié)束這個(gè)它。如果這個(gè) Try 是一個(gè) Success,關(guān)聯(lián)的 Future 會(huì)成功完成,否則,就失敗。

基于 Future 的編程實(shí)踐

如果想使用基于 Future 的編程范式以增加應(yīng)用的擴(kuò)展性,那應(yīng)用從下到上都必須被設(shè)計(jì)成非阻塞模式。這意味著,基本上應(yīng)用層所有的函數(shù)都應(yīng)該是異步的,并且返回 Future。

當(dāng)下,一個(gè)可能的使用場(chǎng)景是開(kāi)發(fā) Web 應(yīng)用。流行的 Scala Web 框架,允許你將響應(yīng)作為 Future[Response] 返回,而不是等到你完成響應(yīng)再返回。這個(gè)非常重要,因?yàn)樗试S Web 服務(wù)器用少量的線(xiàn)程處理更多的連接。通過(guò)賦予服務(wù)器 Future[Response] 的能力,你可以最大化服務(wù)器線(xiàn)程池的利用率。

而且,應(yīng)用的服務(wù)可能需要多次調(diào)用數(shù)據(jù)庫(kù)層以及(或者)某些外部服務(wù),這時(shí)候可以獲取多個(gè) Future,用 for 語(yǔ)句將它們組合成新的 Future,簡(jiǎn)單可讀!最終,Web 層再將這樣的一個(gè) Future 變成 Future[Response]。

但是該怎樣在實(shí)踐中實(shí)現(xiàn)這些呢?需要考慮三種不同的場(chǎng)景:

非阻塞IO

應(yīng)用很可能涉及到大量的 IO 操作。比如,可能需要和數(shù)據(jù)庫(kù)交互,還可能作為客戶(hù)端去調(diào)用其他的 Web 服務(wù)。

如果是這樣,可以使用一些基于 Java 非阻塞 IO 實(shí)現(xiàn)的庫(kù),也可以直接或通過(guò) Netty 這樣的庫(kù)來(lái)使用 Java 的 NIO API。這樣的庫(kù)可以用定量的線(xiàn)程池處理大量的連接。

但如果是想開(kāi)發(fā)這樣的一個(gè)庫(kù),直接和 Promise 打交道更為合適。

阻塞 IO

有時(shí)候,并沒(méi)有基于 NIO 的庫(kù)可用。比如,Java 世界里大多數(shù)的數(shù)據(jù)庫(kù)驅(qū)動(dòng)都是使用阻塞 IO。在 Web 應(yīng)用中,如果用這樣的驅(qū)動(dòng)發(fā)起大量訪(fǎng)問(wèn)數(shù)據(jù)庫(kù)的調(diào)用,要記得這些調(diào)用是發(fā)生在服務(wù)器線(xiàn)程里的。為了避免這個(gè)問(wèn)題,可以將所有需要和數(shù)據(jù)庫(kù)交互的代碼都放入 future 代碼塊里,就像這樣:

// get back a Future[ResultSet] or something similar:
Future {
  queryDB(query)
}

到現(xiàn)在為止,我們都是使用隱式可用的全局 ExecutionContext 來(lái)執(zhí)行這些代碼塊。通常,更好的方式是創(chuàng)建一個(gè)專(zhuān)用的 ExecutionContext 放在數(shù)據(jù)庫(kù)層里??梢詮?Java的 ExecutorService 來(lái)它,這也意味著,可以異步的調(diào)整線(xiàn)程池來(lái)執(zhí)行數(shù)據(jù)庫(kù)調(diào)用,應(yīng)用的其他部分不受影響。

import java.util.concurrent.Executors
import concurrent.ExecutionContext
val executorService = Executors.newFixedThreadPool(4)
val executionContext = ExecutionContext.fromExecutorService(executorService)

長(zhǎng)時(shí)間運(yùn)行的計(jì)算

取決于應(yīng)用的本質(zhì)特點(diǎn),一個(gè)應(yīng)用偶爾還會(huì)調(diào)用一些長(zhǎng)時(shí)間運(yùn)行的任務(wù),它們完全不涉及 IO(CPU 密集的任務(wù))。這些任務(wù)也不應(yīng)該在服務(wù)器線(xiàn)程中執(zhí)行,因此需要將它們變成 Future:

Future {
  longRunningComputation(data, moreData)
}

同樣,最好有一些專(zhuān)屬的 ExecutionContext 來(lái)處理這些 CPU 密集的計(jì)算。怎樣調(diào)整這些線(xiàn)程池大小取決于應(yīng)用的特征,這些已經(jīng)超過(guò)了本文的范圍。

總結(jié)

這一章里,我們學(xué)習(xí)了 Promise - 基于 Future 的并發(fā)范式的可寫(xiě)組件,以及怎樣用它來(lái)完成一個(gè) Future;同時(shí),還給出了一些在實(shí)踐中使用它們的建議。

下一章會(huì)討論 Scala 函數(shù)式編程是如何增加代碼可用性(一個(gè)長(zhǎng)久以來(lái)和面向?qū)ο缶幊滔嚓P(guān)聯(lián)的概念)的。

以上內(nèi)容是否對(duì)您有幫助:
在線(xiàn)筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)