博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Hive-实战之谷粒影音
阅读量:4481 次
发布时间:2019-06-08

本文共 18468 字,大约阅读时间需要 61 分钟。

10 Hive实战之谷粒影音

10.1 需求描述

统计硅谷影音视频网站的常规指标,各种TopN指标:

--统计视频观看数Top10

--统计视频类别热度Top10

--统计视频观看数Top20所属类别

--统计视频观看数Top50所关联视频的所属类别Rank

--统计每个类别中的视频热度Top10

--统计每个类别中视频流量Top10

--统计上传视频最多的用户Top10以及他们上传的视频

--统计每个类别视频观看数Top10

10.2 项目

10.2.1 数据结构

1.视频表

6-13 视频表

字段

备注

详细描述

video id

视频唯一id

11位字符串

uploader

视频上传者

上传视频的用户名String

age

视频年龄

视频在平台上的整数天

category

视频类别

上传视频指定的视频分类

length

视频长度

整形数字标识的视频长度

views

观看次数

视频被浏览的次数

rate

视频评分

满分5

ratings

流量

视频的流量,整型数字

conments

评论数

一个视频的整数评论数

related ids

相关视频id

相关视频的id,最多20

2.用户表

6-14 用户表

字段

备注

字段类型

uploader

上传者用户名

string

videos

上传视频数

int

friends

朋友数量

int

10.2.2 ETL原始数据

通过观察原始数据形式,可以发现,视频可以有多个所属分类,每个所属分类用&符号分割,且分割的两边有空格字符,同时相关视频也是可以有多个元素,多个相关视频又用“\t”进行分割。为了分析数据时方便对存在多个子元素的数据进行操作,我们首先进行数据重组清洗操作。即:将所有的类别用“&”分割,同时去掉两边空格,多个相关视频id也使用“&”进行分割。

 

 

import com.atlxl.ETLUtil;/** * @author LXL * @create 2019-06-05 23:07 */public class Test {    public static void main(String[] args) {        System.out.println(ETLUtil.etlStr("SDNkMu8ZT68\tw00dy911\t630\tPeople & Blogs\t186\t10181\t3.49\t494\t257\trjnbgpPJUks\tkdjhkdfhsgksd"));    }}

 

 

4.0.0
com.atlxl
guliETL
1.0-SNAPSHOT
junit
junit
RELEASE
org.apache.logging.log4j
log4j-core
2.8.2
org.apache.hadoop
hadoop-common
2.7.2
org.apache.hadoop
hadoop-client
2.7.2
org.apache.hadoop
hadoop-hdfs
2.7.2

 

 

 

 

1ETLETLUtil

 

