(args.length <
2) {
System.err.println(
"Usage : <master> <hdfs dir path>")
System.exit(
1)
}
Logger.getLogger(
"org.apache.spark").setLevel(Level.WARN)
Logger.getLogger(
"org.eclipse.jetty.server").setLevel(Level.OFF)
val conf =
new SparkConf().setMaster(args(
0)).setAppName(
"Collaborative Filtering")
val sc =
new SparkContext(conf)
val ratingsList_Tuple = sc.textFile(args(
1) +
"/ratings.dat").map { lines =>
val fields = lines.split(
"::")
(fields(
0).toInt, fields(
1).toInt, fields(
2).toDouble, fields(
3).toLong %
10)
}
val ratingsTrain_KV = ratingsList_Tuple.map(x =>
(x._4, Rating(x._1, x._2, x._3)))
println(
"get " + ratingsTrain_KV.count()
+
" ratings from " + ratingsTrain_KV.map(_._2.user).distinct().count()
+
"users on " + ratingsTrain_KV.map(_._2.product).distinct().count() +
"movies")
val myRatedData_Rating = sc.textFile(args(
2)).map { lines =>
val fields = lines.split(
"::")
Rating(fields(
0).toInt, fields(
1).toInt, fields(
2).toDouble)
}
val numPartitions =
3
val traningData_Rating = ratingsTrain_KV.filter(_._1 <
8)
.values
.union(myRatedData_Rating)
.repartition(numPartitions)
.cache()
val validateData_Rating = ratingsTrain_KV.filter(x => x._1 >=
6 && x._1 <
8)
.values
.repartition(numPartitions)
.cache()
val testData_Rating = ratingsTrain_KV.filter(_._1 >=
8)
.values
.cache()
println(
"training data‘s num : " + traningData_Rating.count()
+
" validate data‘s num : " + validateData_Rating.count()
+
" test data‘s num : " + testData_Rating.count())
val ranks = List(
8,
22)
val lambdas = List(
0.1,
10.0)
val iters = List(
5,
7)
var bestModel: MatrixFactorizationModel =
null
var bestValidateRnse = Double.MaxValue
var bestRank =
0
var bestLambda = -
1.0
var bestIter = -
1
for (rank <- ranks; lam <- lambdas; iter <- iters) {
val model = ALS.train(traningData_Rating, rank, iter, lam)
val validateRnse = rnse(model, validateData_Rating, validateData_Rating.count())
println(
"validation = " + validateRnse
+
" for the model trained with rank = " + rank
+
" lambda = " + lam
+
" and numIter" + iter)
if (validateRnse < bestValidateRnse) {
bestModel = model
bestValidateRnse = validateRnse
bestRank = rank
bestLambda = lam
bestIter = iter
}
}
val testDataRnse = rnse(bestModel, testData_Rating, testData_Rating.count())
println(
"the best model was trained with rank = " + bestRank +
" and lambda = " + bestLambda
+
" and numIter = " + bestIter +
" and Rnse on the test data is " + testDataRnse)
val meanRating = traningData_Rating.union(validateData_Rating).map(_.rating).mean()
val baseLineRnse = math.sqrt(testData_Rating.map(x => (meanRating - x.rating) * (meanRating - x.rating)).mean())
val improvent = (baseLineRnse - testDataRnse) / baseLineRnse *
100
println(
"the best model improves the baseline by " +
"%2.2f".format(improvent) +
"%")
val movieList_Tuple = sc.textFile(args(
1) +
"/movies.dat").map { lines =>
val fields = lines.split(
"::")
(fields(
0).toInt, fields(
1), fields(
2))
}
val movies_Map = movieList_Tuple.map(x =>
(x._1, x._2)).collect().toMap
val moviesType_Map = movieList_Tuple.map(x =>
(x._1, x._3)).collect().toMap
var i =
1
println(
"movies recommond for you:")
val myRatedMovieIds = myRatedData_Rating.map(_.product).collect().toSet
val recommondList = sc.parallelize(movies_Map.keys.filter(myRatedMovieIds.contains(_)).toSeq)
bestModel.predict(recommondList.map((
0, _))).collect().sortBy(-_.rating).take(
10).foreach { r =>
println(
"%2d".format(i) +
"----------> : \nmovie name --> "
+ movies_Map(r.product) +
" \nmovie type --> "
+ moviesType_Map(r.product))
i +=
1
}
println(
"you may be interested in these people : ")
val sqlContext =
new SQLContext(sc)
import sqlContext.implicits._
val movies = movieList_Tuple
.map(m => Movies(m._1.toInt, m._2, m._3))
.toDF()
val ratings = ratingsList_Tuple
.map(r => Ratings(r._1.toInt, r._2.toInt, r._3.toInt))
.toDF()
val users = sc.textFile(args(
1) +
"/users.dat").map { lines =>
val fields = lines.split(
"::")
Users(fields(
0).toInt, fields(
2).toInt, fields(
3).toInt)
}.toDF()
ratings.filter(‘rating >=
5)
.join(movies, ratings(
"movieId") === movies(
"id"))
.filter(movies(
"mType") ===
"Drama")
.join(users, ratings(
"userId") === users(
"id"))
.filter(users(
"age") ===
18)
.filter(users(
"occupation") ===
15)
.select(users(
"id"))
.take(
10)
.foreach(println)
}
def rnse(model: MatrixFactorizationModel, predictionData: RDD[Rating], n: Long): Double = {
val prediction = model.predict(predictionData.map(x => (x.user, x.product)))
val predictionAndOldRatings = prediction.map(x => ((x.user, x.product), x.rating))
.join(predictionData.map(x => ((x.user, x.product), x.rating))).values
math.sqrt(predictionAndOldRatings.map(x => (x._1 - x._2) * (x._1 - x._2)).reduce(_ - _) / n)
}
case
class Ratings(userId: Int, movieId: Int, rating: Int)
case
class Movies(id: Int, name: String, mType: String)
case
class Users(id: Int, age: Int, occupation: Int)
}
系统在Spark集群上运行的结果如下图:
关于SparkSQL的一些基本操作请看:
Spark(九) – SparkSQL API编程
如果本文有中任何不足或者错误之处,万请指出~
如果你有任何疑问,欢迎联系交流~
基于Spark Mllib,SparkSQL的电影推荐系统
标签:spark mllib 协同过滤 推荐系统