Statistics Recommender

第一个Dataloader模块记录之前没有把过程截图,我下面将离线数据统计模块的创建过程截图记录

此模块的前置条件:

  1. 第一步Dataloader成功存储数据
  2. 虚拟机安装完成MongoDB和Spark并且成功启动
  1. 创建子模块

    image-20210313103032732

    image-20210313103159027

    image-20210313103254293

  2. 添加依赖

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
    <artifactId>book-recommender</artifactId>
    <groupId>com.wan</groupId>
    <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>StatisticsRecommender</artifactId>


    <dependencies>
    <!-- Spark的依赖引入 -->
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    </dependency>
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    </dependency>
    <!-- 引入Scala -->
    <dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    </dependency>
    <!-- 加入MongoDB的驱动 -->
    <!-- 用于代码方式连接MongoDB -->
    <dependency>
    <groupId>org.mongodb</groupId>
    <artifactId>casbah-core_2.11</artifactId>
    <version>${casbah.version}</version>
    </dependency>
    <!-- 用于Spark和MongoDB的对接 -->
    <dependency>
    <groupId>org.mongodb.spark</groupId>
    <artifactId>mongo-spark-connector_2.11</artifactId>
    <version>${mongodb-spark.version}</version>
    </dependency>
    </dependencies>


    <build>
    <finalName>statisticsrecommender</finalName>
    <plugins>
    <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-assembly-plugin</artifactId>
    <configuration>
    <archive>
    <manifest>
    <mainClass>com.wan.statistics.StatisticsRecommender</mainClass>
    </manifest>
    </archive>
    <descriptorRefs>
    <descriptorRef>jar-with-dependencies</descriptorRef>
    </descriptorRefs>
    </configuration>
    </plugin>
    </plugins>
    </build>
    </project>
  1. 配置日志文件

  2. 编码StatisticsRecommender.scala

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    package com.wan.statistics

    import org.apache.spark.SparkConf
    import org.apache.spark.sql.{DataFrame, SparkSession}

    /**
    * @author wanfeng
    * @date 2021/3/13 10:36
    */

    case class Rating(userId: Int, bookId: Int, score: Int)

    case class MongoConfig(uri: String, db: String)

    object StatisticsRecommender {

    val MONGODB_RATING_COLLECTION = "Rating"

    //定义统计表名
    val RATE_MORE_BOOKS = "RateMoreBooks"
    val AVERAGE_BOOKS = "AverageBooks"

    def main(args: Array[String]): Unit = {

    val config = Map(
    "spark.cores" -> "local[*]",
    "mongo.uri" -> "mongodb://192.168.2.88:27017/recommender",
    "mongo.db" -> "recommender"
    )
    val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("StatisticsRecommender")
    val spark = SparkSession.builder().config(sparkConf).getOrCreate()

    import spark.implicits._

    implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))

    val ratingDF = spark.read
    .option("uri", mongoConfig.uri)
    .option("collection", MONGODB_RATING_COLLECTION)
    .format("com.mongodb.spark.sql")
    .load()
    .as[Rating]
    .toDF()

    ratingDF.createTempView("ratings")

    //1. 历史热门书籍,按照评分个数统计
    val rateMoreBooksDF = spark.sql("select bookId, count(bookId) as countNum from ratings group by bookId order by countNum desc")
    storeDFInMongoDB(rateMoreBooksDF, RATE_MORE_BOOKS)

    //2. 优质书籍
    val averageBooksDF = spark.sql("select bookId, avg(score) as avgScore from ratings group by bookId order by avgScore desc")
    storeDFInMongoDB(averageBooksDF, AVERAGE_BOOKS)

    spark.stop()
    }

    def storeDFInMongoDB(df: DataFrame, collection_name: String)(implicit mongoConfig: MongoConfig): Unit = {
    df.write
    .option("uri", mongoConfig.uri)
    .option("collection", collection_name)
    .mode("overwrite")
    .format("com.mongodb.spark.sql")
    .save()
    }

    }

  3. 展示工程结构与运行效果:

    image-20210313104311120

    image-20210313104336710

  4. 结束!