GPars Logo

要下載此文件為 PDF 格式 - 請點擊這裡

Actor(執行者)

概念

Actor(執行者)是獨立隔離的活動物件,彼此不共享任何資料,僅透過訊息傳遞進行溝通。避免共享可變狀態可讓開發人員免於許多典型的並行問題,例如活鎖或競爭條件。每個 actor 的主體(程式碼)由執行緒池中的隨機執行緒執行,因此 actor 可以並行且獨立地進行。由於 Actors 可以共享一個相對較小的執行緒池,因此它們避免了 JVM 的執行緒限制,即使在您的應用程式包含數千個 actor 的情況下,也不需要過多的系統資源。


Actors(執行者)通常在其常規任務之上執行三種基本類型的操作

  • 建立新的 actor

  • 傳送訊息給另一個 actor

  • 接收訊息

Actors(執行者)可以建立為特定 actor 類別的子類別,或使用工廠方法,將 actor 的主體作為閉包參數提供。有多種方式可以傳送訊息,可以使用 >> 運算符,或使用任何 send()sendAndWait()sendAndContinua() 方法。

接收訊息可以以阻塞或非阻塞方式執行,當沒有訊息可用時,實體執行緒會返回到池中。


Actors(執行者)可以協調成各種演算法,並有可能利用類似於企業訊息系統中已知的架構模式。

生命週期

使用工廠方法建立 Actor(執行者)

建立 Actor(執行者)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Actors.actor {
    println "actor1 has started"
    delegate.metaClass {
        afterStop = {List undeliveredMessages ->
            println "actor1 has stopped"
        }
        onInterrupt = {InterruptedException e ->
            println "actor1 has been interrupted"
        }
        onTimeout = {->
            println "actor1 has timed out"
        }
        onException = {Exception e ->
            println "actor1 threw an exception"
        }
    }
    println("Running actor1")
    ...
}

子類別化 DefaultActor 類別

子類別化 Actor(執行者)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class PooledLifeCycleSampleActor extends DefaultActor {
    protected void act() {
        println("Running actor2")
        ...
    }
    private void afterStart() {
        println "actor2 has started"
    }
    private void afterStop(List undeliveredMessages) {
        println "actor2 has stopped"
    }
    private void onInterrupt(InterruptedException e) {
        println "actor2 has been interrupted"
    }
    private void onTimeout() {
        println "actor2 has timed out"
    }
    private void onException(Exception e) {
        println "actor2 threw an exception"
    }
}

用法

使用工廠方法建立 Actor(執行者)

來自工廠的 Actor(執行者)
1
2
3
4
5
6
7
8
9
10
import static groovyx.gpars.actor.Actors.actor

def console = actor {
    loop {
        react {
            println it
        }
    }
    ...
}

子類別化 DefaultActor 類別

子類別化 DefaultActor
1
2
3
4
5
6
7
8
9
10
11
12
class CustomActor extends DefaultActor {
    @Override protected void act() {
        loop {
            react {
                println it
            }
        }
    }
}

def console=new CustomActor()
console.start()

傳送訊息

Actors(執行者)的訊息
1
2
3
4
console.send('Message')
console << 'Message'
console.sendAndContinue 'Message', {reply -> println "I received reply: $reply"}
console.sendAndWait 'Message'

逾時

如何處理計時問題
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import static groovyx.gpars.actor.Actors.actor

def me = actor {
    friend.send('Hi')
    react(30.seconds) {msg ->
        if (msg == Actor.TIMEOUT) {
            friend.send('I see, busy as usual. Never mind.')
            stop()
        } else {
            //continue conversation
        }
    }
}
me.join()

當等待訊息時逾時到期時,會收到 Actor.TIMEOUT 訊息。此外,如果 actor 上存在 onTimeout() 處理常式,也會被呼叫。

當 Actor(執行者)逾時時會發生什麼
1
2
3
4
5
6
7
8
9
10
11
12
13
import static groovyx.gpars.actor.Actors.actor

def me = actor {
    delegate.metaClass.onTimeout = {->
        friend.send('I see, busy as usual. Never mind.')
        stop()
    }
    friend.send('Hi')
    react(30.seconds) {
        // Continue conversation.
    }
}
me.join()

