Изучаем английский с Scala на Future и Actor +11


Решил тут я подтянуть свой английский язык. В частности, захотелось значительно расширить словарный запас. Я знаю, что существует масса программ, которые в игровой форме помогают это сделать. Загвоздка в том, что я не люблю геймфикацию. Предпочитаю по старинке. Листочек бумаги где таблица со словами, транскрипцией и переводом. И учим его учим. И проверяем свои знания, например, закрывая столбец с переводом. В общем, как я учил это в университете.

Прослышал я про то, что существует 3000 наиболее часто используемых слов, подобранных на OxfordDictionary сайте. Вот тут этот список слов: www.oxfordlearnersdictionaries.com/wordlist/english/oxford3000/Oxford3000_A-B Ну а перевод на русский я решил брать отсюда: www.translate.ru/dictionary/en-ru Одна только проблема, все находиться на этих сайтах ну совсем не в том формате, который можно распечатать и учить. В итоге родилась идея это все запрограммировать. Но сделать это не как последовательный алгоритм, а все распаралелить. Что бы выкачивание и парсинг всех слов занял не (3000 слов * 2 сайта) / 60 секунд = 100 минут. Это если давать по 1 секунде на выкачивание и распарсивание страницы для извлечения перевода и транскрипции (в реальности думаю это в 3 раза дольше, пока соединение откроем, пока закроем и тд и тп).

image


Задачу я разбил сразу на два крупных блока. Первый блок, это операции ввода/вывода блокирующие — выкачивание страницы с сайта. Второй блок, это операции вычислительные, не блокирующие, но нагружающие CPU: парсинг страницы для извлечения перевода и транскрипции и добавление в словарь результатов парсинга.

Блокирующие операции я решил делать в пуле нитей, используя Future от Scala. Вычислительные задачи, решил раскидать на 3 актера Akka. Применяя методику TDD, cначала я написал тест к своим кирпичикам будущего приложения.

class Test extends FlatSpec with Matchers {

  "Table Of Content extractor" should "download and extract content from Oxford Site" in {
    val content:List[String] = OxfordSite.getTableOfContent
    content.size should be (10)
    content.find(_ == "A-B") should be (Some("A-B"))
    content.find(_ == "U-Z") should be (Some("U-Z"))
  }

  "Words list extractor" should "download words from page" in {
    val future: Future[Try[Option[List[String]]]] = OxfordSite.getWordsFromPage("A-B", 1)
    val wordsTry:Try[Option[List[String]]] = Await.result(future,60 seconds)
    wordsTry should be a 'success
    val words = wordsTry.get
    words.get.find(_ == "abandon") should be (Some("abandon"))

  }
  "Words list extractor" should "return None from empty page" in {
    val future: Future[Try[Option[List[String]]]] = OxfordSite.getWordsFromPage("A-B", 999)
    val wordsTry:Try[Option[List[String]]] = Await.result(future,60 seconds)
    wordsTry should be a 'success
    val words = wordsTry.get
    words should be(None)

  }

  "Russian Translation" should "download translation and parse" in {
    val page: Future[Try[String]] =  LingvoSite.getPage("test")
    val pageResultTry: Try[String]= Await.result(page,60 seconds)
    pageResultTry should be a 'success
    val pageResult = pageResultTry.get
    pageResult.contains("тест") should be(true)
    LingvoSite.parseTranslation(pageResult).get should be("тест")
  }



  "English Translation" should "download translation and parse" in {
    val page: Future[Try[String]] =  OxfordSite.getPage("test")
    val pageResultTry: Try[String] = Await.result(page,60 seconds)
    pageResultTry should be a 'success
    val pageResult = pageResultTry.get
    pageResult.contains("examination") should be(true)
    OxfordSite.parseTranslation(pageResult).get should be(("test", "an examination of somebody’s knowledge or ability, consisting of questions for them to answer or activities for them to perform"))

  }

}


Обратите внимание. Функции, которые могут вернуть результат вычислений имеют Try[…]. Те либо Success результат или Failure и эксепшен. Функции, которые будут часто вызывать и имеют блокирующие i/o операции имеют результат, как Future[Try[…]]. Те при вызове функции сразу возвращается Future в которой идут долгие i/o операции. Притом они идут внутри Try и могут завершится с ошибок (например соединение порвалось).

Само приложение инициализируется в Top3000WordsApp.scala. Поднимается система актеров. Создаются актеры. Запускается парсинг списка слов, который в параллель запускает выкачивание английских и русских страниц с транскрипцией и переводом. В случае успешной скачки страниц, срабатывает передача содержимое страниц актерам для парсинга, извлекающих перевод и транскрипцию. Результат перевода актеры передают конечному актеру-словарю, который акамулирует все результаты в одном месте. И по нажатию enter, система актеров идет в shutdown. И актер DictionaryActor, получая сигнал об этом, сохраняет собраный словарь в файл dictionaty.txt

object Top3000WordsApp extends App {


