From ff3e4c7b1bcf946c591048b744366e6663e5d873 Mon Sep 17 00:00:00 2001 From: fly6516 Date: Fri, 28 Feb 2025 01:23:23 +0000 Subject: [PATCH] feat: wordcount full code --- .idea/.gitignore | 3 ++ data/file1.txt | 1 + data/file2.txt | 1 + pom.xml | 16 ++++++++ src/main/java/WcMap.java | 37 +++++++++++++++++ src/main/java/WcReduce.java | 31 +++++++++++++++ src/main/java/WcRunner.java | 79 +++++++++++++++++++++++++++++++++++++ 7 files changed, 168 insertions(+) create mode 100644 .idea/.gitignore create mode 100644 data/file1.txt create mode 100644 data/file2.txt create mode 100644 pom.xml create mode 100644 src/main/java/WcMap.java create mode 100644 src/main/java/WcReduce.java create mode 100644 src/main/java/WcRunner.java diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..26d3352 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,3 @@ +# Default ignored files +/shelf/ +/workspace.xml diff --git a/data/file1.txt b/data/file1.txt new file mode 100644 index 0000000..557db03 --- /dev/null +++ b/data/file1.txt @@ -0,0 +1 @@ +Hello World diff --git a/data/file2.txt b/data/file2.txt new file mode 100644 index 0000000..0123650 --- /dev/null +++ b/data/file2.txt @@ -0,0 +1 @@ +Hello MapReduce diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..7566664 --- /dev/null +++ b/pom.xml @@ -0,0 +1,16 @@ + + + 4.0.0 + + org.example + WordCount + 1.0-SNAPSHOT + + + 8 + 8 + + + \ No newline at end of file diff --git a/src/main/java/WcMap.java b/src/main/java/WcMap.java new file mode 100644 index 0000000..19bfc94 --- /dev/null +++ b/src/main/java/WcMap.java @@ -0,0 +1,37 @@ +import java.io.IOException; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; +/*** + * + * @author Administrator + * 1:4个泛型中,前两个是指定mapper输入数据的类型,KEYIN是输入的key的类型,VALUEIN是输入的value的值 + * KEYOUT是输入的key的类型,VALUEOUT是输入的value的值 + * 2:map和reduce的数据输入和输出都是以key-value的形式封装的。 + * 3:默认情况下,框架传递给我们的mapper的输入数据中,key是要处理的文本中一行的起始偏移量,这一行的内容作为value + * 4:key-value数据是在网络中进行传递,节点和节点之间互相传递,在网络之间传输就需要序列化,但是jdk自己的序列化很冗余 + * 所以使用hadoop自己封装的数据类型,而不要使用jdk自己封装的数据类型; + * Long--->LongWritable + * String--->Text + */ +public class WcMap extends Mapper{ + //重写map这个方法 + //mapreduce框架每读一行数据就调用一次该方法 + @Override + protected void map(LongWritable key, Text value, Context context) + throws IOException, InterruptedException { + //具体业务逻辑就写在这个方法体中,而且我们业务要处理的数据已经被框架传递进来,在方法的参数中key-value + //key是这一行数据的起始偏移量,value是这一行的文本内容 + + //1: + String str = value.toString(); + //2:切分单词,空格隔开,返回切分开的单词 + String[] words = StringUtils.split(str," "); + //3:遍历这个单词数组,输出为key-value的格式,将单词发送给reduce + for(String word : words){ + //输出的key是Text类型的,value是LongWritable类型的 + context.write(new Text(word), new LongWritable(1)); + } + } +} diff --git a/src/main/java/WcReduce.java b/src/main/java/WcReduce.java new file mode 100644 index 0000000..399bafa --- /dev/null +++ b/src/main/java/WcReduce.java @@ -0,0 +1,31 @@ +import java.io.IOException; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Reducer; +/*** + * + * @author Administrator + * 1:reduce的四个参数,第一个key-value是map的输出作为reduce的输入,第二个key-value是输出单词和次数,所以 + * 是Text,LongWritable的格式; + */ +public class WcReduce extends Reducer { + //继承Reducer之后重写reduce方法 + //第一个参数是key,第二个参数是集合。 + //框架在map处理完成之后,将所有key-value对缓存起来,进行分组,然后传递一个组,调用一次reduce方法 + // + @Override + protected void reduce(Text key, Iterable values, Context context) + throws IOException, InterruptedException { + //将values进行累加操作,进行计数 + long count = 0; + //遍历value的list,进行累加求和 + for (LongWritable value : values) { + + count += value.get(); + } + + //输出这一个单词的统计结果 + //输出放到hdfs的某一个目录上面,输入也是在hdfs的某一个目录 + context.write(key, new LongWritable(count)); + } +} diff --git a/src/main/java/WcRunner.java b/src/main/java/WcRunner.java new file mode 100644 index 0000000..7a2c16e --- /dev/null +++ b/src/main/java/WcRunner.java @@ -0,0 +1,79 @@ +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import java.util.Scanner; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import java.net.URI; +/*** + * 1:用来描述一个特定的作业 + * 比如,该作业使用哪个类作为逻辑处理中的map,那个作为reduce + * 2:还可以指定该作业要处理的数据所在的路径 + * 还可以指定改作业输出的结果放到哪个路径 + * @author Administrator + * + */ +public class WcRunner{ + public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { + //创建配置文件 + Configuration conf = new Configuration(); + //获取一个作业 + Job job = Job.getInstance(conf); + + //设置整个job所用的那些类在哪个jar包 + job.setJarByClass(WcRunner.class); + + //本job使用的mapper和reducer的类 + job.setMapperClass(WcMap.class); + job.setReducerClass(WcReduce.class); + + //指定reduce的输出数据key-value类型 + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(LongWritable.class); + + + //指定mapper的输出数据key-value类型 + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(LongWritable.class); + + Scanner sc = new Scanner(System.in); + System.out.print("inputPath:"); + String inputPath = sc.next(); + System.out.print("outputPath:"); + String outputPath = sc.next(); + + //指定要处理的输入数据存放路径 + FileInputFormat.setInputPaths(job, new Path("hdfs://master:9000"+inputPath)); + + //指定处理结果的输出数据存放路径 + FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000"+outputPath)); + + //将job提交给集群运行 + job.waitForCompletion(true); + + //输出结果 + + try { + FileSystem fs = FileSystem.get(new URI("hdfs://master:9000"), new Configuration()); + Path srcPath = new Path(outputPath+"/part-r-00000"); + + FSDataInputStream is = fs.open(srcPath); + System.out.println("Results:"); + while(true) { + String line = is.readLine(); + if(line == null) { + break; + } + System.out.println(line); + } + is.close(); + }catch(Exception e) { + e.printStackTrace(); + } + } +} \ No newline at end of file