Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
189 views
in Technique[技术] by (71.8m points)

scala - ALS training data with case class rating

I am using the Amazon consumer reviews dataset. My objective is to apply collaborative filtering. I succeeded adding myself as a user and adding user rating.

I want to create a model. I want to use ALS but I have a problem with ALS.train() because I am not using the default Rating (Int,Int, Double); case class Rating (String, String, Int) instead. I tried to convert my String values to Int and rating value to Double but encountered a problem while converting userID to Int because Amazon's userID is like "AVpgNzjwLJeJML43Kpxn" and prodcutID is like "B00QWO9P0O,B00LH3DMUO" (with " " included). How to overcome this problem?

Code of CollabarativeFiltering:

 object CollabarativeFiltering {
case class Product(prooductID: String, prodcutName: String,
        productCat: String)

def parseProduct(fields: Row): Product = {
        //4,3,7,6
        Product(fields(4).toString(),
                fields(3).toString(),
                fields(5).toString())

}
def readProduct(location:String, spark: SparkSession): RDD[Product] = {
        val product = spark.read.option("header", "true").csv(location).rdd.map(parseProduct)
                return product
}

def topRatedProducts(products : RDD[Product], ratings : RDD[User_Ratings.Rating],  i: Int): Map[ Int, String] = {
        // Create mostRatedProducts(productID, Number_of_Product)
        val mostRatedProducts = ratings.groupBy(_.productID).map(f=> (f._1, f._2.size)).takeOrdered(100)(Ordering[Int].reverse.on(_._2))

                // Select 100 of the top rated Products
                val selectedProdcut = shuffle(mostRatedProducts).map(f => (f._2, products.filter(_.prooductID == f._1)
                        .map(p => p.prodcutName )
                        .take(1)(0) ) ).take(i).toMap
                return selectedProdcut
}

def getRatings(topRatedProduct: Map[Int, String], spark: SparkSession): RDD[User_Ratings.Rating] = {
        var ourId = "A/E"
                var ourRatings  = ArrayBuffer.empty[User_Ratings.Rating]
                        var i = 1
                        for(product <- topRatedProduct) {
                            breakable {
                                while(true) {
                                    try {
                                        println(i.toString + ") Your rating for: " + product._2 + "," )
                                        val rating = scala.io.StdIn.readInt()
                                        if(rating < 5.1 && rating > 0) {
                                            ourRatings += User_Ratings.Rating("A/E", product._2, rating)
                                                    i += 1
                                                    break
                                        }
                                    } catch {
                                    case e: Exception => println("Invalid Rating");
                                    }
                                }
                            }
                        }
        return spark.sparkContext.parallelize(ourRatings)
}
def main(args: Array[String]) {
    var spark : SparkSession = null
            var fw : FileWriter= null
            try{

                spark = SparkSession.builder.appName("Spark SQL").config("spark.master", "local[*]").getOrCreate()
                        val sc = spark.sparkContext
                        var csv_file = "Datafiniti_Amazon_Consumer_Reviews_of_Amazon_Products_May19.csv"
                        val sqlContext = new org.apache.spark.sql.SQLContext(sc)
                        Logger.getLogger("org").setLevel(Level.OFF)
                        Logger.getLogger("akka").setLevel(Level.OFF)

                        //Loading Products
                        val products = CollabarativeFiltering.readProduct(csv_file, spark)
                        products.cache()

                        products.take(10).foreach(println)

                        //Loading Ratings
                        val ratings = User_Ratings.readRatings(csv_file, spark)
                        ratings.take(10).foreach(println)

                        //Checking  Top Rated Products

                        val topRatedProduct = topRatedProducts(products,ratings,10)
                        topRatedProduct.take(10).foreach(println)

                        // Ask user to rate 10 top rated product
                        val ourRatings = getRatings(topRatedProduct, spark)

                        // Add User Ratings
                        val editedRatings = ratings.union(ourRatings)

                        //Normalizing the Ratings
                        val normalizedRatings = User_Ratings.normalizingRatings(editedRatings)

                        // Training the model
                        val Array(train,test) = normalizedRatings.randomSplit(Array(0.8,0.2))
                        train.cache()
                        test.cache()
                        val ranks = Array(8, 12)
                        val numIterations =Array(1, 5, 10)
                        //val alpha = 0.01
                        val lambdas = Array(10, 20)

                        fw = new FileWriter("Results.txt", true)
                        println("RANK ---- LAMBDA --- ITERATION ---- MSE" )
                        fw.write("RANK ---- LAMBDA --- ITERATION ---- MSE
" )

                        for(i <- ranks) {
                            for(j <- lambdas) {
                                for(k <- numIterations) {
                                    // Statistics about the runtime of training
                                    val start = System.nanoTime()                      

                                            val als_model = ALS.train(train, i, k, j)            

                                            // Shape our data by removing rating So that we wil predict the ratings for them
                                            val usersProducts = test.map(f => (f.userID, f.productID))

                                            // Predict
                                            val predictions = als_model.predict(usersProducts).map(f => ((f.user, f.product), f.rating))

                                            // We hold (user, movie) as a Key and (real rating, predicted rating) pair as Tuple
                                            val real_and_predictions = test.map(f => ((f.userID, f.productID),f.rating)).join(predictions)

                                            // Calculate Mean Square Error
                                            val mean_square_err = real_and_predictions.map(f => sqr(f._2._1 - f._2._2)).mean()

                                            print(i.toString + " -------- " + j.toString + " --------" + k.toString +  " -------- ")
                                            println(mean_square_err.toString + "
")
                                            println("Time elapsed: " + (System.nanoTime()-start)/1e9 )

                                            fw.write(i.toString + " -------- " + j.toString + " --------" + k.toString +  " -------- ")
                                            fw.write(mean_square_err.toString + "
")
                                            fw.write("Time elapsed: " + ((System.nanoTime()-start)/1e9).toString + "
" )
                                }
                            }
                        }
                
            }

catch{
case e : Exception => throw e
}finally {
    spark.stop()
}
println("done")
}
}

Code of User_Ratings:

    object User_Ratings {
case class Rating(userID: String, productID: String,
        rating: Int)

// Create Rating object from Row
def parseRating(fields: Row): Rating = {
        Rating(fields(0).toString, fields(4).toString, fields(18).toString().toInt)
}

// Read ratings from csv file and create RDD[Rating]
def readRatings(location:String, spark: SparkSession): RDD[Rating] = {
        val ratings = spark.read.option("header", "true").csv(location).rdd.map(parseRating)
                return ratings
}

// Normalizing the  ratings by dividing user's rating to average of user's ratings
def normalizingRatings(ratings : RDD[User_Ratings.Rating]) : RDD[User_Ratings.Rating] = {
        // Grouping according to user.
        val ratingsofUsers = ratings.groupBy(f => f.userID).map( x => (x._1, x._2.map( r => r.rating).sum / x._2.size ) )

                // Collecting as Map
                val userMap = ratingsofUsers.collect().toMap

                // Normalizing the Ratings
                val normalizedRatings = ratings.map( f => Rating(f.userID, f.productID, f.rating / userMap(f.userID) ) )

                return normalizedRatings
}

def main(args: Array[String]): Unit = {

        var spark: SparkSession = null
                var fw :FileWriter = null

                try {

                    spark = SparkSession.builder.appName("Spark SQL").config("spark.master", "local[*]").getOrCreate()
                            val sc = spark.sparkContext     
                            val sqlContext = new org.apache.spark.sql.SQLContext(sc)
                            Logger.getLogger("org").setLevel(Level.OFF)
                            Logger.getLogger("akka").setLevel(Level.OFF)

                } catch {
                case e: Exception => throw e
                } finally {
                    val end = System.nanoTime()
                            spark.stop()
                            fw.close()
                }
println("done")
}
}

/*
       Reference : https://spark.apache.org/docs/latest/ml-collaborative-filtering.html
 */

The issue is when I use :

val als_model = ALS.train(train, i, k, j)

It gives:

expected org.apache.spark.mllib.recommendation.RDD[Rating] found RDD[User_Ratings.Rating]

I want to use ALS for training my RDD but couldn't. If it is not possible is there another way to train my data to recommend a user similar products?

question from:https://stackoverflow.com/questions/65641355/als-training-data-with-case-class-rating

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Actually the basic solution that I applied is using the hash() function to my String type UserID and ProdcutId. So the format matched with Machine Learning Rating Class.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...