refactor(hbase): 重构 HBase 客户端连接方式- 替换 HBaseAdmin为 Admin 接口,并使用 try-with-resources 自动管理资源
- 移除不必要的条件判断,简化代码 - 新增 MyConnect 类,实现 HBase 连接的初始化和关闭 - 优化数据过滤逻辑,使用 BinaryComparator 替代 SubstringComparator -增加空值检查,提高数据处理的健壮性
This commit is contained in:
parent
954c873b1b
commit
2f36bb075f
@ -16,19 +16,19 @@ public class EmpHBaseClient {
|
|||||||
|
|
||||||
// 创建职工表结构
|
// 创建职工表结构
|
||||||
public void createEmpTable() throws IOException {
|
public void createEmpTable() throws IOException {
|
||||||
HBaseAdmin admin = new HBaseAdmin(conf);
|
try (Admin admin = connection.getAdmin()) {
|
||||||
if (admin.tableExists("emp1520")) {
|
if (admin.tableExists(TableName.valueOf("emp1520"))) {
|
||||||
System.out.println("Table already exists");
|
System.out.println("Table already exists");
|
||||||
return;
|
return;
|
||||||
|
}
|
||||||
|
HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf("emp1520"));
|
||||||
|
tableDesc.addFamily(new HColumnDescriptor("empnum")); // 存储员工ID
|
||||||
|
tableDesc.addFamily(new HColumnDescriptor("info")); // 存储基本信息
|
||||||
|
tableDesc.addFamily(new HColumnDescriptor("salary")); // 存储薪资
|
||||||
|
tableDesc.addFamily(new HColumnDescriptor("performance")); // 存储绩效信息
|
||||||
|
tableDesc.addFamily(new HColumnDescriptor("training")); // 存储培训课程
|
||||||
|
admin.createTable(tableDesc);
|
||||||
}
|
}
|
||||||
HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf("emp1520"));
|
|
||||||
tableDesc.addFamily(new HColumnDescriptor("empnum")); // 存储员工ID
|
|
||||||
tableDesc.addFamily(new HColumnDescriptor("info")); // 存储基本信息
|
|
||||||
tableDesc.addFamily(new HColumnDescriptor("salary")); // 存储薪资
|
|
||||||
tableDesc.addFamily(new HColumnDescriptor("performance")); // 存储绩效信息
|
|
||||||
tableDesc.addFamily(new HColumnDescriptor("training")); // 存储培训课程
|
|
||||||
admin.createTable(tableDesc);
|
|
||||||
admin.close();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// 新增方法:生成RowKey前缀(MD5处理)
|
// 新增方法:生成RowKey前缀(MD5处理)
|
||||||
@ -51,7 +51,7 @@ public class EmpHBaseClient {
|
|||||||
// info列族:基本信息存储
|
// info列族:基本信息存储
|
||||||
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("ename"), Bytes.toBytes(ename));
|
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("ename"), Bytes.toBytes(ename));
|
||||||
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("job"), Bytes.toBytes(job));
|
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("job"), Bytes.toBytes(job));
|
||||||
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("mgr"), Bytes.toBytes(mgr == null ? "" : mgr));
|
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("mgr"), Bytes.toBytes(mgr)); // 删除原条件判断
|
||||||
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("hiredate"), Bytes.toBytes(hiredate));
|
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("hiredate"), Bytes.toBytes(hiredate));
|
||||||
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("deptno"), Bytes.toBytes(deptno)); // 新增部门号字段存储
|
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("deptno"), Bytes.toBytes(deptno)); // 新增部门号字段存储
|
||||||
// salary列族:薪资相关字段存储
|
// salary列族:薪资相关字段存储
|
||||||
@ -99,7 +99,7 @@ public class EmpHBaseClient {
|
|||||||
Bytes.toBytes("empnum"),
|
Bytes.toBytes("empnum"),
|
||||||
Bytes.toBytes("empno"),
|
Bytes.toBytes("empno"),
|
||||||
CompareFilter.CompareOp.GREATER_OR_EQUAL,
|
CompareFilter.CompareOp.GREATER_OR_EQUAL,
|
||||||
new SubstringComparator("7500")
|
new BinaryComparator(Bytes.toBytes("7500"))
|
||||||
);
|
);
|
||||||
filter.setFilterIfMissing(true);
|
filter.setFilterIfMissing(true);
|
||||||
scan.setFilter(filter);
|
scan.setFilter(filter);
|
||||||
@ -152,7 +152,7 @@ public class EmpHBaseClient {
|
|||||||
String promotionDateStr = Bytes.toString(
|
String promotionDateStr = Bytes.toString(
|
||||||
result.getValue(Bytes.toBytes("performance"), Bytes.toBytes("promotion_date"))
|
result.getValue(Bytes.toBytes("performance"), Bytes.toBytes("promotion_date"))
|
||||||
);
|
);
|
||||||
if (promotionDateStr != null) {
|
if (promotionDateStr != null && !promotionDateStr.isEmpty()) { // 新增空值检查
|
||||||
records.add(new PromotionRecord(
|
records.add(new PromotionRecord(
|
||||||
Bytes.toString(result.getRow()),
|
Bytes.toString(result.getRow()),
|
||||||
LocalDate.parse(promotionDateStr)
|
LocalDate.parse(promotionDateStr)
|
||||||
|
39
src/main/java/MyConnect.java
Normal file
39
src/main/java/MyConnect.java
Normal file
@ -0,0 +1,39 @@
|
|||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.client.*;
|
||||||
|
import java.io.IOException;
|
||||||
|
public class MyConnect{
|
||||||
|
public static Configuration configuration;
|
||||||
|
public static Connection connection;
|
||||||
|
public static Admin admin;
|
||||||
|
public static void main(String[] args)throws IOException{
|
||||||
|
init();
|
||||||
|
close();
|
||||||
|
}
|
||||||
|
//建立连接
|
||||||
|
public static void init(){
|
||||||
|
//根据 hbase-site.xml文件初始化Configuration 对象
|
||||||
|
configuration = HBaseConfiguration.create();
|
||||||
|
try{
|
||||||
|
//根据 Configuration对象初始化Connection 对象
|
||||||
|
connection = ConnectionFactory.createConnection(configuration);
|
||||||
|
//获取Admin 对象实例
|
||||||
|
admin = connection.getAdmin();
|
||||||
|
}catch (IOException e){
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
System.out.println("Connect to HBase Successfully!");
|
||||||
|
}
|
||||||
|
//关闭连接
|
||||||
|
public static void close(){
|
||||||
|
try{
|
||||||
|
if(admin != null){
|
||||||
|
admin.close();
|
||||||
|
}
|
||||||
|
if(null != connection){
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
}catch (IOException e){
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user