本文概述
什么是并发编程?简而言之, 就是你同时做一件事以上。不要与并行性混淆, 并发是指多个操作序列在重叠的时间段内运行。在编程领域, 并发是一个非常复杂的主题。处理诸如线程和锁之类的构造并避免诸如竞争条件和死锁之类的问题可能非常麻烦, 这使得并发程序难以编写。通过并发, 可以将程序设计为独立的过程, 以特定的组合方式协同工作。这种结构可以平行也可以不平行。但是, 在你的程序中实现这种结构具有许多优点。
在本文中, 我们将研究许多不同的并发模型, 以及如何在为并发设计的各种编程语言中实现它们。
共享可变状态模型
让我们看一个简单的示例, 其中包含一个计数器和两个增加计数器的线程。该程序应该不会太复杂。我们有一个对象, 其中包含一个计数器, 该计数器随着方法的增加而增加, 并通过方法get和两个增加它的线程来对其进行检索。
//
// Counting.java
//
public class Counting {
public static void main(String[] args) throws InterruptedException {
class Counter {
int counter = 0;
public void increment() { counter++; }
public int get() { return counter; }
}
final Counter counter = new Counter();
class CountingThread extends Thread {
public void run() {
for (int x = 0; x < 500000; x++) {
counter.increment();
}
}
}
CountingThread t1 = new CountingThread();
CountingThread t2 = new CountingThread();
t1.start(); t2.start();
t1.join(); t2.join();
System.out.println(counter.get());
}
}
这个幼稚的程序并不像乍看起来那样幼稚。当我多次运行该程序时, 会得到不同的结果。在我的笔记本电脑上执行了三个命令后, 有三个值。
java Counting
553706
java Counting
547818
java Counting
613014
这种不可预测的行为的原因是什么?程序在一处增加计数器, 在使用命令counter ++的方法中增加。如果我们查看命令字节码, 我们将看到它由几部分组成:
- 从内存中读取计数器值
- 在本地增加价值
- 将计数器值存储在内存中
现在我们可以想象在这个序列中会出什么问题。如果我们有两个独立增加计数器的线程, 那么我们可能会遇到这种情况:
- 计数器值为115
- 第一个线程从内存中读取计数器的值(115)
- 第一个线程增加本地计数器值(116)
- 第二个线程从内存中读取计数器的值(115)
- 第二个线程增加本地计数器值(116)
- 第二个线程将本地计数器值保存到内存中(116)
- 第一个线程将本地计数器值保存到内存中(116)
- 计数器的价值是116
在这种情况下, 两个线程交织在一起, 因此计数器值增加了1, 但是计数器值应增加2, 因为每个线程将其增加1。不同的线程交织会影响程序的结果。该程序不可预测的原因是该程序无法控制线程纠缠, 而只能控制操作系统。每次执行程序时, 线程可以以不同的方式缠绕在一起。通过这种方式, 我们向程序引入了意外的不可预测性(不确定性)。
若要解决这种意外的不可预测性(不确定性), 程序必须具有对线程交织的控制。当一个线程在方法中增加时, 另一个线程必须在同一个方法中, 直到第一个线程退出。这样, 我们可以序列化对方法增加的访问。
//
// CountingFixed.java
//
public class CountingFixed {
public static main(String[] args) throws InterruptedException {
class Counter {
int counter = 0;
public synchronized void increase() { counter++; }
public synchronized int get() { return counter; }
}
final Counter counter = new Counter();
class CountingThread extends Thread {
public void run() {
for (int i = 0; i < 500000; i++) {
counter.increment();
}
}
}
CountingThread thread1 = new CountingThread();
CountingThread thread2 = new CountingThread();
thread1.start(); thread2.start();
thread1.join(); thread2.join();
System.out.println(counter.get());
}
}
另一种解决方案是使用可以自动增加的计数器, 这意味着不能将操作分成多个操作。这样, 我们就不需要具有需要同步的代码块。 Java在java.util.concurrent.atomic命名空间中具有原子数据类型, 我们将使用AtomicInteger。
//
// CountingBetter.java
//
import java.util.concurrent.atomic.AtomicInteger;
class CountingBetter {
public static void main(String[] args) throws InterruptedException {
final AtomicInteger counter = new AtomicInteger(0);
class CountingThread extends Thread {
public viod run() {
for (int i = 0; i < 500000; i++) {
counter.incrementAndGet();
}
}
}
CountingThread thread1 = new CountingThread();
CountingThread thread2 = new CoutningThread();
thread1.start(); thread2.start();
thread1.join(); thread2.join();
System.out.println(counter.get());
}
}
原子整数具有我们所需的操作, 因此我们可以使用它代替Counter类。有趣的是, atomicinteger的所有方法都不使用锁定, 因此不存在死锁的可能性, 这有利于程序的设计。
使用同步关键字来同步关键方法应该可以解决所有问题, 对吗?假设我们有两个可以存入, 提取和转移到另一个帐户的帐户。如果同时我们想将钱从一个帐户转移到另一个帐户, 反之亦然, 该怎么办?让我们看一个例子。
//
// Deadlock.java
//
public class Deadlock {
public static void main(String[] args) throws InterruptedException {
class Account {
int balance = 100;
public Account(int balance) { this.balance = balance; }
public synchronized void deposit(int amount) { balance += amount; }
public synchronized boolean withdraw(int amount) {
if (balance >= amount) {
balance -= amount;
return true;
}
return false;
}
public synchronized boolean transfer(Account destination, int amount) {
if (balance >= amount) {
balance -= amount;
synchronized(destination) {
destination.balance += amount;
};
return true;
}
return false;
}
public int getBalance() { return balance; }
}
final Account bob = new Account(200000);
final Account joe = new Account(300000);
class FirstTransfer extends Thread {
public void run() {
for (int i = 0; i < 100000; i++) {
bob.transfer(joe, 2);
}
}
}
class SecondTransfer extends Thread {
public void run() {
for (int i = 0; i < 100000; i++) {
joe.transfer(bob, 1);
}
}
}
FirstTransfer thread1 = new FirstTransfer();
SecondTransfer thread2 = new SecondTransfer();
thread1.start(); thread2.start();
thread1.join(); thread2.join();
System.out.println("Bob's balance: " + bob.getBalance());
System.out.println("Joe's balance: " + joe.getBalance());
}
}
当我在笔记本电脑上运行该程序时, 通常会卡住。为什么会这样?如果仔细观察, 我们可以看到, 当我们进行转账时, 我们将进入同步的转账方法, 并锁定对源帐户上所有同步方法的访问, 然后锁定目标帐户, 从而对它上所有同步方法的访问进行锁定。
想象以下情况:
- 第一个线程通话从Bob的帐户转移到Joe的帐户
- 第二个线程调用从Joe的帐户转移到Bob的帐户
- 第二线程减少了Joe帐户的金额
- 第二个线程将资金存入Bob的帐户, 但等待第一个线程完成转帐。
- 第一线程减少了鲍勃帐户的金额
- 第一个线程将资金存入Joe的帐户, 但等待第二个线程完成转帐。
在这种情况下, 一个线程正在等待另一个线程完成传输, 反之亦然。它们彼此卡住, 程序无法继续。这称为死锁。为避免死锁, 必须以相同顺序锁定帐户。为了解决该问题, 我们将为每个帐户分配一个唯一的编号, 以便我们在转移资金时可以以相同的顺序锁定帐户。
//
// DeadlockFixed.java
//
import java.util.concurrent.atomic.AtomicInteger;
public class DeadlockFixed {
public static void main(String[] args) throws InterruptedException {
final AtomicInteger counter = new AtomicInteger(0);
class Account {
int balance = 100;
int order;
public Account(int balance) {
this.balance = balance;
this.order = counter.getAndIncrement();
}
public synchronized void deposit(int amount) { balance += amount; }
public synchronized boolean withdraw(int amount) {
if (balance >= amount) {
balance -= amount;
return true;
}
return false;
}
public boolean transfer(Account destination, int amount) {
Account first;
Account second;
if (this.order < destination.order) {
first = this;
second = destination;
}
else {
first = destination;
second = this;
}
synchronized(first) {
synchronized(second) {
if (balance >= amount) {
balance -= amount;
destination.balance += amount;
return true;
}
return false;
}
}
}
public synchronized int getBalance() { return balance; }
}
final Account bob = new Account(200000);
final Account joe = new Account(300000);
class FirstTransfer extends Thread {
public void run() {
for (int i = 0; i < 100000; i++) {
bob.transfer(joe, 2);
}
}
}
class SecondTransfer extends Thread {
public void run() {
for (int i = 0; i < 100000; i++) {
joe.transfer(bob, 1);
}
}
}
FirstTransfer thread1 = new FirstTransfer();
SecondTransfer thread2 = new SecondTransfer();
thread1.start(); thread2.start();
thread1.join(); thread2.join();
System.out.println("Bob's balance: " + bob.getBalance());
System.out.println("Joe's balance: " + joe.getBalance());
}
}
由于此类错误的不可预测性, 有时会发生这些错误, 但并非总是如此, 并且很难重现。如果程序的行为异常, 通常是由于并发引起的, 这种并发会导致意外的不确定性。为了避免意外的不确定性, 我们应该预先设计程序, 以考虑所有相互交织的问题。
带有偶然性不确定性的程序示例。
//
// NonDeteminism.java
//
public class NonDeterminism {
public static void main(String[] args) throws InterruptedException {
class Container {
public String value = "Empty";
}
final Container container = new Container();
class FastThread extends Thread {
public void run() {
container.value = "Fast";
}
}
class SlowThread extends Thread {
public void run() {
try {
Thread.sleep(50);
}
catch(Exception e) {}
container.value = "Slow";
}
}
FastThread fast = new FastThread();
SlowThread slow = new SlowThread();
fast.start(); slow.start();
fast.join(); slow.join();
System.out.println(container.value);
}
}
该程序中包含意外的不确定性。将显示在容器中输入的最后一个值。
java NonDeterminism
Slow
较慢的线程稍后将输入该值, 并且该值将被打印(慢速)。但这不是必须的。如果计算机同时执行另一个需要大量CPU资源的程序该怎么办?我们不能保证它将是最后输入值的慢线程, 因为它是由操作系统而不是程序控制的。我们可能会遇到这样的情况, 该程序在一台计算机上运行而另一台计算机上的行为有所不同。此类错误很难发现, 并且会引起开发人员的头痛。由于所有这些原因, 这种并发模型很难正确执行。
功能方式
平行性
让我们看一下功能语言正在使用的另一个模型。例如, 我们将使用Clojure, 可以使用工具Leiningen进行解释。 Clojure是一种非常有趣的语言, 对并发有很好的支持。先前的并发模型具有共享的可变状态。我们使用的类也可能具有隐藏状态, 该状态会发生我们不知道的变异, 因为从它们的API中看不出来。如我们所见, 如果我们不小心, 此模型可能会导致意外的不确定性和死锁。功能语言的数据类型不会发生变化, 因此可以安全地共享它们, 而不会发生更改的风险。函数具有属性以及其他数据类型。可以在程序执行期间创建函数, 并将其作为参数传递给另一个函数, 或者作为函数调用的结果返回。
并发编程的基本原语是未来和希望。 Future在另一个线程中执行一个代码块, 并为该值返回一个对象, 该对象将在执行该块时输入。
;
; future.clj
;
(let [a (future
(println "Started A")
(Thread/sleep 1000)
(println "Finished A")
(+ 1 2))
b (future
(println "Started B")
(Thread/sleep 2000)
(println "Finished B")
(+ 3 4))]
(println "Waiting for futures")
(+ @a @b))
当我执行此脚本时, 输出为:
Started A
Started B
Waiting for futures
Finished A
Finished B
10
在这个例子中, 我们有两个独立执行的未来块。仅当从将来的对象中读取尚不可用的值时, 程序才阻塞。在我们的例子中, 等待两个将来的结果块相加。行为是可预测的(确定性的), 并且由于没有共享的可变状态, 因此总是会得到相同的结果。
用于并发的另一个原语是promise。 Promise是一个容器, 可以在其中放置一个值。当读取promise时, 线程将等待直到promise的值被填满。
;
; promise.clj
;
(def result (promise))
(future (println "The result is: " @result))
(Thread/sleep 2000)
(deliver result 42)
在此示例中, 只要保证不保存价值, 未来将等待打印结果。两秒钟后, promise中将存储要在将来的线程中打印的值42。使用promise可能导致死锁, 而不是将来, 因此请谨慎使用promise。
;
; promise-deadlock.clj
;
(def promise-result (promise))
(def future-result
(future
(println "The result is: " + @promise-result)
13))
(println "Future result is: " @future-result)
(deliver result 42)
在此示例中, 我们使用的是未来的结果和承诺的结果。设置和读取值的顺序是主线程正在等待来自将来线程的值, 而将来线程正在等待来自主线程的值。此行为将是可预测的(确定性的), 并且每次程序执行时都会播放一次, 这使得查找和消除错误更加容易。
使用Future可以使程序继续进行练习, 直到需要执行Future的结果为止。这样可以加快程序执行速度。如果将来有多个处理器, 则可以并行执行具有可预测(确定性)行为的程序(每次给出的结果相同)。这样, 我们可以更好地利用计算机的功能。
;
; fibonacci.clj
;
(defn fibonacci[a]
(if (<= a 2)
1
(+ (fibonacci (- a 1)) (fibonacci (- a 2)))))
(println "Start serial calculation")
(time (println "The result is: " (+ (fibonacci 36) (fibonacci 36))))
(println "Start parallel calculation")
(defn parallel-fibonacci[]
(def result-1 (future (fibonacci 36)))
(def result-2 (future (fibonacci 36)))
(+ @result-1 @result-2))
(time (println "The result is: " (parallel-fibonacci)))
在此示例中, 你将看到如何利用未来来更好地利用计算机的速度。我们有两个斐波那契数相加。我们可以看到该程序两次计算结果, 第一次是在单个线程中顺序计算, 第二次是在两个线程中并行计算。由于我的笔记本电脑具有多核处理器, 因此并行执行的工作速度是顺序计算的两倍。
在笔记本电脑上执行此脚本的结果:
Start serial calculation
The result is: 29860704
"Elapsed time: 2568.816524 msecs"
Start parallel calculation
The result is: 29860704
"Elapsed time: 1216.991448 msecs"
并发
为了支持Clojure编程语言中的并发性和不可预测性, 我们必须使用可变的数据类型, 以便其他线程可以看到更改。最简单的变量数据类型是atom。 Atom是一个始终具有可以被另一个值替换的值的容器。可以通过输入新值或调用采用旧值并返回更常用的新值的函数来替换该值。有趣的是, 原子是在没有锁定的情况下实现的, 并且可以在线程中安全使用, 这意味着不可能达到死锁。在内部, atom使用java.util.concurrent.AtomicReference库。让我们看一下用atom实现的反例。
;
; atom-counter.clj
;
(def counter (atom 0))
(def attempts (atom 0))
(defn counter-increases[]
(dotimes [cnt 500000]
(swap! counter (fn [counter]
(swap! attempts inc) ; side effect DO NOT DO THIS
(inc counter)))))
(def first-future (future (counter-increases)))
(def second-future (future (counter-increases)))
; Wait for futures to complete
@first-future
@second-future
; Print value of the counter
(println "The counter is: " @counter)
(println "Number of attempts: " @attempts)
在笔记本电脑上执行脚本的结果是:
The counter is: 1000000
Number of attempts: 1680212
在此示例中, 我们使用包含计数器值的原子。计数器增加(swap!counter inc)。交换函数的工作方式如下:1.获取计数器值并将其保存2.为此值调用给定的函数来计算新值3.要保存新值, 它将使用原子操作检查旧值是否已更改3a。如果该值未更改, 则输入新值3b。如果在此期间更改了值, 则转到步骤1。我们看到, 如果在此期间更改了值, 则可以再次调用该函数。该值只能从另一个线程更改。因此, 至关重要的是, 计算新值的函数必须没有副作用, 以便它被调用多次都无关紧要。原子的一个局限性是它会将更改同步到一个值。
;
; atom-acocunts.clj
;
(def bob (atom 200000))
(def joe (atom 300000))
(def inconsistencies (atom 0))
(defn transfer [source destination amount]
(if (not= (+ @bob @joe) 500000) (swap! inconsistencies inc))
(swap! source - amount)
(swap! destination + amount))
(defn first-transfer []
(dotimes [cnt 100000]
(transfer bob joe 2)))
(defn second-transfer []
(dotimes [cnt 100000]
(transfer joe bob 1)))
(def first-future (future (first-transfer)))
(def second-future (future (second-transfer)))
@first-future
@second-future
(println "Bob has in account: " @bob)
(println "Joe has in account: " @joe)
(println "Inconsistencies while transfer: " @inconsistencies)
当我执行此脚本时, 我得到:
Bob has in account: 100000
Joe has in account: 400000
Inconsistencies while transfer: 36525
在此示例中, 我们可以看到如何更改更多原子。一方面, 可能会发生不一致。两个帐户的总和有时不相同。如果必须协调多个值的更改, 则有两种解决方案:
- 在一个原子中放置更多值
- 使用引用和软件事务存储, 我们将在后面看到
;
; atom-accounts-fixed.clj
;
(def accounts (atom {:bob 200000, :joe 300000}))
(def inconsistencies (atom 0))
(defn transfer [source destination amount]
(let [deref-accounts @accounts]
(if (not= (+ (get deref-accounts :bob) (get deref-accounts :joe)) 500000)
(swap! inconsistencies inc))
(swap! accounts
(fn [accs]
(update (update accs source - amount) destination + amount)))))
(defn first-transfer []
(dotimes [cnt 100000]
(transfer :bob :joe 2)))
(defn second-transfer []
(dotimes [cnt 100000]
(transfer :joe :bob 1)))
(def first-future (future (first-transfer)))
(def second-future (future (second-transfer)))
@first-future
@second-future
(println "Bob has in account: " (get @accounts :bob))
(println "Joe has in account: " (get @accounts :joe))
(println "Inconsistencies while transfer: " @inconsistencies)
在计算机上运行此脚本时, 我得到:
Bob has in account: 100000
Joe has in account: 400000
Inconsistencies while transfer: 0
在该示例中, 协调已解决, 因此我们可以使用地图提高价值。当我们从帐户转移资金时, 我们会同时更改所有帐户, 以免发生总金额不相同的情况。
下一个变量数据类型是代理。 Agent的行为类似于原子, 只是更改值的函数在不同的线程中执行, 因此更改需要花费一些时间才能变得可见。因此, 在读取代理的值时, 有必要调用一个函数, 该函数要等到执行所有更改代理值的函数之后, 才能执行。与原子函数不同, 该值仅更改一次, 因此可能会有副作用。此类型还可以同步一个值, 并且不会死锁。
;
; agent-counter.clj
;
(def counter (agent 0))
(def attempts (atom 0))
(defn counter-increases[]
(dotimes [cnt 500000]
(send counter (fn [counter]
(swap! attempts inc)
(inc counter)))))
(def first-future (future (counter-increases)))
(def second-future (future (counter-increases)))
; wait for futures to complete
@first-future
@second-future
; wait for counter to be finished with updating
(await counter)
; print the value of the counter
(println "The counter is: " @counter)
(println "Number of attempts: " @attempts)
当我在笔记本电脑上运行此脚本时, 我得到:
The counter is: 1000000
Number of attempts: 1000000
该示例与使用原子的计数器的实现相同。唯一的区别是, 这里我们等待所有代理更改完成, 然后再使用await读取最终值。
最后一个变量数据类型是引用。与原子不同, 引用可以将更改同步到多个值。参考上的每个操作都应使用dosync在事务中。这种更改数据的方式称为软件事务存储器或STM。让我们来看一个在帐户中进行转帐的示例。
;
; stm-accounts.clj
;
(def bob (ref 200000))
(def joe (ref 300000))
(def inconsistencies (atom 0))
(def attempts (atom 0))
(def transfers (agent 0))
(defn transfer [source destination amount]
(dosync
(swap! attempts inc) ; side effect DO NOT DO THIS
(send transfers inc)
(when (not= (+ @bob @joe) 500000)
(swap! inconsistencies inc)) ; side effect DO NOT DO THIS
(alter source - amount)
(alter destination + amount)))
(defn first-transfer []
(dotimes [cnt 100000]
(transfer bob joe 2)))
(defn second-transfer []
(dotimes [cnt 100000]
(transfer joe bob 1)))
(def first-future (future (first-transfer)))
(def second-future (future (second-transfer)))
@first-future
@second-future
(await transfers)
(println "Bob has in account: " @bob)
(println "Joe has in account: " @joe)
(println "Inconsistencies while transfer: " @inconsistencies)
(println "Attempts: " @attempts)
(println "Transfers: " @transfers)
运行此脚本时, 我得到:
Bob has in account: 100000
Joe has in account: 400000
Inconsistencies while transfer: 0
Attempts: 330841
Transfers: 200000
有趣的是, 尝试次数多于交易次数。这是因为STM不使用锁, 因此如果发生冲突(例如, 两个线程试图更改相同的值), 则将重新执行事务。因此, 该交易不应有副作用。我们可以看到, 在交易中价值变化的代理行为可预测。更改代理价值的功能将与交易次数一样多地被评估。原因是该代理知道事务。如果交易必须具有副作用, 则应在代理中将其发挥作用。这样, 程序将具有可预测的行为。你可能会认为应该始终使用STM, 但是有经验的程序员经常会使用原子, 因为原子比STM更简单, 更快。当然, 如果可以用这种方式编写程序。如果你有副作用, 那么除了使用STM和代理商外别无选择。
参与者模型
以下并发模型是参与者模型。该模型的原理类似于现实世界。如果我们与许多人达成协议(例如, 建筑物), 那么建筑工地上的每个人都有自己的角色。一群人受到上司的监督。如果工人在工作中受伤, 主管将把受伤男子的工作分配给其他人。如有必要, 他可能会导致一个新人进入现场。在站点上, 我们有更多的人同时(并发)从事这项工作, 但又互相交谈以进行同步。如果我们将在建筑工地上的工作放到程序中, 那么每个人都将是一个拥有状态并在其自己的过程中执行的参与者, 而谈话将被消息替换。基于此模型的流行编程语言是Erlang。这种有趣的语言具有不可变的数据类型和功能, 它们具有与其他数据类型相同的属性。可以在程序执行期间创建函数, 并将其作为参数传递给另一个函数, 或者作为函数调用的结果返回。我将提供使用Erlang虚拟机的Elixir语言示例, 因此我将拥有与Erlang相同的编程模型, 只是语法不同。 Elixir中三个最重要的原语是生成, 发送和接收。 spawn在新进程中执行功能, 发送将消息发送到该进程, 并接收接收发送到当前进程的消息。
使用actor模型的第一个示例将同时增加计数器。要使用此模型制作程序, 必须使一个参与者具有计数器的值, 并接收消息以设置和检索计数器的值, 并让两个参与者同时增加计数器的值。
#
# Counting.exs
#
defmodule Counting do
def counter(value) do
receive do
{:get, sender} ->
send sender, {:counter, value}
counter value
{:set, new_value} -> counter(new_value)
end
end
def counting(sender, counter, times) do
if times > 0 do
send counter, {:get, self}
receive do
{:counter, value} -> send counter, {:set, value + 1}
end
counting(sender, counter, times - 1)
else
send sender, {:done, self}
end
end
end
counter = spawn fn -> Counting.counter 0 end
IO.puts "Starting counting processes"
this = self
counting1 = spawn fn ->
IO.puts "Counting A started"
Counting.counting this, counter, 500_000
IO.puts "Counting A finished"
end
counting2 = spawn fn ->
IO.puts "Counting B started"
Counting.counting this, counter, 500_000
IO.puts "Counting B finished"
end
IO.puts "Waiting for counting to be done"
receive do
{:done, ^counting1} -> nil
end
receive do
{:done, ^counting2} -> nil
end
send counter, {:get, self}
receive do
{:counter, value} -> IO.puts "Counter is: #{value}"
end
当我执行此示例时, 我得到:
Starting counting processes
Counting A started
Waiting for counting to be done
Counting B started
Counting A finished
Counting B finished
Counter is: 516827
我们可以看到最终计数器是516827, 而不是我们预期的1000000。下次运行脚本时, 收到511010。此行为的原因是计数器收到两条消息:检索当前值并设置新值。要增加计数器, 程序需要获取当前值, 将其增加1并设置增加的值。两个进程使用发送到计数器进程的消息同时读取和写入计数器的值。计数器将收到的消息的顺序是不可预测的, 程序无法控制它。我们可以想象这种情况:
- 计数器值为115
- 进程A读取计数器的值(115)
- 进程B读取计数器的值(115)
- 流程B在本地增加值(116)
- 处理B将增加的值设置到计数器(116)
- 处理A增加计数器的值(116)
- 处理A将增加的值设置到计数器(116)
- 计数器值为116
如果我们看这种情况, 则两个过程会将计数器增加1, 最后将计数器增加1, 而不是2。这样的纠缠可能发生不可预测的次数, 因此计数器的值是不可预测的。为避免这种现象, 增加操作必须通过一条消息完成。
#
# CountingFixed.exs
#
defmodule Counting do
def counter(value) do
receive do
:increase -> counter(value + 1)
{:get, sender} ->
send sender, {:counter, value}
counter value
end
end
def counting(sender, counter, times) do
if times > 0 do
send counter, :increase
counting(sender, counter, times - 1)
else
send sender, {:done, self}
end
end
end
counter = spawn fn -> Counting.counter 0 end
IO.puts "Starting counting processes"
this = self
counting1 = spawn fn ->
IO.puts "Counting A started"
Counting.counting this, counter, 500_000
IO.puts "Counting A finished"
end
counting2 = spawn fn ->
IO.puts "Counting B started"
Counting.counting this, counter, 500_000
IO.puts "Counting B finished"
end
IO.puts "Waiting for counting to be done"
receive do
{:done, ^counting1} -> nil
end
receive do
{:done, ^counting2} -> nil
end
send counter, {:get, self}
receive do
{:counter, value} -> IO.puts "Counter is: #{value}"
end
通过运行此脚本, 我得到:
Starting counting processes
Counting A started
Waiting for counting to be done
Counting B started
Counting A finished
Counting B finished
Counter is: 1000000
我们可以看到该计数器具有正确的值。可预测的(确定性)行为的原因是计数器的值增加一条消息, 因此增加计数器的消息序列不会影响其最终值。在使用参与者模型时, 我们必须注意消息如何交织, 并仔细设计消息和消息上的操作, 以避免意外的不可预测性(不确定性)。
使用此模型, 我们如何在两个帐户之间转移资金?
#
# Accounts.exs
#
defmodule Accounts do
def accounts(state) do
receive do
{:transfer, source, destination, amount} ->
accounts %{state | source => state[source] - amount , destination => state[destination] + amount}
{:amounts, accounts, sender } ->
send sender, {:amounts, for account <- accounts do
{account, state[account]}
end}
accounts(state)
end
end
def transfer(sender, accounts, source, destination, amount, times, inconsistencies) do
if times > 0 do
send accounts, {:amounts, [source, destination], self}
receive do
{:amounts, amounts} ->
if amounts[source] + amounts[destination] != 500_000 do
Agent.update(inconsistencies, fn value -> value + 1 end)
end
end
send accounts, {:transfer, source, destination, amount}
transfer(sender, accounts, source, destination, amount, times - 1, inconsistencies)
else
send sender, {:done, self}
end
end
end
accounts = spawn fn -> Accounts.accounts(%{bob: 200_000, joe: 300_000 }) end
{:ok, inconsistencies} = Agent.start(fn -> 0 end)
this = self
transfer1 = spawn fn ->
IO.puts "Transfer A started"
Accounts.transfer(this, accounts, :bob, :joe, 2, 100_000, inconsistencies)
IO.puts "Transfer A finished"
end
transfer2 = spawn fn ->
IO.puts "Transfer B started"
Accounts.transfer(this, accounts, :joe, :bob, 1, 100_000, inconsistencies)
IO.puts "Transfer B finished"
end
IO.puts "Waiting for transfers to be done"
receive do
{:done, ^transfer1} -> nil
end
receive do
{:done, ^transfer2} -> nil
end
send accounts, {:amounts, [:bob, :joe], self}
receive do
{:amounts, amounts} ->
IO.puts "Bob has in account: #{amounts[:bob]}"
IO.puts "Joe has in account: #{amounts[:joe]}"
IO.puts "Inconsistencies while transfer: #{Agent.get(inconsistencies, fn x -> x end)}"
end
当我运行此脚本时, 我得到:
Waiting for transfers to be done
Transfer A started
Transfer B started
Transfer B finished
Transfer A finished
Bob has in account: 100000
Joe has in account: 400000
Inconsistencies while transfer: 0
我们可以看到转账的工作没有矛盾, 因为我们选择了消息转账来转账资金和消息金额以获得帐户的价值, 这使我们可以预测程序的行为。每当我们进行转账时, 任何时候的总金额应相同。
Actor模型可能会导致锁定, 从而导致死锁, 因此在设计程序时请格外小心。以下脚本显示了如何模拟锁定和死锁情况。
#
# Deadlock.exs
#
defmodule Lock do
def loop(state) do
receive do
{:lock, sender} ->
case state do
[] ->
send sender, :locked
loop([sender])
_ ->
loop(state ++ [sender])
end
{:unlock, sender} ->
case state do
[] ->
loop(state)
[^sender | []] ->
loop([])
[^sender | [next | tail]] ->
send next, :locked
loop([next | tail])
_ ->
loop(state)
end
end
end
def lock(pid) do
send pid, {:lock, self}
receive do
:locked -> nil # This will block until we receive message
end
end
def unlock(pid) do
send pid, {:unlock, self}
end
def locking(first, second, times) do
if times > 0 do
lock(first)
lock(second)
unlock(second)
unlock(first)
locking(first, second, times - 1)
end
end
end
a_lock = spawn fn -> Lock.loop([]) end
b_lock = spawn fn -> Lock.loop([]) end
this = self
IO.puts "Locking A, B started"
spawn fn ->
Lock.locking(a_lock, b_lock, 1_000)
IO.puts "Locking A, B finished"
send this, :done
end
IO.puts "Locking B, A started"
spawn fn ->
Lock.locking(b_lock, a_lock, 1_000)
IO.puts "Locking B, A finished"
send this, :done
end
IO.puts "Waiting for locking to be done"
receive do
:done -> nil
end
receive do
:done -> nil
End
当我在笔记本电脑上运行此脚本时, 我得到:
Locking A, B started
Locking B, A started
Waiting for locking to be done
从输出中我们可以看到锁定A和B的进程被卡住了。发生这种情况的原因是, 第一个进程等待第二个进程释放B, 而第二个进程等待第一个进程释放A。它们彼此等待, 并且永远被卡住。为了避免这种锁定, 顺序应该始终相同, 或者设计一个程序以使其不使用锁定(这意味着它不等待特定的消息)。以下清单始终首先锁定A, 然后锁定B。
#
# Deadlock fixed
#
defmodule Lock do
def loop(state) do
receive do
{:lock, sender} ->
case state do
[] ->
send sender, :locked
loop([sender])
_ ->
loop(state ++ [sender])
end
{:unlock, sender} ->
case state do
[] ->
loop(state)
[^sender | []] ->
loop([])
[^sender | [next | tail]] ->
send next, :locked
loop([next | tail])
_ ->
loop(state)
end
end
end
def lock(pid) do
send pid, {:lock, self}
receive do
:locked -> nil # This will block until we receive message
end
end
def unlock(pid) do
send pid, {:unlock, self}
end
def locking(first, second, times) do
if times > 0 do
lock(first)
lock(second)
unlock(second)
unlock(first)
locking(first, second, times - 1)
end
end
end
a_lock = spawn fn -> Lock.loop([]) end
b_lock = spawn fn -> Lock.loop([]) end
this = self
IO.puts "Locking A, B started"
spawn fn ->
Lock.locking(a_lock, b_lock, 1_000)
IO.puts "Locking A, B finished"
send this, :done
end
IO.puts "Locking A, B started"
spawn fn ->
Lock.locking(a_lock, b_lock, 1_000)
IO.puts "Locking A, B finished"
send this, :done
end
IO.puts "Waiting for locking to be done"
receive do
:done -> nil
end
receive do
:done -> nil
End
当我在笔记本电脑上运行此脚本时, 我得到:
Locking A, B started
Locking A, B started
Waiting for locking to be done
Locking A, B finished
Locking A, B finished
现在, 不再有僵局。
本文总结
作为并发编程的介绍, 我们介绍了一些并发模型。我们并未涵盖所有模型, 因为本文太大了。仅举几例, 通道和响应流是其他一些流行使用的并发模型。通道和反应流与参与者模型有很多相似之处。它们全部都发送消息, 但是许多线程可以从一个通道接收消息, 而响应流在一个方向上发送消息以形成有向图, 该图形作为处理结果从一端接收消息并从另一端发送消息。
如果我们不事先考虑, 则共享的可变状态模型很容易出错。它具有比赛条件和僵局的问题。如果我们可以在不同的并发编程模型之间进行选择, 则将易于实现和维护, 但否则我们必须非常小心。
功能性的方法更容易推理和实现。它不能有僵局。该模型的性能可能比共享的可变状态模型差, 但是可以正常运行的程序总是比不能正常运行的程序快。
Actor模型是并发编程的不错选择。尽管存在争用条件和死锁的问题, 但是与共享可变状态模型相比, 它们发生的可能性更少, 因为进程进行通信的唯一方法是通过消息。通过在流程之间进行良好的消息设计, 可以避免这种情况。如果出现问题, 那么在流程之间的通信中, 消息的顺序或含义就决定了, 你知道在哪里看。
我希望本文能使你对什么是并发编程以及如何为你编写的程序赋予结构有所了解。
相关:Ruby并发和并行:实用教程
评论前必须登录!
注册