def trainBestSeller(events: RDD[BuyEvent], n: Int, itemStringIntMap: BiMap[String, Int]): Map[String, Array[(Int, Int)]] = {
val itemTemp = events
// map item from string to integer index
.flatMap {
case BuyEvent(user, item, category, count) if itemStringIntMap.contains(item) =>
Some((itemStringIntMap(item),category),count)
case _ => None
}
// cache to use for next times
.cache()
// top view with each category:
val bestSeller_Category: Map[String, Array[(Int, Int)]] = itemTemp.reduceByKey(_ + _)
.map(row => (row._1._2, (row._1._1, row._2)))
.groupByKey
.map { case (c, itemCounts) =>
(c, itemCounts.toArray.sortBy(_._2)(Ordering.Int.reverse).take(n))
}
.collectAsMap.toMap
// top view with all category => cateogory ALL
val bestSeller_All: Map[String, Array[(Int, Int)]] = itemTemp.reduceByKey(_ + _)
.map(row => ("ALL", (row._1._1, row._2)))
.groupByKey
.map {
case (c, itemCounts) =>
(c, itemCounts.toArray.sortBy(_._2)(Ordering.Int.reverse).take(n))
}
.collectAsMap.toMap
// merge 2 map bestSeller_All and bestSeller_Category
val bestSeller = bestSeller_Category ++ bestSeller_All
bestSeller
}
See Question&Answers more detail:
os 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…