Actor 群組

一群 Actors(執行者)稱為什麼?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def coreActors = new NonDaemonPGroup(5)  //5 non-daemon threads pool
def helperActors = new DefaultPGroup(1)  //1 daemon thread pool
def priceCalculator = coreActors.actor {
...
}
def paymentProcessor = coreActors.actor {
...
}
def emailNotifier = helperActors.actor {
...
}
def cleanupActor = helperActors.actor {
...
}

// Increase size of the core actor group.
coreActors.resize 6

// Shutdown the group's pool once you no longer need the group to release resources.
helperActors.shutdown()

DynamicDispatchActor(動態分派執行者)

動態分派
1
2
3
4
5
6
final Actor actor = new DynamicDispatchActor({
    when {String msg -> println 'A String'; reply 'Thanks'}
    when {Double msg -> println 'A Double'; reply 'Thanks'}
    when {msg -> println 'A something ...'; reply 'What was that?'}
})
actor.start()

Reactor(反應器)

Actors(執行者)反應時
import groovyx.gpars.actor.Actors

final def doubler = Actors.reactor {
    2 * it
}.start()


Agent(代理人)

概念

Clojure 程式語言中,您可以找到 Agents(代理人)的概念,其本質上就像接收程式碼(函式)作為訊息的 actor。接收後,接收到的函式會針對 Agent 的內部狀態執行,並且函式的回傳值被視為 Agent 的新內部狀態。基本上,agents(代理人)透過僅允許單個 代理人管理的執行緒 修改它們來保護可變值。可變值無法從外部直接存取,而是必須將請求傳送給 agent(代理人),並且 agent(代理人)保證代表呼叫者依序處理請求。Agents(代理人)保證所有請求的循序執行,因此保證值的ㄧ致性。


用法

Agent(代理人)實作類似 Clojure 的 Agent(代理人)概念

一個 Agent(代理人)範例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import groovyx.gpars.agent.Agent

def jugMembers = new Agent<List>(['Me'])  // Add Me.
jugMembers.send {it.add 'James'}  // Add James.

final Thread t1 = Thread.start{
    jugMembers {it.add 'Jo'}  // Add Jo --- using the implicit call() method to send the function.
}

final Thread t2 = Thread.start{
    jugMembers << {it.add 'Dave'}  // Add Dave.
    jugMembers << {it.add 'Alice'}  // Add Alice.
}

[t1, t2]*.join()

println jugMembers.val
jugMembers.valAsync {println "Current members: $it"}
System.in.read()
jugMembers.stop()


溝通循序程序

概念

CSPCommunicating Sequential Processes,溝通循序程序)並行概念提供了一種具有同步會合類型通訊的訊息傳遞模型。

它主要因其高程度的確定性和組合並行程序的能力而受到重視。

GPars GroovyCSP 包裝了 坎特伯雷大學的 JCSP 函式庫,並以 Jon Kerridge 的工作為基礎。 請在這裡檢閱他的作品。

有關 CSP 並行模型的更多資訊,請查閱使用者指南的 CSP 章節或參考以下連結


用法

GroovyCSP

看看這個 Groovy API 用於 CSP 風格並行的範例

您如何編寫這個程式碼?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import groovyx.gpars.csp.PAR

import org.jcsp.lang.CSProcess
import org.jcsp.lang.Channel
import org.jcsp.lang.ChannelOutput
import org.jcsp.lang.One2OneChannel

import groovyx.gpars.csp.plugAndPlay.GPrefix
import groovyx.gpars.csp.plugAndPlay.GPCopy
import groovyx.gpars.csp.plugAndPlay.GPairs
import groovyx.gpars.csp.plugAndPlay.GPrint

class FibonacciV2Process implements CSProcess {
    ChannelOutput outChannel

    void run() {
        One2OneChannel a = Channel.createOne2One()
        One2OneChannel b = Channel.createOne2One()
        One2OneChannel c = Channel.createOne2One()
        One2OneChannel d = Channel.createOne2One()
        new PAR([
            new GPrefix(prefixValue: 0, inChannel: d.in(), outChannel: a.out()),
            new GPrefix(prefixValue: 1, inChannel: c.in(), outChannel: d.out()),
            new GPCopy(inChannel: a.in(), outChannel0: b.out(), outChannel1: outChannel),
            new GPairs(inChannel: b.in(), outChannel: c.out()),
        ]).run()
    }
}

