Since my first meeting with Scala I always think that this language was designed to work in a concurrent environment. I wasn’t familiar with actors for a long time, but a few months ago I got a task to write a website scraper. It’s a typical story, there are a lot of nice solutions, but I felt it’s a right time to try Akka in action. This article is the result of my work. I don’t want to describe the basic things, so you should be familiar with the main Akka concepts: actors and messages.

Here is the GitHub repo: https://github.com/sap1ens/scraper.

The Task Link to this heading

So, my task was to find some ads on ***** website, extract data from these ads and save all results to a XLS file. Also, I had a list of cities in US and Canada (472 totally). This work can be done in a few steps:

  1. Generate a list of URLs for all cities.
  2. Every page with city results can contain a pagination, so we should fetch all pages.
  3. Then we should extract URLs of the ads and parse the data.
  4. Once it’s ready (all ads were fetched and parsed), we need to combine results (for grouping, sorting, etc.) and save to the file.

Start with Activator Link to this heading

Typesafe Activator is a beautiful tool to start Typesafe stack projects. It’s very easy to install.

So, I’ve started project with Activator (just chose Scala + Akka template) and I’ve got a ready-to-go application with Sbt, Scala, Akka and ability to run the application with another beautiful tool - Typesafe Console.

Keep your stuff in Config Link to this heading

Also, I put all my configuration stuff into the Typesafe Config file (I’ve chose JSON format). There are a list of cities/countries, search query and some settings related to saving.

ActorSystem Link to this heading

Scraper.scala Link to this heading

scala
 1object Scraper extends App {
 2
 3    val config = ConfigFactory.load()
 4    val system = ActorSystem("scraper-system")
 5
 6    val profiles = for {
 7        profile: ConfigObject <- config.getObjectList("profiles").asScala
 8    } yield Profile(
 9        profile.get("country").unwrapped().toString,
10        profile.get("pattern").unwrapped().toString,
11        profile.get("cities").unwrapped().asInstanceOf[java.util.ArrayList[String]].toList
12    )
13
14    val searchString = config.getString("search")
15    val resultsFolder = config.getString("results.folder")
16    val resultsMode = config.getString("results.mode")
17
18    val collectorService = system.actorOf(Props(new CollectorService(
19        profiles.toList,
20        searchString,
21        resultsFolder,
22        resultsMode)
23    ), "CollectorService")
24
25    collectorService ! StartScraper
26}

Here I’ve took some config data, created ActorSystem and root actor named CollectorService. The app starts with sending StartScraper message to the root actor.

Actors Link to this heading

CollectorService.scala Link to this heading

scala
 1case class Profile(country: String, pattern: String, cities: List[String])
 2
 3object CollectorService {
 4    import PageParser._
 5
 6    case object StartScraper
 7    case object SaveResults
 8    case class PagesResults(results: List[PageResult])
 9    case class AddListUrl(url: String)
10    case class RemoveListUrl(url: String)
11}
12
13class CollectorService(profiles: List[Profile], search: String, resultsFolder: String, resultsMode: String) extends Actor with ActorLogging with CollectionImplicits {
14
15    import CollectorService._
16    import ListParser._
17    import PageParser._
18
19    val lists = context.actorOf(Props(new ListParser(self)).withRouter(SmallestMailboxRouter(5)), name = "AdvertisementList")
20
21    var pageResults = List[PageResult]()
22    var listUrls = List[String]()
23
24    def receive = {
25        case StartScraper => {
26            val searchEncoded = URLEncoder.encode(search, "UTF-8")
27
28            for(profile <- profiles; city <- profile.cities) {
29                self ! AddListUrl(createCityUrl(profile.pattern, searchEncoded, city))
30            }
31        }
32        case AddListUrl(url) => {
33            listUrls = url :: listUrls
34
35            lists ! StartListParser(url)
36        }
37        case RemoveListUrl(url) => {
38            listUrls = listUrls.copyWithout(url)
39
40            if(listUrls.isEmpty) self ! SaveResults
41        }
42        case PagesResults(results) => {
43            pageResults = results ::: pageResults
44        }
45        case SaveResults => {
46            log.info(s"Total results: ${pageResults.size}")
47
48            ExcelFileWriter.write(pageResults, resultsFolder, resultsMode)
49
50            context.system.shutdown()
51        }
52    }
53
54    def createCityUrl(pattern: String, search: String, city: String) = {
55        pattern
56            .replace("{search}", search)
57            .replace("{city}", city)
58    }
59}

