博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
使用Akka Actor和Java 8构建反应式应用
阅读量:6688 次
发布时间:2019-06-25

本文共 13440 字,大约阅读时间需要 44 分钟。

\

本文要点

\\
  • Actor模型为编写并发和分布式的系统提供了高层次的抽象,为开发人员屏蔽了显式锁定和线程管理的工作;\\t
  • Actor模型为反应式系统提供了核心功能,这些功能在反应式宣言中定义为响应性、弹性、扩展性以及消息驱动;\\t
  • Akka是一个基于Actor的框架,借助Java 8的Lambda支持它非常易于实现;\\t
  • 通过使用Actor模型,开发人员在设计和实现系统时,能够更加关注于核心功能,忽略其他业务不相关的冗余内容;\\t
  • 基于Actor的系统非常适合快速演化的微服务架构。\
\\

尽管“反应式(reactive)”这个术语已经存在很长时间了,但是只有到最近它才被行业实际应用到系统设计之中,并得到了主流的采纳。在2014年Gartner就写到,过去非常流行的三层架构已经日薄西山。随着企业在推进现代化方面的努力,这一点已经越发明晰了,企业必须要重新思考他们十多年来构建应用的方式。

\\

微服务席卷了软件行业,它所带来的冲击波正在从根本上动摇传统开发流程。我们看到软件设计范式发生了变化,项目管理的方法论也随之发生了演化。我们正在向新的应用设计和实现方式转变,它以前所未有的势头在IT系统中实现。即便微服务这个术语不是全新的概念,我们的行业也正在意识到它不仅仅是解耦RESTful端点和拆分单体应用,它真正的价值在于更好的资源利用效率以及面对不可预知工作负载时更强的扩展性。的原则很快变成了微服务架构的圣经,因为它们本质上就是分布式的反应式应用。

\\

如今应用中的Actor模型

\\

为了保持用户的兴趣,应用必须要保持很高的响应性,同时,为了满足受众不断变化的需求和预期,应用必须要快速演化。用于构建应用的技术在不断地快速演进;科学在不断发展,持续涌现的新需求不能依赖于昨天的工具和方法论。Actor模型正在不断发展起来,它是一种构建应用的高效工具,能够充分发挥多核、内存以及集群环境所带来的强大处理能力。

\\

Actor提供了一种简单却强大的模型,通过该模型设计和实现的应用可以分布式的,并且能够跨系统中所有的资源共享工作任务,这些资源可以从线程和核心级别一直到服务器集群和数据中心级别。它提供了一个高效的框架来构建应用,所构建出的应用具有较高的并发性,并且能够提升资源的利用率。另外很重要的一点,Actor模型还提供了定义良好的方式来优雅地处理错误和故障,确保应用的可靠性级别,它能够隔离问题,防止级联故障和长时间的宕机。

\\

在过去,构建高并发的系统通常涉及到大量的装配和非常技术化的编程,它们都是非常难以掌握的。这些技术方面的挑战会抢占我们对系统核心业务功能的注意力,因为很大一部分的工作都集中在业务细节上,这需要花费很多的时间和精力用于搭建处理管道和功能装配。如果我们使用Actor来构建系统的话,就能在一个较高的抽象层级完成这些任务,因为处理管道和功能装配已经内置在了Actor模型之中。这不仅能够将我们从繁琐的传统系统实现的细节中解放出来,还能让我们更加关注于系统的核心功能和创新。

\\

使用Java 8和Akka实现Actor模型

\\

是一个在JVM上构建高并发、分布式、有弹性的消息驱动应用的工具集。Akka “actor”只是Akka工具集中一部分,它能够让我们在编写并发代码时,不用去思考低层级的线程和锁。Akka中其他的工具还包括Akka Streams和Akka http。尽管Akka是使用Scala编写的,但是它也有,如下的样例运行在2.4.9版本以上(目前Akka的最新版本为2.5.7,但核心API与本文基本相同——译者注)。

\\

在Akka中,Actor是基本的工作单元。Actor是状态和行为的一个容器,它可以创建和监管子Actor。Actor之间通过异步的消息实现相互的通信。这个模型保护了Actor的内部状态,使其能够实现线程安全,该模型还实现了事件驱动的行为,从而不会阻塞其他的Actor。作为开始,我们所需要知道的只是akka的Maven依赖。