One2OneChannel N2P = Channel.createOne2One()

new PAR([
    new FibonacciV2Process(outChannel: N2P.out()),
    new GPrint(inChannel: N2P.in(), heading: "Fibonacci Numbers")
]).run()


資料流並行

概念

Dataflow Concurrency(資料流並行)提供了一種替代的並行模型,它本質上是安全且穩健的。它強調資料及其在您的程序中的流動,而不是實際操作資料的程序。 Dataflow(資料流)演算法可讓開發人員免於處理活鎖、競爭條件,並使死鎖具有確定性,因此可 100% 重複產生。如果您在測試中沒有遇到死鎖,那麼在生產環境中也不會遇到死鎖。

Dataflow Variable(資料流變數)

一種單一賦值的多讀變數,可提供執行緒之間安全的資料交換。

Dataflows 類別

一個虛擬無限的 Dataflow Variables(資料流變數)映射,具有按需建立原則。

Dataflow Stream(資料流串流)

一個具有 Dataflow Variable(資料流變數)相容介面的執行緒安全、無限制的確定性阻塞串流。

Dataflow Queue(資料流佇列)

一個具有 Dataflow Variable(資料流變數)相容介面的執行緒安全、無限制的阻塞佇列。

Dataflow Task(資料流任務)

一個輕量級的執行緒,它從執行緒池中分配到一個實體執行緒以執行任務的主體。任務通常應使用*資料流變數*和*串流*交換資料。

Dataflow Operator(資料流運算子)

更徹底的資料流並行演算法的基石。此類演算法通常定義許多運算子,並使用由資料流串流、佇列變數表示的通道將它們連接起來。

每個運算子都指定其輸入和輸出通道,以與其他運算子通訊。重複地,每當特定運算子的所有輸入通道都包含資料時,就會執行運算子的主體,並將產生的輸出傳送到輸出通道中。


用法

資料流變數

範例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import static groovyx.gpars.dataflow.Dataflow.task

final def x = new DataflowVariable()
final def y = new DataflowVariable()
final def z = new DataflowVariable()
task{
    z << x.val + y.val
    println "Result: ${z.val}"
}

task{
    x << 10
}

task{
    y << 5
}

資料流

範例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import static groovyx.gpars.dataflow.Dataflow.task
final def df = new Dataflows()

task{
    df.z = df.x + df.y
    println "Result: ${df.z}"
}

task{
    df.x = 10
}

task{
    df.y = 5
}

資料流佇列

範例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import static groovyx.gpars.dataflow.Dataflow.task

def words = ['Groovy', 'fantastic', 'concurrency', 'fun', 'enjoy', 'safe', 'GPars', 'data', 'flow']
final def buffer = new DataflowQueue()

task{
    for (word in words) {
        buffer << word.toUpperCase()  // Add to the buffer.
    }
}

task{
    while(true) println buffer.val  // Read from the buffer in a loop.
}

綁定處理常式

範例
1
2
3
4
def a = new DataflowVariable()
a >> {println "The variable has just been bound to $it"}

a.whenBound{println "Just to confirm that the variable has been really set to $it"}

資料流運算子

範例
1
2
3
4
operator(inputs: [a, b, c], outputs: [d]) {x, y, z ->
    ...
    bindOutput 0, x + y + z
}


Fork/Join(分叉/合併)

概念

Fork/Join(分叉/合併)或分治法是一種非常強大的抽象概念,用於解決階層式問題。在談論階層式問題時,請考慮快速排序、合併排序、檔案系統或通用樹狀導覽等等。

  • Fork / Join(分叉/合併)演算法基本上將手邊的問題分成幾個較小的子問題,並以遞迴方式將相同的演算法應用於每個子問題。

  • 一旦子問題夠小,它就會直接解決。

  • 所有子問題的解決方案會組合起來以解決其父問題,這反過來又有助於解決其自身的父問題。


用法

使用 Fork-Join(分叉-合併)建構器