package com.atlxl;/** * @author LXL * @create 2019-06-05 22:27 * * 1.过滤脏数据 * 2.将类别字段中的空格 * 3.替换关联视频中的分隔符 */public class ETLUtil {    public static String etlStr(String line){        //切割数据        String[] split = line.split("\t");        //1. 过滤脏数据        if (split.length<9) return null;        //2. 去掉类别字段中的空格        split[3] = split[3].replaceAll(" ", "");        //3.替换关联视频的分隔符        StringBuffer sb = new StringBuffer();        for (int i=0; i < split.length; i++) {            if (i < 9) {                if (i==split.length - 1){                    sb.append(split[i]);                }else {                    sb.append(split[i]).append("\t");                }            }else {                if (i==split.length - 1){                    sb.append(split[i]);                }else {                    sb.append(split[i]).append("&");                }            }        }        return sb.toString();    }}

 

 

 

原文:

public class ETLUtil {

public static String oriString2ETLString(String ori){

StringBuilder etlString = new StringBuilder();

String[] splits = ori.split("\t");

if(splits.length < 9) return null;

splits[3] = splits[3].replace(" ", "");

for(int i = 0; i < splits.length; i++){

if(i < 9){

if(i == splits.length - 1){

etlString.append(splits[i]);

}else{

etlString.append(splits[i] + "\t");

}

}else{

if(i == splits.length - 1){

etlString.append(splits[i]);

}else{

etlString.append(splits[i] + "&");

}

}

}

 

return etlString.toString();

}

}

2ETLMapper

 

package com.atlxl;import org.apache.commons.lang.StringUtils;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/** * @author LXL * @create 2019-06-05 21:26 */public class ETlMapper extends Mapper
{ private Text k = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //1.获取一行数据 String line = value.toString(); //2.清洗数据 String etlStr = ETLUtil.etlStr(line); //3.写出数据 if (StringUtils.isBlank(etlStr)) { return; } k.set(etlStr); context.write(k, NullWritable.get()); }}

 

 

 

原文:

import java.io.IOException;

 

import org.apache.commons.lang.StringUtils;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

 

import com.atguigu.util.ETLUtil;

 

public class VideoETLMapper extends Mapper<Object, Text, NullWritable, Text>{

Text text = new Text();

 

@Override

protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {

String etlString = ETLUtil.oriString2ETLString(value.toString());

 

if(StringUtils.isBlank(etlString)) return;

 

text.set(etlString);

context.write(NullWritable.get(), text);

}

}

3ETLRunner

 

package com.atlxl;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;/** * @author LXL * @create 2019-06-05 23:22 */public class ETLDriver implements Tool{    private Configuration configuration;    public int run(String[] args) throws Exception {        //1.获取job对象        Job job = Job.getInstance(getConf());        //2.封装driver类        job.setJarByClass(ETLDriver.class);        //3.关联Mapper类        job.setMapperClass(ETlMapper.class);        //4.Mapper输出的KV类型        job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(NullWritable.class);        //5.最终输出类型        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(NullWritable.class);        //6.输入输出路径        FileInputFormat.setInputPaths(job, new Path(args[0]));        FileOutputFormat.setOutputPath(job, new Path(args[1]));        job.setNumReduceTasks(0);        //7.提交任务        boolean result = job.waitForCompletion(true);        return result ? 0 : 1;    }    public void setConf(Configuration conf) {        configuration = conf;    }    public Configuration getConf() {        return configuration;    }    public static void main(String[] args) throws Exception {        int run = ToolRunner.run(new ETLDriver(), args);        System.out.println(run);    }}

 

 

 

原文:

import java.io.IOException;

 

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

 

public class VideoETLRunner implements Tool {

private Configuration conf = null;

 

@Override

public void setConf(Configuration conf) {

this.conf = conf;

}

 

@Override

public Configuration getConf() {

return this.conf;

}

 

@Override

public int run(String[] args) throws Exception {

conf = this.getConf();

conf.set("inpath", args[0]);

conf.set("outpath", args[1]);

 

Job job = Job.getInstance(conf);

 

job.setJarByClass(VideoETLRunner.class);

 

job.setMapperClass(VideoETLMapper.class);

job.setMapOutputKeyClass(NullWritable.class);

job.setMapOutputValueClass(Text.class);

job.setNumReduceTasks(0);

 

this.initJobInputPath(job);

this.initJobOutputPath(job);

 

return job.waitForCompletion(true) ? 0 : 1;

}

 

private void initJobOutputPath(Job job) throws IOException {

Configuration conf = job.getConfiguration();

String outPathString = conf.get("outpath");

 

FileSystem fs = FileSystem.get(conf);

 

Path outPath = new Path(outPathString);

if(fs.exists(outPath)){

fs.delete(outPath, true);

}

 

FileOutputFormat.setOutputPath(job, outPath);

 

}

 

private void initJobInputPath(Job job) throws IOException {

Configuration conf = job.getConfiguration();

String inPathString = conf.get("inpath");

 

FileSystem fs = FileSystem.get(conf);

 

Path inPath = new Path(inPathString);

if(fs.exists(inPath)){

FileInputFormat.addInputPath(job, inPath);

}else{

throw new RuntimeException("HDFS中该文件目录不存在:" + inPathString);

}

}

 

public static void main(String[] args) {

try {

int resultCode = ToolRunner.run(new VideoETLRunner(), args);

if(resultCode == 0){

System.out.println("Success!");

}else{

System.out.println("Fail!");

}

System.exit(resultCode);

} catch (Exception e) {

e.printStackTrace();

System.exit(1);

}

}

}

4.执行ETL #这里忽视

$ bin/yarn jar ~/softwares/jars/gulivideo-0.0.1-SNAPSHOT.jar \

com.atguigu.etl.ETLVideosRunner \

/gulivideo/video/2008/0222 \

/gulivideo/output/video/2008/0222

 

10.3 准备工作

上传数据到集群:

[root@node01 datas]# hadoop fs -put user /

[root@node01 datas]# hadoop fs -put video /

 

执行jar包:

[root@node01 datas]# yarn jar etl.jar com.atlxl.ETLDriver /video/2008/0222 /output

 

10.3.1 创建表

创建表:gulivideo_ori,gulivideo_user_ori

创建表:gulivideo_orc,gulivideo_user_orc

gulivideo_ori:

create table gulivideo_ori(    videoId string,    uploader string,    age int,    category array
, length int, views int, rate float, ratings int, comments int, relatedId array
)row format delimitedfields terminated by "\t"collection items terminated by "&"stored as textfile;

 

 

gulivideo_user_ori

 

create table gulivideo_user_ori(    uploader string,    videos int,    friends int)row format delimitedfields terminated by "\t"stored as textfile;

 

 

 

然后把原始数据插入到orc表中 

gulivideo_orc

create table gulivideo_orc(    videoId string,    uploader string,    age int,    category array
, length int, views int, rate float, ratings int, comments int, relatedId array
)clustered by (uploader) into 8 bucketsrow format delimited fields terminated by "\t"collection items terminated by "&"stored as orc;

 

gulivideo_user_orc:

create table gulivideo_user_orc(    uploader string,    videos int,    friends int)row format delimitedfields terminated by "\t"stored as orc;

 

10.3.2 导入ETL后的数据

gulivideo_ori:

load data inpath "/gulivideo/output/video/2008/0222" into table gulivideo_ori;

gulivideo_user_ori

load data inpath "/gulivideo/user/2008/0903" into table gulivideo_user_ori;

10.3.3 ORC表插入数据

gulivideo_orc:

 

insert into table gulivideo_orc select * from gulivideo_ori;

或:

insert overwrite table gulivideo_orc  select * from gulivideo_ori;

 

gulivideo_user_orc:

insert into table gulivideo_user_orc select * from gulivideo_user_ori;

或:

insert overwrite table gulivideo_user_orc select * from gulivideo_user_ori;

10.4 业务分析

10.4.1 统计视频观看数Top10

思路:使用order by按照views字段做一个全局排序即可,同时我们设置只显示前10条。

最终代码:

select * from gulivideo_orc order by views desc limit 10;

 这里可能会出现内存溢出,报错可以查看一下日志。如果是内存溢出看下一章常见问题解决

原文:

select

    videoId,

    uploader,

    age,

    category,

    length,

    views,

    rate,

    ratings,

    comments

from

    gulivideo_orc

order by

    views

desc limit

    10;

10.4.2 统计视频类别热度Top10

思路:

1) 即统计每个类别有多少个视频,显示出包含视频最多的前10个类别。

2) 我们需要按照类别group by聚合,然后count组内的videoId个数即可。

3) 因为当前表结构为:一个视频对应一个或多个类别。所以如果要group by类别,需要先将类别进行列转行(展开),然后再进行count即可。

4) 最后按照热度排序,显示前10条。

 

a.炸开类别select videoId, category_namefrom gulivideo_orc lateral view explode(category) category_t as category_name; t1b.统计每种类别下的视频数select category_name, count(*) hotfrom t1 group by category_name; t2c.类别热度Top10select category_name, hot from t2order by hot desclimit 10;

 

 

 

最终代码:

SELECT category_name, hotFROM (    SELECT category_name, COUNT(*) AS hot    FROM (        SELECT videoId, category_name        FROM gulivideo_orc            LATERAL VIEW explode(category) category_t AS category_name    ) t1    GROUP BY category_name) t2ORDER BY hot DESCLIMIT 10;

 

或:

select    category_name as category,    count(t1.videoId) as hotfrom (    select        videoId,        category_name    from        gulivideo_orc lateral view explode(category) t_catetory as category_name) t1group by    t1.category_nameorder by    hotdesc limit    10;

 

或:

SELECT category_name, COUNT(*) AS hotFROM (    SELECT videoId, category_name    FROM gulivideo_orc        LATERAL VIEW explode(category) category_t AS category_name) t1GROUP BY category_nameORDER BY hot DESCLIMIT 10;

 

 

思路:10.4.3 统计出视频观看数最高的20个视频的所属类别以及类别包含Top20视频的个数

1) 先找到观看数最高的20个视频所属条目的所有信息,降序排列

