小程序开发定制【Spark实训】--竞赛网站访问日志分析

 目录


实训题目:小程序开发定制竞赛网站访问日志分析

一. 训练要点

(1)搭建Spurk工程环境。

(2) 编程。

(3)通过spark-submit提交应用。

二.需求说明

     小程序开发定制某竞赛网站每年都会开小程序开发定制展数据挖据的竞赛,小程序开发定制在竞赛期间网站会有大小程序开发定制量人群访问,小程序开发定制生成了大量的用户访向记录。现在提供2016年10月到2017年6小程序开发定制月的部分脱敏访问日志数据。小程序开发定制日志数据的基本内容如图所示,仅提供以下6个字段。

属性名称

属性解析

Id

序号

Content_id

网页ID

Page_path

网址

Userid

用户ID

Sessionid

缓存生成ID

Date_time

访问时间

     要求根据提供的用户访问日志数据,利用Spark技术统计访向的用户数、被访问的不同网页个数以及每月的访问量,并将结果保存到HDFS上。

文章所用文档以及目录等等说明:

(点击可免费下载)访问日志数据:    

IDEA内实现代码存储路径与名字:LogCount.scala

  jc_content_viewlog.txt   内部分数据如下图:

三.关键实现思路及步骤

(1)配置好Spark的IntelliJ IDEA开发环境。

(2)启动IntelliJ IDEA,并进行Spark编程。

(3)对访向记录中的网页去重,统计本周期内被访问网页的个数。

  1. val logs_all: RDD[Array[String]] = sc.textFile(args(0)).map{_.split(",")}
  2. val wy_log: RDD[String] = logs_all.map(x=>(x(1).toString)).distinct()
  3. val wy_count:RDD[(String, Int)]= wy_log.map(("wy_zs",_)).groupByKey().map(x => (x._1,x._2.size))

 (4) userid为用户注册登录的标识,对userid去重,统计登录用户的数量。

  1. val user_log: RDD[String] = logs_all.map(x=>(x(3).toString)).distinct()
  2. val user_count:RDD[(String, Int)]= user_log.map(("user_zs",_)).groupByKey().map(x => (x._1,x._2.size))

 (5)按月统计访问记录数。

  1. val logs_all_new = logs_all.map{x=>(x(0),x(1),x(2),x(3),x(4),x(5),date_time(x(5)))}
  2. val ny_count: RDD[(String, Int)] = logs_all_new.map(x=>(x._7,1)).reduceByKey((a, b)=>a+b)

 (6)将结果保存到不同文件中。

  1. wy_count.repartition(1).saveAsTextFile(args(1))
  2. user_count.repartition(1).saveAsTextFile(args(2))
  3. ny_count.repartition(1).saveAsTextFile(args(3))

 (7)打包Spark工程,在集群提交应用程序。

[root@node1 ~]# /myserver/spark301/bin/spark-submit --master yarn --deploy-mode client   --class   net.LogCount /root/jc.jar hdfs://node1:8020/user/root/jc_content_viewlog.txt hdfs://node1:8020/user/root/jc1 hdfs://node1:8020/user/root/jc2 hdfs://node1:8020/user/root/jc3

[root@node1 ~]# /myserver/spark301/bin/spark-submit --master yarn --deploy-mode client   --class   net.LogCount /root/jc.jar hdfs://node1:8020/user/root/jc_content_viewlog.txt hdfs://node1:8020/user/root/jc1 hdfs://node1:8020/user/root/jc2 hdfs://node1:8020/user/root/jc3

注:jc.jar是上面文件生成的jar包改名并上传而来;

hdfs://node1:8020/user/root/jc_content_viewlog.txt  是hdfs里面jc_content_viewlog.txt存储路径,也需要自己上传,目录自己决定;

