Scala.Actor实践心得与设计思想

xiaoxiao2021-02-28  19

这段时间系统的学习了 Scala ,编程思想上可谓收获不少,想从里面挑些值得写的东西分享给大家, Scala 的Actor

可谓这门语言的一个亮点,函数式编程核心价值在于多核编程,所以就打算说说这个Actor,总结一下学习心得。先很俗

套的简单谈谈概念,然后会拿四个例子做补充。主要内容包括基本原理,设计思想,单机环境并发测试。

            Actor是一种基于事件的轻量级线程,在以前的并发模型中,我们需要关注共享的数据结构,而使用Actor则需要

关注操作数据的代码结构,因为减少了数据的共享。Actor的主要能力来源于消息传递,而不是采用阻塞调用的处理形式。

如果创建直接或间接扩展 Actor的类,要确保对对象的所有调用都通过消息传递进行。

     我把Actor的设计思想归为两类,使用目的归为两类。从设计思想上来说Scala推荐的是以消息传递为核心的设计

思想,由于Scala可以无缝使用Java类库,所以也可以采用以共享数据为核心的设计,当然也可以写出混合式设计风格的

。使用目的主要有两种,一种是Scala提供APIJava调用,另一种就是Scala自给自足。举三个例子,例子很简单,是一

个累加器。

1、以消息传递为核心的设计:使用Actoractor方法,使用不可变对象,不考虑数据共享问题,以消息传递为设计核心。

 1  import  actors._, Actor._  2  /*  3   * Author:ShiYang  4   * Blog: http://shiyangxt.cnblogs.com  5   *  */  6  object SendMessageStyle {  7   8    def main(args: Array[String]): Unit  =  {  9      val caller  =  self 10      val accumulator  =  actor { 11        var  continue   =   true 12        var sum  =   0 13        loopWhile( continue ) { 14          reactWithin( 500 ) { 15             case  number: Int  =>  sum  +=  number 16             case  TIMEOUT  => 17               continue   =   false 18              caller  !  sum 19          } 20        } 21      } 22      accumulator  !   1 23      accumulator  !   2 24      accumulator  !   3 25      receiveWithin( 1000 ) { 26         case  result  =>  println( " Total is  "   +  result) 27      } 28    } 29  }

2、以共享数据为核心的设计:构建由Actor继承共享数据操作类,以共享数据为核心。

 1  import  actors._, Actor._  2   3  /*  4   * Author:ShiYang  5   * Blog: http://shiyangxt.cnblogs.com  6   *  */  7  object SharedDataStyle {  8     case   class  Add(number: Int)  9     case   class  GetResult(sender: Actor) 10  11     class  AddActor  extends  Actor { 12      override def act(): Unit  =  process( 0 ) 13      def process(value: Int): Unit  =  { 14        reactWithin( 500 ) { 15           case  Add(number)  =>  process(value  +  number) 16           case  GetResult(a)  =>  a  !  value; process(value) 17           case  _  =>  process(value) 18        } 19      } 20    } 21  22    def main(args: Array[String]): Unit  =  { 23      val addActor  =   new  AddActor 24      addActor.start() 25      addActor  !  Add( 1 ) 26      addActor  !  Add( 2 ) 27      addActor  !  Add( 3 ) 28      addActor  !  GetResult(self) 29      receiveWithin( 1000 ) { 30         case  result  =>  println( " Total is  "   +  result) 31      } 32    } 33  }

3、以API形式提供给Java程序使用:由于Java不能直接向Actor发消息,所以需要对Scala!()方法进行封装。

 1  import  actors._, Actor._  2  /*  3   * Author:ShiYang  4   * Blog: http://shiyangxt.cnblogs.com  5   *  */  6  object ForJavaStyle {  7     case   class  Add(number: Int)  8     case   class  GetResult(sender: Actor)  9     private   class  AddActor  extends  Actor { 10      override def act(): Unit  =  process( 0 ) 11      def process(value: Int): Unit  =  { 12        reactWithin( 500 ) { 13           case  Add(number)  =>  process(value  +  number) 14           case  GetResult(a)  =>  a  !  value; process(value) 15           case  _  =>  process(value) 16        } 17      } 18    } 19     private  val addActor  =   new  AddActor 20    addActor.start() 21     private  def add(sender: Actor, num: Int): Unit  =  { 22      sender  !  Add(num) 23    } 24     private  def getResult(sender: Actor): Int  =  { 25      sender  !  GetResult(self) 26      receiveWithin( 1000 ) { 27         case  result: Int  =>  result 28      } 29    } 30    def addForJava(num: Int): Unit  =  { 31      add(addActor, num) 32    } 33    def getResultForJava(): Int  =  { 34      getResult(addActor) 35    } 36  }

