fix bugs and add suitable code for testing
This commit is contained in:
parent
2c5af64557
commit
d77b5adbed
7
.idea/encodings.xml
Normal file
7
.idea/encodings.xml
Normal file
@ -0,0 +1,7 @@
|
|||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<project version="4">
|
||||||
|
<component name="Encoding">
|
||||||
|
<file url="file://$PROJECT_DIR$/src/main/java" charset="UTF-8" />
|
||||||
|
<file url="file://$PROJECT_DIR$/src/main/resources" charset="UTF-8" />
|
||||||
|
</component>
|
||||||
|
</project>
|
@ -1,3 +1,4 @@
|
|||||||
|
import org.apache.hadoop.io.DoubleWritable;
|
||||||
import org.apache.hadoop.io.LongWritable;
|
import org.apache.hadoop.io.LongWritable;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.mapreduce.Mapper;
|
import org.apache.hadoop.mapreduce.Mapper;
|
||||||
|
@ -1,63 +1,107 @@
|
|||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.io.DoubleWritable;
|
import org.apache.hadoop.io.DoubleWritable;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.mapreduce.Job;
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
|
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
|
||||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||||
|
|
||||||
|
import java.net.URI;
|
||||||
|
import java.util.Scanner;
|
||||||
|
|
||||||
public class Main {
|
public class Main {
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
|
|
||||||
if (args.length < 2) {
|
// if (args.length < 2) {
|
||||||
System.err.println("Usage: Main <input path> <output path>");
|
// System.err.println("Usage: Main <input path> <output path>");
|
||||||
|
// System.exit(-1);
|
||||||
|
// }
|
||||||
|
|
||||||
|
Scanner sc = new Scanner(System.in);
|
||||||
|
System.out.print("inputPath:");
|
||||||
|
//String inputPath = sc.next();
|
||||||
|
String inputPath = "/mapreduce/BD-exp-2/input";
|
||||||
|
System.out.print("outputPath:");
|
||||||
|
//String outputPathBase = sc.next();
|
||||||
|
String outputPathBase = "/mapreduce/BD-exp-2/output";
|
||||||
|
|
||||||
|
// 检查输入路径是否存在
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
FileSystem fs = FileSystem.get(new URI("hdfs://master:9000"), conf);
|
||||||
|
if (!fs.exists(new Path(inputPath))) {
|
||||||
|
System.err.println("Input path does not exist: " + inputPath);
|
||||||
System.exit(-1);
|
System.exit(-1);
|
||||||
}
|
}
|
||||||
|
|
||||||
String inputPath = args[0];
|
|
||||||
String outputPathBase = args[1];
|
|
||||||
|
|
||||||
runMonthlyResidentialPowerJob(inputPath, outputPathBase + "/monthly_residential");
|
runMonthlyResidentialPowerJob(inputPath, outputPathBase + "/monthly_residential");
|
||||||
runAnnualIndustrialPowerJob(inputPath, outputPathBase + "/annual_industrial");
|
runAnnualIndustrialPowerJob(inputPath, outputPathBase + "/annual_industrial");
|
||||||
runProvinceMaxDailyPowerJob(inputPath, outputPathBase + "/province_max_daily");
|
runProvinceMaxDailyPowerJob(inputPath, outputPathBase + "/province_max_daily");
|
||||||
|
|
||||||
|
// 打印结果
|
||||||
|
printResults(outputPathBase + "/monthly_residential");
|
||||||
|
printResults(outputPathBase + "/annual_industrial");
|
||||||
|
printResults(outputPathBase + "/province_max_daily");
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void printResults(String outputPath) throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
FileSystem fs = FileSystem.get(new URI("hdfs://master:9000"), conf);
|
||||||
|
Path srcPath = new Path(outputPath + "/part-r-00000");
|
||||||
|
|
||||||
|
if (fs.exists(srcPath)) {
|
||||||
|
FSDataInputStream is = fs.open(srcPath);
|
||||||
|
System.out.println("Results from " + outputPath + ":");
|
||||||
|
while (true) {
|
||||||
|
String line = is.readLine();
|
||||||
|
if (line == null) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
System.out.println(line);
|
||||||
|
}
|
||||||
|
is.close();
|
||||||
|
} else {
|
||||||
|
System.err.println("Output file does not exist: " + srcPath);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void runMonthlyResidentialPowerJob(String inputPath, String outputPath) throws Exception {
|
private static void runMonthlyResidentialPowerJob(String inputPath, String outputPath) throws Exception {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
Job job = Job.getInstance(conf, "Monthly Residential Power Usage");
|
Job job = Job.getInstance(conf, "Monthly Residential Power Usage");
|
||||||
job.setJarByClass(Main.class);
|
job.setJarByClass(Main.class); // 显式设置Jar文件
|
||||||
job.setMapperClass(MonthlyResidentialPowerMapper.class);
|
job.setMapperClass(MonthlyResidentialPowerMapper.class);
|
||||||
job.setReducerClass(MonthlyResidentialPowerReducer.class);
|
job.setReducerClass(MonthlyResidentialPowerReducer.class);
|
||||||
job.setOutputKeyClass(Text.class);
|
job.setOutputKeyClass(Text.class);
|
||||||
job.setOutputValueClass(DoubleWritable.class);
|
job.setOutputValueClass(DoubleWritable.class);
|
||||||
FileInputFormat.addInputPath(job, new Path(inputPath));
|
FileInputFormat.addInputPath(job, new Path("hdfs://master:9000" + inputPath));
|
||||||
FileOutputFormat.setOutputPath(job, new Path(outputPath));
|
FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000" + outputPath));
|
||||||
job.waitForCompletion(true);
|
job.waitForCompletion(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void runAnnualIndustrialPowerJob(String inputPath, String outputPath) throws Exception {
|
private static void runAnnualIndustrialPowerJob(String inputPath, String outputPath) throws Exception {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
Job job = Job.getInstance(conf, "Annual Industrial Power Usage");
|
Job job = Job.getInstance(conf, "Annual Industrial Power Usage");
|
||||||
job.setJarByClass(Main.class);
|
job.setJarByClass(Main.class); // 显式设置Jar文件
|
||||||
job.setMapperClass(AnnualIndustrialPowerMapper.class);
|
job.setMapperClass(AnnualIndustrialPowerMapper.class);
|
||||||
job.setReducerClass(AnnualIndustrialPowerReducer.class);
|
job.setReducerClass(AnnualIndustrialPowerReducer.class);
|
||||||
job.setOutputKeyClass(Text.class);
|
job.setOutputKeyClass(Text.class);
|
||||||
job.setOutputValueClass(DoubleWritable.class);
|
job.setOutputValueClass(DoubleWritable.class);
|
||||||
FileInputFormat.addInputPath(job, new Path(inputPath));
|
FileInputFormat.addInputPath(job, new Path("hdfs://master:9000" + inputPath));
|
||||||
FileOutputFormat.setOutputPath(job, new Path(outputPath));
|
FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000" + outputPath));
|
||||||
job.waitForCompletion(true);
|
job.waitForCompletion(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void runProvinceMaxDailyPowerJob(String inputPath, String outputPath) throws Exception {
|
private static void runProvinceMaxDailyPowerJob(String inputPath, String outputPath) throws Exception {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
Job job = Job.getInstance(conf, "Province Max Daily Power Usage");
|
Job job = Job.getInstance(conf, "Province Max Daily Power Usage");
|
||||||
job.setJarByClass(Main.class);
|
job.setJarByClass(Main.class); // 显式设置Jar文件
|
||||||
job.setMapperClass(ProvinceMaxDailyPowerMapper.class);
|
job.setMapperClass(ProvinceMaxDailyPowerMapper.class);
|
||||||
job.setReducerClass(ProvinceMaxDailyPowerReducer.class);
|
job.setReducerClass(ProvinceMaxDailyPowerReducer.class);
|
||||||
job.setOutputKeyClass(Text.class);
|
job.setOutputKeyClass(Text.class);
|
||||||
job.setOutputValueClass(Text.class);
|
job.setOutputValueClass(Text.class);
|
||||||
FileInputFormat.addInputPath(job, new Path(inputPath));
|
FileInputFormat.addInputPath(job, new Path("hdfs://master:9000" + inputPath));
|
||||||
FileOutputFormat.setOutputPath(job, new Path(outputPath));
|
FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000" + outputPath));
|
||||||
job.waitForCompletion(true);
|
job.waitForCompletion(true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
import org.apache.hadoop.io.DoubleWritable;
|
||||||
import org.apache.hadoop.io.LongWritable;
|
import org.apache.hadoop.io.LongWritable;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.mapreduce.Mapper;
|
import org.apache.hadoop.mapreduce.Mapper;
|
||||||
|
Loading…
Reference in New Issue
Block a user