2) 把这20条信息中的category分裂出来(列转行)

3) 最后查询视频分类名称和该分类下有多少个Top20的视频

 

a.统计观看数Top20所属类别select videoId,views,categoryfrom gulivideo_orcorder by views desclimit 20; t1 b.前20视频的类别select videoId, category_namefrom t1lateral view explode(category) category_t as category_name; t2c.去重select videoId, distinct(t2.category_name)from t2

 

 

 

最终代码:

select    category_name as category,    count(t2.videoId) as hot_with_viewsfrom (    select        videoId,        category_name    from (        select            *        from            gulivideo_orc        order by            views        desc limit            20) t1 lateral view explode(category) t_catetory as category_name) t2group by    category_nameorder by    hot_with_viewsdesc;

 

思路:10.4.4 统计视频观看数Top50所关联视频的所属类别Rank

1) 查询出观看数最多的前50个视频的所有信息(当然包含了每个视频对应的关联视频),记为临时表t1

t1:观看数前50的视频

select

    *

from

    gulivideo_orc

order by

    views

desc limit

    50;

2) 将找到的50条视频信息的相关视频relatedId列转行,记为临时表t2

t2:将相关视频的id进行列转行操作

select

    explode(relatedId) as videoId

from

t1;

3) 将相关视频的idgulivideo_orc表进行inner join操作

