Actor(執行者)
概念
Actor(執行者)是獨立隔離的活動物件,彼此不共享任何資料,僅透過訊息傳遞進行溝通。避免共享可變狀態可讓開發人員免於許多典型的並行問題,例如活鎖或競爭條件。每個 actor 的主體(程式碼)由執行緒池中的隨機執行緒執行,因此 actor 可以並行且獨立地進行。由於 Actors 可以共享一個相對較小的執行緒池,因此它們避免了 JVM 的執行緒限制,即使在您的應用程式包含數千個 actor 的情況下,也不需要過多的系統資源。
Actors(執行者)通常在其常規任務之上執行三種基本類型的操作
-
建立新的 actor
-
傳送訊息給另一個 actor
-
接收訊息
Actors(執行者)可以建立為特定 actor 類別的子類別,或使用工廠方法,將 actor 的主體作為閉包參數提供。有多種方式可以傳送訊息,可以使用 >> 運算符,或使用任何 send() 、 sendAndWait() 或 sendAndContinua() 方法。
接收訊息可以以阻塞或非阻塞方式執行,當沒有訊息可用時,實體執行緒會返回到池中。
Actors(執行者)可以協調成各種演算法,並有可能利用類似於企業訊息系統中已知的架構模式。 |
生命週期
使用工廠方法建立 Actor(執行者)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Actors.actor {
println "actor1 has started"
delegate.metaClass {
afterStop = {List undeliveredMessages ->
println "actor1 has stopped"
}
onInterrupt = {InterruptedException e ->
println "actor1 has been interrupted"
}
onTimeout = {->
println "actor1 has timed out"
}
onException = {Exception e ->
println "actor1 threw an exception"
}
}
println("Running actor1")
...
}
子類別化 DefaultActor 類別
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class PooledLifeCycleSampleActor extends DefaultActor {
protected void act() {
println("Running actor2")
...
}
private void afterStart() {
println "actor2 has started"
}
private void afterStop(List undeliveredMessages) {
println "actor2 has stopped"
}
private void onInterrupt(InterruptedException e) {
println "actor2 has been interrupted"
}
private void onTimeout() {
println "actor2 has timed out"
}
private void onException(Exception e) {
println "actor2 threw an exception"
}
}
用法
使用工廠方法建立 Actor(執行者)
1
2
3
4
5
6
7
8
9
10
import static groovyx.gpars.actor.Actors.actor
def console = actor {
loop {
react {
println it
}
}
...
}
子類別化 DefaultActor 類別
1
2
3
4
5
6
7
8
9
10
11
12
class CustomActor extends DefaultActor {
@Override protected void act() {
loop {
react {
println it
}
}
}
}
def console=new CustomActor()
console.start()
傳送訊息
1
2
3
4
console.send('Message')
console << 'Message'
console.sendAndContinue 'Message', {reply -> println "I received reply: $reply"}
console.sendAndWait 'Message'
逾時
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import static groovyx.gpars.actor.Actors.actor
def me = actor {
friend.send('Hi')
react(30.seconds) {msg ->
if (msg == Actor.TIMEOUT) {
friend.send('I see, busy as usual. Never mind.')
stop()
} else {
//continue conversation
}
}
}
me.join()
當等待訊息時逾時到期時,會收到 Actor.TIMEOUT 訊息。此外,如果 actor 上存在 onTimeout() 處理常式,也會被呼叫。
1
2
3
4
5
6
7
8
9
10
11
12
13
import static groovyx.gpars.actor.Actors.actor
def me = actor {
delegate.metaClass.onTimeout = {->
friend.send('I see, busy as usual. Never mind.')
stop()
}
friend.send('Hi')
react(30.seconds) {
// Continue conversation.
}
}
me.join()
Actor 群組
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def coreActors = new NonDaemonPGroup(5) //5 non-daemon threads pool
def helperActors = new DefaultPGroup(1) //1 daemon thread pool
def priceCalculator = coreActors.actor {
...
}
def paymentProcessor = coreActors.actor {
...
}
def emailNotifier = helperActors.actor {
...
}
def cleanupActor = helperActors.actor {
...
}
// Increase size of the core actor group.
coreActors.resize 6
// Shutdown the group's pool once you no longer need the group to release resources.
helperActors.shutdown()
DynamicDispatchActor(動態分派執行者)
1
2
3
4
5
6
final Actor actor = new DynamicDispatchActor({
when {String msg -> println 'A String'; reply 'Thanks'}
when {Double msg -> println 'A Double'; reply 'Thanks'}
when {msg -> println 'A something ...'; reply 'What was that?'}
})
actor.start()
Reactor(反應器)
import groovyx.gpars.actor.Actors final def doubler = Actors.reactor { 2 * it }.start()
Agent(代理人)
概念
在 Clojure 程式語言中,您可以找到 Agents(代理人)的概念,其本質上就像接收程式碼(函式)作為訊息的 actor。接收後,接收到的函式會針對 Agent 的內部狀態執行,並且函式的回傳值被視為 Agent 的新內部狀態。基本上,agents(代理人)透過僅允許單個 代理人管理的執行緒 修改它們來保護可變值。可變值無法從外部直接存取,而是必須將請求傳送給 agent(代理人),並且 agent(代理人)保證代表呼叫者依序處理請求。Agents(代理人)保證所有請求的循序執行,因此保證值的ㄧ致性。
用法
Agent(代理人)實作類似 Clojure 的 Agent(代理人)概念
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import groovyx.gpars.agent.Agent
def jugMembers = new Agent<List>(['Me']) // Add Me.
jugMembers.send {it.add 'James'} // Add James.
final Thread t1 = Thread.start{
jugMembers {it.add 'Jo'} // Add Jo --- using the implicit call() method to send the function.
}
final Thread t2 = Thread.start{
jugMembers << {it.add 'Dave'} // Add Dave.
jugMembers << {it.add 'Alice'} // Add Alice.
}
[t1, t2]*.join()
println jugMembers.val
jugMembers.valAsync {println "Current members: $it"}
System.in.read()
jugMembers.stop()
溝通循序程序
概念
CSP(Communicating Sequential Processes,溝通循序程序)並行概念提供了一種具有同步會合類型通訊的訊息傳遞模型。
它主要因其高程度的確定性和組合並行程序的能力而受到重視。
GPars GroovyCSP 包裝了 坎特伯雷大學的 JCSP 函式庫,並以 Jon Kerridge 的工作為基礎。 請在這裡檢閱他的作品。
有關 CSP 並行模型的更多資訊,請查閱使用者指南的 CSP 章節或參考以下連結
-
CSP 定義:Wiki CSP
-
Google 的 Go 程式語言具有 CSP 風格的並行:Google 的 Go
用法
GroovyCSP
看看這個 Groovy API 用於 CSP 風格並行的範例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import groovyx.gpars.csp.PAR
import org.jcsp.lang.CSProcess
import org.jcsp.lang.Channel
import org.jcsp.lang.ChannelOutput
import org.jcsp.lang.One2OneChannel
import groovyx.gpars.csp.plugAndPlay.GPrefix
import groovyx.gpars.csp.plugAndPlay.GPCopy
import groovyx.gpars.csp.plugAndPlay.GPairs
import groovyx.gpars.csp.plugAndPlay.GPrint
class FibonacciV2Process implements CSProcess {
ChannelOutput outChannel
void run() {
One2OneChannel a = Channel.createOne2One()
One2OneChannel b = Channel.createOne2One()
One2OneChannel c = Channel.createOne2One()
One2OneChannel d = Channel.createOne2One()
new PAR([
new GPrefix(prefixValue: 0, inChannel: d.in(), outChannel: a.out()),
new GPrefix(prefixValue: 1, inChannel: c.in(), outChannel: d.out()),
new GPCopy(inChannel: a.in(), outChannel0: b.out(), outChannel1: outChannel),
new GPairs(inChannel: b.in(), outChannel: c.out()),
]).run()
}
}
One2OneChannel N2P = Channel.createOne2One()
new PAR([
new FibonacciV2Process(outChannel: N2P.out()),
new GPrint(inChannel: N2P.in(), heading: "Fibonacci Numbers")
]).run()
資料流並行
概念
Dataflow Concurrency(資料流並行)提供了一種替代的並行模型,它本質上是安全且穩健的。它強調資料及其在您的程序中的流動,而不是實際操作資料的程序。 Dataflow(資料流)演算法可讓開發人員免於處理活鎖、競爭條件,並使死鎖具有確定性,因此可 100% 重複產生。如果您在測試中沒有遇到死鎖,那麼在生產環境中也不會遇到死鎖。
Dataflow Variable(資料流變數)
一種單一賦值的多讀變數,可提供執行緒之間安全的資料交換。
Dataflows 類別
一個虛擬無限的 Dataflow Variables(資料流變數)映射,具有按需建立原則。
Dataflow Stream(資料流串流)
一個具有 Dataflow Variable(資料流變數)相容介面的執行緒安全、無限制的確定性阻塞串流。
Dataflow Queue(資料流佇列)
一個具有 Dataflow Variable(資料流變數)相容介面的執行緒安全、無限制的阻塞佇列。
Dataflow Task(資料流任務)
一個輕量級的執行緒,它從執行緒池中分配到一個實體執行緒以執行任務的主體。任務通常應使用*資料流變數*和*串流*交換資料。
Dataflow Operator(資料流運算子)
更徹底的資料流並行演算法的基石。此類演算法通常定義許多運算子,並使用由資料流串流、佇列或變數表示的通道將它們連接起來。
每個運算子都指定其輸入和輸出通道,以與其他運算子通訊。重複地,每當特定運算子的所有輸入通道都包含資料時,就會執行運算子的主體,並將產生的輸出傳送到輸出通道中。
用法
資料流變數
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import static groovyx.gpars.dataflow.Dataflow.task
final def x = new DataflowVariable()
final def y = new DataflowVariable()
final def z = new DataflowVariable()
task{
z << x.val + y.val
println "Result: ${z.val}"
}
task{
x << 10
}
task{
y << 5
}
資料流
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import static groovyx.gpars.dataflow.Dataflow.task
final def df = new Dataflows()
task{
df.z = df.x + df.y
println "Result: ${df.z}"
}
task{
df.x = 10
}
task{
df.y = 5
}
資料流佇列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import static groovyx.gpars.dataflow.Dataflow.task
def words = ['Groovy', 'fantastic', 'concurrency', 'fun', 'enjoy', 'safe', 'GPars', 'data', 'flow']
final def buffer = new DataflowQueue()
task{
for (word in words) {
buffer << word.toUpperCase() // Add to the buffer.
}
}
task{
while(true) println buffer.val // Read from the buffer in a loop.
}
綁定處理常式
1
2
3
4
def a = new DataflowVariable()
a >> {println "The variable has just been bound to $it"}
a.whenBound{println "Just to confirm that the variable has been really set to $it"}
資料流運算子
1
2
3
4
operator(inputs: [a, b, c], outputs: [d]) {x, y, z ->
...
bindOutput 0, x + y + z
}
Fork/Join(分叉/合併)
概念
Fork/Join(分叉/合併)或分治法是一種非常強大的抽象概念,用於解決階層式問題。在談論階層式問題時,請考慮快速排序、合併排序、檔案系統或通用樹狀導覽等等。
-
Fork / Join(分叉/合併)演算法基本上將手邊的問題分成幾個較小的子問題,並以遞迴方式將相同的演算法應用於每個子問題。
-
一旦子問題夠小,它就會直接解決。
-
所有子問題的解決方案會組合起來以解決其父問題,這反過來又有助於解決其自身的父問題。
用法
使用 Fork-Join(分叉-合併)建構器
請隨意嘗試池中分叉/合併執行緒的數量 |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
withPool(1){pool ->
println """Number of files: ${
runForkJoin(new File("./src")) {file ->
long count = 0
file.eachFile {
if (it.isDirectory()) {
println "Forking a child task for $it"
// Fork a child task.
forkOffChild(it)
} else {
count++
}
}
// Use results of children tasks to calculate and store own result.
return count + (childrenResults.sum(0))
}
}"""
}
擴充 AbstractForkJoinWorker 類別
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public final class FileCounter extends AbstractForkJoinWorker<Long> {
private final File file;
def FileCounter(final File file) {
this.file = file
}
protected void compute() {
long count = 0;
file.eachFile{
if (it.isDirectory()) {
println "Forking a thread for $it"
// Fork a child task.
forkOffChild(new FileCounter(it))
}
else {
count++
}
}
// Use results of children tasks to calculate and store own result.
setResult(count + ((childrenResults)?.sum() ?: 0))
}
}
withPool(1){pool -> // Feel free to experiment with the number of fork/join threads in the pool.
println "Number of files: ${orchestrate(new FileCounter(new File("..")))}"
}
Fork/Join Pool(分叉/合併池)
概念
處理資料通常涉及操作集合。列表、陣列、集合、映射、迭代器、字串和許多其他資料類型可以被視為項目集合。處理此類集合的常見模式是循序一個接一個地取得元素,並針對每個項目執行動作。
例如,假設 min() 函式應傳回集合中的最小元素。當您在數字集合上呼叫 min() 方法時,呼叫執行緒將會建立一個累加器或至今為止最小的值,其初始化為給定類型的最小值,例如為零。然後,執行緒將會迭代集合的元素,並將它們與累加器中的值進行比較。一旦所有元素都被處理完畢,最小值就會儲存在累加器中。
這個演算法有效地浪費了晶片 75% 的運算能力 |
然而,這個演算法雖然簡單,但在多核心硬體上是完全錯誤的。在雙核心晶片上執行 min() 函式最多只能利用晶片 50% 的運算能力。在四核心晶片上,它僅為 25%。正確,這個演算法有效地浪費了晶片 75% 的運算能力。
這裡應該使用樹狀結構
樹狀結構被證明更適合平行處理。我們範例中的 min() 函式不需要迭代所有元素,並將其值與累加器進行比較。相反,它可以做的是依靠您硬體的多核心特性。
例如,parallel_min() 函式可以比較集合中相鄰值的配對(或特定大小的元組),並將元組中的最小值提升到下一輪比較。
在不同元組中搜尋最小值可以安全地平行進行,因此同一輪中的元組可以由不同的核心同時處理,而執行緒之間沒有競爭或衝突。
用法
平行集合處理
目前在 Groovy 中的所有物件上都支援下列方法
-
eachParallel()
-
eachWithIndexParallel()
-
collectParallel()
-
findAllParallel()
-
findParallel()
-
everyParallel()
-
anyParallel()
-
grepParallel()
-
groupByParallel()
-
foldParallel()
-
minParallel()
-
maxParallel()
-
sumParallel()
1
2
3
4
5
6
7
8
9
10
11
ForkJoinPool.withPool{
final AtomicInteger result = new AtomicInteger(0)
[1, 2, 3, 4, 5].eachParallel{result.addAndGet(it)}
assert 15 == result
}
// Multiply numbers asynchronously.
ForkJoinPool.withPool{
final List result = [1, 2, 3, 4, 5].collectParallel{it * 2}
assert ([2, 4, 6, 8, 10].equals(result))
}
元類別增強器
1
2
3
4
5
6
7
import groovyx.gpars.ParallelEnhancer
def list = [1, 2, 3, 4, 5, 6, 7, 8, 9]
ParallelEnhancer.enhanceInstance(list)
println list.collectParallel{it * 2 }
透明的平行集合
1
2
3
4
5
6
7
8
9
10
11
ForkJoinPool.withPool{
assert ['ALICE', 'JASON'] == selectImportantNames(['Joe', 'Alice', 'Dave', 'Jason'].makeConcurrent())
}
/**
* A function implemented using standard sequential collect() and findAll() methods.
*/
def selectImportantNames(names) {
names.collect{it.toUpperCase()}.findAll{it.size() > 4}
}
Map/Reduce(映射/化簡)
可用的方法
-
map()
-
reduce()
-
filter()
-
size()
-
sum()
-
min()
-
max()
collection 屬性會傳回包裝在 Groovy 集合執行個體中的所有元素。
1
2
3
4
5
6
println 'Number of occurrences of the word GROOVY today: ' + urls.parallel
.map{it.toURL().text.toUpperCase()}
.filter{it.contains('GROOVY')}
.map{it.split()}
.map{it.findAll{word -> word.contains 'GROOVY'}.size()}
.sum()
軟體交易記憶體
概念
軟體交易記憶體 或 STM,為開發人員提供交易語意來存取記憶體中的資料。當多個執行緒共享記憶體中的資料時,透過將程式碼區塊標記為交易式(原子),開發人員將資料一致性的責任委派給 Stm 引擎。 GPars 利用 Multiverse STM 引擎。
原子閉包
GPars 允許開發人員將其並行程式碼結構化為原子區塊(閉包),然後將這些區塊作為單個單元執行,以保留交易的 ACI(原子性、一致性、隔離性)屬性。
用法
以原子方式執行一段程式碼
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import groovyx.gpars.stm.GParsStm
import org.multiverse.api.references.TxnInteger
import static org.multiverse.api.StmUtils.newTxnInteger
public class Account {
private final TxnInteger amount = newTxnInteger(0);
public void transfer(final int a) {
GParsStm.atomic {
amount.increment(a);
}
}
public int getCurrentAmount() {
GParsStm.atomicWithInt {
amount.get();
}
}
}
自訂交易屬性
1
2
3
4
5
6
7
8
9
import groovyx.gpars.stm.GParsStm
import org.multiverse.api.AtomicBlock
import org.multiverse.api.PropagationLevel
final TxnExecutor block = GParsStm.createTxnExecutor(maxRetries: 3000, familyName: 'Custom', PropagationLevel: PropagationLevel.Requires, interruptible: false)
assert GParsStm.atomicWithBoolean(block) {
true
}
ThreadPool(執行緒池)
概念
在多核心系統上,您可以從在背景非同步執行某些任務中獲益,因此可以卸載您主要的執行緒。 ThreadPool(執行緒池)類別允許您輕鬆地在背景中啟動要非同步執行的任務,並在稍後收集結果。
用法
使用 ThreadPool(執行緒池) - 基於 Java Executor 的並行集合處理器
閉包增強功能
1
2
3
4
5
6
7
8
9
10
11
12
GParsExecutorsPool.withPool() {
Closure longLastingCalculation = {calculate()}
// Create a new closure, which starts the original closure on a thread pool.
Closure fastCalculation = longLastingCalculation.async()
// Returns almost immediately.
Future result=fastCalculation()
// Do stuff while calculation performs...
println result.get()
}
1
2
3
4
5
6
7
8
GParsExecutorsPool.withPool() {
/**
* The callAsync() method is an asynchronous variant of the default call() method
* to invoke a closure. It will return a Future for the result value.
*/
assert 6 == {it * 2}.call(3).get()
assert 6 == {it * 2}.callAsync(3).get()
}
Executor Service(執行器服務)增強功能
1
2
3
GParsExecutorsPool.withPool {ExecutorService executorService ->
executorService << {println 'Inside parallel task'}
}
非同步函式處理
1
2
3
4
5
6
7
8
GParsExecutorsPool.withPool {
// Waits for results.
assert [10, 20] == AsyncInvokerUtil.doInParallel({calculateA()}, {calculateB()})
// Returns a Future and doesn't wait for results to be calculated.
assert [10, 20] == AsyncInvokerUtil.executeAsync({calculateA()}, {calculateB()})*.get()
}