CollectorService is the root actor, it coordinates all work.

It contains variable (or value, to be correct) named lists, which holds a pointer to the next level of hierarchy - ListParser actors. Constructor of the ListParser accepts one element, CollectorService. Also, it uses routing to balance requests between 5 actors, based on the actor mailbox capacity (SmallestMailboxRouter).

scala
1val lists = context.actorOf(Props(new ListParser(self)).withRouter(SmallestMailboxRouter(5)), name = "AdvertisementList")

CollectorService also contains two variables: pageResults and listUrls. First one is the storage for final ads results, second one holds URLs to be fetched.

CollectorService starts with StartScraper message, it sends all generated URLs to itself in AddListUrl message. Probably you think that sending messages to itself is a strange practice, but I don’t agree :) It’s a good pattern to decouple and reuse logic and you’ll see it.

The next step is to start fetching these URLs. CollectorService delegates this job to the ListParser actor and you can see here the second pattern: every actor should have its own task.

So, sending AddListUrl message adds URL to the listUrls variable as well as sends the StartListParser message to the ListParser actor.

RemoveListUrl message removes specified URL from the listUrls and if it’s empty, we think that job is done and it should persist results (so it sends SaveResults to itself).

PagesResults message adds an extracted page data to the pageResults variable. We’ll use it later, during saving process.

ListParser.scala Link to this heading

scala
 1object ListParser {
 2    import PageParser._
 3
 4    case class StartListParser(listUrl: String)
 5    case class ListResult(listUrl: String, pageUrls: List[String], nextPage: Option[String] = None)
 6    case class AddPageUrl(listUrl: String, url: String)
 7    case class RemovePageUrl(listUrl: String, url: String)
 8    case class SavePageResult(listUrl: String, result: Option[PageResult])
 9}
10
11class ListParser(collectorService: ActorRef) extends Actor with ActorLogging with CollectionImplicits with ParserUtil with ParserImplicits {
12
13    import ListParser._
14    import PageParser._
15    import CollectorService._
16
17    val pages = context.actorOf(Props(new PageParser(self)).withRouter(SmallestMailboxRouter(10)), name = "Advertisement")
18
19    var pageUrls = Map[String, List[String]]()
20    var pageResults = Map[String, List[Option[PageResult]]]()
21
22    def receive = {
23        case StartListParser(listUrl) => {
24            val future = parseAdvertisementList(listUrl)
25
26            future onFailure {
27                case e: Exception => {
28                    log.warning(s"Can't process $listUrl, cause: ${e.getMessage}")
29                    collectorService ! RemoveListUrl(listUrl)
30                }
31            }
32
33            future pipeTo self
34        }
35        case AddPageUrl(listUrl, url) => {
36            pageUrls = pageUrls.updatedWith(listUrl, List.empty) {url :: _}
37
38            pages ! StartPageParser(listUrl, url)
39        }
40        case RemovePageUrl(listUrl, url) => {
41            pageUrls = pageUrls.updatedWith(listUrl, List.empty) { urls =>
42                val updatedUrls = urls.copyWithout(url)
43
44                if(updatedUrls.isEmpty) {
45                    pageResults.get(listUrl) map { results =>
46                        collectorService ! PagesResults(results.flatten.toList)
47                        collectorService ! RemoveListUrl(listUrl)
48                    }
49                }
50
51                updatedUrls
52            }
53        }
54        case SavePageResult(listUrl, result) => {
55            pageResults = pageResults.updatedWith(listUrl, List.empty) {result :: _}
56        }
57        case ListResult(listUrl, urls, Some(nextPage)) => {
58            collectorService ! AddListUrl(nextPage)
59
60            self ! ListResult(listUrl, urls, None)
61        }
62        case ListResult(listUrl, urls, None) => {
63            log.debug(s"${urls.size} pages were extracted")
64
65            if(urls.isEmpty) collectorService ! RemoveListUrl(listUrl)
66
67            urls foreach { url =>
68                self ! AddPageUrl(listUrl, url)
69            }
70        }
71    }
72
73    def parseAdvertisementList(listUrl: String): Future[ListResult] = Future {
74        // skip
75    }
76}

As you can see, the ListParser also contains few variables: pages, pageUrls and pageResults. pages is similar to lists from CollectorService: it holds next level actors - PageParser and the same router, SmallestMailboxRouter.

