I am using the Amazon consumer reviews dataset. My objective is to apply collaborative filtering. I succeeded adding myself as a user and adding user rating.
I want to create a model. I want to use ALS but I have a problem with ALS.train()
because I am not using the default Rating (Int,Int, Double); case class Rating (String, String, Int) instead. I tried to convert my String values to Int and rating value to Double but encountered a problem while converting userID
to Int because Amazon's userID is like "AVpgNzjwLJeJML43Kpxn"
and prodcutID
is like "B00QWO9P0O,B00LH3DMUO"
(with " "
included). How to overcome this problem?
Code of CollabarativeFiltering
:
object CollabarativeFiltering {
case class Product(prooductID: String, prodcutName: String,
productCat: String)
def parseProduct(fields: Row): Product = {
//4,3,7,6
Product(fields(4).toString(),
fields(3).toString(),
fields(5).toString())
}
def readProduct(location:String, spark: SparkSession): RDD[Product] = {
val product = spark.read.option("header", "true").csv(location).rdd.map(parseProduct)
return product
}
def topRatedProducts(products : RDD[Product], ratings : RDD[User_Ratings.Rating], i: Int): Map[ Int, String] = {
// Create mostRatedProducts(productID, Number_of_Product)
val mostRatedProducts = ratings.groupBy(_.productID).map(f=> (f._1, f._2.size)).takeOrdered(100)(Ordering[Int].reverse.on(_._2))
// Select 100 of the top rated Products
val selectedProdcut = shuffle(mostRatedProducts).map(f => (f._2, products.filter(_.prooductID == f._1)
.map(p => p.prodcutName )
.take(1)(0) ) ).take(i).toMap
return selectedProdcut
}
def getRatings(topRatedProduct: Map[Int, String], spark: SparkSession): RDD[User_Ratings.Rating] = {
var ourId = "A/E"
var ourRatings = ArrayBuffer.empty[User_Ratings.Rating]
var i = 1
for(product <- topRatedProduct) {
breakable {
while(true) {
try {
println(i.toString + ") Your rating for: " + product._2 + "," )
val rating = scala.io.StdIn.readInt()
if(rating < 5.1 && rating > 0) {
ourRatings += User_Ratings.Rating("A/E", product._2, rating)
i += 1
break
}
} catch {
case e: Exception => println("Invalid Rating");
}
}
}
}
return spark.sparkContext.parallelize(ourRatings)
}
def main(args: Array[String]) {
var spark : SparkSession = null
var fw : FileWriter= null
try{
spark = SparkSession.builder.appName("Spark SQL").config("spark.master", "local[*]").getOrCreate()
val sc = spark.sparkContext
var csv_file = "Datafiniti_Amazon_Consumer_Reviews_of_Amazon_Products_May19.csv"
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
//Loading Products
val products = CollabarativeFiltering.readProduct(csv_file, spark)
products.cache()
products.take(10).foreach(println)
//Loading Ratings
val ratings = User_Ratings.readRatings(csv_file, spark)
ratings.take(10).foreach(println)
//Checking Top Rated Products
val topRatedProduct = topRatedProducts(products,ratings,10)
topRatedProduct.take(10).foreach(println)
// Ask user to rate 10 top rated product
val ourRatings = getRatings(topRatedProduct, spark)
// Add User Ratings
val editedRatings = ratings.union(ourRatings)
//Normalizing the Ratings
val normalizedRatings = User_Ratings.normalizingRatings(editedRatings)
// Training the model
val Array(train,test) = normalizedRatings.randomSplit(Array(0.8,0.2))
train.cache()
test.cache()
val ranks = Array(8, 12)
val numIterations =Array(1, 5, 10)
//val alpha = 0.01
val lambdas = Array(10, 20)
fw = new FileWriter("Results.txt", true)
println("RANK ---- LAMBDA --- ITERATION ---- MSE" )
fw.write("RANK ---- LAMBDA --- ITERATION ---- MSE
" )
for(i <- ranks) {
for(j <- lambdas) {
for(k <- numIterations) {
// Statistics about the runtime of training
val start = System.nanoTime()
val als_model = ALS.train(train, i, k, j)
// Shape our data by removing rating So that we wil predict the ratings for them
val usersProducts = test.map(f => (f.userID, f.productID))
// Predict
val predictions = als_model.predict(usersProducts).map(f => ((f.user, f.product), f.rating))
// We hold (user, movie) as a Key and (real rating, predicted rating) pair as Tuple
val real_and_predictions = test.map(f => ((f.userID, f.productID),f.rating)).join(predictions)
// Calculate Mean Square Error
val mean_square_err = real_and_predictions.map(f => sqr(f._2._1 - f._2._2)).mean()
print(i.toString + " -------- " + j.toString + " --------" + k.toString + " -------- ")
println(mean_square_err.toString + "
")
println("Time elapsed: " + (System.nanoTime()-start)/1e9 )
fw.write(i.toString + " -------- " + j.toString + " --------" + k.toString + " -------- ")
fw.write(mean_square_err.toString + "
")
fw.write("Time elapsed: " + ((System.nanoTime()-start)/1e9).toString + "
" )
}
}
}
}
catch{
case e : Exception => throw e
}finally {
spark.stop()
}
println("done")
}
}
Code of User_Ratings
:
object User_Ratings {
case class Rating(userID: String, productID: String,
rating: Int)
// Create Rating object from Row
def parseRating(fields: Row): Rating = {
Rating(fields(0).toString, fields(4).toString, fields(18).toString().toInt)
}
// Read ratings from csv file and create RDD[Rating]
def readRatings(location:String, spark: SparkSession): RDD[Rating] = {
val ratings = spark.read.option("header", "true").csv(location).rdd.map(parseRating)
return ratings
}
// Normalizing the ratings by dividing user's rating to average of user's ratings
def normalizingRatings(ratings : RDD[User_Ratings.Rating]) : RDD[User_Ratings.Rating] = {
// Grouping according to user.
val ratingsofUsers = ratings.groupBy(f => f.userID).map( x => (x._1, x._2.map( r => r.rating).sum / x._2.size ) )
// Collecting as Map
val userMap = ratingsofUsers.collect().toMap
// Normalizing the Ratings
val normalizedRatings = ratings.map( f => Rating(f.userID, f.productID, f.rating / userMap(f.userID) ) )
return normalizedRatings
}
def main(args: Array[String]): Unit = {
var spark: SparkSession = null
var fw :FileWriter = null
try {
spark = SparkSession.builder.appName("Spark SQL").config("spark.master", "local[*]").getOrCreate()
val sc = spark.sparkContext
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
} catch {
case e: Exception => throw e
} finally {
val end = System.nanoTime()
spark.stop()
fw.close()
}
println("done")
}
}
/*
Reference : https://spark.apache.org/docs/latest/ml-collaborative-filtering.html
*/
The issue is when I use :
val als_model = ALS.train(train, i, k, j)
It gives:
expected org.apache.spark.mllib.recommendation.RDD[Rating] found
RDD[User_Ratings.Rating]
I want to use ALS for training my RDD but couldn't. If it is not possible is there another way to train my data to recommend a user similar products?
question from:
https://stackoverflow.com/questions/65641355/als-training-data-with-case-class-rating