diff --git a/.idea/encodings.xml b/.idea/encodings.xml new file mode 100644 index 0000000..aa00ffa --- /dev/null +++ b/.idea/encodings.xml @@ -0,0 +1,7 @@ + + + + + + + \ No newline at end of file diff --git a/src/main/java/AnnualIndustrialPowerMapper.java b/src/main/java/AnnualIndustrialPowerMapper.java index 2f96abb..9d33745 100644 --- a/src/main/java/AnnualIndustrialPowerMapper.java +++ b/src/main/java/AnnualIndustrialPowerMapper.java @@ -1,3 +1,4 @@ +import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; diff --git a/src/main/java/Main.java b/src/main/java/Main.java index 4ab814e..369292e 100644 --- a/src/main/java/Main.java +++ b/src/main/java/Main.java @@ -1,63 +1,107 @@ import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import java.net.URI; +import java.util.Scanner; + public class Main { public static void main(String[] args) throws Exception { - if (args.length < 2) { - System.err.println("Usage: Main "); +// if (args.length < 2) { +// System.err.println("Usage: Main "); +// System.exit(-1); +// } + + Scanner sc = new Scanner(System.in); + System.out.print("inputPath:"); + //String inputPath = sc.next(); + String inputPath = "/mapreduce/BD-exp-2/input"; + System.out.print("outputPath:"); + //String outputPathBase = sc.next(); + String outputPathBase = "/mapreduce/BD-exp-2/output"; + + // 检查输入路径是否存在 + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.get(new URI("hdfs://master:9000"), conf); + if (!fs.exists(new Path(inputPath))) { + System.err.println("Input path does not exist: " + inputPath); System.exit(-1); } - String inputPath = args[0]; - String outputPathBase = args[1]; - runMonthlyResidentialPowerJob(inputPath, outputPathBase + "/monthly_residential"); runAnnualIndustrialPowerJob(inputPath, outputPathBase + "/annual_industrial"); runProvinceMaxDailyPowerJob(inputPath, outputPathBase + "/province_max_daily"); + + // 打印结果 + printResults(outputPathBase + "/monthly_residential"); + printResults(outputPathBase + "/annual_industrial"); + printResults(outputPathBase + "/province_max_daily"); + } + + private static void printResults(String outputPath) throws Exception { + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.get(new URI("hdfs://master:9000"), conf); + Path srcPath = new Path(outputPath + "/part-r-00000"); + + if (fs.exists(srcPath)) { + FSDataInputStream is = fs.open(srcPath); + System.out.println("Results from " + outputPath + ":"); + while (true) { + String line = is.readLine(); + if (line == null) { + break; + } + System.out.println(line); + } + is.close(); + } else { + System.err.println("Output file does not exist: " + srcPath); + } } private static void runMonthlyResidentialPowerJob(String inputPath, String outputPath) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "Monthly Residential Power Usage"); - job.setJarByClass(Main.class); + job.setJarByClass(Main.class); // 显式设置Jar文件 job.setMapperClass(MonthlyResidentialPowerMapper.class); job.setReducerClass(MonthlyResidentialPowerReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DoubleWritable.class); - FileInputFormat.addInputPath(job, new Path(inputPath)); - FileOutputFormat.setOutputPath(job, new Path(outputPath)); + FileInputFormat.addInputPath(job, new Path("hdfs://master:9000" + inputPath)); + FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000" + outputPath)); job.waitForCompletion(true); } private static void runAnnualIndustrialPowerJob(String inputPath, String outputPath) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "Annual Industrial Power Usage"); - job.setJarByClass(Main.class); + job.setJarByClass(Main.class); // 显式设置Jar文件 job.setMapperClass(AnnualIndustrialPowerMapper.class); job.setReducerClass(AnnualIndustrialPowerReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DoubleWritable.class); - FileInputFormat.addInputPath(job, new Path(inputPath)); - FileOutputFormat.setOutputPath(job, new Path(outputPath)); + FileInputFormat.addInputPath(job, new Path("hdfs://master:9000" + inputPath)); + FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000" + outputPath)); job.waitForCompletion(true); } private static void runProvinceMaxDailyPowerJob(String inputPath, String outputPath) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "Province Max Daily Power Usage"); - job.setJarByClass(Main.class); + job.setJarByClass(Main.class); // 显式设置Jar文件 job.setMapperClass(ProvinceMaxDailyPowerMapper.class); job.setReducerClass(ProvinceMaxDailyPowerReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); - FileInputFormat.addInputPath(job, new Path(inputPath)); - FileOutputFormat.setOutputPath(job, new Path(outputPath)); + FileInputFormat.addInputPath(job, new Path("hdfs://master:9000" + inputPath)); + FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000" + outputPath)); job.waitForCompletion(true); } } diff --git a/src/main/java/MonthlyResidentialPowerMapper.java b/src/main/java/MonthlyResidentialPowerMapper.java index 0ee0cd7..e6d6ea5 100644 --- a/src/main/java/MonthlyResidentialPowerMapper.java +++ b/src/main/java/MonthlyResidentialPowerMapper.java @@ -1,3 +1,4 @@ +import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;