  val system = ActorSystem("Top3000Words")
  val dictionatyActor = system.actorOf(Props[DictionaryActor], "dictionatyActor")
  val englishTranslationActor = system.actorOf(Props(classOf[EnglishTranslationActor], dictionatyActor), "englishTranslationActor")
  val russianTranslationActor = system.actorOf(Props(classOf[RussianTranslationActor], dictionatyActor), "russianTranslationActor")
  val mapGetPageThreadExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(16))
  val mapGetWordsThreadExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(16))


  start()

  scala.io.StdIn.readLine()
  system.terminate()
  def start() = {

    import concurrent.ExecutionContext.Implicits.global

    Future {
      OxfordSite.getTableOfContent.par.foreach(letterGroup => {
        getWords(letterGroup, 1)
      })

    }
  }


  def getWords(letterGroup: String, pageNum: Int): Unit = {
    implicit val executor = mapGetWordsThreadExecutionContext

    OxfordSite.getWordsFromPage(letterGroup, pageNum).map(tryWords => {
      tryWords match {
        case Success(Some(words)) => words.par.foreach(word => {
            parse(word,letterGroup,pageNum)
        })
        case Success(None) => Unit
        case Failure(ex) => println(ex.getMessage)
      }
    })

  }


  def parse(word: String, letterGroup: String, pageNum: Int)= {

    implicit val executor = mapGetPageThreadExecutionContext
    OxfordSite.getPage(word).map(tryEnglishPage => {
      tryEnglishPage match {
        case Success(englishPage) => {
          englishTranslationActor ! (word, englishPage)
          getWords(letterGroup, pageNum + 1)
        }
        case Failure(ex) => println(ex.getMessage)
      }
    })
    LingvoSite.getPage(word).map(_ match {
      case Success(russianPage) => {
        russianTranslationActor !(word, russianPage)
      }
      case Failure(ex) => println(ex.getMessage)
    })
  }

}



Обратите внимание, что алгоритм разбит на start, getWords, parse функции. Это сделано из-за того, что для каждой фазы задачи требуется свой пул нитей, который передается неявно, как ThreadExecutionContext. Сначала, у меня была всего одна функция getWords, для рекурсивного вызова. Но все работало очень медленно, так как на верхнем уровне алгоритма распаралеливание выжирало весь пул нитей и в самом низу были вечные ожидания, когда же мне дадут свободную нить, что бы поработать. А как раз в низу самое большое число операций.

Вот реализация скачивания и парсинга с сайтов.

object OxfordSite {

  val getPageThreadExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(16))

  def parseTranslation(content: String): Try[(String, String)] = {
    Try {
      val browser = new Browser
      val doc = browser.parseString(content)
      val spanElement: Element = doc >> element(".phon")
      val str = Jsoup.parse(spanElement.toString).text()
      val transcription = str.stripPrefix("BrE//").stripSuffix("//").trim
      val translation = doc >> text(".def")
      (transcription,translation)
    }
  }

  def getPage(word: String): Future[Try[String]] = {
    implicit val executor = getPageThreadExecutionContext
    Future {
      Try {
        val html = Source.fromURL("http://www.oxfordlearnersdictionaries.com/definition/english/" + (word.replace(' ','-')) + "_1")
        html.mkString
      }
    }
  }

  def getWordsFromPage(letterGroup: String, pageNum: Int): Future[Try[Option[List[String]]]] = {
    import ExecutionContext.Implicits.global

    Future {
      Try {
        val html = Source.fromURL("http://www.oxfordlearnersdictionaries.com" +
          "/wordlist/english/oxford3000/Oxford3000_" + letterGroup + "/?page=" + pageNum)
        val page = html.mkString
        val browser = new Browser
        val doc = browser.parseString(page)
        val ulElement: Element = doc >> element(".wordlist-oxford3000")
        val liElements: List[Element] = ulElement >> elementList("li")
        if (liElements.size > 0) Some(liElements.map(_ >> text("a")))
        else None
      }
    }
  }

  def getTableOfContent: List[String] = {

    val html = Source.fromURL("http://www.oxfordlearnersdictionaries.com/wordlist/english/oxford3000/Oxford3000_A-B/")
    val page = html.mkString
    val browser = new Browser
    val doc = browser.parseString(page)
    val ulElement: Element = doc >> element(".hide_phone")
    val liElements: List[Element] = ulElement >> elementList("li")
    List(liElements.head >> text("span")) ++ liElements.tail.map(_ >> text("a"))
  }

}


object LingvoSite {
  val getPageThreadExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(16))


  def parseTranslation(content: String): Try[String] = {
    Try {

      val browser = new Browser
      val doc = browser.parseString(content)
      val spanElement: Element = doc >> element(".r_rs")
      spanElement >> text("a")
    }
  }

  def getPage(word: String): Future[Try[String]] = {
    implicit val executor = getPageThreadExecutionContext

    Future {
      Try {

        val html = Source.fromURL("http://www.translate.ru/dictionary/en-ru/" + java.net.URLEncoder.encode(word,"UTF-8"))
        html.mkString
      }
    }
  }

}



Структуры данных с которыми работают актеры.