hdfs://node1:8020/user/root/jc1 hdfs://node1:8020/user/root/jc2 hdfs://node1:8020/user/root/jc3  是设置它的输出存储路径,因为会输出三个不同数据,需要三个目录,不然会报错。

 四、LogCount.scala文件完整代码实现:

  1. package net
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. object LogCount {
  5. def main(args: Array[String]): Unit = {
  6. if(args.length < 2){
  7. println("请指定input和output")
  8. System.exit(1)//非0表示非正常退出程序
  9. }
  10. //TODO 1.env/准备sc/SparkContext/Spark上下文执行环境
  11. val conf: SparkConf = new SparkConf().setAppName("wc")
  12. val sc: SparkContext = new SparkContext(conf)
  13. sc.setLogLevel("WARN")
  14. //TODO 2.source/读取数据
  15. //RDD:A Resilient Distributed Dataset (RDD):弹性分布式数据集,简单理解为分布式集合!使用起来和普通集合一样简单!
  16. //RDD[就是一行行的数据]
  17. val logs_all: RDD[Array[String]] = sc.textFile(args(0)).map{_.split(",")}
  18. //TODO 3.transformation/数据操作/转换
  19. //对访问记录中的网页去重,统计本周期内被访问网页的个数
  20. val wy_log: RDD[String] = logs_all.map(x=>(x(1).toString)).distinct()
  21. val wy_count:RDD[(String, Int)]= wy_log.map(("wy_zs",_)).groupByKey().map(x => (x._1,x._2.size))
  22. //userid为用户注册登录的标识,对userid去重,统计登录用户的数量
  23. val user_log: RDD[String] = logs_all.map(x=>(x(3).toString)).distinct()
  24. val user_count:RDD[(String, Int)]= user_log.map(("user_zs",_)).groupByKey().map(x => (x._1,x._2.size))
  25. //按月统计访问记录数
  26. val logs_all_new = logs_all.map{x=>(x(0),x(1),x(2),x(3),x(4),x(5),date_time(x(5)))}
  27. val ny_count: RDD[(String, Int)] = logs_all_new.map(x=>(x._7,1)).reduceByKey((a, b)=>a+b)
  28. //TODO 4.sink/输出
  29. //输出到指定path(可以是文件/夹)
  30. wy_count.repartition(1).saveAsTextFile(args(1))
  31. user_count.repartition(1).saveAsTextFile(args(2))
  32. ny_count.repartition(1).saveAsTextFile(args(3))
  33. //为了便于查看Web-UI可以让程序睡一会
  34. Thread.sleep(1000 * 60)
  35. //TODO 5.关闭资源
  36. sc.stop()
  37. }
  38. //获取年月,时间段作为输入参数
  39. def date_time(date:String):String={
  40. val nianye =date.trim.substring(0,7)
  41. nianye
  42. }
  43. }

五、运行过程与结果截图:

 


 六、具体实现步骤

1、修改打包好的jar名字,并把jar上传到node1结点

2、开启一系列集群:

start-dfs.sh   //一键开启
start-yarn.sh  //开启
cd /myserver/
 mr-jobhistory-daemon.sh start historyserver
 /myserver/spark301/sbin/start-history-server.sh
 jps  //查看

这里不再具体说明如何开启。

3、上传jc_content_viewlog.txt到node1节点,并上传到hdfs

  1. [root@node1 ~]# hdfs dfs -put jc_content_viewlog.txt  /user/root/

 4、在集群提交应用程序

[root@node1 ~]# /myserver/spark301/bin/spark-submit --master yarn --deploy-mode client   --class   net.LogCount /root/jc.jar hdfs://node1:8020/user/root/jc_content_viewlog.txt hdfs://node1:8020/user/root/jc1 hdfs://node1:8020/user/root/jc2 hdfs://node1:8020/user/root/jc3

 七、相关知识点 

进入spark-shell

[root@node1 bin]# /myserver/spark301/bin/spark-shell

 1、过滤出访问次数在 50 次以上的用户记录

(1)统计用户访问次数并筛选出访问次数在50次以上的用户ID

scala> val data = sc.textFile("hdfs://node1:8020/user/root/jc_content_viewlog.txt").map{x=> x.split(",")}

data: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[2] at map at <console>:24

scala> val userid=data.map(line=>(line(3),1)).reduceByKey((a,b)=>a+b).filter(x=>x._2>50).keys .collect

 (2)根据过滤后的用户ID,在原数据中筛选出这一部分用户的访问记录

scala> val valib_data=data.filter(x=>userid.contains(x(3)))

valib_data: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[7] at filter at <console>:27

scala> valib_data.take(2)   //查看

