From e490bfb3b76435c2123bdc6a56bc32012e79d4e6 Mon Sep 17 00:00:00 2001 From: canghaihongxin Date: Tue, 8 Jun 2021 15:53:06 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0MapReduce=E6=A1=88=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../hadoopdemo/HadoopDemoApplication.java | 2 +- .../hadoopdemo/config/HadoopConfig.java | 2 +- .../controller/MapReduceController.java | 18 +++++ .../hadoopdemo/mapReduce/MapReduceClient.java | 15 ++++ .../hadoopdemo/mapReduce/ReduceJobsUtil.java | 44 ++++++++++- .../mapReduce/phoneCount/FlowBean.java | 75 +++++++++++++++++++ .../mapReduce/phoneCount/FlowCountMapper.java | 43 +++++++++++ .../phoneCount/FlowCountReducer.java | 36 +++++++++ src/main/resources/application.yml | 6 +- 9 files changed, 234 insertions(+), 7 deletions(-) create mode 100644 src/main/java/com/example/hadoopdemo/mapReduce/phoneCount/FlowBean.java create mode 100644 src/main/java/com/example/hadoopdemo/mapReduce/phoneCount/FlowCountMapper.java create mode 100644 src/main/java/com/example/hadoopdemo/mapReduce/phoneCount/FlowCountReducer.java diff --git a/src/main/java/com/example/hadoopdemo/HadoopDemoApplication.java b/src/main/java/com/example/hadoopdemo/HadoopDemoApplication.java index cc394ce..ca6ff5c 100644 --- a/src/main/java/com/example/hadoopdemo/HadoopDemoApplication.java +++ b/src/main/java/com/example/hadoopdemo/HadoopDemoApplication.java @@ -15,7 +15,7 @@ public class HadoopDemoApplication implements CommandLineRunner { private final FsShell fsShell; public static void main(String[] args) { - System.setProperty("hadoop.home.dir", "D:\\IDEA\\hadoop-3.1.2"); +// System.setProperty("hadoop.home.dir", "D:\\IDEA\\hadoop-3.1.2"); SpringApplication.run(HadoopDemoApplication.class, args); } diff --git a/src/main/java/com/example/hadoopdemo/config/HadoopConfig.java b/src/main/java/com/example/hadoopdemo/config/HadoopConfig.java index 42f75eb..d398f12 100644 --- a/src/main/java/com/example/hadoopdemo/config/HadoopConfig.java +++ b/src/main/java/com/example/hadoopdemo/config/HadoopConfig.java @@ -38,7 +38,7 @@ public class HadoopConfig { FileSystem fs = null; try { URI uri = new URI(hadoopProperties.getDirectoryPath().trim()); - fs = FileSystem.get(uri, this.getConfiguration(hadoopProperties)); + fs = FileSystem.get(uri, this.getConfiguration(hadoopProperties),"root"); } catch (Exception e) { log.error("【FileSystem配置初始化失败】", e); } diff --git a/src/main/java/com/example/hadoopdemo/controller/MapReduceController.java b/src/main/java/com/example/hadoopdemo/controller/MapReduceController.java index e71f83e..2aa27bc 100644 --- a/src/main/java/com/example/hadoopdemo/controller/MapReduceController.java +++ b/src/main/java/com/example/hadoopdemo/controller/MapReduceController.java @@ -9,6 +9,9 @@ import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; +import javax.print.attribute.standard.JobName; +import java.io.IOException; + /** * MapReduce操作相关接口 * @author Ruison @@ -37,4 +40,19 @@ public class MapReduceController { mapReduceClient.wordCount(jobName, inputPath); return BaseResponse.ok("单词统计成功"); } + + + @PostMapping("flowCount") + public BaseResponse phoneCount(@RequestParam("jobName")String jobName, + @RequestParam("inputPath") String inputPath) throws IOException, ClassNotFoundException, InterruptedException { + + if (StringUtils.isEmpty(jobName) || StringUtils.isEmpty(inputPath)) { + return BaseResponse.error("请求参数为空"); + } + + mapReduceClient.phoneCount(jobName,inputPath); + return BaseResponse.ok("手机流量统计成功"); + + + } } diff --git a/src/main/java/com/example/hadoopdemo/mapReduce/MapReduceClient.java b/src/main/java/com/example/hadoopdemo/mapReduce/MapReduceClient.java index 7906987..6b8ba8b 100644 --- a/src/main/java/com/example/hadoopdemo/mapReduce/MapReduceClient.java +++ b/src/main/java/com/example/hadoopdemo/mapReduce/MapReduceClient.java @@ -6,6 +6,8 @@ import lombok.SneakyThrows; import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Component; +import java.io.IOException; + /** * @author Ruison * on 2019/7/6 - 15:18 @@ -34,4 +36,17 @@ public class MapReduceClient { hadoopClient.rmdir(outputPath, null); reduceJobsUtils.getWordCountJobsConf(jobName, inputPath, outputPath); } + + public void phoneCount(String jobName, String inputPath) throws IOException, InterruptedException, ClassNotFoundException { + + if (StringUtils.isEmpty(jobName) || StringUtils.isEmpty(inputPath)) { + return; + } + // 输出目录 = output/当前Job,如果输出路径存在则删除,保证每次都是最新的 + String outputPath = OUTPUT_PATH + "/" + jobName; + hadoopClient.rmdir(outputPath, null); + + reduceJobsUtils.getPhoneCountJobsConf(jobName, inputPath, outputPath); + + } } diff --git a/src/main/java/com/example/hadoopdemo/mapReduce/ReduceJobsUtil.java b/src/main/java/com/example/hadoopdemo/mapReduce/ReduceJobsUtil.java index 0c5a4b7..3558de9 100644 --- a/src/main/java/com/example/hadoopdemo/mapReduce/ReduceJobsUtil.java +++ b/src/main/java/com/example/hadoopdemo/mapReduce/ReduceJobsUtil.java @@ -2,6 +2,9 @@ package com.example.hadoopdemo.mapReduce; import com.example.hadoopdemo.HadoopDemoApplication; import com.example.hadoopdemo.config.HadoopConfig; +import com.example.hadoopdemo.mapReduce.phoneCount.FlowBean; +import com.example.hadoopdemo.mapReduce.phoneCount.FlowCountMapper; +import com.example.hadoopdemo.mapReduce.phoneCount.FlowCountReducer; import com.example.hadoopdemo.props.HadoopProperties; import lombok.AllArgsConstructor; import org.apache.hadoop.conf.Configuration; @@ -30,8 +33,8 @@ public class ReduceJobsUtil { /** * 获取单词统计的配置信息 * - * @param jobName 名称 - * @param inputPath 分词地址 + * @param jobName 名称 + * @param inputPath 分词地址 * @param outputPath 输出地址 * @throws Exception 异常 */ @@ -61,4 +64,41 @@ public class ReduceJobsUtil { job.waitForCompletion(true); } + /** + * 按照手机号统计流量 + * + * @param jobName 名称 + * @param inputPath 分词地址 + * @param outputPath 输出地址 + * @throws Exception 异常 + */ + public void getPhoneCountJobsConf(String jobName, String inputPath, String outputPath) throws IOException, ClassNotFoundException, InterruptedException { + HadoopConfig hadoopConfig = new HadoopConfig(); + Job job = Job.getInstance(hadoopConfig.getConfiguration(hadoopProperties), jobName); + job.setJarByClass(HadoopDemoApplication.class); + + + job.setMapperClass(FlowCountMapper.class); + job.setReducerClass(FlowCountReducer.class); + + + // 3 指定mapper输出数据的kv类型 + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(FlowBean.class); + + // 4 指定最终输出的数据的kv类型 + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(FlowBean.class); + + // 5 指定job的输入原始文件所在目录 + FileInputFormat.addInputPath(job, new Path(inputPath)); + FileOutputFormat.setOutputPath(job, new Path(outputPath)); + + + // 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行 + job.waitForCompletion(true); + + } + + } diff --git a/src/main/java/com/example/hadoopdemo/mapReduce/phoneCount/FlowBean.java b/src/main/java/com/example/hadoopdemo/mapReduce/phoneCount/FlowBean.java new file mode 100644 index 0000000..a87a306 --- /dev/null +++ b/src/main/java/com/example/hadoopdemo/mapReduce/phoneCount/FlowBean.java @@ -0,0 +1,75 @@ +package com.example.hadoopdemo.mapReduce.phoneCount; + +import lombok.Data; +import org.apache.hadoop.io.Writable; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * @author 田培融 + * @date 2021/6/8 14:47 + */ +@Data +public class FlowBean implements Writable { + + private long upFlow; + private long downFlow; + private long sumFlow; + + + //2 反序列化时,需要反射调用空参构造函数,所以必须有 + public FlowBean() { + super(); + } + + + public FlowBean(long upFlow, long downFlow) { + super(); + this.upFlow = upFlow; + this.downFlow = downFlow; + this.sumFlow = upFlow + downFlow; + } + + public void set(long upFlow, long downFlow){ + this.upFlow = upFlow; + this.downFlow = downFlow; + this.sumFlow = upFlow + downFlow; + } + + /** + * 写序列化方法 + * @param output output + * @throws IOException IOException + */ + @Override + public void write(DataOutput output) throws IOException { + output.writeLong(upFlow); + output.writeLong(downFlow); + output.writeLong(sumFlow); + } + + /** + * 反序列化方法 + * 反序列化方法读顺序必须和写序列化方法的写顺序必须一致 + * @param dataInput dataInput + * @throws IOException IOException + */ + @Override + public void readFields(DataInput dataInput) throws IOException { + this.upFlow = dataInput.readLong(); + this.downFlow = dataInput.readLong(); + this.sumFlow = dataInput.readLong(); + + } + + @Override + public String toString() { + return upFlow + "\t" + downFlow + "\t" + sumFlow; + } + + + + +} diff --git a/src/main/java/com/example/hadoopdemo/mapReduce/phoneCount/FlowCountMapper.java b/src/main/java/com/example/hadoopdemo/mapReduce/phoneCount/FlowCountMapper.java new file mode 100644 index 0000000..240a050 --- /dev/null +++ b/src/main/java/com/example/hadoopdemo/mapReduce/phoneCount/FlowCountMapper.java @@ -0,0 +1,43 @@ +package com.example.hadoopdemo.mapReduce.phoneCount; + + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; + +import java.io.IOException; + +/** + * @author 田培融 + * @date 2021/6/8 15:23 + */ + +public class FlowCountMapper extends Mapper { + + FlowBean v = new FlowBean(); + Text k = new Text(); + + @Override + protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { + + String line = value.toString(); + + // 2 切割字段 + String[] fields = line.split("\t"); + + // 3 封装对象 + // 取出手机号码 + String phoneNum = fields[1]; + + // 取出上行流量和下行流量 + long upFlow = Long.parseLong(fields[fields.length - 3]); + long downFlow = Long.parseLong(fields[fields.length - 2]); + + k.set(phoneNum); + v.set(downFlow, upFlow); + // 4 写出 + context.write(k, v); + + + } +} diff --git a/src/main/java/com/example/hadoopdemo/mapReduce/phoneCount/FlowCountReducer.java b/src/main/java/com/example/hadoopdemo/mapReduce/phoneCount/FlowCountReducer.java new file mode 100644 index 0000000..4f96b88 --- /dev/null +++ b/src/main/java/com/example/hadoopdemo/mapReduce/phoneCount/FlowCountReducer.java @@ -0,0 +1,36 @@ +package com.example.hadoopdemo.mapReduce.phoneCount; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Reducer; + +import java.io.IOException; + +/** + * @author 田培融 + * @date 2021/6/8 15:35 + */ + +public class FlowCountReducer extends Reducer { + + @Override + protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { + + long sum_upFlow = 0; + long sum_downFlow = 0; + + // 1 遍历所用bean,将其中的上行流量,下行流量分别累加 + for (FlowBean flowBean : values) { + sum_upFlow += flowBean.getUpFlow(); + sum_downFlow += flowBean.getDownFlow(); + } + + // 2 封装对象 + FlowBean resultBean = new FlowBean(sum_upFlow, sum_downFlow); + // 3 写出 + + System.out.println("============ " + key + " 流量分词为: " + resultBean + " ============"); + + context.write(key, resultBean); + + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 18220ac..7532e95 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,9 +1,9 @@ # spring boot Hadoop spring: hadoop: - fs-uri: hdfs://127.0.0.1:9000 + fs-uri: hdfs://192.168.159.130:9000 # Hadoop client hadoop: - name-node: hdfs://127.0.0.1:9000/ - directory-path: /data \ No newline at end of file + name-node: hdfs://192.168.159.130:9000/ + directory-path: /data -- Gitee