case class Word (word: String, transcription: Option[String] = None, russianTranslation:Option[String] = None, englishTranslation: Option[String] = None)
case class RussianTranslation(word:String, translation: String)
case class EnglishTranslation(word:String, translation: String)
case class Transcription(word:String, transcription: String)



Актеры, которые принимают на входе скачанные страницы для парсинга и пересылают перевод и транскрипцию актеру DictionaryActor

class EnglishTranslationActor (dictionaryActor: ActorRef) extends Actor {
  println("EnglishTranslationActor")


  def receive = {
    case (word: String, englishPage: String) => {
      OxfordSite.parseTranslation(englishPage) match {
        case Success((transcription, translation)) => {
          dictionaryActor ! EnglishTranslation(word,translation)
          dictionaryActor ! Transcription(word,transcription)
        }
        case Failure(ex) => {
          println(ex.getMessage)
        }
      }
    }
  }

}


class RussianTranslationActor  (dictionaryActor: ActorRef) extends Actor {
  println("RussianTranslationActor")


  def receive = {
    case (word: String, russianPage: String) => {
      LingvoSite.parseTranslation(russianPage) match {
        case Success(translation) => {
            dictionaryActor ! RussianTranslation(word, translation)
        }
        case Failure(ex) => {
          println(ex.getMessage)
        }
      }
    }
  }



}



Актер который накапливает в себе словарь с переводами и транскрипцией и после shutdown системы актеров записывает весь словарь в dictionary.txt

class DictionaryActor extends Actor {
  println("DictionaryActor")


  override def postStop(): Unit = {
    println("DictionaryActor postStop")
    val fileText = DictionaryActor.words.map{case (_, someWord)=> {
      val transcription = someWord.transcription.getOrElse(" ")
      val russianTranslation = someWord.russianTranslation.getOrElse(" ")
      val englishTranslation = someWord.englishTranslation.getOrElse(" ")
      List(someWord.word, transcription , russianTranslation , englishTranslation).mkString("|")
    }}.mkString("\n")
    scala.tools.nsc.io.File("dictionary.txt").writeAll(fileText)
    println("dictionary.txt saved")
    System.exit(0)

  }

  def receive = {
    case Transcription(wordName, transcription) => {
      val newElement = DictionaryActor.words.get(wordName) match {
        case Some(word) => word.copy(transcription = Some(transcription))
        case None =>  Word(wordName,transcription = Some(transcription))
      }
      DictionaryActor.words += wordName -> newElement
      println(newElement)
    }
    case RussianTranslation(wordName, translation) => {
      val newElement = DictionaryActor.words.get(wordName) match {
        case Some(word) => word.copy(russianTranslation = Some(translation))
        case None =>  Word(wordName,russianTranslation = Some(translation))
      }
      DictionaryActor.words += wordName -> newElement
      println(newElement)
    }
    case EnglishTranslation(wordName, translation) => {
      val newElement = DictionaryActor.words.get(wordName) match {
        case Some(word) => word.copy(englishTranslation = Some(translation))
        case None =>  Word(wordName,englishTranslation = Some(translation))
      }
      DictionaryActor.words += wordName -> newElement
      println(newElement)
    }
  }
}


object DictionaryActor {
  var words = scala.collection.mutable.Map[String, Word]()
}


Какие выводы? На моем Mac Book Pro этот скрипт работал в течение примерно 1 часа, пока я писал эту статью. Я его прервал нажав enter и вот какой результат:

bash-3.2$ cat ./dictionary.txt |wc -l
    1809


Потом, я еще раз запустил скрипт и оставил на несколько часов. Когда вернулся, то у меня был процессор загружен 100% и были ошибки в консоле про гарбаж коллектор, по нажатию на enter моя программа не смогла сохранить результат своей работы в файл. Диагноз такой, писать на Future и par.map или par.foreach, конечно красиво и удобно, но реально очень тяжело понять как на самом деле на уровне нитей это все работает и где же узкое горлышко бутылки. В итоге я планирую все переписать на актеры. Притом, буду использовать пулы актеров. Что бы, например, 4 актера выкачивало и парсило страницы со списками слов, 18 актеров выкачивало страницы с переводами, 4 актера парсило страницы извлекая переводы и транскрипцию, ну и 1 актер складывал все в словарь.

Текущая реализация в бранче v0.1 github.com/evgenyigumnov/top3000words/tree/v0.1 Версия где все переписано на актеры с пулами будет в бранче v0.2, ну и в master, чуть позже. Может есть у кого соображения, что я делал не так, в текущей версии? Ну и может советы подкините по новой версии?

Проект на гитхабе доступен: github.com/evgenyigumnov/top3000words

Запустить тесты проекта: sbt test
Запустить приложение: sbt run
Ну и как надоест ждать, нажать enter и ознакомиться с содержимым файла dictionary.txt в текущей папке

PS
В итоге сделал финальную версию v0.2, которая парсит за 10 минут в 30 потоков. github.com/evgenyigumnov/top3000words/tree/v0.2
По окончанию работы enter нажимать не нужно. Все сделано на актерах. В Future обвернуты только блокирующие i/o тяжелые.




К сожалению, не доступен сервер mySQL