Akka是一个工具,用来在JVM上构建高并发、分布式、容错的事件驱动的应用。Akka支持java和scala两种语言。Akka最强大的一个特性是使用了并发Actor模型。
这一次本文会同时使用java和scala来进行说明。
示例代码
下面是一个非常简单使用akka的的例子。
java(做了折叠):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
package akka; import akka.actor.*; import scala.concurrent.duration.Duration; import java.io.Serializable; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class HelloAkka { public static class Greet implements Serializable {} public static class WhoToGreet implements Serializable { public final String who; public WhoToGreet(String who) { this.who = who; } } public static class Greeting implements Serializable { public final String message; public Greeting(String message) { this.message = message; } } public static class Greeter extends UntypedActor { String greeting = ""; public void onReceive(Object message) { if (message instanceof WhoToGreet) greeting = "hello, " + ((WhoToGreet) message).who; else if (message instanceof Greet) // 发送当前的greeting消息给Sender getSender().tell(new Greeting(greeting), getSelf()); else unhandled(message); } } public static class GreetPrinter extends UntypedActor { public void onReceive(Object message) { if (message instanceof Greeting) System.out.println(((Greeting) message).message); } } public static void main(String[] args) { try { // 创建'helloakka' actor系统 final ActorSystem system = ActorSystem.create("helloakka"); // 创建一个'greeter' actor final ActorRef greeter = system.actorOf(Props.create(Greeter.class), "greeter"); // 创建actor收件箱 final Inbox inbox = Inbox.create(system); // 告诉'greeter'改变它的greeting消息 greeter.tell(new WhoToGreet("akka"), ActorRef.noSender()); // 向greeter请求最新的greeting消息 // 返回的消息会被放到actor收件箱里 inbox.send(greeter, new Greet()); // 等待5秒钟接收反馈的greeting消息 final Greeting greeting1 = (Greeting) inbox.receive(Duration.create(5, TimeUnit.SECONDS)); System.out.println("Greeting: " + greeting1.message); // 改变greeting消息并再次请求 greeter.tell(new WhoToGreet("typesafe"), ActorRef.noSender()); inbox.send(greeter, new Greet()); final Greeting greeting2 = (Greeting) inbox.receive(Duration.create(5, TimeUnit.SECONDS)); System.out.println("Greeting: " + greeting2.message); // 在0秒后,由一个新的actor greetPrinter每秒发送一条Greet消息给greeter。 final ActorRef greetPrinter = system.actorOf(Props.create(GreetPrinter.class)); system.scheduler().schedule(Duration.Zero(), Duration.create(1, TimeUnit.SECONDS), greeter, new Greet(), system.dispatcher(), greetPrinter); } catch (TimeoutException ex) { System.out.println("Got a timeout waiting for reply from an actor"); ex.printStackTrace(); } } } |
scala(做了折叠):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
import akka.actor.{ActorRef, ActorSystem, Props, Actor, Inbox} import scala.concurrent.duration._ case object Greet case class WhoToGreet(who: String) case class Greeting(message: String) class Greeter extends Actor { var greeting = "" def receive = { case WhoToGreet(who) => greeting = s"hello, $who" case Greet => sender ! Greeting(greeting) // Send the current greeting back to the sender } } object HelloAkka extends App { // Create the 'helloakka' actor system val system = ActorSystem("helloakka") // Create the 'greeter' actor val greeter = system.actorOf(Props[Greeter], "greeter") // Create an "actor-in-a-box" val inbox = Inbox.create(system) // Tell the 'greeter' to change its 'greeting' message greeter.tell(WhoToGreet("akka"), ActorRef.noSender) // Ask the 'greeter for the latest 'greeting' // Reply should go to the "actor-in-a-box" inbox.send(greeter, Greet) // Wait 5 seconds for the reply with the 'greeting' message val Greeting(message1) = inbox.receive(5.seconds) println(s"Greeting: $message1") // Change the greeting and ask for it again greeter.tell(WhoToGreet("typesafe"), ActorRef.noSender) inbox.send(greeter, Greet) val Greeting(message2) = inbox.receive(5.seconds) println(s"Greeting: $message2") val greetPrinter = system.actorOf(Props[GreetPrinter]) // after zero seconds, send a Greet message every second to the greeter with a sender of the greetPrinter system.scheduler.schedule(0.seconds, 1.second, greeter, Greet)(system.dispatcher, greetPrinter) } // prints a greeting class GreetPrinter extends Actor { def receive = { case Greeting(message) => println(message) } } |
在上面的例子中定义了一个Greetor Actor,这个Actor可以获取最新的greeting消息,并对两种行为作出响应:设置一个新的greeting字符串;返回最新的greeting字符串。
接下来会详细解释下这个例子。
定义消息类
Actor并没有提供让开发者调用的公共API。它是通过Actor处理的消息来提供公共API的。消息可以是任意类型(java中指的是继承了Object的类,在scala中指的是继承了Any的类)的对象。也就是说我们可以使用包装类来发送直接类型的消息,也可以发送一些数据结构比如数组或者集合类的对象。然而因为消息是Actor的公共API,所以定义消息类的时候需要保证消息类的名称有一定的语义,或者在指定领域内有意义,尽管有时仅仅是将已有的类简单封装了下。不过这样可以构建简单易懂的Actor系统,在调试的时候也可以更容易一些。
在代码中我们定义了三个消息类:
- WhoToGreet,重新定义新的greeting消息;
- Greet,向Actor请求最新的消息;
- Greeting,返回最新的greeting消息。
在示例中这三个消息类定义在外部类HelloAkka中。这里要注意在消息对象中保存的消息应该是不可变的,否则的话就可能会出现两个不同的Actor共享状态的危险,这违反了Actor模型的设计原则。
虽然在这个示例中我们没有使用远程连接,但是在定义消息类时实现序列化是个好习惯——这样我们使用akka扩展到多个节点时就不需要再回过来修改代码。
消息类的定义:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
public static class Greet implements Serializable {} public static class WhoToGreet implements Serializable { public final String who; public WhoToGreet(String who) { this.who = who; } } public static class Greeting implements Serializable { public final String message; public Greeting(String message) { this.message = message; } } |
下面是使用scala定义消息类的代码。scala的case类和case对象对Actor消息的支持非常好,因为它们本就是不可变的且支持模式匹配(这在对Actor接收的消息进行匹配时很有用)。使用case类的另一个好处就是其默认支持序列化。
1 2 3 4 5 |
case object Greet case class WhoToGreet(who: String) case class Greeting(message: String) |
定义Actor
Actor是Akka的执行单元。Actor是面向对象的,因为它也封装了状态和行为。但是它要比Java或Scala中的普通对象有更强的隔离性。Actor模型禁止两个或多个Actor间共享状态的行为。一个Actor观察另一个Actor的状态的方式就是向其发送一条消息来请求状态。Actor是非常轻量级的——它只受到内存的限制。每个Actor只会消耗几百个字节的内存,这意味着在一个应用中可以轻松地创建数百万个并发的Actor。Actor模型强大的隔离原则以及事件驱动模型(稍后会说到)和位置透明等特性让我们可以用直观的方式轻松解决并发问题和伸缩性问题。
在java中创建Actor类需要继承抽象类UntypedActor并实现onReceive方法。使用scala也是差不多的方式,需要继承Actor trait并实现receive方法。在onReceive方法中定义了Actor的行为,在这个方法中Actor可以对它收到的不同的消息做出不同的反馈。一个Actor可以有或者说是通常都会有状态。访问或者是改变一个Actor的状态是完全线程安全的,因为这受到Actor模型的保护。
现在开始创建一个Greeter Actor,并使用一个变量greeting作为这个Actor的状态。greeting代表获取的最新定义的greeting消息。在Greeter的onReceive方法里面我们添加了一些行为用来对Greet和WhoToGreet两种消息分别作出反馈。先来看Java的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
public static class Greeter extends UntypedActor { String greeting = ""; public void onReceive(Object message) { if (message instanceof WhoToGreet) greeting = "hello, " + ((WhoToGreet) message).who; else if (message instanceof Greet) // 发送当前的greeting消息给Sender getSender().tell(new Greeting(greeting), getSelf()); else unhandled(message); } } |
代码中定义的Actor继承了抽象类UntypedActor,意味着其接收的消息是没有类型约束的,如代码中就是Object。此外也有类限制的Actor,不过目前暂时不关注这些。通常使用的Actor都是没有类型约束的。
暂时先不要考虑代码中定义的getSender()、tell(…)和getSelf()等API。稍后说到发送和响应消息时会详细解释这些API。
现在再看一下scala的实现。如代码中所示,scala case类的模式匹配的特性可以很大程度地简化Actor的receive方法。不过除此以外的内容还是还是和java版本很相似的:
1 2 3 4 5 6 7 8 |
class Greeter extends Actor { var greeting = "" def receive = { case WhoToGreet(who) => greeting = s"hello, $who" case Greet => sender ! Greeting(greeting) // Send the current greeting back to the sender } } |
有没有注意到scala版本的Greetor和java版本的一处不同:在scala版本的代码中并没有将未知类型的消息传递给unhandled方法。在scala中这不是必需的,因为scala将receive方法的行为解释为一个偏应用函数,也就是说匹配不上的语句会被默认为不处理,并由Akka自动将之传递给unhandled()方法。
另外一个不同就是Scala版本中继承的trait是Actor,而非是UntypedActor。因为这是scala的API,而不是Java的API,尽管二者本质上是同一种Actor。
创建Actor
到现在我们已经说过了如何创建Actor和消息。接下来我们会说一下如何创建Actor的实例。在Akka中创建Actor实例不能像平常一样直接使用new关键字,需要通过一个工厂来创建。这个工厂也不会直接返回一个目标Actor的实例,而是返回一个指向Actor实例的ActorRef对象。使用ActorRef看起来像是隔了一层,但是却增加了许多功能和灵活性。比如说位置透明:在相同语义的情况下,ActorRef表示的正在运行的实例既可以是在当前进程下也可以是在远端机器上。也就是说,位置并不重要。这也意味着,如果需要的话,可以在运行时改变Actor的位置或者调整应用的拓扑结构来优化系统。ActorRef这种间隔带来的另一个特性是使用“let it crash”模型来进行故障管理:系统可以主动crash故障的Actor并重启以实现自我治愈。
Akka中的这个工厂是ActorSystem。ActorSystem在某种程度上类似于Spring的BeanFactory,它也可以作为所有Actor的容器,执行管理这些Actor的生命周期等工作。可以通过一个名为actorOf的工厂方法创建Actor实例。这个方法需要一个Props的配置实例和一个名称。Actor(和ActorSystem)的名称在Akka中很重要,可以在查看Actor信息和在配置文件中添加配置时使用它们。因此完全有必要花些时间为Actor和ActorSystem起一个好名称。
下面是用java写的代码:
1 2 3 |
final ActorSystem system = ActorSystem.create("helloakka"); final ActorRef greeter = system.actorOf(Props.create(Greeter.class), "greeter"); |
scala的代码也没有太多不同:
1 2 3 |
val system = ActorSystem("helloakka") val greeter = system.actorOf(Props[Greeter], "greeter") |
现在我们已经创建了一个Greeter Actor的运行实例。接下来我们要看一下如何与之进行通信。
告诉Actor去做一些事情
和Actor的所有通信都是通过异步消息传递完成的。这也是如何使Actor做出反应以及事件驱动的方式。Actor不会主动做任何事情,除非它被通知做某事。开发者可以通过发送消息通知Actor做某些事。异步发送消息意味着发件方不会坚持等待接收方处理完消息。相反的,发件方只是将消息发送到接收方的收件箱里,然后就可以自由地去做一些比等待接收方处理消息更重要的事情了。接收方的收件箱本质上是一个队列,且是有序的。这保证了同一个Actor发送的多条消息的排序会被保留,不过却有可能会与另一个Actor发送的消息交治。
你可能会想知道当一个Actor在不处理任何消息时会做些什么事情。会做一些其他的具体的工作么?其实不会,此时Actor处于完全暂停的状态,它不会消耗除了内存以外的任何资源。就像牵线木偶一样。
我们可以通过传递消息到ActorRef的tell方法中告诉Actor去做一些事情。这个方法将消息放到Actor的收件箱以后就会立即退回。
java代码:
1 |
greeter.tell(new WhoToGreet("akka"), ActorRef.noSender()); |
scala代码:
1 |
greeter.tell(WhoToGreet("akka"), ActorRef.noSender) |
使用scala还可以写得更简洁一些:
1 |
greeter ! WhoToGreet("akka") |
这里使用的“!”是一个绑定操作。
向Actor做出回复
发件方的自引用
在上面的代码里,没有等待Actor作出回复。有时候通信不是简单地单向通信模式,而是倾向于请求应答模式。这时一个直接的方式是添加一个对发件方的引用作为消息的一部分,以便接收方可以通过这个引用发送回复给发件方。这是一个常见的情况,它由Akka直接支持。对于发送的每一条消息,开发者都可以选择是否传递发件方的引用(Actor对应的ActorRef)。如果是从一个Actor内发送消息,那么就可以用过该Actor的自引用访问到这个Actor的ActorRef。然而请注意,不应该这样使用。在Java中可以通过getSelf()访问自引用,在scala中则可以通过self()方法。
java代码:
1 |
greeter.tell(new Greeting(greeting), getSelf()); |
scala的代码就会简单一些了。scala有一个隐式参数列表的特性,它允许自动透明地将参数传递给方法。我们在向另一个Actor发送消息时,可以利用这个特性自动传递对发件方的引用。
下面这段代码如果是在Actor A内部调用的,会在发送消息时将Actor A的ActorRef作为消息的发件方一起传递出去:
1 |
greeter ! Greet |
如果选择在tell方法中不传递对发件方的引用,或者说是忘记了,就会默认使用一个被称为“dead-letter”的Actor的引用。“dead-letter”是所有未处理的消息的结尾标识,可以使用Akka的事件总线( Event Bus )来订阅这种消息。
引用发件方
发件方的引用将会在接收方Actor处理消息时可用。因为每条消息都有一个与之唯一配对的发件方引用,也就是说接收方处理的每条消息的发件方引用是一直在变化的。因此,如果开发者出于某种原因想要在处理消息后继续使用某个特定的发件方引用,就需要保证持有它——可以考虑将之保存在一个成员变量或类似的结构中。要访问发件方引用,在java中可以使用getSender()方法,在scala中则可以直接使用sender:
1 |
getSender().tell(new Greeting(greeting), getSelf()); |
scala代码如下:
1 |
sender ! Greeting(greeting) |
使用收件箱
当前大部分实际应用的Actor应用都会使用不止一个Actor。Actor模型的发明者,Carl Hewitt,最近在一个采访中说道:“One Actor is no Actor. Actors come in systems”。这是很重要且富有智慧的一个评论。要真正利用Actor模型,就应该使用大量的Actor。Actor编程中的每一个难题都可以通过添加更多的Actor来解决——通过将这个难题拆分成更细的子任务并将之委托给新的Actor。
为了简单起见,我们在这个示例中只使用了一个Actor。这意味着如果我们是从主程序与这唯一的一个Actor进行通信,我们就没有发件方,因为我们没有从另一个Actor的内部发送消息。幸运的是Akka提供了一个很好地解决方案:Inbox(收件箱)。
Inbox允许开发者创建一个“actor-in-a-box”。也就是说在Inbox中可以包含一个傀儡式的Actor,通过这个傀儡Actor可以向其他Actor发送消息并接收它们的回复。可以使用Inbox.create()方法创建一个Inbox实例,并使用inbox.send()方法从中发送消息。Inbox内置的傀儡Actor会把收到的所有消息放到一个队列里面,而后可以使用inbox.receive()方法将消息取出来。如果取消息的时候队列为空,那么调用的receive方法将会阻塞——直到有一条消息可以取出位置。很简单是吧。
开发者应该都知道:阻塞非常容易影响性能和扩展性,使用阻塞应当慎之又慎。我们在这实例中使用阻塞方法是因为它可以简化消息流,方便大家理解Actor模型。
现在我们将通过编写Greeter Actor的驱动程序代码来结束这篇文章。
java版本:
1 2 3 4 5 6 7 8 |
final Inbox inbox = Inbox.create(system); greeter.tell(new WhoToGreet("akka"), ActorRef.noSender()); inbox.send(greeter, new Greet()); final Greeting greeting1 = (Greeting) inbox.receive(Duration.create(5, TimeUnit.SECONDS)); System.out.println("Greeting: " + greeting1.message); |
scala版本:
1 2 3 4 5 6 7 8 |
val inbox = Inbox.create(system) greeter.tell(WhoToGreet("akka"), ActorRef.noSender) inbox.send(greeter, Greet) val Greeting(message1) = inbox.receive(5.seconds) println(s"Greeting: $message1") |
就这样!
发表评论