Java端调用代码:

 1  /*  2   * Author:ShiYang  3   * Blog: http://shiyangxt.cnblogs.com  4   *  */  5  public   class  GetFromScala {  6   7       public   static   void  main(String[] args) {  8          ForJavaStyle$.MODULE$.addForJava( 1 );  9          ForJavaStyle$.MODULE$.addForJava( 2 ); 10          ForJavaStyle$.MODULE$.addForJava( 3 ); 11          System.out.println( " Total is  " 12                   +  ForJavaStyle$.MODULE$.getResultForJava()); 13      } 14  }

      通过上面的例子可见ScalaJava语言有非常大的补充,提高了生产力。为Java提供了轻松实现多核并行编程的能

力。为了进一步测试Actor的并发性能,于是做了一个简单的单机环境并发测试。程序是构建一个Actor动态有序数组,

并发创建NActor对象,为了证明这些对象全都可用,顺序从数组的第一个Actor发消息到最后一个Actor,只有当一个

Actor接收到前一个Actor发送的消息后,才向后一个Actor发送消息。当最后一个数组元素接收到消息后,再把消息从数组

尾部用同样处理过程逆序发送到数组头部。这个消息发送过程不是并发处理,是顺序处理。这里只是为了证明这些对象全都

可用。如果为了测试并发处理,可以修改程序,让每个数组元素给后一位数组元素发消息。这样就会看到输出混乱的发送信

息,因为并发是无序的。

测试环境:双核4G内存,Windows XPSun JVM1.6,单机环境,Scala版本2.9.0.1

测试结果:当使用Receive方法接收消息时,由于Receive会在结束任务前一直持有线程,而Scala在后台默认只给Receive

方法启动256个线程,我的程序又是顺序的发消息,而且不是临时接收器(只处理一次消息),所以Receive在这种情况下,

只有255个并发。React接收器由于不需要长期持有线程,空闲即释放线程。所以React在本程序中可以跑20w的并发,如果

简单优化一下JVM,就可以达到100w的并发量。默认React接收器后台会调用4个线程组成的线程池。如果修改程序让每个数

组元素给后一位数组元素并发的发消息,那么在不阻塞线程的情况下,Receive方法也可以达到和React一样的并发量。因为

这个测试程序是顺序发送消息,所以就没有设置超时,如果是并发环境,建议加上超时,避免线程阻塞。

下面是测试程序:

 1  import  actors._, Actor._, java.util._  2  /*  3   * Author:ShiYang  4   * Blog: http://shiyangxt.cnblogs.com  5   *  */  6  object ConcurrentTest {  7   8    val actors  =   new  ArrayList[Actor]  9    val length  =   1000000 10    var startTime  =  System.nanoTime 11  12    def main(args: Array[String]): Unit  =  { 13       for  (i  <-   0  to length) 14        actors.add(actor { 15          info( " react:  "   +  i  +   "  actor created " ) 16          reactMessage 17        }) 18      actors.get( 0 !  ( 0 0 ) 19    } 20  21    def info(msg: String)  =  println(msg  +   "  received by  "   + 22      Thread.currentThread) 23  24    def receiveMessage { 25      var  continue   =   true 26       while  ( continue ) { 27        receive { 28           case  (id: Int, direction: Int)  => 29            sendMessage(id: Int, direction: Int) 30           case   " finish "   => 31             continue   =   false 32            val endTime  =  System.nanoTime 33            println( " Finish, spend time: "   + 34              (endTime  -  startTime)  /   1000000000.0   +   "  secs " ) 35           case  _  =>  println( " input error " ) 36        } 37      } 38    } 39  40    def reactMessage { 41      var  continue   =   true 42      loopWhile( continue ) { 43        react { 44           case  (id: Int, direction: Int)  => 45            sendMessage(id: Int, direction: Int) 46           case   " finish "   => 47             continue   =   false 48            val endTime  =  System.nanoTime 49            println( " Finish, spend time: "   + 50              (endTime  -  startTime)  /   1000000000.0   +   "  secs " ) 51           case  _  =>  println( " input error " ) 52        } 53      } 54    } 55  56     // direction=0->sendLatter;direction=1->sendFormer 57    def sendMessage(id: Int, direction: Int) { 58       if  (direction  ==   0   &&  id  !=  length) { 59        info( " Actor "   +  id  +   "  send message to the Actor "   +  (id  +   1 )) 60        actors.get(id  +   1 !  (id  +   1 0 ) 61      }  else   if  (id  !=   0   &&  direction  ==   1 ) { 62        info( " Actor "   +  id  +   "  send message to the Actor "   +  (id  -   1 )) 63        actors.get(id  -   1 !  (id  -   1 1 ) 64      }  else   if  (direction  ==   0   &&  id  ==  length) { 65        actors.get(length)  !  (length,  1 ) 66      }  else   if  (id  ==   0   &&  direction  ==   1 ) { 67        actors.get( 0 !   " finish " 68      } 69    } 70  }
转载请注明原文地址: https://www.6miu.com/read-1700134.html

最新回复(0)