基于大数据的图书推荐系统系统(五)——离线推荐
Offline Recommender
注意:离线推荐模块我直接使用原数据集的全部数据时会出现内存溢出的问题,如果你的虚拟机设置的内存并不大,建议选取部分的数据集数据
新建子模块OfflineRecommender
添加依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>book-recommender</artifactId>
<groupId>com.wan</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>OfflineRecommender</artifactId>
<dependencies>
<dependency>
<groupId>org.scalanlp</groupId>
<artifactId>jblas</artifactId>
<version>${jblas.version}</version>
</dependency>
<!-- Spark的依赖引入 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
</dependency>
<!-- 引入Scala -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
<!-- 加入MongoDB的驱动 -->
<!-- 用于代码方式连接MongoDB -->
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>casbah-core_2.11</artifactId>
<version>${casbah.version}</version>
</dependency>
<!-- 用于Spark和MongoDB的对接 -->
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.11</artifactId>
<version>${mongodb-spark.version}</version>
</dependency>
</dependencies>
<build>
<finalName>offlinerecommender</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>com.wan.offline.OfflineRecommender</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>
配置日志文件
编码
OfflineRecommender.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135package com.wan.offline
import org.apache.spark.SparkConf
import org.apache.spark.mllib.recommendation.{ALS, Rating}
import org.apache.spark.sql.SparkSession
import org.jblas.DoubleMatrix
/**
* @author wanfeng
* @date 2021/3/13 10:47
*/
case class BookRating(userId: Int, bookId: Int, score: Int)
case class MongoConfig(uri: String, db: String)
case class Recommenderation(bookId: Int, score: Double)
case class UserRecs(userId: Int, recs: Seq[Recommenderation])
case class BookRecs(bookId: Int, recs: Seq[Recommenderation])
object OfflineRecommender {
val MONGODB_RATING_COLLECTION = "Rating"
// 定义推荐表名
val USER_RECS = "UserRecs"
val BOOK_RECS = "BookRecs"
//定义推荐数量
val USER_MAX_RECOMMENDATION = 20
def main(args: Array[String]): Unit = {
val config = Map(
"spark.cores" -> "local[*]",
"mongo.uri" -> "mongodb://192.168.2.88:27017/recommender",
"mongo.db" -> "recommender"
)
val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("OfflineRecommender")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))
val ratingRDD = spark.read
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_RATING_COLLECTION)
.format("com.mongodb.spark.sql")
.load()
.as[BookRating]
.rdd
.map(
rating => (rating.userId, rating.bookId, rating.score)
).cache()
val userRDD = ratingRDD.map(_._1).distinct()
val bookRDD = ratingRDD.map(_._2).distinct()
//1. 训练隐语义模型
val trainData = ratingRDD.map(x => Rating(x._1, x._2, x._3))
val (rank, iterations, lambda) = (50, 5, 0.01)
val model = ALS.train(trainData, rank, iterations, lambda)
//2. 获得预评分矩阵,得到用户的推荐列表
val userBooks = userRDD.cartesian(bookRDD)
val preRating = model.predict(userBooks)
val userRecs = preRating.filter(_.rating > 0)
.map(
rating => (rating.user, (rating.product, rating.rating))
)
.groupByKey()
.map {
case (userId, recs) =>
UserRecs(
userId, recs.toList.sortWith(_._2 > _._2).take(USER_MAX_RECOMMENDATION).map(x => Recommenderation(x._1, x._2))
)
}
.toDF()
userRecs.write
.option("uri", mongoConfig.uri)
.option("collection", USER_RECS)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
//3. 利用书籍的特征向量,计算书籍的相似度列表
val productFeatures = model.productFeatures.map {
case (bookId, features) => (bookId, new DoubleMatrix(features))
}
val bookRecs = productFeatures.cartesian(productFeatures)
.filter {
case (a, b) => a._1 != b._1
}
.map {
case (a, b) =>
val simScore = consinSim(a._2, b._2)
(a._1, (b._1, simScore))
}
.filter(_._2._2 > 0.4)
.groupByKey()
.map {
case (bookId, recs) =>
BookRecs(
bookId, recs.toList.map(x => Recommenderation(x._1, x._2))
)
}
.toDF()
bookRecs
.write
.option("uri", mongoConfig.uri)
.option("collection", BOOK_RECS)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
spark.stop()
}
def consinSim(product1: DoubleMatrix, product2: DoubleMatrix): Double = {
product1.dot(product2) / (product1.norm2() * product2.norm2())
}
}展示工程结构与运行效果:
结束!