本文概述
挑战
编写并发程序很难。必须处理线程, 锁, 争用条件等等, 这很容易出错, 并且可能导致难以阅读, 测试和维护的代码。
因此, 许多人宁愿完全避免多线程。取而代之的是, 它们仅依靠外部服务(例如数据库, 队列等)来使用单线程进程来处理任何所需的并发或异步操作。尽管此方法在某些情况下是一种合法的选择, 但在许多情况下, 它根本不是可行的选择。许多实时系统(例如交易或银行应用程序或实时游戏)没有等待单线程进程完成的奢望(他们现在需要答案!)。其他系统是如此耗费计算或资源, 以至于在不将并行化引入其代码的情况下, 它们将花费大量时间(在某些情况下为数小时甚至数天)来运行。
一种相当普遍的单线程方法(例如, 在Node.js世界中广泛使用)是使用基于事件的非阻塞范例。尽管通过避免上下文切换, 锁定和阻塞确实有助于提高性能, 但仍不能解决同时使用多个处理器的问题(这样做将需要启动多个独立进程并在多个独立进程之间进行协调)。
那么, 这是否意味着你别无选择, 只能深入了解线程, 锁和竞争条件, 以构建并发应用程序?
感谢Akka框架, 答案是否定的。本教程介绍了Akka示例, 并探讨了它促进和简化并发分布式应用程序实现的方式。
什么是Akka框架?
这篇文章介绍了Akka, 并探讨了它促进和简化并发分布式应用程序实现的方式。
Akka是一个工具包和运行时, 用于在JVM上构建高度并发, 分布式和容错的应用程序。 Akka用Scala编写, 并为Scala和Java提供了语言绑定。
Akka处理并发的方法基于Actor模型。在基于参与者的系统中, 所有事物都是参与者, 就像在面向对象设计中, 一切都是对象一样。但是, 与我们的讨论特别相关的一个关键区别是, Actor模型是经过专门设计和设计的, 可以用作并发模型, 而面向对象模型则不是。更具体地说, 在Scala actor系统中, actor可以交互和共享信息, 而无需任何顺序性的前提。参与者之间共享信息并彼此执行任务的机制是消息传递。
创建和调度线程, 接收和分发消息以及处理竞争条件和同步的所有复杂性都归于该框架以透明方式进行处理。
Akka在参与者和底层系统之间创建一个层, 以便参与者仅需要处理消息。创建和调度线程, 接收和分发消息以及处理竞争条件和同步的所有复杂性都归于该框架以透明方式进行处理。
Akka严格遵守《反应式宣言》。响应式应用程序旨在用满足以下一项或多项要求的体系结构替换传统的多线程应用程序:
- 事件驱动。使用Actors, 可以编写异步处理请求的代码, 并专门采用非阻塞操作。
- 可扩展在Akka中, 由于消息传递和位置透明, 都可以添加节点而无需修改代码。
- 弹性强。任何应用程序都会遇到错误, 并在某个时间点失败。 Akka提供”监督”(容错)策略以促进自我修复系统。
- 反应灵敏。当今许多高性能和快速响应的应用程序需要向用户提供快速反馈, 因此需要非常及时地对事件做出反应。 Akka基于消息的非阻塞策略有助于实现这一目标。
什么是Akka中的Actors?
参与者本质上不过是一个接收消息并采取措施处理消息的对象。它与消息的源分离, 它的唯一职责是正确识别已收到的消息的类型并采取相应的措施。
收到消息后, Actors可以采取以下一项或多项措施:
- 本身执行一些操作(例如执行计算, 持久化数据, 调用外部Web服务等)
- 将消息或派生消息转发给另一个参与者
- 实例化新Actors并将消息转发给它
作为选择, 如果行动者认为适当, 则可以选择完全忽略该消息(即, 它可以选择不采取行动)。
要实现一个actor, 有必要扩展akka.actor.Actor特性并实现receive方法。当消息发送给Actors时, Actors的接收方法(由Akka)被调用。它的典型实现包括模式匹配, 如下面的Akka示例所示, 以识别消息类型并做出相应的反应:
import akka.actor.Actor
import akka.actor.Props
import akka.event.Logging
class MyActor extends Actor {
def receive = {
case value: String => doSomething(value)
case _ => println("received unknown message")
}
}
模式匹配是一种用于处理消息的相对优雅的技术, 与基于回调的可比实现相比, 该技术倾向于生成”更干净”且易于导航的代码。例如, 考虑一个简单的HTTP请求/响应实现。
首先, 让我们使用JavaScript中基于回调的范例来实现此目的:
route(url, function(request){
var query = buildQuery(request);
dbCall(query, function(dbResponse){
var wsRequest = buildWebServiceRequest(dbResponse);
wsCall(wsRequest, function(wsResponse) {
sendReply(wsResponse);
});
});
});
现在, 将其与基于模式匹配的实现进行比较:
msg match {
case HttpRequest(request) => {
val query = buildQuery(request)
dbCall(query)
}
case DbResponse(dbResponse) => {
var wsRequest = buildWebServiceRequest(dbResponse);
wsCall(dbResponse)
}
case WsResponse(wsResponse) => sendReply(wsResponse)
}
尽管基于回调的JavaScript代码虽然很紧凑, 但是肯定很难阅读和导航。相比之下, 基于模式匹配的代码使你可以更直接地了解正在考虑哪些情况以及如何处理每种情况。
Actors系统
通常, 解决复杂的问题并将其递归地分解为较小的子问题是一种合理的问题解决技术。这种方法在计算机科学中(与”单一职责原则”一致)特别有益, 因为它倾向于生成干净, 模块化的代码, 几乎没有冗余, 也没有冗余, 而且相对易于维护。
在基于角色的设计中, 此技术的使用有助于将角色的逻辑组织简化为称为”角色系统”的分层结构。参与者系统提供了基础结构, 参与者可以通过该基础结构进行交互。
在Akka中, 与Actors交流的唯一方法是通过ActorRef。 ActorRef代表对Actors的引用, 该Actors阻止其他对象直接访问或操纵该Actors的内部和状态。可以使用以下语法协议之一通过ActorRef将消息发送给actor:
- ! (“告诉”)–发送消息并立即返回
- ? (“询问”)–发送消息并返回表示可能的答复的未来
每个参与者都有一个邮箱, 其传入消息将传递到该邮箱。有多种邮箱实现可供选择, 默认实现为FIFO。
一个actor包含许多实例变量, 以在处理多个消息时保持状态。 Akka确保角色的每个实例都在其自己的轻量级线程中运行, 并确保一次处理一条消息。通过这种方式, 可以可靠地维护每个参与者的状态, 而开发人员无需明确担心同步或竞赛条件。
通过Akka Actor API为每个actor提供以下有用信息, 以执行其任务:
- 发件人:Actor引用当前正在处理的邮件的发件人
- 上下文:与参与者在其中运行的上下文有关的信息和方法(例如, 包括用于实例化新参与者的actorOf方法)
- monitoringStrategy:定义用于从错误中恢复的策略
- 自我:Actors本身的ActorRef
Akka确保角色的每个实例都在其自己的轻量级线程中运行, 并确保一次处理一条消息。这样, 可以可靠地维护每个参与者的状态, 而开发人员无需明确担心同步或竞争条件。
为了帮助将这些教程结合在一起, 让我们考虑一个简单的示例, 该示例计算文本文件中的单词数。
就我们的Akka示例而言, 我们将问题分解为两个子任务:即, (1)一个”子”任务, 对一行中的单词数进行计数;(2)一个”父”任务, 对每行中的单词计数进行求和, 以获得文件中的单词总数。
父角色将加载文件中的每一行, 然后将计数该行中单词的任务委派给子角色。当孩子完成后, 它将把结果发送给父母。父级将接收带有单词计数(每行)的消息, 并为整个文件中的单词总数保留一个计数器, 计数器将在完成后返回给调用者。
(请注意, 下面提供的Akka教程代码示例仅供参考, 因此不一定涉及所有边缘条件, 性能优化等。此外, 下面提供的代码示例的完整可编译版本可在以下位置获得:这个要点。)
我们首先来看一下StringCounterActor子类的示例实现:
case class ProcessStringMsg(string: String)
case class StringProcessedMsg(words: Integer)
class StringCounterActor extends Actor {
def receive = {
case ProcessStringMsg(string) => {
val wordsInLine = string.split(" ").length
sender ! StringProcessedMsg(wordsInLine)
}
case _ => println("Error: message not recognized")
}
}
这个参与者的任务非常简单:使用ProcessStringMsg消息(包含一行文本), 计算指定行上的单词数, 然后通过StringProcessedMsg消息将结果返回给发送方。请注意, 我们已经实现了使用!的类。 (” tell”)方法发送StringProcessedMsg消息(即发送消息并立即返回)。
好的, 现在让我们将注意力转向父级WordCounterActor类:
1. case class StartProcessFileMsg()
2.
3. class WordCounterActor(filename: String) extends Actor {
4.
5. private var running = false
6. private var totalLines = 0
7. private var linesProcessed = 0
8. private var result = 0
9. private var fileSender: Option[ActorRef] = None
10.
11. def receive = {
12. case StartProcessFileMsg() => {
13. if (running) {
14. // println just used for example purposes;
15. // Akka logger should be used instead
16. println("Warning: duplicate start message received")
17. } else {
18. running = true
19. fileSender = Some(sender) // save reference to process invoker
20. import scala.io.Source._
21. fromFile(filename).getLines.foreach { line =>
22. context.actorOf(Props[StringCounterActor]) ! ProcessStringMsg(line)
23. totalLines += 1
24. }
25. }
26. }
27. case StringProcessedMsg(words) => {
28. result += words
29. linesProcessed += 1
30. if (linesProcessed == totalLines) {
31. fileSender.map(_ ! result) // provide result to process invoker
32. }
33. }
34. case _ => println("message not recognized!")
35. }
36. }
这里发生了很多事情, 因此让我们更详细地检查每件事(请注意, 下面的讨论中引用的行号基于上述代码示例)…
首先, 请注意, 要处理的文件名将传递给WordCounterActor构造函数(第3行)。这表明actor仅用于处理单个文件。由于实例仅使用一次(即处理单个文件), 因此避免了在完成工作时重置状态变量(运行, totalLines, linesProcessed和结果)的需要, 从而简化了开发人员的编码工作。然后丢弃。
接下来, 观察WordCounterActor处理两种类型的消息:
- StartProcessFileMsg(第12行)
- 从最初启动WordCounterActor的外部actor接收。
- 收到后, WordCounterActor首先检查它是否未收到冗余请求。
- 如果请求是多余的, 则WordCounterActor会生成警告, 并且仅执行其他操作(第16行)。
- 如果请求不是多余的:
- WordCounterActor将对发件人的引用存储在fileSender实例变量中(请注意, 这是一个Option [ActorRef]而不是Option [Actor]-参见第9行)。在处理最后的StringProcessedMsg(如下所述, 它是从StringCounterActor子级接收到的)时, 需要该ActorRef以便以后访问并对其作出响应。
- 然后, WordCounterActor读取文件, 并在加载文件的每一行时, 创建一个StringCounterActor子级, 并将包含要处理的行的消息传递给它(第21-24行)。
- StringProcessedMsg(第27行)
- 在完成处理分配给它的行时从子StringCounterActor接收。
- 收到后, WordCounterActor会为文件增加行计数器, 如果文件中的所有行均已处理(即, totalLines和linesProcessed相等时), 它将最终结果发送到原始fileSender(第28-31行)。
再次注意, 在Akka中, 角色间通信的唯一机制是消息传递。消息是行为者共享的唯一内容, 并且由于行为者可以潜在地并发访问相同的消息, 因此对于他们而言, 保持不变是很重要的, 以避免竞争条件和意外行为。
Scala中的Case类是常规类, 它们通过模式匹配提供递归分解机制。
因此, 通常以案例类的形式传递消息, 因为它们在默认情况下是不可变的, 并且由于它们与模式匹配的无缝集成。
让我们以运行整个应用程序的代码示例结束该示例。
object Sample extends App {
import akka.util.Timeout
import scala.concurrent.duration._
import akka.pattern.ask
import akka.dispatch.ExecutionContexts._
implicit val ec = global
override def main(args: Array[String]) {
val system = ActorSystem("System")
val actor = system.actorOf(Props(new WordCounterActor(args(0))))
implicit val timeout = Timeout(25 seconds)
val future = actor ? StartProcessFileMsg()
future.map { result =>
println("Total number of words " + result)
system.shutdown
}
}
}
在并发编程中, “未来”本质上是一个占位符对象, 其结果尚不清楚。
请注意这次如何?方法用于发送消息。这样, 调用方可以使用返回的Future来打印最终结果(如果有), 并通过关闭ActorSystem退出程序。
Akka容错和主管策略
在Actors系统中, 每个Actors都是其子代的主管。如果Actors无法处理消息, 它将中止其自身及其所有子级, 并将消息(通常以异常形式)发送给其主管。
在Akka中, 主管策略是定义系统容错行为的主要直接方法。
在Akka中, 管理者对从其子级渗透到其上的异常做出反应并处理异常的方式称为”管理者策略”。主管策略是定义系统的容错行为的主要直接方法。
当表示失败的消息到达主管时, 可以采取以下措施之一:
- 恢复孩子(及其孩子), 保持其内部状态。当子状态未因错误而损坏并且可以继续正常运行时, 可以应用此策略。
- 重新启动子代(及其子代), 清除其内部状态。可以在与上述策略相反的场景中使用此策略。如果子状态已被错误破坏, 则必须重置其状态, 然后才能在将来使用它。
- 永久停止孩子(及其孩子)。如果认为错误情况无法纠正, 但不会危及正在执行的其余操作(可以在没有失败的孩子的情况下完成)的其余部分, 则可以采用此策略。
- 自行停止并逐步升级错误。在主管不知道如何处理故障并将其升级为自己的主管时使用。
此外, Actors可以决定只对失败的孩子或其兄弟姐妹采取该行动。为此有两种预定义的策略:
- OneForOneStrategy:将指定的操作仅应用于失败的孩子
- AllForOneStrategy:将指定的操作应用于其所有子级
这是一个使用OneForOneStrategy的简单示例:
import akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy._
import scala.concurrent.duration._
override val supervisorStrategy =
OneForOneStrategy() {
case _: ArithmeticException => Resume
case _: NullPointerException => Restart
case _: IllegalArgumentException => Stop
case _: Exception => Escalate
}
如果未指定任何策略, 则采用以下默认策略:
- 如果在初始化角色时出错, 或者如果角色被杀死, 则角色会被停止。
- 如果存在任何其他类型的异常, 则只需重新启动参与者即可。
Akka提供的默认策略的实现如下:
final val defaultStrategy: SupervisorStrategy = {
def defaultDecider: Decider = {
case _: ActorInitializationException ⇒ Stop
case _: ActorKilledException ⇒ Stop
case _: Exception ⇒ Restart
}
OneForOneStrategy()(defaultDecider)
}
Akka允许实施自定义主管策略, 但是正如Akka文档警告的那样, 请谨慎行事, 因为不正确的实施可能会导致诸如参与者系统被阻塞(例如, 永久暂停的参与者)等问题。
位置透明
Akka架构支持位置透明性, 使参与者可以完全不知道他们收到的消息的来源。消息的发件人可以与执行者位于同一JVM中, 也可以位于单独的JVM中(在同一节点或不同节点上运行)。 Akka允许以对参与者(因此对开发者)完全透明的方式处理这些情况。唯一的警告是, 跨多个节点发送的消息必须可序列化。
Akka架构支持位置透明性, 使参与者可以完全不知道他们收到的消息的来源。
Actor系统被设计为可以在分布式环境中运行, 而无需任何专门的代码。 Akka仅需要存在一个配置文件(application.conf), 该文件指定要将消息发送到的节点。这是一个配置文件的简单示例:
akka {
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
transport = "akka.remote.netty.NettyRemoteTransport"
netty {
hostname = "127.0.0.1"
port = 2552
}
}
}
一些分开的技巧…
我们已经看到了Akka框架如何帮助实现并发和高性能。但是, 正如本教程所指出的那样, 在设计和实现系统以充分利用Akka的功能时, 需要牢记以下几点:
在最大可能范围内, 应为每个行为者分配最小的任务(如前所述, 遵循”单一责任原则”)
参与者应异步处理事件(即处理消息), 并且不应阻塞, 否则会发生上下文切换, 这可能会对性能产生不利影响。具体而言, 最好在Future中执行阻止操作(IO等), 以免阻止参与者。即:
case evt => blockingCall() // BAD
case evt => Future {
blockingCall() // GOOD
}
确保你的消息都是不可变的, 因为将它们传递给彼此的参与者将同时在各自的线程中运行。可变消息可能会导致意外行为。
由于节点之间发送的消息必须可序列化, 因此请务必记住, 消息越大, 序列化, 发送和反序列化它们所花费的时间就越长, 这会对性能产生负面影响。
总结
用Scala编写的Akka简化并促进了高并发, 分布式和容错应用程序的开发, 从而使开发人员避免了许多复杂性。要使Akka完全公正, 将需要的不仅是本教程, 而且还希望本介绍及其示例引人入胜, 以使你能够阅读更多内容。
亚马逊, VMWare和CSC只是积极使用Akka的领先公司的几个例子。访问Akka官方网站以了解更多信息, 并探索Akka是否也可能是你的项目的正确答案。
评论前必须登录!
注册