descending order
def calculateSimilarity
(video: RDD[(String, Set[String])], myFunc: (Set[String], Set[String]) => Double): RDD[(String, String, Double)] = {
video.cartesian(video).filter(f => f._1._1 != f._2._1)
.map(f => (f._1._1, f._2._1, myFunc(f._1._2, f._2._2)))
.filter(f => f._3 > 0).groupBy(_._1)
.flatMap(f => f._2.toList.sortWith((a, b) => a._3 > b._3).take(100)) //k value
}
ascending order
def itemCFRecommend(
videoSimilarity: RDD[VideoSim],
userVideos: RDD[VideoRating],
r_number: Int): (RDD[UserRecd]) = {
videoSimilarity.map(f
=> (f.video1, (f.video2, f.similar)))
.join(userVideos.map(f
=> (f.videoID, (f.userID, f.score))))
.map(f
=> ((f._2._2._1, f._2._1._1), f._2._2._2 * f._2._1._2))
.reduceByKey
((x, y) => x + y)
.leftOuterJoin(userVideos.map(f => ((f.userID, f.videoID), 1)))
.filter(f => f._2._2.isEmpty)
.map(f => (f._1._1, (f._1._2, f._2._1)))
.groupByKey() // knn part
.map(f => {
val i2 = f._2.toBuffer
val i2_2 = i2.sortBy(_._2)
if (i2_2.length > r_number) i2_2.remove(0, i2_2.length - r_number)
(f._1, i2_2)
}).flatMap(f => {
for (w <- f._2) yield (f._1, w._1, w._2)
}).map(f => UserRecd(f._1, f._2, f._3))
}