t5:得到两列数据,一列是category,一列是之前查询出来的相关视频id

 (select

    distinct(t2.videoId),

    t3.category

from

    t2

inner join

    gulivideo_orc t3 on t2.videoId = t3.videoId) t4 lateral view explode(category) t_catetory as category_name;

4) 按照视频类别进行分组,统计每组视频个数,然后排行

 

a.统计观看数前50的视频&关联视频select    videoId, views, category, relatedIdfrom    gulivideo_orcorder by    views DESClimit 50; t1b.炸开关联视频id 去重select DISTINCT(relatedId_name)from t1lateral view explode(relatedId) relatedId_t as relatedId_name; t2c.关联视频所属类别select *from t2 join gulivideo_orc t3on t2.relatedId_name=t3.videoId; t4d.炸开关联视频的类别select * from t4lateral view explode(category) category_t as category_name; t5e.统计类别个数SELECT category_name,count(*) hotfrom t5GROUP BY category_name; t6f.统计类别RankSELECT * FROM t6 order by hot desc;

 

 

 

最终代码:

SELECT *FROM (    SELECT category_name, COUNT(*) AS hot    FROM (        SELECT *        FROM (            SELECT *            FROM (                SELECT DISTINCT relatedId_name                FROM (                    SELECT videoId, views, category, relatedId                    FROM gulivideo_orc                    ORDER BY views DESC                    LIMIT 50                ) t1                    LATERAL VIEW explode(relatedId) relatedId_t AS relatedId_name            ) t2                JOIN gulivideo_orc t3 ON t2.relatedId_name = t3.videoId        ) t4            LATERAL VIEW explode(category) category_t AS category_name    ) t5    GROUP BY category_name) t6ORDER BY hot DESC;

 

 

源码:

select

    category_name as category,

    count(t5.videoId) as hot

from (

    select

        videoId,

        category_name

    from (

        select

            distinct(t2.videoId),

            t3.category

        from (

            select

                explode(relatedId) as videoId

            from (

                select

                    *

                from

                    gulivideo_orc

                order by

                    views

                desc limit

                    50) t1) t2

        inner join

            gulivideo_orc t3 on t2.videoId = t3.videoId) t4 lateral view explode(category) t_catetory as category_name) t5

group by

    category_name

order by

    hot

desc;

 

10.4.5 统计每个类别中的视频热度Top10,以Music为例

思路:

1) 要想统计Music类别中的视频热度Top10,需要先找到Music类别,那么就需要将category展开,所以可以创建一张表用于存放categoryId展开的数据。

2) category展开的表中插入数据。

3) 统计对应类别(Music)中的视频热度。

最终代码:

创建表类别表:

create table gulivideo_category(    videoId string,    uploader string,    age int,    categoryId string,    length int,    views int,    rate float,    ratings int,    comments int,    relatedId array
)row format delimitedfields terminated by "\t"collection items terminated by "&"stored as orc;

 

向类别表中插入数据:

insert into table gulivideo_category      select        videoId,        uploader,        age,        categoryId,        length,        views,        rate,        ratings,        comments,        relatedId    from        gulivideo_orc lateral view explode(category) catetory as categoryId;

 

统计Music类别的Top10(也可以统计其他) 

