Offline Recommender

注意:离线推荐模块我直接使用原数据集的全部数据时会出现内存溢出的问题,如果你的虚拟机设置的内存并不大,建议选取部分的数据集数据

  1. 新建子模块OfflineRecommender

  2. 添加依赖

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
    <artifactId>book-recommender</artifactId>
    <groupId>com.wan</groupId>
    <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>OfflineRecommender</artifactId>

    <dependencies>

    <dependency>
    <groupId>org.scalanlp</groupId>
    <artifactId>jblas</artifactId>
    <version>${jblas.version}</version>
    </dependency>

    <!-- Spark的依赖引入 -->
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    </dependency>
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    </dependency>
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-mllib_2.11</artifactId>
    </dependency>
    <!-- 引入Scala -->
    <dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    </dependency>

    <!-- 加入MongoDB的驱动 -->
    <!-- 用于代码方式连接MongoDB -->
    <dependency>
    <groupId>org.mongodb</groupId>
    <artifactId>casbah-core_2.11</artifactId>
    <version>${casbah.version}</version>
    </dependency>
    <!-- 用于Spark和MongoDB的对接 -->
    <dependency>
    <groupId>org.mongodb.spark</groupId>
    <artifactId>mongo-spark-connector_2.11</artifactId>
    <version>${mongodb-spark.version}</version>
    </dependency>
    </dependencies>
    <build>
    <finalName>offlinerecommender</finalName>
    <plugins>
    <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-assembly-plugin</artifactId>
    <configuration>
    <archive>
    <manifest>
    <mainClass>com.wan.offline.OfflineRecommender</mainClass>
    </manifest>
    </archive>
    <descriptorRefs>
    <descriptorRef>jar-with-dependencies</descriptorRef>
    </descriptorRefs>
    </configuration>
    </plugin>
    </plugins>
    </build>


    </project>
  1. 配置日志文件

  2. 编码OfflineRecommender.scala

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    package com.wan.offline

    import org.apache.spark.SparkConf
    import org.apache.spark.mllib.recommendation.{ALS, Rating}
    import org.apache.spark.sql.SparkSession
    import org.jblas.DoubleMatrix

    /**
    * @author wanfeng
    * @date 2021/3/13 10:47
    */


    case class BookRating(userId: Int, bookId: Int, score: Int)

    case class MongoConfig(uri: String, db: String)

    case class Recommenderation(bookId: Int, score: Double)

    case class UserRecs(userId: Int, recs: Seq[Recommenderation])

    case class BookRecs(bookId: Int, recs: Seq[Recommenderation])


    object OfflineRecommender {

    val MONGODB_RATING_COLLECTION = "Rating"

    // 定义推荐表名
    val USER_RECS = "UserRecs"
    val BOOK_RECS = "BookRecs"

    //定义推荐数量
    val USER_MAX_RECOMMENDATION = 20

    def main(args: Array[String]): Unit = {

    val config = Map(
    "spark.cores" -> "local[*]",
    "mongo.uri" -> "mongodb://192.168.2.88:27017/recommender",
    "mongo.db" -> "recommender"
    )
    val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("OfflineRecommender")
    val spark = SparkSession.builder().config(sparkConf).getOrCreate()

    import spark.implicits._

    implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))

    val ratingRDD = spark.read
    .option("uri", mongoConfig.uri)
    .option("collection", MONGODB_RATING_COLLECTION)
    .format("com.mongodb.spark.sql")
    .load()
    .as[BookRating]
    .rdd
    .map(
    rating => (rating.userId, rating.bookId, rating.score)
    ).cache()

    val userRDD = ratingRDD.map(_._1).distinct()
    val bookRDD = ratingRDD.map(_._2).distinct()


    //1. 训练隐语义模型
    val trainData = ratingRDD.map(x => Rating(x._1, x._2, x._3))
    val (rank, iterations, lambda) = (50, 5, 0.01)
    val model = ALS.train(trainData, rank, iterations, lambda)


    //2. 获得预评分矩阵,得到用户的推荐列表
    val userBooks = userRDD.cartesian(bookRDD)
    val preRating = model.predict(userBooks)

    val userRecs = preRating.filter(_.rating > 0)
    .map(
    rating => (rating.user, (rating.product, rating.rating))
    )
    .groupByKey()
    .map {
    case (userId, recs) =>
    UserRecs(
    userId, recs.toList.sortWith(_._2 > _._2).take(USER_MAX_RECOMMENDATION).map(x => Recommenderation(x._1, x._2))
    )
    }
    .toDF()

    userRecs.write
    .option("uri", mongoConfig.uri)
    .option("collection", USER_RECS)
    .mode("overwrite")
    .format("com.mongodb.spark.sql")
    .save()

    //3. 利用书籍的特征向量,计算书籍的相似度列表
    val productFeatures = model.productFeatures.map {
    case (bookId, features) => (bookId, new DoubleMatrix(features))
    }

    val bookRecs = productFeatures.cartesian(productFeatures)
    .filter {
    case (a, b) => a._1 != b._1
    }
    .map {
    case (a, b) =>
    val simScore = consinSim(a._2, b._2)
    (a._1, (b._1, simScore))
    }
    .filter(_._2._2 > 0.4)
    .groupByKey()
    .map {
    case (bookId, recs) =>
    BookRecs(
    bookId, recs.toList.map(x => Recommenderation(x._1, x._2))
    )
    }
    .toDF()

    bookRecs
    .write
    .option("uri", mongoConfig.uri)
    .option("collection", BOOK_RECS)
    .mode("overwrite")
    .format("com.mongodb.spark.sql")
    .save()

    spark.stop()
    }

    def consinSim(product1: DoubleMatrix, product2: DoubleMatrix): Double = {
    product1.dot(product2) / (product1.norm2() * product2.norm2())
    }

    }

  3. 展示工程结构与运行效果:

    image-20210313142831598

    image-20210313142915947

  4. 结束!