Compare commits

..

2 Commits

Author SHA1 Message Date
75609c7c34 Reapply "feat: 添加 TableExist 类用于检查 HBase 表是否存在"
This reverts commit ac8bc7357d.
2025-03-26 11:19:42 +08:00
bc64458cd3 Revert "refactor(hbase): 重构 HBaseEmpManager 类"
This reverts commit 15b008dc35.
2025-03-26 11:19:26 +08:00
6 changed files with 250 additions and 151 deletions

View File

@ -1,40 +0,0 @@
// 新建员工数据访问类
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.*;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.*;
public class EmployeeDAO {
private static final String[] CF_ARRAY = {"empnum", "info", "salary", "performance", "training"};
public void insertData(String filePath) throws IOException, NoSuchAlgorithmException {
try(Table table = HBaseConnectionManager.getConnection().getTable(TableName.valueOf("emp1520"))) {
BufferedReader br = new BufferedReader(new FileReader(filePath));
String line;
while ((line = br.readLine()) != null) {
String[] emp = line.split(",");
if (emp.length < 12) continue;
String rowKey = generateRowKey(emp[0]);
Put put = new Put(Bytes.toBytes(rowKey));
table.put(put);
}
br.close();
}
System.out.println("Data inserted successfully!");
}
private String generateRowKey(String empno) throws NoSuchAlgorithmException {
}
public void queryById() throws IOException {
}
public void queryByPerformance() throws IOException {
}
public void queryRecentPromotion() throws IOException {
}
}

View File

@ -1,41 +0,0 @@
// 新建连接管理类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Admin;
public class HBaseConnectionManager {
private static Configuration configuration;
private static Connection connection;
private static Admin admin;
public static void init() {
configuration = HBaseConfiguration.create();
try {
connection = ConnectionFactory.createConnection(configuration);
admin = connection.getAdmin();
System.out.println("Connected to HBase Successfully!");
} catch (IOException e) {
e.printStackTrace();
}
}
public static void close() {
try {
if (admin != null) admin.close();
if (connection != null) connection.close();
System.out.println("HBase Connection Closed.");
} catch (IOException e) {
e.printStackTrace();
}
}
public static Admin getAdmin() {
return admin;
}
public static Connection getConnection() {
return connection;
}
}

View File

@ -8,24 +8,206 @@ import java.security.NoSuchAlgorithmException;
import java.util.*;
public class HBaseEmpManager {
public static void main(String[] args) {
private static Configuration configuration;
private static Connection connection;
private static Admin admin;
private static final String TABLE_NAME = "emp1520";
private static final String CF_EMPNUM = "empnum";
private static final String CF_INFO = "info";
private static final String CF_SALARY = "salary";
private static final String CF_PERFORMANCE = "performance";
private static final String CF_TRAINING = "training";
public static void main(String[] args) throws IOException, NoSuchAlgorithmException {
init(); // 初始化 HBase 连接
createTable(); // 创建表
insertData("emp.txt"); // 读取 emp.txt 插入数据
queryById(); // 查询 ID > 7500 的员工
queryByPerformance(); // 查询绩效评分 > 4 且入职时间 < 2022 年的员工
queryRecentPromotion(); // 查找最近晋升的员工
countTrainingParticipants(); // 统计培训课程的参与人数
close(); // 关闭连接
}
// 初始化 HBase 连接
private static void init() {
configuration = HBaseConfiguration.create();
try {
HBaseConnectionManager.init();
HBaseTableManager.createTable(HBaseConnectionManager.getAdmin());
EmployeeDAO employeeDAO = new EmployeeDAO();
employeeDAO.insertData("emp.txt");
employeeDAO.queryById();
employeeDAO.queryByPerformance();
employeeDAO.queryRecentPromotion();
TrainingDAO trainingDAO = new TrainingDAO();
trainingDAO.countTrainingParticipants();
} catch (Exception e) {
connection = ConnectionFactory.createConnection(configuration);
admin = connection.getAdmin();
System.out.println("Connected to HBase Successfully!");
} catch (IOException e) {
e.printStackTrace();
} finally {
HBaseConnectionManager.close();
}
}
// 关闭 HBase 连接
private static void close() {
try {
if (admin != null) {
admin.close();
}
if (connection != null) {
connection.close();
}
System.out.println("HBase Connection Closed.");
} catch (IOException e) {
e.printStackTrace();
}
}
// 创建 HBase
private static void createTable() throws IOException {
TableName tableName = TableName.valueOf(TABLE_NAME);
// 检查表格是否存在
if (admin.tableExists(tableName)) {
System.out.println("" + TABLE_NAME + " 已经存在,准备禁用并删除...");
// 禁用表格
admin.disableTable(tableName);
System.out.println("" + TABLE_NAME + " 已禁用。");
// 删除表格
admin.deleteTable(tableName);
System.out.println("" + TABLE_NAME + " 已删除。");
}
HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
tableDescriptor.addFamily(new HColumnDescriptor(CF_EMPNUM));
tableDescriptor.addFamily(new HColumnDescriptor(CF_INFO));
tableDescriptor.addFamily(new HColumnDescriptor(CF_SALARY));
tableDescriptor.addFamily(new HColumnDescriptor(CF_PERFORMANCE));
tableDescriptor.addFamily(new HColumnDescriptor(CF_TRAINING));
admin.createTable(tableDescriptor);
System.out.println("Table emp1520 created successfully.");
}
// 读取并插入数据
private static void insertData(String filePath) throws IOException, NoSuchAlgorithmException {
Table table = connection.getTable(TableName.valueOf(TABLE_NAME));
BufferedReader br = new BufferedReader(new FileReader(filePath));
String line;
while ((line = br.readLine()) != null) {
String[] emp = line.split(",");
if (emp.length < 12) continue; // 确保数据完整
String rowKey = generateRowKey(emp[0]); // 使用哈希前缀
Put put = new Put(Bytes.toBytes(rowKey));
put.addColumn(Bytes.toBytes(CF_EMPNUM), Bytes.toBytes("empno"), Bytes.toBytes(emp[0]));
put.addColumn(Bytes.toBytes(CF_INFO), Bytes.toBytes("ename"), Bytes.toBytes(emp[1]));
put.addColumn(Bytes.toBytes(CF_INFO), Bytes.toBytes("job"), Bytes.toBytes(emp[2]));
put.addColumn(Bytes.toBytes(CF_INFO), Bytes.toBytes("mgr"), Bytes.toBytes(emp[3]));
put.addColumn(Bytes.toBytes(CF_INFO), Bytes.toBytes("hiredate"), Bytes.toBytes(emp[4]));
put.addColumn(Bytes.toBytes(CF_SALARY), Bytes.toBytes("sal"), Bytes.toBytes(emp[5]));
put.addColumn(Bytes.toBytes(CF_SALARY), Bytes.toBytes("credit"), Bytes.toBytes(emp[6]));
put.addColumn(Bytes.toBytes(CF_INFO), Bytes.toBytes("deptno"), Bytes.toBytes(emp[7]));
put.addColumn(Bytes.toBytes(CF_INFO), Bytes.toBytes("work_hours"), Bytes.toBytes(emp[8]));
put.addColumn(Bytes.toBytes(CF_PERFORMANCE), Bytes.toBytes("performance_rating"), Bytes.toBytes(emp[9]));
put.addColumn(Bytes.toBytes(CF_PERFORMANCE), Bytes.toBytes("promotion_date"), Bytes.toBytes(emp[10]));
put.addColumn(Bytes.toBytes(CF_TRAINING), Bytes.toBytes("training_courses"), Bytes.toBytes(emp[11]));
table.put(put);
}
br.close();
table.close();
System.out.println("Data inserted successfully!");
}
// 生成 RowKey避免 Region 热点
private static String generateRowKey(String empno) throws NoSuchAlgorithmException {
String prefix = empno.substring(0, 3);
MessageDigest md5 = MessageDigest.getInstance("MD5");
byte[] digest = md5.digest(prefix.getBytes());
StringBuilder hexString = new StringBuilder();
for (byte b : digest) {
hexString.append(String.format("%02x", b));
}
return hexString.substring(0, 4) + empno;
}
// 查询 ID > 7500 的员工
private static void queryById() throws IOException {
Table table = connection.getTable(TableName.valueOf(TABLE_NAME));
Scan scan = new Scan();
scan.addColumn(Bytes.toBytes(CF_EMPNUM), Bytes.toBytes("empno"));
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
String empno = Bytes.toString(result.getValue(Bytes.toBytes(CF_EMPNUM), Bytes.toBytes("empno")));
if (empno != null && Integer.parseInt(empno) > 7500) {
System.out.println("ID > 7500 Employee: " + empno);
}
}
table.close();
}
// 查询绩效评分 > 4 且入职时间 < 2022
private static void queryByPerformance() throws IOException {
Table table = connection.getTable(TableName.valueOf(TABLE_NAME));
Scan scan = new Scan();
scan.addColumn(Bytes.toBytes(CF_INFO), Bytes.toBytes("hiredate"));
scan.addColumn(Bytes.toBytes(CF_PERFORMANCE), Bytes.toBytes("performance_rating"));
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
String hiredate = Bytes.toString(result.getValue(Bytes.toBytes(CF_INFO), Bytes.toBytes("hiredate")));
String rating = Bytes.toString(result.getValue(Bytes.toBytes(CF_PERFORMANCE), Bytes.toBytes("performance_rating")));
if (hiredate.compareTo("2022-01-01") < 0 && Integer.parseInt(rating) > 4) {
System.out.println("Qualified Employee: Hire Date " + hiredate + ", Rating " + rating);
}
}
table.close();
}
// 查找最近晋升的员工
private static void queryRecentPromotion() throws IOException {
Table table = connection.getTable(TableName.valueOf(TABLE_NAME));
Scan scan = new Scan();
scan.addColumn(Bytes.toBytes(CF_PERFORMANCE), Bytes.toBytes("promotion_date"));
ResultScanner scanner = table.getScanner(scan);
String latestDate = "";
String latestEmp = "";
for (Result result : scanner) {
String promotionDate = Bytes.toString(result.getValue(Bytes.toBytes(CF_PERFORMANCE), Bytes.toBytes("promotion_date")));
if (promotionDate != null && promotionDate.compareTo(latestDate) > 0) {
latestDate = promotionDate;
latestEmp = Bytes.toString(result.getRow());
}
}
System.out.println("Most Recently Promoted Employee: " + latestEmp + ", Promotion Date: " + latestDate);
table.close();
}
// 统计培训课程的参与人数
private static void countTrainingParticipants() throws IOException {
Table table = connection.getTable(TableName.valueOf(TABLE_NAME));
Scan scan = new Scan();
scan.addColumn(Bytes.toBytes(CF_TRAINING), Bytes.toBytes("training_courses"));
ResultScanner scanner = table.getScanner(scan);
Map<String, Integer> countMap = new HashMap<>();
for (Result result : scanner) {
String courses = Bytes.toString(result.getValue(Bytes.toBytes(CF_TRAINING), Bytes.toBytes("training_courses")));
for (String course : courses.split(", ")) {
countMap.put(course, countMap.getOrDefault(course, 0) + 1);
}
}
System.out.println("培训课程统计:");
for (Map.Entry<String, Integer> entry : countMap.entrySet()) {
System.out.println(entry.getKey() + ": " + entry.getValue() + "");
}
table.close();
}
}

View File

@ -1,26 +0,0 @@
// 新建表管理类
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
public class HBaseTableManager {
private static final String TABLE_NAME = "emp1520";
private static final String[] CF_ARRAY = {"empnum", "info", "salary", "performance", "training"};
public static void createTable(Admin admin) throws IOException {
TableName tableName = TableName.valueOf(TABLE_NAME);
if (admin.tableExists(tableName)) {
admin.disableTable(tableName);
admin.deleteTable(tableName);
System.out.println("Existing table removed");
}
HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
for (String cf : CF_ARRAY) {
tableDescriptor.addFamily(new HColumnDescriptor(cf));
}
admin.createTable(tableDescriptor);
System.out.println("Table created successfully");
}
}

View File

@ -0,0 +1,52 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import java.io.IOException;
import java.util.Scanner;
public class TableExist{
public static Configuration configuration;
public static Connection connection;
public static Admin admin;
public static void main(String[] args)throws IOException{
Scanner sc = new Scanner(System.in);
System.out.print("tableName:");
String tableName = sc.next();
init();
TableName tn = TableName.valueOf(tableName);
if(admin.tableExists(tn)) {
System.out.println("table " + tableName + " does exist!");
} else {
System.out.println("table " + tableName + " don't exist!");
}
close();
}
//建立连接
public static void init(){
//根据 hbase-site.xml文件初始化Configuration 对象
configuration = HBaseConfiguration.create();
try{
//根据 Configuration对象初始化Connection 对象
connection = ConnectionFactory.createConnection(configuration);
//获取Admin 对象实例
admin = connection.getAdmin();
}catch (IOException e){
e.printStackTrace();
}
System.out.println("Connect to HBase successfully!");
}
//关闭连接
public static void close(){
try{
if(admin != null){
admin.close();
}
if(null != connection){
connection.close();
}
}catch (IOException e){
e.printStackTrace();
}
}
}

View File

@ -1,28 +0,0 @@
// 新建培训统计类
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.util.*;
public class TrainingDAO {
public void countTrainingParticipants() throws IOException {
try(Table table = HBaseConnectionManager.getConnection().getTable(TableName.valueOf("emp1520"))) {
Scan scan = new Scan();
scan.addColumn(Bytes.toBytes("training"), Bytes.toBytes("training_courses"));
ResultScanner scanner = table.getScanner(scan);
Map<String, Integer> countMap = new HashMap<>();
for (Result result : scanner) {
String courses = Bytes.toString(result.getValue(Bytes.toBytes("training"), Bytes.toBytes("training_courses")));
for (String course : courses.split(", ")) {
countMap.put(course, countMap.getOrDefault(course, 0) + 1);
}
}
System.out.println("培训课程统计:");
for (Map.Entry<String, Integer> entry : countMap.entrySet()) {
System.out.println(entry.getKey() + ": " + entry.getValue() + "");
}
}
}
}