select    videoId,    viewsfrom    gulivideo_categorywhere    categoryId = "Music"order by    viewsdesc limit    10;

 

10.4.6 统计每个类别中视频流量Top10,以Music为例

思路:

1) 创建视频类别展开表(categoryId列转行后的表)

2) 按照ratings排序即可

最终代码:

select    videoId,    views,    ratingsfrom    gulivideo_categorywhere    categoryId = "Music"order by    ratingsdesc limit    10;

 

10.4.7 统计上传视频最多的用户Top10以及他们上传的观看次数在前20的视频 

思路:

1) 先找到上传视频最多的10个用户的用户信息

select

    *

from

    gulivideo_user_orc

order by

    videos

desc limit

    10;

 

2) 通过uploader字段与gulivideo_orc表进行join,得到的信息按照views观看次数进行排序即可。

最终代码:

select

    t2.videoId,

    t2.views,

    t2.ratings,

    t1.videos,

    t1.friends

from (

    select

        *

    from

        gulivideo_user_orc

    order by

        videos desc

    limit

        10) t1

join

    gulivideo_orc t2

on

    t1.uploader = t2.uploader

order by

    views desc

limit

    20;

a.上传视频数钱20的人SELECT uploader FROM gulivideo_user_orc ORDER BY videos DESC LIMIT 10; t1b.这10个人所上传的视频SELECT * FROM t1jion gulivideo_orc t2on t1.uploader = t2.uploader; t3c.观看次数在前20的视频SELECT *FROM t3ORDER BY viewsDESCLIMIT 20;SELECT *FROM (SELECT * FROM (SELECT uploader,videos FROM gulivideo_user_orc ORDER BY videos DESC LIMIT 10) t1join gulivideo_orc t2on t1.uploader = t2.uploader) t3ORDER BY viewsDESCLIMIT 20;SELECT *FROM (    SELECT views, videoId, t1.uploader    FROM (        SELECT uploader, videos        FROM gulivideo_user_orc        ORDER BY videos DESC        LIMIT 10    ) t1        JOIN gulivideo_orc t2 ON t1.uploader = t2.uploader) t3ORDER BY views DESCLIMIT 20;

 

10.4.8 统计每个类别视频观看数Top10

思路:

1) 先得到categoryId展开的表数据

2) 子查询按照categoryId进行分区,然后分区内排序,并生成递增数字,该递增数字这一列起名为rank

3) 通过子查询产生的临时表,查询rank值小于等于10的数据行即可。

最终代码:

select    t1.*from (    select        videoId,        categoryId,        views,        row_number() over(partition by categoryId order by views desc) rank from gulivideo_category) t1where    rank <= 10;

 

 

 

10.4.9.统计视频观看数Top20所属类别

 

9.统计出视频观看数最高的20个视频的所属类别以及类别包含Top20视频的个数select    category_name as category,    count(t2.videoId) as hot_with_viewsfrom (    select        videoId,        category_name    from (        select            *        from            gulivideo_orc        order by            views        desc limit            20) t1 lateral view explode(category) t_catetory as category_name) t2group by    category_nameorder by    hot_with_viewsdesc;

 

转载于:https://www.cnblogs.com/LXL616/p/10981641.html

你可能感兴趣的文章
Java学习——Applet写字符串(调字体)
查看>>
react路由
查看>>
nyoj 220——推桌子——————【贪心】
查看>>
java 静态方法分析
查看>>
codevs——4189 字典&&HihoCoder #1014 : Trie树
查看>>
洛谷——P1602 Sramoc问题
查看>>
【MySQL笔记】字符串、时间日期转换
查看>>
jQuery实战之仿淘宝商城左侧导航效果
查看>>
AC日记——「SCOI2016」幸运数字 LiBreOJ 2013
查看>>
unmount
查看>>
数据库连接池
查看>>
javascript获得和设置以及移除元素属性的三个方法
查看>>
windwos iis 7.5 使用html 报405错误
查看>>
范围(地址转换)
查看>>
Unity3D游戏,TCP,WEBCOSKT,HTTP通信架构 weaving-socket
查看>>
【小程序入门集锦】19,微信小程序个人帐号申请
查看>>
php写一个简单的计算器
查看>>
【JAVA零基础入门系列】Day3 Java基本数据类型
查看>>
两个整数,求他们的最小公倍数和最大公约数
查看>>
Mongo索引
查看>>