refactor(HBase): 重构 HBaseEmpManager 类

- 重命名 EmpHBaseClient 为 HBaseEmpManager
- 优化类结构和方法命名,提高代码可读性
- 移除冗余代码和不必要的注释
- 使用静态方法替代实例方法,简化资源管理
- 重构数据插入和查询逻辑,提高效率
This commit is contained in:
fly6516 2025-03-26 10:12:17 +08:00
parent c3464a2571
commit 9891f61574
2 changed files with 177 additions and 298 deletions

View File

@ -1,298 +0,0 @@
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
//import org.apache.hadoop.hbase.client.HTableDescriptor;
//import org.apache.hadoop.hbase.client.HColumnDescriptor;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
//import org.apache.hadoop.hbase.client.Filter;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.commons.codec.digest.DigestUtils;
public class EmpHBaseClient {
private static Configuration conf;
private static Connection connection;
private static Admin admin;
private Table table; // 添加表对象成员变量
// 新增静态初始化方法
public static void init() {
conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "localhost");
conf.set("hbase.zookeeper.property.clientPort", "2181");
conf.set("hbase.client.retries.number", "5");
conf.set("hbase.client.pause", "1000");
conf.set("hbase.rpc.timeout", "120000");
conf.set("hbase.zookeeper.connection.timeout", "30000");
try {
connection = ConnectionFactory.createConnection(conf);
admin = connection.getAdmin();
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException("HBase连接初始化失败", e);
}
}
// 修改构造函数依赖静态连接
public EmpHBaseClient() {
if (connection == null) {
init();
}
try {
table = connection.getTable(TableName.valueOf("emp1520"));
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException("获取表失败", e);
}
}
// 创建职工表结构
public void createEmpTable() throws IOException {
if (admin == null) {
throw new IOException("Admin未初始化");
}
if (admin.tableExists(TableName.valueOf("emp1520"))) {
System.out.println("Table already exists");
return;
}
HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf("emp1520"));
tableDesc.addFamily(new HColumnDescriptor("empnum")); // 存储员工ID
tableDesc.addFamily(new HColumnDescriptor("info")); // 存储基本信息
tableDesc.addFamily(new HColumnDescriptor("salary")); // 存储薪资
tableDesc.addFamily(new HColumnDescriptor("performance")); // 存储绩效信息
tableDesc.addFamily(new HColumnDescriptor("training")); // 存储培训课程
admin.createTable(tableDesc);
}
// 新增方法生成RowKey前缀MD5处理
private String generateRowKeyPrefix(String empno) {
String firstThreeDigits = empno.substring(0, 3);
// 使用MD5哈希并截取前4位避免过长 + 分隔符
String md5 = DigestUtils.md5Hex(firstThreeDigits).substring(0, 4);
return md5 + "-";
}
// 新增插入数据方法核心实现
public void insertEmployee(String empno, String ename, String job, String mgr, String hiredate, String sal, String credit, String deptno, String work_hours, String performance_rating, String promotion_date, String training_courses) throws IOException {
// 生成符合要求的RowKeyMD5前缀 + 原始empno
String rowKey = generateRowKeyPrefix(empno) + empno;
Put put = new Put(Bytes.toBytes(rowKey));
// 列族分配逻辑按实验要求字段划分
// empnum列族存储员工ID空间换时间设计
put.addColumn(Bytes.toBytes("empnum"), Bytes.toBytes("empno"), Bytes.toBytes(empno));
// info列族基本信息存储
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("ename"), Bytes.toBytes(ename));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("job"), Bytes.toBytes(job));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("mgr"), Bytes.toBytes(mgr)); // 删除原条件判断
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("hiredate"), Bytes.toBytes(hiredate));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("deptno"), Bytes.toBytes(deptno)); // 新增部门号字段存储
// salary列族薪资相关字段存储
put.addColumn(Bytes.toBytes("salary"), Bytes.toBytes("sal"), Bytes.toBytes(sal));
put.addColumn(Bytes.toBytes("salary"), Bytes.toBytes("credit"), Bytes.toBytes(credit));
// performance列族绩效相关字段存储
put.addColumn(Bytes.toBytes("performance"), Bytes.toBytes("performance_rating"), Bytes.toBytes(performance_rating));
put.addColumn(Bytes.toBytes("performance"), Bytes.toBytes("promotion_date"), Bytes.toBytes(promotion_date));
put.addColumn(Bytes.toBytes("performance"), Bytes.toBytes("work_hours"), Bytes.toBytes(work_hours)); // 新增工作时长字段存储
// training列族培训课程存储逗号分隔的字符串
put.addColumn(Bytes.toBytes("training"), Bytes.toBytes("training_courses"), Bytes.toBytes(training_courses));
// 写入表数据
table.put(put);
}
// 新增CSV文件加载方法读取emp.txt并插入数据
public void loadEmployeesFromCSV(String filePath) throws IOException {
try (BufferedReader br = new BufferedReader(new FileReader(filePath))) {
String line;
while ((line = br.readLine()) != null) {
// 处理CSV行注意处理引号包裹的字段
String[] parts = line.replaceAll("\"", "").split(",", -1);
if (parts.length >= 12) {
insertEmployee(
parts[0], parts[1], parts[2], parts[3], parts[4],
parts[5], parts[6], parts[7], parts[8],
parts[9], parts[10], parts[11]
);
}
}
}
}
// 修改构造函数关闭资源时确保连接释放新增close方法
public static void close() {
try {
if (admin != null) admin.close();
if (connection != null) connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
// 新增查询ID 7500以上员工方法
public void findEmployeesWithEmpnoAbove7500() throws IOException {
Scan scan = new Scan();
SingleColumnValueFilter filter = new SingleColumnValueFilter(
Bytes.toBytes("empnum"),
Bytes.toBytes("empno"),
CompareFilter.CompareOp.GREATER_OR_EQUAL,
new BinaryComparator(Bytes.toBytes("7500"))
);
filter.setFilterIfMissing(true);
scan.setFilter(filter);
try (ResultScanner scanner = table.getScanner(scan)) {
for (Result result : scanner) {
System.out.println("RowKey: " + Bytes.toString(result.getRow()));
// 可根据需要扩展字段输出
}
}
}
// 新增绩效评分和入职时间联合查询方法
public void findQualifiedEmployees() throws IOException {
Scan scan = new Scan();
// 绩效评分>4
SingleColumnValueFilter filter1 = new SingleColumnValueFilter(
Bytes.toBytes("performance"),
Bytes.toBytes("performance_rating"),
CompareFilter.CompareOp.GREATER, // 修改点将GREATER_THAN改为GREATERHBase 1.1.5版本不支持GREATER_THAN
new BinaryComparator(Bytes.toBytes("4"))
);
// 入职时间早于2022年
SingleColumnValueFilter filter2 = new SingleColumnValueFilter(
Bytes.toBytes("info"),
Bytes.toBytes("hiredate"),
CompareFilter.CompareOp.LESS,
new BinaryComparator(Bytes.toBytes("2022-01-01"))
);
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
filterList.addFilter(filter1);
filterList.addFilter(filter2);
scan.setFilter(filterList);
try (ResultScanner scanner = table.getScanner(scan)) {
for (Result result : scanner) {
System.out.println("RowKey: " + Bytes.toString(result.getRow()));
}
}
}
// 修改最近晋升查询方法
public void findLatestPromotion() throws IOException {
Scan scan = new Scan();
List<PromotionRecord> records = new ArrayList<>();
try (ResultScanner scanner = table.getScanner(scan)) {
for (Result result : scanner) {
String promotionDateStr = Bytes.toString(
result.getValue(Bytes.toBytes("performance"), Bytes.toBytes("promotion_date"))
);
if (promotionDateStr != null && !promotionDateStr.isEmpty()) { // 新增空值检查
records.add(new PromotionRecord(
Bytes.toString(result.getRow()),
LocalDate.parse(promotionDateStr)
));
}
}
}
records.sort(Comparator.comparing(PromotionRecord::getDate).reversed());
if (!records.isEmpty()) {
PromotionRecord latest = records.get(0);
System.out.println("Latest promotion: RowKey=" + latest.rowKey + ", Date=" + latest.date);
}
}
// 修改培训课程统计方法
public void countTrainingCourses() throws IOException {
Map<String, Integer> courseMap = new HashMap<>();
Scan scan = new Scan();
try (ResultScanner scanner = table.getScanner(scan)) {
for (Result result : scanner) {
String coursesStr = Bytes.toString(
result.getValue(Bytes.toBytes("training"), Bytes.toBytes("training_courses"))
);
if (coursesStr != null) {
for (String course : coursesStr.split(",")) {
courseMap.put(course, courseMap.getOrDefault(course, 0)+1);
}
}
}
}
// 输出统计结果
for (Map.Entry<String, Integer> entry : courseMap.entrySet()) {
System.out.printf("Course: %-20s Count: %d%n", entry.getKey(), entry.getValue());
}
}
// 新增内部类处理晋升记录
private static class PromotionRecord {
private final String rowKey;
private final LocalDate date;
public PromotionRecord(String rowKey, LocalDate date) {
this.rowKey = rowKey;
this.date = date;
}
public LocalDate getDate() {
return date;
}
}
// 新增main方法作为入口
public static void main(String[] args) {
try {
EmpHBaseClient.init(); // 新增静态初始化调用
EmpHBaseClient client = new EmpHBaseClient();
System.out.println("1. 创建表结构...");
client.createEmpTable();
System.out.println("2. 加载CSV数据...");
client.loadEmployeesFromCSV("emp.txt");
System.out.println("\n3. 查询ID≥7500的员工");
client.findEmployeesWithEmpnoAbove7500();
System.out.println("\n4. 查询绩效>4且入职早于2022的员工");
client.findQualifiedEmployees();
System.out.println("\n5. 最近晋升记录:");
client.findLatestPromotion();
System.out.println("\n6. 培训课程统计:");
client.countTrainingCourses();
} catch (IOException e) {
e.printStackTrace();
} finally {
EmpHBaseClient.close(); // 新增静态关闭调用
}
}
}

View File

@ -0,0 +1,177 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.*;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.*;
public class HBaseEmpManager {
private static final String TABLE_NAME = "emp1520";
private static final String CF_EMPNUM = "empnum";
private static final String CF_INFO = "info";
private static final String CF_SALARY = "salary";
private static final String CF_PERFORMANCE = "performance";
private static final String CF_TRAINING = "training";
private static Connection connection;
private static Admin admin;
public static void main(String[] args) throws IOException, NoSuchAlgorithmException {
Configuration config = HBaseConfiguration.create();
connection = ConnectionFactory.createConnection(config);
admin = connection.getAdmin();
createTable();
insertData("emp.txt");
queryById();
queryByPerformance();
queryRecentPromotion();
countTrainingParticipants();
admin.close();
connection.close();
}
// 创建 HBase
private static void createTable() throws IOException {
TableName tableName = TableName.valueOf(TABLE_NAME);
if (admin.tableExists(tableName)) {
System.out.println("" + TABLE_NAME + " 已存在!");
return;
}
HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
tableDescriptor.addFamily(new HColumnDescriptor(CF_EMPNUM));
tableDescriptor.addFamily(new HColumnDescriptor(CF_INFO));
tableDescriptor.addFamily(new HColumnDescriptor(CF_SALARY));
tableDescriptor.addFamily(new HColumnDescriptor(CF_PERFORMANCE));
tableDescriptor.addFamily(new HColumnDescriptor(CF_TRAINING));
admin.createTable(tableDescriptor);
System.out.println("" + TABLE_NAME + " 创建成功!");
}
// 读取并插入数据
private static void insertData(String filePath) throws IOException, NoSuchAlgorithmException {
Table table = connection.getTable(TableName.valueOf(TABLE_NAME));
BufferedReader br = new BufferedReader(new FileReader(filePath));
String line;
while ((line = br.readLine()) != null) {
String[] emp = line.split(",");
if (emp.length < 12) continue; // 确保数据完整
String rowKey = generateRowKey(emp[0]); // 使用哈希前缀
Put put = new Put(Bytes.toBytes(rowKey));
put.addColumn(Bytes.toBytes(CF_EMPNUM), Bytes.toBytes("empno"), Bytes.toBytes(emp[0]));
put.addColumn(Bytes.toBytes(CF_INFO), Bytes.toBytes("ename"), Bytes.toBytes(emp[1]));
put.addColumn(Bytes.toBytes(CF_INFO), Bytes.toBytes("job"), Bytes.toBytes(emp[2]));
put.addColumn(Bytes.toBytes(CF_INFO), Bytes.toBytes("mgr"), Bytes.toBytes(emp[3]));
put.addColumn(Bytes.toBytes(CF_INFO), Bytes.toBytes("hiredate"), Bytes.toBytes(emp[4]));
put.addColumn(Bytes.toBytes(CF_SALARY), Bytes.toBytes("sal"), Bytes.toBytes(emp[5]));
put.addColumn(Bytes.toBytes(CF_SALARY), Bytes.toBytes("credit"), Bytes.toBytes(emp[6]));
put.addColumn(Bytes.toBytes(CF_INFO), Bytes.toBytes("deptno"), Bytes.toBytes(emp[7]));
put.addColumn(Bytes.toBytes(CF_INFO), Bytes.toBytes("work_hours"), Bytes.toBytes(emp[8]));
put.addColumn(Bytes.toBytes(CF_PERFORMANCE), Bytes.toBytes("performance_rating"), Bytes.toBytes(emp[9]));
put.addColumn(Bytes.toBytes(CF_PERFORMANCE), Bytes.toBytes("promotion_date"), Bytes.toBytes(emp[10]));
put.addColumn(Bytes.toBytes(CF_TRAINING), Bytes.toBytes("training_courses"), Bytes.toBytes(emp[11]));
table.put(put);
}
br.close();
table.close();
System.out.println("数据插入完成!");
}
// 查询 ID > 7500 的员工
private static void queryById() throws IOException {
Table table = connection.getTable(TableName.valueOf(TABLE_NAME));
Scan scan = new Scan();
scan.addColumn(Bytes.toBytes(CF_EMPNUM), Bytes.toBytes("empno"));
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
String empno = Bytes.toString(result.getValue(Bytes.toBytes(CF_EMPNUM), Bytes.toBytes("empno")));
if (empno != null && Integer.parseInt(empno) > 7500) {
System.out.println("ID > 7500 员工: " + empno);
}
}
table.close();
}
// 查询绩效评分 > 4 且入职时间 < 2022
private static void queryByPerformance() throws IOException {
Table table = connection.getTable(TableName.valueOf(TABLE_NAME));
Scan scan = new Scan();
scan.addColumn(Bytes.toBytes(CF_INFO), Bytes.toBytes("hiredate"));
scan.addColumn(Bytes.toBytes(CF_PERFORMANCE), Bytes.toBytes("performance_rating"));
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
String hiredate = Bytes.toString(result.getValue(Bytes.toBytes(CF_INFO), Bytes.toBytes("hiredate")));
String rating = Bytes.toString(result.getValue(Bytes.toBytes(CF_PERFORMANCE), Bytes.toBytes("performance_rating")));
if (hiredate.compareTo("2022-01-01") < 0 && Integer.parseInt(rating) > 4) {
System.out.println("符合条件的员工: 入职时间 " + hiredate + ",绩效评分 " + rating);
}
}
table.close();
}
// 查找最近晋升的员工
private static void queryRecentPromotion() throws IOException {
Table table = connection.getTable(TableName.valueOf(TABLE_NAME));
Scan scan = new Scan();
scan.addColumn(Bytes.toBytes(CF_PERFORMANCE), Bytes.toBytes("promotion_date"));
ResultScanner scanner = table.getScanner(scan);
String latestDate = "";
String latestEmp = "";
for (Result result : scanner) {
String promotionDate = Bytes.toString(result.getValue(Bytes.toBytes(CF_PERFORMANCE), Bytes.toBytes("promotion_date")));
if (promotionDate != null && promotionDate.compareTo(latestDate) > 0) {
latestDate = promotionDate;
latestEmp = Bytes.toString(result.getRow());
}
}
System.out.println("最近晋升的员工: " + latestEmp + ",晋升日期: " + latestDate);
table.close();
}
// 统计每个培训课程的参与人数
private static void countTrainingParticipants() throws IOException {
Table table = connection.getTable(TableName.valueOf(TABLE_NAME));
Scan scan = new Scan();
scan.addColumn(Bytes.toBytes(CF_TRAINING), Bytes.toBytes("training_courses"));
ResultScanner scanner = table.getScanner(scan);
Map<String, Integer> countMap = new HashMap<>();
for (Result result : scanner) {
String courses = Bytes.toString(result.getValue(Bytes.toBytes(CF_TRAINING), Bytes.toBytes("training_courses")));
for (String course : courses.split(", ")) {
countMap.put(course, countMap.getOrDefault(course, 0) + 1);
}
}
System.out.println("培训课程统计: " + countMap);
table.close();
}
// 生成 RowKey避免 Region 热点
private static String generateRowKey(String empno) throws NoSuchAlgorithmException {
String prefix = empno.substring(0, 3);
MessageDigest md5 = MessageDigest.getInstance("MD5");
byte[] digest = md5.digest(prefix.getBytes());
StringBuilder hexString = new StringBuilder();
for (byte b : digest) {
hexString.append(String.format("%02x", b));
}
return hexString.substring(0, 4) + empno;
}
}