res1: Array[Array[String]] = Array(Array(480343, 611, /jingsa/611.jhtml, 1, 69463B3F2728DBEB045A5C31CA9C2E3A, 2017-03-01 09:56:49), Array(480358, 611, /jingsa/611.jhtml, 1, 69463B3F2728DBEB045A5C31CA9C2E3A, 2017-03-01 09:58:50))

 2、统计访问 50 次以上的用户主要访问的前 5 类网页

 scala> val web = valib_data.map(x=>(x(2),1)).reduceByKey((a,b)=>a+b).sortBy(x=>x._2,false)

web: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[12] at sortBy at <console>:25

scala> web.take(5)

res2: Array[(String, Int)] = Array((/jingsa/1030.jhtml,67899), (/view/contentViewLog.jspx,5008), (/jingsa/712.jhtml,2551), (/youxiuzuopin/823.jhtml,1212), (/jingsa/613.jhtml,968))

 3. 合并部分网页

(URL 后面带有_1、_2 字样的翻页网址,统一为一个网址)通过字符串截取的方法,对网页网址字符串进行截取,只截取“_”前面的字符串

 scala> val data2=data.filter(_.length>=6).map{

    x=>

      var page="";

      if(x(2).contains("_"))

        { page=x(2).substring(0,x(2).lastIndexOf("_")) }

      else

        { page=x(2) };

      (x(0),x(1),page,x(3),x(4),x(5))

      }

data2: org.apache.spark.rdd.RDD[(String, String, String, String, String, String)] = MapPartitionsRDD[14] at map at <console>:25

 4.根据访问时间加入对应时段:

6:30~11:30 为上午,11:30~14:00 为中午,14:00~17:30为下午,17:30~19:00 为傍晚,19:00~23:00 为晚上,23:00~6:30 为深夜,统计所有用户各时段访问情况

(1)首先定义一个函数,用于匹配时间段并返回相应的字段值

  1. scala> def date_time(date:String):String={
  2. val hour=date.substring(date.indexOf(" ")+1,date.indexOf(":")).toInt
  3. val min=date.substring(date.indexOf(":")+1,date.lastIndexOf(":")).toInt
  4. if(hour<6 && hour>=23) "深夜"
  5. else if(hour==6 && min<=30) "深夜"
  6. else if(hour<11 && hour>=6) "上午"
  7. else if(hour==11 && min<=30) "上午"
  8. else if(hour<14 && hour>=11) "中午"
  9. else if(hour>=14 && hour<17) "下午"
  10. else if(hour==17 && hour<=30) "下午"
  11. else if(hour>=17 && hour<19) "傍晚"
  12. else if(hour==19 && min<=30) "傍晚"
  13. else "晚上"
  14. }
  15. date_time: (date: String)String

(2)通过map方法对每一条记录的时间进行匹配,增加一个时间段的值到记录中

 scala> val data_new = data2.map{x=>(x._1,x._2,x._3,x._4,x._5,x._6,date_time(x._6))}

data_new: org.apache.spark.rdd.RDD[(String, String, String, String, String, String, String)] = MapPartitionsRDD[17] at map at <console>:27

(3)将时段值作为键,值为1,利用reduceByKey的方法统计各时段访问情况 

scala> val date_count = data_new.map(x=>(x._7,1)).reduceByKey((a,b)=>a+b)

date_count: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[19] at reduceByKey at <console>:25

scala> date_count.take(10)

res3: Array[(String, Int)] = Array((上午,31675), (傍晚,14511), (中午,18799), (下午,39720), (深夜,81), (晚上,67073))

网站建设定制开发 软件系统开发定制 定制软件开发 软件开发定制 定制app开发 app开发定制 app开发定制公司 电商商城定制开发 定制小程序开发 定制开发小程序 客户管理系统开发定制 定制网站 定制开发 crm开发定制 开发公司 小程序开发定制 定制软件 收款定制开发 企业网站定制开发 定制化开发 android系统定制开发 定制小程序开发费用 定制设计 专注app软件定制开发 软件开发定制定制 知名网站建设定制 软件定制开发供应商 应用系统定制开发 软件系统定制开发 企业管理系统定制开发 系统定制开发