請隨意嘗試池中分叉/合併執行緒的數量
範例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
withPool(1){pool ->

    println """Number of files: ${

        runForkJoin(new File("./src")) {file ->
            long count = 0

            file.eachFile {
                if (it.isDirectory()) {
                    println "Forking a child task for $it"
                    // Fork a child task.
                    forkOffChild(it)
                } else {
                    count++
                }
            }

            // Use results of children tasks to calculate and store own result.
            return count + (childrenResults.sum(0))
        }
    }"""
}

擴充 AbstractForkJoinWorker 類別

範例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public final class FileCounter extends AbstractForkJoinWorker<Long> {
    private final File file;

    def FileCounter(final File file) {
        this.file = file
    }

    protected void compute() {
        long count = 0;
        file.eachFile{
            if (it.isDirectory()) {
                println "Forking a thread for $it"
                // Fork a child task.
                forkOffChild(new FileCounter(it))
            }
            else {
                count++
            }
        }

        // Use results of children tasks to calculate and store own result.
        setResult(count + ((childrenResults)?.sum() ?: 0))
    }
}

withPool(1){pool ->  // Feel free to experiment with the number of fork/join threads in the pool.
    println "Number of files: ${orchestrate(new FileCounter(new File("..")))}"
}


Fork/Join Pool(分叉/合併池)

概念

處理資料通常涉及操作集合。列表、陣列、集合、映射、迭代器、字串和許多其他資料類型可以被視為項目集合。處理此類集合的常見模式是循序一個接一個地取得元素,並針對每個項目執行動作。

例如,假設 min() 函式應傳回集合中的最小元素。當您在數字集合上呼叫 min() 方法時,呼叫執行緒將會建立一個累加器或至今為止最小的值,其初始化為給定類型的最小值,例如為零。然後,執行緒將會迭代集合的元素,並將它們與累加器中的值進行比較。一旦所有元素都被處理完畢,最小值就會儲存在累加器中。

這個演算法有效地浪費了晶片 75% 的運算能力

然而,這個演算法雖然簡單,但在多核心硬體上是完全錯誤的。在雙核心晶片上執行 min() 函式最多只能利用晶片 50% 的運算能力。在四核心晶片上,它僅為 25%。正確,這個演算法有效地浪費了晶片 75% 的運算能力。

這裡應該使用樹狀結構

樹狀結構被證明更適合平行處理。我們範例中的 min() 函式不需要迭代所有元素,並將其值與累加器進行比較。相反,它可以做的是依靠您硬體的多核心特性。

例如,parallel_min() 函式可以比較集合中相鄰值的配對(或特定大小的元組),並將元組中的最小值提升到下一輪比較。

在不同元組中搜尋最小值可以安全地平行進行,因此同一輪中的元組可以由不同的核心同時處理,而執行緒之間沒有競爭或衝突。


用法

平行集合處理

目前在 Groovy 中的所有物件上都支援下列方法

  • eachParallel()

  • eachWithIndexParallel()

  • collectParallel()

  • findAllParallel()

  • findParallel()

  • everyParallel()

  • anyParallel()

  • grepParallel()

  • groupByParallel()

  • foldParallel()

  • minParallel()

  • maxParallel()

  • sumParallel()

使用此範例同時總結數字
1
2
3
4
5
6
7
8
9
10
11
ForkJoinPool.withPool{
    final AtomicInteger result = new AtomicInteger(0)
    [1, 2, 3, 4, 5].eachParallel{result.addAndGet(it)}
    assert 15 == result
}

// Multiply numbers asynchronously.
ForkJoinPool.withPool{
    final List result = [1, 2, 3, 4, 5].collectParallel{it * 2}
    assert ([2, 4, 6, 8, 10].equals(result))
}

元類別增強器

範例
1
2
3
4
5
6
7
import groovyx.gpars.ParallelEnhancer

def list = [1, 2, 3, 4, 5, 6, 7, 8, 9]

ParallelEnhancer.enhanceInstance(list)

println list.collectParallel{it * 2 }

透明的平行集合

selectImportantNames() 方法同時處理名稱集合。
1
2
3
4
5
6
7
8
9
10
11
ForkJoinPool.withPool{

    assert ['ALICE', 'JASON'] == selectImportantNames(['Joe', 'Alice', 'Dave', 'Jason'].makeConcurrent())
}

