feat: wordcount full code

This commit is contained in:
fly6516 2025-02-28 01:23:23 +00:00
commit ff3e4c7b1b
7 changed files with 168 additions and 0 deletions

3
.idea/.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
# Default ignored files
/shelf/
/workspace.xml

1
data/file1.txt Normal file
View File

@ -0,0 +1 @@
Hello World

1
data/file2.txt Normal file
View File

@ -0,0 +1 @@
Hello MapReduce

16
pom.xml Normal file
View File

@ -0,0 +1,16 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>WordCount</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
</project>

37
src/main/java/WcMap.java Normal file
View File

@ -0,0 +1,37 @@
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/***
*
* @author Administrator
* 14个泛型中前两个是指定mapper输入数据的类型KEYIN是输入的key的类型VALUEIN是输入的value的值
* KEYOUT是输入的key的类型VALUEOUT是输入的value的值
* 2map和reduce的数据输入和输出都是以key-value的形式封装的
* 3默认情况下框架传递给我们的mapper的输入数据中key是要处理的文本中一行的起始偏移量这一行的内容作为value
* 4key-value数据是在网络中进行传递节点和节点之间互相传递在网络之间传输就需要序列化但是jdk自己的序列化很冗余
* 所以使用hadoop自己封装的数据类型而不要使用jdk自己封装的数据类型
* Long--->LongWritable
* String--->Text
*/
public class WcMap extends Mapper<LongWritable, Text, Text, LongWritable>{
//重写map这个方法
//mapreduce框架每读一行数据就调用一次该方法
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//具体业务逻辑就写在这个方法体中而且我们业务要处理的数据已经被框架传递进来在方法的参数中key-value
//key是这一行数据的起始偏移量value是这一行的文本内容
//1:
String str = value.toString();
//2:切分单词,空格隔开,返回切分开的单词
String[] words = StringUtils.split(str," ");
//3:遍历这个单词数组输出为key-value的格式将单词发送给reduce
for(String word : words){
//输出的key是Text类型的value是LongWritable类型的
context.write(new Text(word), new LongWritable(1));
}
}
}

View File

@ -0,0 +1,31 @@
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/***
*
* @author Administrator
* 1:reduce的四个参数第一个key-value是map的输出作为reduce的输入,第二个key-value是输出单词和次数所以
* 是Text,LongWritable的格式
*/
public class WcReduce extends Reducer<Text, LongWritable, Text, LongWritable> {
//继承Reducer之后重写reduce方法
//第一个参数是key第二个参数是集合
//框架在map处理完成之后将所有key-value对缓存起来进行分组然后传递一个组<key,valus{}>调用一次reduce方法
//<hello,{1,1,1,1,1,1.....}>
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context)
throws IOException, InterruptedException {
//将values进行累加操作进行计数
long count = 0;
//遍历value的list进行累加求和
for (LongWritable value : values) {
count += value.get();
}
//输出这一个单词的统计结果
//输出放到hdfs的某一个目录上面输入也是在hdfs的某一个目录
context.write(key, new LongWritable(count));
}
}

View File

@ -0,0 +1,79 @@
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.util.Scanner;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import java.net.URI;
/***
* 1:用来描述一个特定的作业
* 比如,该作业使用哪个类作为逻辑处理中的map,那个作为reduce
* 2:还可以指定该作业要处理的数据所在的路径
* 还可以指定改作业输出的结果放到哪个路径
* @author Administrator
*
*/
public class WcRunner{
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//创建配置文件
Configuration conf = new Configuration();
//获取一个作业
Job job = Job.getInstance(conf);
//设置整个job所用的那些类在哪个jar包
job.setJarByClass(WcRunner.class);
//本job使用的mapper和reducer的类
job.setMapperClass(WcMap.class);
job.setReducerClass(WcReduce.class);
//指定reduce的输出数据key-value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//指定mapper的输出数据key-value类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
Scanner sc = new Scanner(System.in);
System.out.print("inputPath:");
String inputPath = sc.next();
System.out.print("outputPath:");
String outputPath = sc.next();
//指定要处理的输入数据存放路径
FileInputFormat.setInputPaths(job, new Path("hdfs://master:9000"+inputPath));
//指定处理结果的输出数据存放路径
FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000"+outputPath));
//将job提交给集群运行
job.waitForCompletion(true);
//输出结果
try {
FileSystem fs = FileSystem.get(new URI("hdfs://master:9000"), new Configuration());
Path srcPath = new Path(outputPath+"/part-r-00000");
FSDataInputStream is = fs.open(srcPath);
System.out.println("Results:");
while(true) {
String line = is.readLine();
if(line == null) {
break;
}
System.out.println(line);
}
is.close();
}catch(Exception e) {
e.printStackTrace();
}
}
}