pageUrls and pageResults help to keep intermediate results. They are pretty similar to the listUrls and pageResults from CollectorService, except they are maps, where key is listUrl.

ListParser starts with the StartListParser message. It sends URL to parseAdvertisementList method (which I want to skip, you can find it in the GitHub repo though). As a result, it receives a Future with the ListResult case class. This class contains a list of page-level URLs and optionally an URL to the next page of this city results.

scala
1future pipeTo self

This line sends the result of the Future to actor itself. There are two possible ways after.

  1. If there is a next page in the message, it goes to this case: case ListResult(listUrl, urls, Some(nextPage)) It sends the AddListUrl message to the CollectorService, as well as the ListResult message without next page to actor itself.
  2. If there is no next page (or it’s a message from 1), it goes to another case case ListResult(listUrl, urls, None)

Second ListResult case checks a list of URLs. If it’s an empty, RemoveListUrl will be sent to the CollectorService. If it’s not empty, actor sends the AddPageUrl message for every URL to itself.

In AddPageUrl case, actor saves specified URL to the pageUrls and sends the StartPageParser message to the PageParser, next actor in hierarchy.

In RemovePageUrl case it removes specified url from the pageUrls and if it’s empty, it sends the PagesResults as well as the RemoveListUrl to the CollectorService.

Also ListParser contains SavePageResult case, which just saves sent data to the pageResults.

PageParser.scala Link to this heading

scala
 1object PageParser {
 2    case class StartPageParser(listUrl: String, pageUrl: String)
 3    case class PageResult(
 4        url: String,
 5        title: String,
 6        description: String,
 7        date: Option[(String, String)] = None,
 8        email: Option[String] = None,
 9        phone: Option[String] = None
10    )
11}
12
13class PageParser(listParser: ActorRef) extends Actor with ActorLogging with ParserUtil with ParserImplicits {
14
15    import PageParser._
16    import ListParser._
17
18    def receive = {
19        case StartPageParser(listUrl, pageUrl) => {
20            val future = parseAdvertisement(pageUrl).mapTo[Option[PageResult]]
21
22            future onComplete {
23                case Success(result) => {
24                    listParser ! SavePageResult(listUrl, result)
25                    listParser ! RemovePageUrl(listUrl, pageUrl)
26                }
27                case Failure(e) => {
28                    log.warning(s"Can't process pageUrl, cause: ${e.getMessage}")
29                    listParser ! RemovePageUrl(listUrl, pageUrl)
30                }
31            }
32        }
33    }
34
35    def parseAdvertisement(url: String): Future[Option[PageResult]] = Future {
36        // skip
37    }
38}

PageParser actor is pretty straightforward. It uses parseAdvertisement method to get a Future with extracted data and then sends SavePageResult and RemovePageUrl messages to the parent actor (ListParser).

Important things Link to this heading

  1. Error handling is a very important thing. That’s why every Future has onFailure block, which sends clean-up messages to parent actors.
  2. You can find ListParser and PageParser similar: they both have 2 same types of inner variables, same actions (add item to the process queue, remove item from the processing queue, save results). It means we can extend actors hierarchy multiple times, but it’s a good practice to have different actors for every level of hierarchy, because we can set up different supervisors. So, it’s worth thinking how to reuse this behaviour.

Summary Link to this heading

I like the results: it takes about 4 minutes on my MBP to find, fetch, parse and save about 10k ads.

Bonus: Immutable data structures Link to this heading

May be you didn’t notice, but all inner variables inside the actors are immutable. It’s not a requirement because actor can process only one message from mailbox at the one period of time, so it won’t mess with any mutable data. I used immutable data structures just as an exercise and also it’s a good culture in Scala. That’s why we have these implicits to work with List and Map.

scala
 1trait CollectionImplicits {
 2    implicit class ListExtensions[K](val list: List[K]) {
 3        def copyWithout(item: K) = {
 4            val (left, right) = list span (_ != item)
 5            left ::: right.drop(1)
 6        }
 7    }
 8
 9    implicit class MapExtensions[K, V](val map: Map[K, V]) {
10        def updatedWith(key: K, default: V)(f: V => V) = {
11            map.updated(key, f(map.getOrElse(key, default)))
12        }
13    }
14}

Further Reading Link to this heading

I can recommend perfect book Akka Concurrency by Derek Wyatt to continue learning about Akka.