\\
\\u0026lt;dependency\u0026gt;\   \u0026lt;groupId\u0026gt;com.typesafe.akka\u0026lt;/groupId\u0026gt;\   \u0026lt;artifactId\u0026gt;akka-actor_2.11\u0026lt;/artifactId\u0026gt;\   \u0026lt;version\u0026gt;2.4.9\u0026lt;/version\u0026gt;\\u0026lt;/dependency\u0026gt;
\\

改变Actor的状态

\\

就像通过移动设备收发短信一样,我们需要使用消息来调用Actor。与短信类似,Actor之间的消息也必须是不可变的。在使用Actor的时候,最重要的就是定义它所能接受的消息。(这种消息通常被称为协议,因为它定义了Actor之间的交互点。)Actor接收消息,然后以各种方式对其作出反应,它们可以发送其他的消息、修改自己的状态或行为、创建其他的Actor。

\\

Actor的初始行为是通过实现receive()方法来定义的,在实现这个方法时,可以在默认的constructor. receive()中借助ReceiveBuilder匹配传入的消息并执行相关的行为。每条信息的行为通过一个Lambda表达式来定义。在下面的样例中,ReceiveBuilder使用了对接口方法“onMessage”的引用。onMessage方法增加了一个计数器(内部状态)并通过AbstractLoggingActor.log方法记录了一条info级别的日志信息。

\\
\static class Counter extends AbstractLoggingActor {\    static class Message { }\\    private int counter = 0;\\    {\      receive(ReceiveBuilder\        .match(Message.class, this::onMessage)\        .build()\      );\    }\\    private void onMessage(Message message) {\      counter++;\      log().info(\"Increased counter \" + counter);\    }\}
\\

Actor就绪之后,还需要启动它。这需要通过ActorSystem实现,它控制着Actor的生命周期。但是,我们首先需要提供一些关于如何启动这个Actor所需的额外信息。akka.actor.Props是一个配置对象,能够将上下文范围内的配置暴露给框架的各个地方。它用来创建我们的Actor,这个对象是不可变的,因此线程安全,完全可以共享。

\\

return Props.create(Counter.class);

\\

Props对象描述了Actor的构造器参数。将其封装到一个工厂函数中并放到Actor的构造器附近通常是一种好的实践。ActorSystem本身是在main方法中创建的。

\\
\public static void main(String[] args) {\    ActorSystem system = ActorSystem.create(\"sample1\");\    ActorRef counter = system.actorOf(Counter.props(), \"counter\");\\}
\\

ActorSystem (“sample1”)和它所包含的Actor(“counter”)都可以给定名称,这样便于在Actor层级结构中进行导航,这个话题稍后会进行讨论。现在,ActorRef可以发送一条消息给Actor,如样例所示:

\\

counter.tell(new Counter.Message(), ActorRef.noSender());

\\

在这里,使用两个参数定义了要发送的Message以及消息的发送者。(顾名思义,noSender表明在本例中,并没有使用发送者。)如果运行上述样例的话,我们就能得到预期的输出:

\\

[01/10/2017 10:15:15.400] [sample1-akka.actor.default-dispatcher-4] [akka://sample1/user/counter] Increased counter 1

\\

这是一个非常简单的样例,但是它提供了我们所需的线程安全性。发送给Actor的消息来源于不同的线程,这些消息屏蔽了并发的问题,因为Actor框架会进行消息的序列化处理。读者可以查看完整的样例。

\\

修改Actor的行为

\\

读者可能已经注意到,我们的简单样例修改了Actor的状态,但是它并没有改变Actor行为,也没有发送消息给其他Actor。我们接下来考虑一个防盗报警系统,它可以通过密码来启用或禁用,它的传感器会探测活动。如果有人试图通过不正确的密码禁用告警的话,它就会发出声音。Actor能够响应三种消息,分别是通过密码(以负载的形式提供该值)进行禁用和启用的消息以及盗窃活动的消息。这三种消息都包含在了下面的协议中:

\\
\static class Alarm extends AbstractLoggingActor {\    // contract\    static class Activity {}\    static class Disable {\      private final String password;\      public Disable(String password) {\        this.password = password;\      }\    }\    static class Enable {\      private final String password;\      public Enable(String password) {\        this.password = password;\      }\    }\\    // ...\}
\\

Actor有一个针对密码的预置属性,它也会传入到构造器中:

\\
\private final String password;\public Alarm(String password) {\      this.password = password;\     // ...\}
\\

前面提到的akka.actor.Props配置对象也需要知道password属性,这样的话,才能在Actor系统启动的时候将其传递给实际的构造器。

\\
\public static Props props(String password) {\      return Props.create(Alarm.class, password);\}
\\

针对每种可能的消息,Alarm还需要对应的行为。这些行为是AbstractActorreceive方法的实现。receive方法应该定义一系列的match语句(每个都是PartialFunction\u0026lt;Object, BoxedUnit\u0026gt;类型),它定义了Actor能够处理的消息,另外还包含消息如何进行处理的实现。

\\
\private final PartialFunction\u0026lt;Object, BoxedUnit\u0026gt; enabled;\private final PartialFunction\u0026lt;Object, BoxedUnit\u0026gt; disabled;
\\

如果这个签名看上去令人望而生畏的话,那么我们的代码可以通过前面所使用的ReceiveBuilder将细节隐藏起来。

\\
\public Alarm(String password) {\      this.password = password;\      \     enabled = ReceiveBuilder\        .match(Activity.class, this::onActivity)\        .match(Disable.class, this::onDisable)\        .build();\\      disabled = ReceiveBuilder\        .match(Enable.class, this::onEnable)\        .build();\\      receive(disabled);\    }\}
\\

需要注意最后对receive的调用,将默认行为设置为“disabled”。这三个行为是使用已有的三个方法(onActivity、onDisable、onEnable)来实现的。这些方法中最简单的是onActivity。如果接收到activity的话,报警会在控制台记录一条日志。在这里需要注意activity没有消息负载,所以我们将其命名为ignored。  

\\
\private void onActivity(Activity ignored) {\    log().warning(\"oeoeoeoeoe, alarm alarm!!!\");\}
\\

如果Actor接收到一条enable消息的话,新的状态将会记录下来并且状态将会变更为enabled。如果密码不匹配的话,会记录一条简短的警告日志。消息负载现在包含了密码,所以我们可以通过访问它来校验密码。

\\
\private void onEnable(Enable enable) {\   if (password.equals(enable.password)) {\     log().info(\"Alarm enable\");\     getContext().become(enabled);\   } else {\     log().info(\"Someone failed to enable the alarm\");\   }\}
\\

当收到一条disable消息时,Actor需要检查密码,记录一条关于状态变化的简短消息然后将状态修改为disabled或者在密码不匹配的情况下记录一条警告信息。

\\
\private void onDisable(Disable disable) {\  if (password.equals(disable.password)) {\    log().info(\"Alarm disabled\");\    getContext().become(disabled);\  } else {\    log().warning(\"Someone who didn't know the password tried to disable it\");\  }\}
\\

这样就完成了Actor的逻辑,我们接下来可以启动Actor系统并向其发送一些消息。注意,我们的正确密码“cats”是作为一个属性传递给Actor系统的。

\\
\ActorSystem system = ActorSystem.create();\    final ActorRef alarm = system.actorOf(Alarm.props(\"cat\"), \"alarm\");
\\

消息:

\\
\    alarm.tell(new Alarm.Activity(), ActorRef.noSender());\    alarm.tell(new Alarm.Enable(\"dogs\"), ActorRef.noSender());\    alarm.tell(new Alarm.Enable(\"cat\"), ActorRef.noSender());\    alarm.tell(new Alarm.Activity(), ActorRef.noSender());\    alarm.tell(new Alarm.Disable(\"dogs\"), ActorRef.noSender());\    alarm.tell(new Alarm.Disable(\"cat\"), ActorRef.noSender());\    alarm.tell(new Alarm.Activity(), ActorRef.noSender());
\\

产生的输出如下所示:

\\
\[01/10/2017 10:15:15.400] [default-akka.actor.default-dispatcher-4] [akka://default/user/alarm] Someone failed to enable the alarm\[01/10/2017 10:15:15.401] [default-akka.actor.default-dispatcher-4] [akka://default/user/alarm] Alarm enable\[WARN] [01/10/2017 10:15:15.403] [default-akka.actor.default-dispatcher-4] [akka://default/user/alarm] oeoeoeoeoe, alarm alarm!!!\[WARN] [01/10/2017 10:15:15.404] [default-akka.actor.default-dispatcher-4] [akka://default/user/alarm] Someone who didn't know the password tried to disable it\[01/10/2017 10:15:15.404] [default-akka.actor.default-dispatcher-4] [akka://default/user/alarm] Alarm disabled
\\

你可以在GitHub上找到完整的。到目前为止,我们只使用了一个Actor来处理消息。不过就像在业务组织中一样,Actor也能形成自然的层级结构。

\\

Actor的层级结构

\\

Actor可能会创建其他的Actor。当一个Actor创建另外一个Actor时,创建者也被称为监管者(supervisor),而被创建的Actor也被称为工作者(worker)。我们可能基于很多原因需要创建工作者Actor,最常见的原因是工作的委托。监管者创建一个或多个工作者Actor,然后将工作委托给它们。

\\

监管者同时会成为工作者的看守人。就像父母会时刻关注孩子的行为那样,监管者也会照顾它的工作者Actor。如果Actor遇到问题的话,它会将自己挂起(也就是说在恢复之前,它不会处理正常的消息),并且会通知其监管者自己发生了故障。

\\

到目前为止,我们创建了多个Actor并为其分配了名字。Actor的名字用来在层级结构中识别Actor。与Actor交互的一般都是用户所创建Actor的父Actor,也就是带有\"/user\"路径的guardian。使用原始system.actorOf()创建的Actor是该guardian的直接子Actor,如果它终止的话,系统中所有正常的Actor也都会关闭。在上面的alarm样例中,我们创建的是/user/alarm路径的用户Actor。因为Actor是按照严格的层级方式来创建的,所以Actor会存在一个由Actor名称组成的唯一序列,这个序列会从Actor系统的根逐级往下,按照父子关系形成。这个序列类似于文件系统中的封闭文件夹,因此采用了“路径(path)”这个名称来代指它,当然Actor层级结构与文件系统的层级结构还有一些基础的差异。

\\

在Actor内部,我们可以调用getContext().actorOf(props, “alarm-child”)创建名为“alarm-child”的新Actor,它会作为alarm Actor的子Actor。子Actor的生命周期是绑定在父Actor之上的,这意味着如果我们停止“alarm” Actor的话,也会停掉其子Actor:

\\

8ccad4aec9ef8d7cd589fac11d302ff5.jpg

\\

这种层级结构对于基于Actor系统的故障处理也有着直接影响。Actor系统的典型特点就是将任务进行分解和委托,直到它被拆分得足够小,能够从一个地方进行处理。通过这种方式,不仅任务本身能够非常清晰地进行结构化,所形成的Actor也能在如下方面变得非常明确:

\\
  • 应该处理哪些消息\\t
  • 应该怎样正确地应对接收到的消息\\t
  • 应该如何处理故障。\

如果某个Actor无法处理特定情景的话,它会发送对应的故障消息给它的监管者,请求帮助。面对故障,监管者有四种不同的可选方案:

\\
  • 恢复(Resume)子Actor,保持其已有的内部状态,但是忽略掉导致故障的消息;\\t
  • 重启(Restart)子Actor,通过启动新的实例,清理其已有的状态;\\t
  • 永久停止(Stop)子Actor,将子Actor未来所有的消息发送至;\\t
  • 将故障传递至更高的层级(Escalate),这需要让监管者本身也发生故障。\

接下来,我们将上面学到的所有内容通过一个样例来具体讲解一下:NonTrustWorthyChild接收Command消息,每当收到该消息时,会增加一个内部的计数器。如果消息数能够被4整除的话,会抛出一个RuntimeException,这个异常会向上传递给Supervisor。这里并没有什么新东西,Command消息本身并没有负载。

\\
\public class NonTrustWorthyChild extends AbstractLoggingActor {\\  public static class Command {}\  private long messages = 0L;\\  {\    receive(ReceiveBuilder\      .match(Command.class, this::onCommand)\      .build()\    );\  }\\  private void onCommand(Command c) {\    messages++;\    if (messages % 4 == 0) {\      throw new RuntimeException(\"Oh no, I got four commands, can't handle more\");\    } else {\      log().info(\"Got a command \" + messages);\    }\  }\\  public static Props props() {\    return Props.create(NonTrustWorthyChild.class);\  }\}
\\

Supervisor 在它的构造器中启动NonTrustWorthyChild,并将它所接收到的command消息直接转发给子Actor。

\\
\public class Supervisor extends AbstractLoggingActor {\{\    final ActorRef child = getContext().actorOf(NonTrustWorthyChild.props(), \"child\");\\    receive(ReceiveBuilder\      .matchAny(command -\u0026gt; child.forward(command, getContext()))\      .build()\    );\\  }\  //…\}
\\

Supervisor实际启动之后,所形成的层级结构将会是“/user/supervisor/child”。在我们完成该任务之前,需要预先定义所谓的监管策略(supervision strategy)。Akka提供了两种类型的监管策略:OneForOneStrategyAllForOneStrategy。它们之间的差异在于前者会将指令应用于发生故障的子Actor,而后者则会将指令同时应用于子Actor的兄弟节点。正常情况下,我们应该使用OneForOneStrategy,如果没有明确声明的话,它也是默认方案。监管策略需要通过覆盖SupervisorStrategy方法来定义。

\\
\@Override\public SupervisorStrategy supervisorStrategy() {\   return new OneForOneStrategy(\      10,\      Duration.create(10, TimeUnit.SECONDS),\      DeciderBuilder\          .match(RuntimeException.class, ex -\u0026gt; stop())\          .build()\   );\}
\\

第一个参数定义了maxNrOfRetries,它指定了子Actor在停止之前允许尝试重启的次数。(如果设置为负数值,则代表没有限制)。withinTimeRange参数定义了maxNrOfRetries的持续时间窗口。按照上面的定义,该策略会在10秒钟之内尝试10次。DeciderBuilder的工作方式与ReceiveBuilder完全类似,它定义了要匹配的异常以及如何应对。在本例中,如果在10秒钟内尝试了10次的话,Supervisor会停止掉NonTrustWorthyChild,所有剩余的消息将会发送至dead letter box。

\\

Actor系统是通过Supervisor Actor来启动的。

\\
\ActorSystem system = ActorSystem.create();\final ActorRef supervisor = system.actorOf(Supervisor.props(), \"supervisor\");
\\

当系统启动之后,我们发送10条command信息到Supervisor。需要注意,“Command”消息是定义在NonTrustWorthyChild中的。

\\
\for (int i = 0; i \u0026lt; 10; i++) {\  supervisor.tell(new NonTrustWorthyChild.Command(), ActorRef.noSender());\}
\\

输出的内容显示,在四条消息之后,异常传递到了Supervisor中,剩下的消息发送到了deadLetters收件箱中。如果SupervisorStrategy被定义为restart()而不是stop()的话,那么将会启动一个新的NonTrustWorthyChild Actor实例。

\\
\[01/10/2017 12:33:47.540] [default-akka.actor.default-dispatcher-3] [akka://default/user/supervisor/child] Got a command 1\[01/10/2017 12:33:47.540] [default-akka.actor.default-dispatcher-3] [akka://default/user/supervisor/child] Got a command 2\[01/10/2017 12:33:47.540] [default-akka.actor.default-dispatcher-3] [akka://default/user/supervisor/child] Got a command 3\[01/10/2017 12:33:47.548] [default-akka.actor.default-dispatcher-4] [akka://default/user/supervisor] Oh no, I got four commands, I can't handle any more\java.lang.RuntimeException: Oh no, I got four commands, I can't handle any more\\t...\[01/10/2017 12:33:47.556] [default-akka.actor.default-dispatcher-3] [akka://default/user/supervisor/child] Message [com.lightbend.akkasample.sample3.NonTrustWorthyChild$Command] from Actor[akka://default/deadLetters] to Actor[akka://default/user/supervisor/child#-1445437649] was not delivered. [1] dead letters encountered.
\\

这个日志可以关闭或者进行调整,这需要修改 “akka.log-dead-letters”“akka.log-dead-letters-during-shutdown”的配置。

\\

读者可以,并尝试调整SupervisorStrategy

\\

总结

\\

借助Akka和Java 8,我们能够创建分布式和基于微服务的系统,这在几年前还是一种梦想。现在,所有行业的企业都迫切希望系统的演化速度能够跟上业务的速度和用户的需求。如今,我们能够弹性的扩展系统,使其支持大量的用户和庞大的数据。我们创建的系统有望具备一定级别的弹性,使停机时间不再是按照小时来计算,而是按照秒来计算。基于Actor的系统能够让我们创建快速演进的微服务架构,它可以进行扩展并且能够不停机运行。

\\

Actor模型提供了反应式系统的核心功能,也就是反应式宣言所定义的响应性、弹性、扩展性以及消息驱动。

\\

Actor系统可以进行水平扩展,从一个节点扩展到具有众多节点的集群,这样的话就为我们提供了灵活性以应对大规模的负载。除此之外,还可能实现有弹性的扩展,也就是扩展系统的处理能力,不管是手动的还是自动的,都能充分支持系统活动所出现的高峰和低谷状态。

\\

借助Actor和Actor系统,故障探测和恢复就成为一种架构上的特性,而不是事后才考虑增补上去的功能。我们可以使用内置的Actor监管策略来处理下属工作者Actor遇到的问题,能够一直向上追溯到Actor系统层级,集群节点会积极监控集群的状态,在这种环境中处理故障已经植入到了Actor和Actor系统的DNA之中了。这其实始于Actor之间异步交换消息的最基础层级:如果你给我发送一条消息的话,你必须要考虑可能的输出,如果得到了预期的答复该怎么办,没有收到预期答复又该怎么办?这种处理策略会一直延伸到集群中节点的加入和离开。

\\

在设计系统时,按照Actor的方式思考在很多方面都会更加直观。Actor的交互方式对我们来说会更加自然,因为简单来讲,它的交互方式与人类之间的交互方式更加接近。这样的话,我们在设计和实现系统时,能够更加关注核心功能,忽略其他业务不相关的冗余内容。

\\

关于作者

\\

a873fd1004946712845be0b4a33391bb.jpgMarkus Eisele 是一位Java Champion、前Java EE专家组成员、JavaLand的创始人,在世界范围内的Java会议上是享有盛誉的讲师,在企业级Java领域颇为知名。他在Lightbend担任开发人员,读者可以在Twitter上联系到他。

\\

查看英文原文:

转载地址:http://suuoo.baihongyu.com/

你可能感兴趣的文章
Kali Linux 网络扫描秘籍 第三章 端口扫描(一)
查看>>
共享单车步入物联网军备战
查看>>
PHP 魔术变量
查看>>
推荐的PHP编码规范
查看>>
Gartner报告:东方金信进入Hadoop世界厂商名录
查看>>
Python_(1)数据类型及其常见使用方法(图文)
查看>>
如何查看WWN号
查看>>
主页被劫持问题
查看>>
linux中awk学习小结
查看>>
WCF分布式开发常见错误(23):the fact that the server certificate isn't configured with HTTP.SYS...
查看>>
第一个Indigo Service
查看>>
《Pro ASP.NET MVC 3 Framework》学习笔记之三十二 【无入侵的Ajax】
查看>>
监听启动报TNS-12537、TNS-12560错误
查看>>
XXX管理平台系统——项目教训
查看>>
会写代码的项目经理
查看>>
通过Lua解释器来扩展丰富nginx功能,实现复杂业务的处理
查看>>
禁用WPF窗体的最大化按钮
查看>>
玩转React样式
查看>>
TinyHttpd中sockaddr与struct sockaddr_in的区别
查看>>
嘉峪关市与甘肃省广电网络公司对接智慧城市建设项目
查看>>