/**
 * A function implemented using standard sequential collect() and findAll() methods.
 */
def selectImportantNames(names) {
    names.collect{it.toUpperCase()}.findAll{it.size() > 4}
}

Map/Reduce(映射/化簡)

可用的方法

  • map()

  • reduce()

  • filter()

  • size()

  • sum()

  • min()

  • max()

collection 屬性會傳回包裝在 Groovy 集合執行個體中的所有元素。

範例
1
2
3
4
5
6
println 'Number of occurrences of the word GROOVY today: ' + urls.parallel
        .map{it.toURL().text.toUpperCase()}
        .filter{it.contains('GROOVY')}
        .map{it.split()}
        .map{it.findAll{word -> word.contains 'GROOVY'}.size()}
        .sum()


軟體交易記憶體

概念

軟體交易記憶體STM,為開發人員提供交易語意來存取記憶體中的資料。當多個執行緒共享記憶體中的資料時,透過將程式碼區塊標記為交易式(原子),開發人員將資料一致性的責任委派給 Stm 引擎。 GPars 利用 Multiverse STM 引擎

原子閉包

GPars 允許開發人員將其並行程式碼結構化為原子區塊(閉包),然後將這些區塊作為單個單元執行,以保留交易的 ACI(原子性、一致性、隔離性)屬性。


用法

以原子方式執行一段程式碼

一個原子範例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import groovyx.gpars.stm.GParsStm
import org.multiverse.api.references.TxnInteger

import static org.multiverse.api.StmUtils.newTxnInteger

public class Account {
    private final TxnInteger amount = newTxnInteger(0);

    public void transfer(final int a) {
        GParsStm.atomic {
            amount.increment(a);
        }
    }

    public int getCurrentAmount() {
        GParsStm.atomicWithInt {
            amount.get();
        }
    }
}

自訂交易屬性

範例
1
2
3
4
5
6
7
8
9
import groovyx.gpars.stm.GParsStm
import org.multiverse.api.AtomicBlock
import org.multiverse.api.PropagationLevel

final TxnExecutor block = GParsStm.createTxnExecutor(maxRetries: 3000, familyName: 'Custom', PropagationLevel: PropagationLevel.Requires, interruptible: false)

assert GParsStm.atomicWithBoolean(block) {
    true
}


ThreadPool(執行緒池)

概念

在多核心系統上,您可以從在背景非同步執行某些任務中獲益,因此可以卸載您主要的執行緒。 ThreadPool(執行緒池)類別允許您輕鬆地在背景中啟動要非同步執行的任務,並在稍後收集結果。


用法

使用 ThreadPool(執行緒池) - 基於 Java Executor 的並行集合處理器

閉包增強功能

範例
1
2
3
4
5
6
7
8
9
10
11
12
GParsExecutorsPool.withPool() {
    Closure longLastingCalculation = {calculate()}

    // Create a new closure, which starts the original closure on a thread pool.
    Closure fastCalculation = longLastingCalculation.async()

    // Returns almost immediately.
    Future result=fastCalculation()

    // Do stuff while calculation performs...
    println result.get()
}
另一個範例
1
2
3
4
5
6
7
8
GParsExecutorsPool.withPool() {
    /**
     * The callAsync() method is an asynchronous variant of the default call() method
     * to invoke a closure. It will return a Future for the result value.
     */
    assert 6 == {it * 2}.call(3).get()
    assert 6 == {it * 2}.callAsync(3).get()
}

Executor Service(執行器服務)增強功能

範例
1
2
3
GParsExecutorsPool.withPool {ExecutorService executorService ->
    executorService << {println 'Inside parallel task'}
}

非同步函式處理

範例
1
2
3
4
5
6
7
8
GParsExecutorsPool.withPool {

    // Waits for results.
    assert [10, 20] == AsyncInvokerUtil.doInParallel({calculateA()}, {calculateB()})

    // Returns a Future and doesn't wait for results to be calculated.
    assert [10, 20] == AsyncInvokerUtil.executeAsync({calculateA()}, {calculateB()})*.get()
}