BD-exp-5/src/main/java/HBaseEmpManager.java
2025-03-26 11:28:24 +08:00

214 lines
9.1 KiB
Java
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.*;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.*;
public class HBaseEmpManager {
private static Configuration configuration;
private static Connection connection;
private static Admin admin;
private static final String TABLE_NAME = "emp1520";
private static final String CF_EMPNUM = "empnum";
private static final String CF_INFO = "info";
private static final String CF_SALARY = "salary";
private static final String CF_PERFORMANCE = "performance";
private static final String CF_TRAINING = "training";
public static void main(String[] args) throws IOException, NoSuchAlgorithmException {
init(); // 初始化 HBase 连接
createTable(); // 创建表
insertData("emp.txt"); // 读取 emp.txt 插入数据
queryById(); // 查询 ID > 7500 的员工
queryByPerformance(); // 查询绩效评分 > 4 且入职时间 < 2022 年的员工
queryRecentPromotion(); // 查找最近晋升的员工
countTrainingParticipants(); // 统计培训课程的参与人数
close(); // 关闭连接
}
// 初始化 HBase 连接
private static void init() {
configuration = HBaseConfiguration.create();
try {
connection = ConnectionFactory.createConnection(configuration);
admin = connection.getAdmin();
System.out.println("Connected to HBase Successfully!");
} catch (IOException e) {
e.printStackTrace();
}
}
// 关闭 HBase 连接
private static void close() {
try {
if (admin != null) {
admin.close();
}
if (connection != null) {
connection.close();
}
System.out.println("HBase Connection Closed.");
} catch (IOException e) {
e.printStackTrace();
}
}
// 创建 HBase 表
private static void createTable() throws IOException {
TableName tableName = TableName.valueOf(TABLE_NAME);
// 检查表格是否存在
if (admin.tableExists(tableName)) {
System.out.println("" + TABLE_NAME + " 已经存在,准备禁用并删除...");
// 禁用表格
admin.disableTable(tableName);
System.out.println("" + TABLE_NAME + " 已禁用。");
// 删除表格
admin.deleteTable(tableName);
System.out.println("" + TABLE_NAME + " 已删除。");
}
HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
tableDescriptor.addFamily(new HColumnDescriptor(CF_EMPNUM));
tableDescriptor.addFamily(new HColumnDescriptor(CF_INFO));
tableDescriptor.addFamily(new HColumnDescriptor(CF_SALARY));
tableDescriptor.addFamily(new HColumnDescriptor(CF_PERFORMANCE));
tableDescriptor.addFamily(new HColumnDescriptor(CF_TRAINING));
admin.createTable(tableDescriptor);
System.out.println("Table emp1520 created successfully.");
}
// 读取并插入数据
private static void insertData(String filePath) throws IOException, NoSuchAlgorithmException {
Table table = connection.getTable(TableName.valueOf(TABLE_NAME));
BufferedReader br = new BufferedReader(new FileReader(filePath));
String line;
while ((line = br.readLine()) != null) {
String[] emp = line.split(",");
if (emp.length < 12) continue; // 确保数据完整
String rowKey = generateRowKey(emp[0]); // 使用哈希前缀
Put put = new Put(Bytes.toBytes(rowKey));
put.addColumn(Bytes.toBytes(CF_EMPNUM), Bytes.toBytes("empno"), Bytes.toBytes(emp[0]));
put.addColumn(Bytes.toBytes(CF_INFO), Bytes.toBytes("ename"), Bytes.toBytes(emp[1]));
put.addColumn(Bytes.toBytes(CF_INFO), Bytes.toBytes("job"), Bytes.toBytes(emp[2]));
put.addColumn(Bytes.toBytes(CF_INFO), Bytes.toBytes("mgr"), Bytes.toBytes(emp[3]));
put.addColumn(Bytes.toBytes(CF_INFO), Bytes.toBytes("hiredate"), Bytes.toBytes(emp[4]));
put.addColumn(Bytes.toBytes(CF_SALARY), Bytes.toBytes("sal"), Bytes.toBytes(emp[5]));
put.addColumn(Bytes.toBytes(CF_SALARY), Bytes.toBytes("credit"), Bytes.toBytes(emp[6]));
put.addColumn(Bytes.toBytes(CF_INFO), Bytes.toBytes("deptno"), Bytes.toBytes(emp[7]));
put.addColumn(Bytes.toBytes(CF_INFO), Bytes.toBytes("work_hours"), Bytes.toBytes(emp[8]));
put.addColumn(Bytes.toBytes(CF_PERFORMANCE), Bytes.toBytes("performance_rating"), Bytes.toBytes(emp[9]));
put.addColumn(Bytes.toBytes(CF_PERFORMANCE), Bytes.toBytes("promotion_date"), Bytes.toBytes(emp[10]));
put.addColumn(Bytes.toBytes(CF_TRAINING), Bytes.toBytes("training_courses"), Bytes.toBytes(emp[11]));
table.put(put);
}
br.close();
table.close();
System.out.println("Data inserted successfully!");
}
// 生成 RowKey避免 Region 热点
private static String generateRowKey(String empno) throws NoSuchAlgorithmException {
String prefix = empno.substring(0, 3);
MessageDigest md5 = MessageDigest.getInstance("MD5");
byte[] digest = md5.digest(prefix.getBytes());
StringBuilder hexString = new StringBuilder();
for (byte b : digest) {
hexString.append(String.format("%02x", b));
}
return hexString.substring(0, 4) + empno;
}
// 查询 ID > 7500 的员工
private static void queryById() throws IOException {
Table table = connection.getTable(TableName.valueOf(TABLE_NAME));
Scan scan = new Scan();
scan.addColumn(Bytes.toBytes(CF_EMPNUM), Bytes.toBytes("empno"));
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
String empno = Bytes.toString(result.getValue(Bytes.toBytes(CF_EMPNUM), Bytes.toBytes("empno")));
if (empno != null && Integer.parseInt(empno) > 7500) {
System.out.println("ID > 7500 Employee: " + empno);
}
}
table.close();
}
// 查询绩效评分 > 4 且入职时间 < 2022 年
private static void queryByPerformance() throws IOException {
Table table = connection.getTable(TableName.valueOf(TABLE_NAME));
Scan scan = new Scan();
scan.addColumn(Bytes.toBytes(CF_INFO), Bytes.toBytes("hiredate"));
scan.addColumn(Bytes.toBytes(CF_PERFORMANCE), Bytes.toBytes("performance_rating"));
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
String hiredate = Bytes.toString(result.getValue(Bytes.toBytes(CF_INFO), Bytes.toBytes("hiredate")));
String rating = Bytes.toString(result.getValue(Bytes.toBytes(CF_PERFORMANCE), Bytes.toBytes("performance_rating")));
if (hiredate.compareTo("2022-01-01") < 0 && Integer.parseInt(rating) > 4) {
System.out.println("Qualified Employee: Hire Date " + hiredate + ", Rating " + rating);
}
}
table.close();
}
// 查找最近晋升的员工
private static void queryRecentPromotion() throws IOException {
Table table = connection.getTable(TableName.valueOf(TABLE_NAME));
Scan scan = new Scan();
scan.addColumn(Bytes.toBytes(CF_PERFORMANCE), Bytes.toBytes("promotion_date"));
ResultScanner scanner = table.getScanner(scan);
String latestDate = "";
String latestEmp = "";
for (Result result : scanner) {
String promotionDate = Bytes.toString(result.getValue(Bytes.toBytes(CF_PERFORMANCE), Bytes.toBytes("promotion_date")));
if (promotionDate != null && promotionDate.compareTo(latestDate) > 0) {
latestDate = promotionDate;
latestEmp = Bytes.toString(result.getRow());
}
}
System.out.println("Most Recently Promoted Employee: " + latestEmp + ", Promotion Date: " + latestDate);
table.close();
}
// 统计培训课程的参与人数
private static void countTrainingParticipants() throws IOException {
Table table = connection.getTable(TableName.valueOf(TABLE_NAME));
Scan scan = new Scan();
scan.addColumn(Bytes.toBytes(CF_TRAINING), Bytes.toBytes("training_courses"));
ResultScanner scanner = table.getScanner(scan);
Map<String, Integer> countMap = new HashMap<>();
for (Result result : scanner) {
String courses = Bytes.toString(result.getValue(Bytes.toBytes(CF_TRAINING), Bytes.toBytes("training_courses")));
for (String course : courses.split(", ")) {
countMap.put(course, countMap.getOrDefault(course, 0) + 1);
}
}
System.out.println("培训课程统计:");
for (Map.Entry<String, Integer> entry : countMap.entrySet()) {
System.out.println(entry.getKey() + ": " + entry.getValue() + "");
}
table.close();
}
}