feat(EmpHBaseClient):重构并添加新功能

-重构类结构,添加成员变量并优化资源管理
- 新增生成RowKey前缀的方法,使用MD5哈希
- 添加插入员工数据的方法,支持单条目插入
- 实现从CSV文件加载员工数据的方法
- 新增查询ID 7500以上员工的方法
- 修改绩效评分和入职时间联合查询方法
- 优化最近晋升查询方法,输出最新记录
- 改进培训课程统计方法,优化输出格式
This commit is contained in:
fly6516 2025-03-26 09:30:29 +08:00
parent 1bd83368c5
commit fd26fe33f3

View File

@ -1,3 +1,19 @@
public class EmpHBaseClient {
private Configuration conf;
private Connection connection;
private Table table; // 添加表对象成员变量
public EmpHBaseClient() {
conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "localhost");
try {
connection = ConnectionFactory.createConnection(conf);
table = connection.getTable(TableName.valueOf("emp1520"));
} catch (IOException e) {
e.printStackTrace();
}
}
// 创建职工表结构
public void createEmpTable() throws IOException {
HBaseAdmin admin = new HBaseAdmin(conf);
@ -15,98 +31,123 @@ public void createEmpTable() throws IOException {
admin.close();
}
// 插入数据实现
public void loadData() throws IOException {
Table table = connection.getTable(TableName.valueOf("emp1520"));
try (BufferedReader br = new BufferedReader(new FileReader("emp.txt"))) {
String line;
while ((line = br.readLine()) != null) {
String[] fields = line.split(",");
String empno = fields[0];
String rowKey = generateRowKey(empno); // 生成MD5前缀+empno的RowKey
// 新增方法生成RowKey前缀MD5处理
private String generateRowKeyPrefix(String empno) {
String firstThreeDigits = empno.substring(0, 3);
// 使用MD5哈希并截取前4位避免过长 + 分隔符
String md5 = DigestUtils.md5Hex(firstThreeDigits).substring(0, 4);
return md5 + "-";
}
// 新增插入数据方法核心实现
public void insertEmployee(String empno, String ename, String job, String mgr, String hiredate, String sal, String credit, String deptno, String work_hours, String performance_rating, String promotion_date, String training_courses) throws IOException {
// 生成符合要求的RowKeyMD5前缀 + 原始empno
String rowKey = generateRowKeyPrefix(empno) + empno;
Put put = new Put(Bytes.toBytes(rowKey));
// empnum列族
// 列族分配逻辑按实验要求字段划分
// empnum列族存储员工ID空间换时间设计
put.addColumn(Bytes.toBytes("empnum"), Bytes.toBytes("empno"), Bytes.toBytes(empno));
// info列族
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("ename"), Bytes.toBytes(fields[1]));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("job"), Bytes.toBytes(fields[2]));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("mgr"), Bytes.toBytes(fields[3]));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("hiredate"), Bytes.toBytes(fields[4]));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("credit"), Bytes.toBytes(fields[5]));
// salary列族
put.addColumn(Bytes.toBytes("salary"), Bytes.toBytes("sal"), Bytes.toBytes(fields[6]));
// performance列族
put.addColumn(Bytes.toBytes("performance"), Bytes.toBytes("work_hours"), Bytes.toBytes(fields[7]));
put.addColumn(Bytes.toBytes("performance"), Bytes.toBytes("performance_rating"), Bytes.toBytes(fields[8]));
put.addColumn(Bytes.toBytes("performance"), Bytes.toBytes("promotion_date"), Bytes.toBytes(fields[9]));
// training列族
put.addColumn(Bytes.toBytes("training"), Bytes.toBytes("training_courses"), Bytes.toBytes(fields[10]));
// info列族基本信息存储
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("ename"), Bytes.toBytes(ename));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("job"), Bytes.toBytes(job));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("mgr"), Bytes.toBytes(mgr == null ? "" : mgr));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("hiredate"), Bytes.toBytes(hiredate));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("deptno"), Bytes.toBytes(deptno)); // 新增部门号字段存储
// salary列族薪资相关字段存储
put.addColumn(Bytes.toBytes("salary"), Bytes.toBytes("sal"), Bytes.toBytes(sal));
put.addColumn(Bytes.toBytes("salary"), Bytes.toBytes("credit"), Bytes.toBytes(credit));
// performance列族绩效相关字段存储
put.addColumn(Bytes.toBytes("performance"), Bytes.toBytes("performance_rating"), Bytes.toBytes(performance_rating));
put.addColumn(Bytes.toBytes("performance"), Bytes.toBytes("promotion_date"), Bytes.toBytes(promotion_date));
put.addColumn(Bytes.toBytes("performance"), Bytes.toBytes("work_hours"), Bytes.toBytes(work_hours)); // 新增工作时长字段存储
// training列族培训课程存储逗号分隔的字符串
put.addColumn(Bytes.toBytes("training"), Bytes.toBytes("training_courses"), Bytes.toBytes(training_courses));
// 写入表数据
table.put(put);
}
// 新增CSV文件加载方法读取emp.txt并插入数据
public void loadEmployeesFromCSV(String filePath) throws IOException {
try (BufferedReader br = new BufferedReader(new FileReader(filePath))) {
String line;
while ((line = br.readLine()) != null) {
// 处理CSV行注意处理引号包裹的字段
String[] parts = line.replaceAll("\"", "").split(",", -1);
if (parts.length >= 12) {
insertEmployee(
parts[0], parts[1], parts[2], parts[3], parts[4],
parts[5], parts[6], parts[7], parts[8],
parts[9], parts[10], parts[11]
);
}
}
}
}
// RowKey生成方法
private String generateRowKey(String empno) {
String prefix = empno.substring(0,3);
String md5 = md5Hash(prefix);
return md5.substring(0,4) + empno; // 前4位MD5+原empno
// 修改构造函数关闭资源时确保连接释放新增close方法
public void close() throws IOException {
if (table != null) table.close();
if (connection != null) connection.close();
}
// MD5计算方法
private static String md5Hash(String input) {
try {
MessageDigest md = MessageDigest.getInstance("MD5");
byte[] messageDigest = md.digest(input.getBytes());
BigInteger number = new BigInteger(1, messageDigest);
return number.toString(16);
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException(e);
}
}
// 查询ID>7500的员工
public void queryEmpNoOver7500() throws IOException {
// 新增查询ID 7500以上员工方法
public void findEmployeesWithEmpnoAbove7500() throws IOException {
Scan scan = new Scan();
Filter filter = new SingleColumnValueFilter(
SingleColumnValueFilter filter = new SingleColumnValueFilter(
Bytes.toBytes("empnum"),
Bytes.toBytes("empno"),
CompareFilter.CompareOp.GREATER_OR_EQUAL,
new SubstringComparator("7500")
);
filter.setFilterIfMissing(true);
scan.setFilter(filter);
ResultScanner scanner = table.getScanner(scan);
try (ResultScanner scanner = table.getScanner(scan)) {
for (Result result : scanner) {
// 输出结果处理
System.out.println("RowKey: " + Bytes.toString(result.getRow()));
// 可根据需要扩展字段输出
}
}
}
// 绩效>4且入职早于2022年
public void queryPerformanceAndHiredate() throws IOException {
FilterList list = new FilterList(FilterList.Operator.MUST_PASS_ALL);
list.addFilter(new SingleColumnValueFilter(
// 新增绩效评分和入职时间联合查询方法
public void findQualifiedEmployees() throws IOException {
Scan scan = new Scan();
// 绩效评分>4
SingleColumnValueFilter filter1 = new SingleColumnValueFilter(
Bytes.toBytes("performance"),
Bytes.toBytes("performance_rating"),
CompareFilter.CompareOp.GREATER_OR_EQUAL,
new SubstringComparator("5")
));
list.addFilter(new SingleColumnValueFilter(
CompareFilter.CompareOp.GREATER_THAN,
new BinaryComparator(Bytes.toBytes("4"))
);
// 入职时间早于2022年
SingleColumnValueFilter filter2 = new SingleColumnValueFilter(
Bytes.toBytes("info"),
Bytes.toBytes("hiredate"),
CompareFilter.CompareOp.LESS,
new SubstringComparator("2022-01-01")
));
Scan scan = new Scan();
scan.setFilter(list);
ResultScanner scanner = table.getScanner(scan);
// 结果处理逻辑
new BinaryComparator(Bytes.toBytes("2022-01-01"))
);
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
filterList.addFilter(filter1);
filterList.addFilter(filter2);
scan.setFilter(filterList);
try (ResultScanner scanner = table.getScanner(scan)) {
for (Result result : scanner) {
System.out.println("RowKey: " + Bytes.toString(result.getRow()));
}
}
}
// 最近晋升员工查询
// 修改最近晋升查询方法
public void findLatestPromotion() throws IOException {
Scan scan = new Scan();
ResultScanner scanner = table.getScanner(scan);
List<PromotionRecord> records = new ArrayList<>();
try (ResultScanner scanner = table.getScanner(scan)) {
for (Result result : scanner) {
String promotionDateStr = Bytes.toString(
result.getValue(Bytes.toBytes("performance"), Bytes.toBytes("promotion_date"))
@ -118,15 +159,20 @@ public void findLatestPromotion() throws IOException {
));
}
}
records.sort(Comparator.comparing(PromotionRecord::getDate).reversed());
// 输出最新记录
}
// 培训课程统计
records.sort(Comparator.comparing(PromotionRecord::getDate).reversed());
if (!records.isEmpty()) {
PromotionRecord latest = records.get(0);
System.out.println("Latest promotion: RowKey=" + latest.rowKey + ", Date=" + latest.date);
}
}
// 修改培训课程统计方法
public void countTrainingCourses() throws IOException {
Map<String, Integer> courseMap = new HashMap<>();
Scan scan = new Scan();
ResultScanner scanner = table.getScanner(scan);
try (ResultScanner scanner = table.getScanner(scan)) {
for (Result result : scanner) {
String coursesStr = Bytes.toString(
result.getValue(Bytes.toBytes("training"), Bytes.toBytes("training_courses"))
@ -137,5 +183,26 @@ public void countTrainingCourses() throws IOException {
}
}
}
// 输出统计结果
}
// 输出统计结果
for (Map.Entry<String, Integer> entry : courseMap.entrySet()) {
System.out.printf("Course: %-20s Count: %d%n", entry.getKey(), entry.getValue());
}
}
// 新增内部类处理晋升记录
private static class PromotionRecord {
private final String rowKey;
private final LocalDate date;
public PromotionRecord(String rowKey, LocalDate date) {
this.rowKey = rowKey;
this.date = date;
}
public LocalDate getDate() {
return date;
}
}
}