Skip to content

Instantly share code, notes, and snippets.

@Diego81
Last active December 12, 2021 12:36
Show Gist options
  • Star 15 You must be signed in to star a gist
  • Fork 7 You must be signed in to fork a gist
  • Save Diego81/9887105 to your computer and use it in GitHub Desktop.
Save Diego81/9887105 to your computer and use it in GitHub Desktop.
Simple Word Counter implemented using Akka
import akka.actor.{ Actor, ActorRef, Props, ActorSystem }
case class ProcessStringMsg(string: String)
case class StringProcessedMsg(words: Integer)
class StringCounterActor extends Actor {
def receive = {
case ProcessStringMsg(string) => {
val wordsInLine = string.split(" ").length
sender ! StringProcessedMsg(wordsInLine)
}
case _ => println("Error: message not recognized")
}
}
case class StartProcessFileMsg()
class WordCounterActor(filename: String) extends Actor {
private var running = false
private var totalLines = 0
private var linesProcessed = 0
private var result = 0
private var fileSender: Option[ActorRef] = None
def receive = {
case StartProcessFileMsg() => {
if (running) {
// println just used for example purposes;
// Akka logger should be used instead
println("Warning: duplicate start message received")
} else {
running = true
fileSender = Some(sender) // save reference to process invoker
import scala.io.Source._
fromFile(filename).getLines.foreach { line =>
context.actorOf(Props[StringCounterActor]) ! ProcessStringMsg(line)
totalLines += 1
}
}
}
case StringProcessedMsg(words) => {
result += words
linesProcessed += 1
if (linesProcessed == totalLines) {
fileSender.map(_ ! result) // provide result to process invoker
}
}
case _ => println("message not recognized!")
}
}
object Sample extends App {
import akka.util.Timeout
import scala.concurrent.duration._
import akka.pattern.ask
import akka.dispatch.ExecutionContexts._
implicit val ec = global
override def main(args: Array[String]) {
val system = ActorSystem("System")
val actor = system.actorOf(Props(new WordCounterActor(args(0))))
implicit val timeout = Timeout(25 seconds)
val future = actor ? StartProcessFileMsg()
future.map { result =>
println("Total number of words " + result)
system.shutdown
}
}
}
@shehaaz
Copy link

shehaaz commented Mar 23, 2016

I had to move implicit val ec = global inside the main method to make it work :)
https://www.toptal.com/scala/concurrency-and-fault-tolerance-made-easy-an-intro-to-akka#comment-1776147740

@DagnyTagg2013
Copy link

Hi there! Beautiful learning example! THANK YOU so much for sharing this!

Couple followup questions, and welcome to direct me to further reference Links for me to find out:

  1. Not totally understanding this line L37:
    context.actorOf(Props[StringCounterActor]) ,,,
    ie. How is Context Initialized? -- Are you using some external config property file that's not in this GitHub repo?
    How is Props initialized? -- Are you using some external config property file that's not in this GitHub repo?

  2. What's the best-practice for checking for duplicate messages in Actors --
    i.e. Why are you checking for that in the WordCounter Actor; but not in the StringCounter Actor?

  3. What's the best-practice for Parent delegation to Child Actors --
    i.e. Should the WordCounter Parent Actor be saving references to all of its created Children; and control how many
    instances get created?
    Should WordCounter be messaging its Children to shutdown; instead of calling a system.shutdown?

THANKS in advance for any followup info!
Dagny T

@jducoeur
Copy link

jducoeur commented Sep 1, 2016

(Kibitzing on the questions, as requested on akka-user.)

How is Context Initialized?

It isn't -- the ActorContext is part and parcel of the Actor itself, and is automatically built by the system without application intervention. (Or do you mean the ExecutionContext? If so, that's pulled in implicitly on line 59.)

How is Props initialized?

Ah -- that signature just isn't obvious. That's actually a constructor call, which is creating a Props object. Since the Actor doesn't take any constructor parameters, the Props call doesn't need any params, and can omit the parentheses. It's a little confusing when you're learning, though.

Why are you checking for that in the WordCounter Actor; but not in the StringCounter Actor?

Hmm. Good question -- it isn't obvious why WordCounter is checking for duplication. In general, Akka is "at most once delivery", so messages can get dropped (particularly when sending between nodes); if you need a higher degree of reliability, you sometimes need to introduce timeouts and retries at the sending end, and that usually results in needing duplication checks at the receiving end.

But that doesn't appear to be the case here, so I suspect this is just defensively coded against a case that doesn't actually happen here.

Should the WordCounter Parent Actor be saving references to all of its created Children?

Doesn't need to -- parents can always access their children. (That's built into ActorContext.)

control how many instances get created?

In a more realistic example, probably so.

Should WordCounter be messaging its Children to shutdown; instead of calling a system.shutdown?

In a longer-lived app, yes -- this example just isn't showing that. But yes, in a more realistic example, you might well have the parent shutting down its children, or simply shutting itself down (which will kill its children): the latter would be done by adding

context.stop(self)

after line 46.

The preferred approach depends on the requirements of the application; for example, in my application the children are mostly in charge of their own lifespans, and if they don't get any traffic for a certain period of time they work with their parents to shut down cleanly. But there is no one-size-fits-all answer -- it depends entirely on how your app works.

@smishmash
Copy link

Thanks for the tutorial! Though it didn't run as expected for me. I would recommend giving the precise invocation that would run this code, along with the scala version etc.

I am running with scala 2.11.8, and I get an NPE in the main method. I see the following in App's documentation:

@deprecatedOverriding( message = "main should not be overridden" , since = "2.11.0" )

I modified the SampleApp class to look like this:

import scala.concurrent.duration._

import akka.actor.{ActorSystem, Props}
import akka.dispatch.ExecutionContexts._
import akka.pattern.ask
import akka.util.Timeout

object Sample extends App {

  implicit val ec = global

  val system = ActorSystem("System")
  val actor = system.actorOf(Props(new WordCounterActor(args(0))))
  implicit val timeout = Timeout(25.seconds)
  val future = actor ? StartProcessFileMsg()
  future.map { result =>
    println("Total number of words " + result)
    system.shutdown
  }
}

It also isn't obvious that the program needs an argument, the file whose words will be counted.

Finally, there's a deprecation warning I haven't yet tracked down:

[warn] The - command is deprecated in favor of onFailure and will be removed in 0.14.0

Again, thanks for the tutorial! Still going over how the program works.

Sunil

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment