Welcome加拿大28为梦而年轻!

好程序员-千锋教育旗下高端IT职业教育品牌

400-811-9990
  • 客服QQ
  • 官方微信

    好程序员

    专注高端IT职业培训

[BigData] 好程序员大数据培训教程分享Actor学习笔记

[复制链接]
29 0
叶子老师 发表于 昨天 11:43 | 只看该作者 |阅读模式 打印 上一主题 下一主题
  好程序员大数据培训教程分享Actor学习笔记,scala中她能实现很强大的功能,他是基于并发机制的一个事件模型
加拿大28现在学的scala2.10.x版本就是之前的Actor
同步:在主程序上排队执行的任务,只有前一个任务执行完毕后,才能执行下一个任务
异步:指不进入主程序,而进入"任务对列"的任务,只有等主程序任务执行完毕,"任务对列"开始请求主程序,请求任务执行,该任务会进入主程序
java
共享变量 -- 加锁
会出现锁死问题
scala
Actor不共享数据
没有锁的概念
Actor通信之间需要message(通信)
Aactor执行顺序
1.首先调用start()方法启动Actor
2.调用start()方法后act()方法会被执行
3.Actor之间进行发送消息
Actor发送消息的三种方式
! -> 发送异步消息,没有返回值
!? -> 发送同步消息,有返回值,会有线程等待
!! -> 发送异步消息,有返回值,返回值类型Future[Any](用来获取异步操作结果)
Actor并行执行
//注意,这两个actor会并行执行,当其中一个for循环结束后,actor结束
object ActorDemo01 {
  def main(args: Array[String]): Unit = {
    MyActor1.start()
    MyActor2.start()
  }
}
object MyActor1 extends Actor{
  override def act(): Unit = {
    for (i <- 1 to 10){
      println(s"actor => $i")
      Thread.sleep(2000)
    }
  }
  object MyActor2 extends Actor{
    override def act(): Unit = {
      for (i <- 1 to 5){
        println(s"actor2 => $i")
        Thread.sleep(2000)
      }
    }
  }
}
Actor不断接受消息
执行第一种方式,异步
object ActorDemo02 {
  def main(args: Array[String]): Unit = {
    val actor: MyActor = new MyActor
    actor.start()
    //并行执行
    actor ! "start"  // !->异步
    actor ! "stop"
    println("发送完成")
  }
}
class MyActor extends Actor{
  override def act(): Unit = {
    while (true){   //死循环
      receive {   //接收
        case "start" => {
          println("starting")
          Thread.sleep(1000)
          println("started")
        }
        case "stop" => {
          println("stopping")
          Thread.sleep(1000)
          println("stopped")
        }
      }
    }
  }
}
第二种方式:利用react来代替receive,也就是说react线程可复用,比receive更高效
object ActorDemo03 {
  def main(args: Array[String]): Unit = {
    val actor: MyActor3 = new MyActor3
    actor.start()
    actor ! "start"
    actor ! "stop"
    println("成功了")
  }
}
class MyActor3 extends Actor{
  override def act(): Unit = {
    loop {
      react{
        case "start" =>{
          println("starting")
          Thread.sleep(1000)
          println("sarted")
        }
        case "stop" =>{
          println("stoppting")
          Thread.sleep(1000)
          println("stopped")
        }
      }
    }
  }
}
结合样例类练习Actor发送消息
//创建样例类
case class AsyncMsg(id: Int, msg: String)
case class SyncMsg(id: Int, msg: String)
case class ReplyMsg(id: Int, msg: String)
object ActorDemo01 extends Actor {
  override def act(): Unit = {
    while (true) {
      receive {
        case "start" => println("starting...")
        case AsyncMsg(id, msg) =>
        {
          println(s"id:$id,msg:$msg")
          sender ! ReplyMsg(1,"sucess")  //接收到消息后返回响应消息
        }
        case SyncMsg(id,msg) => {
          println(s"id:$id,msg:$msg")
          sender ! ReplyMsg(2,"sucess")
        }
      }
    }
  }
}
object ActorTest{
  def main(args: Array[String]): Unit = {
    val actor: Actor = ActorDemo01.start()
//    //异步发送消息,没有返回值
//    actor ! AsyncMsg(3,"heihei")
//    println("异步消息发送完成,没有返回值")
//    //同步发送消息,有返回值
//    val text: Any = actor !? SyncMsg(4,"OK")
//    println(text)
//    println("同步消息发送成功")
    //异步发送消息,有返回值,返回类型为Future[Any]
    val reply: Future[Any] = actor !! SyncMsg(5,"OK is 不存在的")
    Thread.sleep(2000)
    if (reply.isSet){
      val applyMsg: Any = reply.apply()
      println(applyMsg)
    }else{
      println("Nothing")
    }
  }
}
Actor并行化的wordcount
class Task extends Actor {
  override def act(): Unit = {
    loop {
      react {
        case SubmitTask(fileName) => {
          val contents = Source.fromFile(new File(fileName)).mkString
          val arr = contents.split("\r\n")
          val result = arr.flatMap(_.split(" ")).map((_, 1)).groupBy(_._1).mapValues(_.length)
          //val result = arr.flatMap(_.split(" ")).map((_, 1)).groupBy(_._1).mapValues(_.foldLeft(0)(_ + _._2))
          sender ! ResultTask(result)
        }
        case StopTask => {
          exit()
        }
      }
    }
  }
}
object WorkCount {
  def main(args: Array[String]) {
    val files = Array("c://words.txt", "c://words.log")
    val replaySet = new mutable.HashSet[Future[Any]]
    val resultList = new mutable.ListBuffer[ResultTask]
    for(f <- files) {
      val t = new Task
      val replay = t.start() !! SubmitTask(f)
      replaySet += replay
    }
    while(replaySet.size > 0){
      val toCumpute = replaySet.filter(_.isSet)
      for(r <- toCumpute){
        val result = r.apply()
        resultList += result.asInstanceOf[ResultTask]
        replaySet.remove(r)
      }
      Thread.sleep(100)
    }
    val finalResult = resultList.map(_.result).flatten.groupBy(_._1).mapValues(x => x.foldLeft(0)(_ + _._2))
    println(finalResult)
  }
}
case class SubmitTask(fileName: String)
case object StopTask
case class ResultTask(result: Map[String, Int])
好程序员大数据培训官网:http://www.uvidit.com/

精彩内容,一键分享给更多人!
收藏
收藏0
转播
转播
分享
淘帖0
支持
支持0
反对
反对0
您需要登录后才可以回帖

本版积分规则

关注加拿大28
好程序员
千锋好程序员

北京校区(总部):北京市海淀区宝盛北里西区28号中关村智诚科创大厦

深圳西部硅谷校区:深圳市宝安区宝安大道5010号深圳西部硅谷B座A区605-619

杭州龙驰智慧谷校区:浙江省杭州市下沙经济技术开发区元成路199号龙驰智慧谷B座7层

郑州校区:郑州市二七区航海中路60号海为科技园C区10层、12层

Copyright 2007-2019 北京千锋互联科技加拿大28 .All Right

京ICP备12003911号-5 京公安网11010802011455号

请您保持通讯畅通1对1咨询马上开启