上一章介紹了 Future 類(lèi)型,以及如何用它來(lái)編寫(xiě)高可讀性、高組合性的異步執(zhí)行代碼。
Future 只是整個(gè)謎團(tuán)的一部分:它是一個(gè)只讀類(lèi)型,允許你使用它計(jì)算得到的值,或者處理計(jì)算中出現(xiàn)的錯(cuò)誤。但是在這之前,必須得有一種方法把這個(gè)值放進(jìn)去。這一章里,你將會(huì)看到如何通過(guò) Promise 類(lèi)型來(lái)達(dá)到這個(gè)目的。
之前,我們把一段順序執(zhí)行的代碼塊傳遞給了 scala.concurrent
里的 future
方法,并且在作用域中給出了一個(gè) ExecutionContext
,它神奇地異步調(diào)用代碼塊,返回一個(gè) Future 類(lèi)型的結(jié)果。
雖然這種獲得 Future 的方式很簡(jiǎn)單,但還有其他的方法來(lái)創(chuàng)建 Future 實(shí)例,并填充它,這就是 Promise。Promise 允許你在 Future 里放入一個(gè)值,不過(guò)只能做一次,F(xiàn)uture 一旦完成,就不能更改了。
一個(gè) Future 實(shí)例總是和一個(gè)(也只能是一個(gè))Promise 實(shí)例關(guān)聯(lián)在一起。如果你在 REPL 里調(diào)用 future
方法,你會(huì)發(fā)現(xiàn)返回的也是一個(gè) Promise:
import concurrent.Future
import concurrent.Future
scala> import concurrent.future
import concurrent.future
scala> import concurrent.ExecutionContext.Implicits.global
import concurrent.ExecutionContext.Implicits.global
scala> val f: Future[String] = future { "Hello World!" }
f: scala.concurrent.Future[String] = scala.concurrent.impl.Promise$DefaultPromise@2b509249
你得到的對(duì)象是一個(gè) DefaultPromise
,它實(shí)現(xiàn)了 Future
和 Promise
接口,不過(guò)這就是具體的實(shí)現(xiàn)細(xì)節(jié)了(譯注,有興趣的讀者可翻閱其實(shí)現(xiàn)的源碼),使用者只需要知道代碼實(shí)現(xiàn)把 Future 和對(duì)應(yīng)的 Promise 之間的聯(lián)系分的很清晰。
這個(gè)小例子說(shuō)明了:除了通過(guò) Promise,沒(méi)有其他方法可以完成一個(gè) Future,future
方法也只是一個(gè)輔助函數(shù),隱藏了具體的實(shí)現(xiàn)機(jī)制。
現(xiàn)在,讓我們動(dòng)動(dòng)手,看看怎樣直接使用 Promise 類(lèi)型。
當(dāng)我們談?wù)撈鸪兄Z能否被兌現(xiàn)時(shí),一個(gè)很熟知的例子是那些政客的競(jìng)選諾言。
假設(shè)被推選的政客給他的投票者一個(gè)減稅的承諾。這可以用 Promise[TaxCut]
表示:
import concurrent.Promise
case class TaxCut(reduction: Int)
// either give the type as a type parameter to the factory method:
val taxcut = Promise[TaxCut]()
// or give the compiler a hint by specifying the type of your val:
val taxcut2: Promise[TaxCut] = Promise()
// taxcut: scala.concurrent.Promise[TaxCut] = scala.concurrent.impl.Promise$DefaultPromise@66ae2a84
// taxcut2: scala.concurrent.Promise[TaxCut] = scala.concurrent.impl.Promise$DefaultPromise@346974c6
一旦創(chuàng)建了這個(gè) Promise,就可以在它上面調(diào)用 future
方法來(lái)獲取承諾的未來(lái):
val taxCutF: Future[TaxCut] = taxcut.future
// `> scala.concurrent.Future[TaxCut] ` scala.concurrent.impl.Promise$DefaultPromise@66ae2a84
返回的 Future 可能并不和 Promise 一樣,但在同一個(gè) Promise 上調(diào)用 future
方法總是返回同一個(gè)對(duì)象,以確保 Promise 和 Future 之間一對(duì)一的關(guān)系。
一旦給出了承諾,并告訴全世界會(huì)在不遠(yuǎn)的將來(lái)兌現(xiàn)它,那最好盡力去實(shí)現(xiàn)。在 Scala 中,可以結(jié)束一個(gè) Promise,無(wú)論成功還是失敗。
為了成功結(jié)束一個(gè) Promise,你可以調(diào)用它的 success
方法,并傳遞一個(gè)大家期許的結(jié)果:
taxcut.success(TaxCut(20))
這樣做之后,Promise 就無(wú)法再寫(xiě)入其他值了,如果偏要再寫(xiě),會(huì)產(chǎn)生異常。
此時(shí),和 Promise 關(guān)聯(lián)的 Future 也成功完成,注冊(cè)的回調(diào)會(huì)開(kāi)始執(zhí)行,或者說(shuō)對(duì)這個(gè) Future 進(jìn)行了映射,那這個(gè)時(shí)候,映射函數(shù)也該執(zhí)行了。
一般來(lái)說(shuō),Promise 的完成和對(duì)返回的 Future 的處理發(fā)生在不同的線(xiàn)程。很可能你創(chuàng)建了 Promise,并立即返回和它關(guān)聯(lián)的 Future 給調(diào)用者,而實(shí)際上,另外一個(gè)線(xiàn)程還在計(jì)算它。
為了說(shuō)明這一點(diǎn),我們拿減稅來(lái)舉個(gè)例子:
object Government {
def redeemCampaignPledge(): Future[TaxCut] = {
val p = Promise[TaxCut]()
Future {
println("Starting the new legislative period.")
Thread.sleep(2000)
p.success(TaxCut(20))
println("We reduced the taxes! You must reelect us!!!!1111")
}
p.future
}
}
這個(gè)例子中使用了 Future 伴生對(duì)象,不過(guò)不要被它搞混淆了,這個(gè)例子的重點(diǎn)是:Promise 并不是在調(diào)用者的線(xiàn)程里完成的。
現(xiàn)在我們來(lái)兌現(xiàn)當(dāng)初的競(jìng)選宣言,在 Future 上添加一個(gè) onComplete
回調(diào):
import scala.util.{Success, Failure}
val taxCutF: Future[TaxCut] = Government.redeemCampaignPledge()
println("Now that they're elected, let's see if they remember their promises...")
taxCutF.onComplete {
case Success(TaxCut(reduction)) =>
println(s"A miracle! They really cut our taxes by $reduction percentage points!")
case Failure(ex) =>
println(s"They broke their promises! Again! Because of a ${ex.getMessage}")
}
多次運(yùn)行這個(gè)例子,會(huì)發(fā)現(xiàn)顯示屏輸出的結(jié)果順序是不確定的,而且,最終回調(diào)函數(shù)會(huì)執(zhí)行,進(jìn)入成功的那個(gè) case 。
政客習(xí)慣違背諾言,Scala 程序員有時(shí)候也只能這樣做。調(diào)用 failure
方法,傳遞一個(gè)異常,結(jié)束 Promise:
case class LameExcuse(msg: String) extends Exception(msg)
object Government {
def redeemCampaignPledge(): Future[TaxCut] = {
val p = Promise[TaxCut]()
Future {
println("Starting the new legislative period.")
Thread.sleep(2000)
p.failure(LameExcuse("global economy crisis"))
println("We didn't fulfill our promises, but surely they'll understand.")
}
p.future
}
}
這個(gè) redeemCampaignPledge
實(shí)現(xiàn)最終會(huì)違背承諾。一旦用 failure
結(jié)束這個(gè) Promise,也無(wú)法再次寫(xiě)入了,正如 success
方法一樣。相關(guān)聯(lián)的 Future 也會(huì)以 Failure
收?qǐng)觥?/p>
如果已經(jīng)有了一個(gè) Try,那可以直接把它傳遞給 Promise 的 complete
方法,以此來(lái)結(jié)束這個(gè)它。如果這個(gè) Try 是一個(gè) Success,關(guān)聯(lián)的 Future 會(huì)成功完成,否則,就失敗。
如果想使用基于 Future 的編程范式以增加應(yīng)用的擴(kuò)展性,那應(yīng)用從下到上都必須被設(shè)計(jì)成非阻塞模式。這意味著,基本上應(yīng)用層所有的函數(shù)都應(yīng)該是異步的,并且返回 Future。
當(dāng)下,一個(gè)可能的使用場(chǎng)景是開(kāi)發(fā) Web 應(yīng)用。流行的 Scala Web 框架,允許你將響應(yīng)作為 Future[Response]
返回,而不是等到你完成響應(yīng)再返回。這個(gè)非常重要,因?yàn)樗试S Web 服務(wù)器用少量的線(xiàn)程處理更多的連接。通過(guò)賦予服務(wù)器 Future[Response]
的能力,你可以最大化服務(wù)器線(xiàn)程池的利用率。
而且,應(yīng)用的服務(wù)可能需要多次調(diào)用數(shù)據(jù)庫(kù)層以及(或者)某些外部服務(wù),這時(shí)候可以獲取多個(gè) Future,用 for 語(yǔ)句將它們組合成新的 Future,簡(jiǎn)單可讀!最終,Web 層再將這樣的一個(gè) Future 變成 Future[Response]
。
但是該怎樣在實(shí)踐中實(shí)現(xiàn)這些呢?需要考慮三種不同的場(chǎng)景:
應(yīng)用很可能涉及到大量的 IO 操作。比如,可能需要和數(shù)據(jù)庫(kù)交互,還可能作為客戶(hù)端去調(diào)用其他的 Web 服務(wù)。
如果是這樣,可以使用一些基于 Java 非阻塞 IO 實(shí)現(xiàn)的庫(kù),也可以直接或通過(guò) Netty 這樣的庫(kù)來(lái)使用 Java 的 NIO API。這樣的庫(kù)可以用定量的線(xiàn)程池處理大量的連接。
但如果是想開(kāi)發(fā)這樣的一個(gè)庫(kù),直接和 Promise 打交道更為合適。
有時(shí)候,并沒(méi)有基于 NIO 的庫(kù)可用。比如,Java 世界里大多數(shù)的數(shù)據(jù)庫(kù)驅(qū)動(dòng)都是使用阻塞 IO。在 Web 應(yīng)用中,如果用這樣的驅(qū)動(dòng)發(fā)起大量訪(fǎng)問(wèn)數(shù)據(jù)庫(kù)的調(diào)用,要記得這些調(diào)用是發(fā)生在服務(wù)器線(xiàn)程里的。為了避免這個(gè)問(wèn)題,可以將所有需要和數(shù)據(jù)庫(kù)交互的代碼都放入 future
代碼塊里,就像這樣:
// get back a Future[ResultSet] or something similar:
Future {
queryDB(query)
}
到現(xiàn)在為止,我們都是使用隱式可用的全局 ExecutionContext
來(lái)執(zhí)行這些代碼塊。通常,更好的方式是創(chuàng)建一個(gè)專(zhuān)用的 ExecutionContext
放在數(shù)據(jù)庫(kù)層里??梢詮?Java的 ExecutorService
來(lái)它,這也意味著,可以異步的調(diào)整線(xiàn)程池來(lái)執(zhí)行數(shù)據(jù)庫(kù)調(diào)用,應(yīng)用的其他部分不受影響。
import java.util.concurrent.Executors
import concurrent.ExecutionContext
val executorService = Executors.newFixedThreadPool(4)
val executionContext = ExecutionContext.fromExecutorService(executorService)
取決于應(yīng)用的本質(zhì)特點(diǎn),一個(gè)應(yīng)用偶爾還會(huì)調(diào)用一些長(zhǎng)時(shí)間運(yùn)行的任務(wù),它們完全不涉及 IO(CPU 密集的任務(wù))。這些任務(wù)也不應(yīng)該在服務(wù)器線(xiàn)程中執(zhí)行,因此需要將它們變成 Future:
Future {
longRunningComputation(data, moreData)
}
同樣,最好有一些專(zhuān)屬的 ExecutionContext
來(lái)處理這些 CPU 密集的計(jì)算。怎樣調(diào)整這些線(xiàn)程池大小取決于應(yīng)用的特征,這些已經(jīng)超過(guò)了本文的范圍。
這一章里,我們學(xué)習(xí)了 Promise - 基于 Future 的并發(fā)范式的可寫(xiě)組件,以及怎樣用它來(lái)完成一個(gè) Future;同時(shí),還給出了一些在實(shí)踐中使用它們的建議。
下一章會(huì)討論 Scala 函數(shù)式編程是如何增加代碼可用性(一個(gè)長(zhǎng)久以來(lái)和面向?qū)ο缶幊滔嚓P(guān)聯(lián)的概念)的。
更多建議: