From 15b008dc35075a21799654d68db3308ca184ce05 Mon Sep 17 00:00:00 2001 From: fly6516 Date: Wed, 26 Mar 2025 11:15:47 +0800 Subject: [PATCH] =?UTF-8?q?refactor(hbase):=20=E9=87=8D=E6=9E=84=20HBaseEm?= =?UTF-8?q?pManager=20=E7=B1=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 HBaseConnectionManager 类用于管理 HBase 连接 - 新增 HBaseTableManager 类用于管理 HBase 表操作 - 新增 EmployeeDAO 和 TrainingDAO 类用于数据访问和操作 - 优化代码结构,提高可维护性和可扩展性 --- src/main/java/EmployeeDAO.java | 40 ++++ src/main/java/HBaseConnectionManager.java | 41 +++++ src/main/java/HBaseEmpManager.java | 214 ++-------------------- src/main/java/HBaseTableManager.java | 26 +++ src/main/java/TrainingDAO.java | 28 +++ 5 files changed, 151 insertions(+), 198 deletions(-) create mode 100644 src/main/java/EmployeeDAO.java create mode 100644 src/main/java/HBaseConnectionManager.java create mode 100644 src/main/java/HBaseTableManager.java create mode 100644 src/main/java/TrainingDAO.java diff --git a/src/main/java/EmployeeDAO.java b/src/main/java/EmployeeDAO.java new file mode 100644 index 0000000..c15ce47 --- /dev/null +++ b/src/main/java/EmployeeDAO.java @@ -0,0 +1,40 @@ +// 新建员工数据访问类 +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.util.Bytes; +import java.io.*; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.*; + +public class EmployeeDAO { + private static final String[] CF_ARRAY = {"empnum", "info", "salary", "performance", "training"}; + + public void insertData(String filePath) throws IOException, NoSuchAlgorithmException { + try(Table table = HBaseConnectionManager.getConnection().getTable(TableName.valueOf("emp1520"))) { + BufferedReader br = new BufferedReader(new FileReader(filePath)); + String line; + while ((line = br.readLine()) != null) { + String[] emp = line.split(","); + if (emp.length < 12) continue; + + String rowKey = generateRowKey(emp[0]); + Put put = new Put(Bytes.toBytes(rowKey)); + table.put(put); + } + br.close(); + } + System.out.println("Data inserted successfully!"); + } + + private String generateRowKey(String empno) throws NoSuchAlgorithmException { + } + + public void queryById() throws IOException { + } + + public void queryByPerformance() throws IOException { + } + + public void queryRecentPromotion() throws IOException { + } +} \ No newline at end of file diff --git a/src/main/java/HBaseConnectionManager.java b/src/main/java/HBaseConnectionManager.java new file mode 100644 index 0000000..6eeba59 --- /dev/null +++ b/src/main/java/HBaseConnectionManager.java @@ -0,0 +1,41 @@ +// 新建连接管理类 +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Admin; + +public class HBaseConnectionManager { + private static Configuration configuration; + private static Connection connection; + private static Admin admin; + + public static void init() { + configuration = HBaseConfiguration.create(); + try { + connection = ConnectionFactory.createConnection(configuration); + admin = connection.getAdmin(); + System.out.println("Connected to HBase Successfully!"); + } catch (IOException e) { + e.printStackTrace(); + } + } + + public static void close() { + try { + if (admin != null) admin.close(); + if (connection != null) connection.close(); + System.out.println("HBase Connection Closed."); + } catch (IOException e) { + e.printStackTrace(); + } + } + + public static Admin getAdmin() { + return admin; + } + + public static Connection getConnection() { + return connection; + } +} \ No newline at end of file diff --git a/src/main/java/HBaseEmpManager.java b/src/main/java/HBaseEmpManager.java index b3a2923..dd1a777 100644 --- a/src/main/java/HBaseEmpManager.java +++ b/src/main/java/HBaseEmpManager.java @@ -8,206 +8,24 @@ import java.security.NoSuchAlgorithmException; import java.util.*; public class HBaseEmpManager { - private static Configuration configuration; - private static Connection connection; - private static Admin admin; - - private static final String TABLE_NAME = "emp1520"; - private static final String CF_EMPNUM = "empnum"; - private static final String CF_INFO = "info"; - private static final String CF_SALARY = "salary"; - private static final String CF_PERFORMANCE = "performance"; - private static final String CF_TRAINING = "training"; - - public static void main(String[] args) throws IOException, NoSuchAlgorithmException { - init(); // 初始化 HBase 连接 - createTable(); // 创建表 - insertData("emp.txt"); // 读取 emp.txt 插入数据 - queryById(); // 查询 ID > 7500 的员工 - queryByPerformance(); // 查询绩效评分 > 4 且入职时间 < 2022 年的员工 - queryRecentPromotion(); // 查找最近晋升的员工 - countTrainingParticipants(); // 统计培训课程的参与人数 - close(); // 关闭连接 - } - - // 初始化 HBase 连接 - private static void init() { - configuration = HBaseConfiguration.create(); + public static void main(String[] args) { try { - connection = ConnectionFactory.createConnection(configuration); - admin = connection.getAdmin(); - System.out.println("Connected to HBase Successfully!"); - } catch (IOException e) { + HBaseConnectionManager.init(); + HBaseTableManager.createTable(HBaseConnectionManager.getAdmin()); + + EmployeeDAO employeeDAO = new EmployeeDAO(); + employeeDAO.insertData("emp.txt"); + employeeDAO.queryById(); + employeeDAO.queryByPerformance(); + employeeDAO.queryRecentPromotion(); + + TrainingDAO trainingDAO = new TrainingDAO(); + trainingDAO.countTrainingParticipants(); + + } catch (Exception e) { e.printStackTrace(); + } finally { + HBaseConnectionManager.close(); } } - - // 关闭 HBase 连接 - private static void close() { - try { - if (admin != null) { - admin.close(); - } - if (connection != null) { - connection.close(); - } - System.out.println("HBase Connection Closed."); - } catch (IOException e) { - e.printStackTrace(); - } - } - - // 创建 HBase 表 - private static void createTable() throws IOException { - TableName tableName = TableName.valueOf(TABLE_NAME); - // 检查表格是否存在 - if (admin.tableExists(tableName)) { - System.out.println("表 " + TABLE_NAME + " 已经存在,准备禁用并删除..."); - - // 禁用表格 - admin.disableTable(tableName); - System.out.println("表 " + TABLE_NAME + " 已禁用。"); - - // 删除表格 - admin.deleteTable(tableName); - System.out.println("表 " + TABLE_NAME + " 已删除。"); - } - - HTableDescriptor tableDescriptor = new HTableDescriptor(tableName); - tableDescriptor.addFamily(new HColumnDescriptor(CF_EMPNUM)); - tableDescriptor.addFamily(new HColumnDescriptor(CF_INFO)); - tableDescriptor.addFamily(new HColumnDescriptor(CF_SALARY)); - tableDescriptor.addFamily(new HColumnDescriptor(CF_PERFORMANCE)); - tableDescriptor.addFamily(new HColumnDescriptor(CF_TRAINING)); - - admin.createTable(tableDescriptor); - System.out.println("Table emp1520 created successfully."); - } - - // 读取并插入数据 - private static void insertData(String filePath) throws IOException, NoSuchAlgorithmException { - Table table = connection.getTable(TableName.valueOf(TABLE_NAME)); - BufferedReader br = new BufferedReader(new FileReader(filePath)); - String line; - - while ((line = br.readLine()) != null) { - String[] emp = line.split(","); - - if (emp.length < 12) continue; // 确保数据完整 - - String rowKey = generateRowKey(emp[0]); // 使用哈希前缀 - - Put put = new Put(Bytes.toBytes(rowKey)); - put.addColumn(Bytes.toBytes(CF_EMPNUM), Bytes.toBytes("empno"), Bytes.toBytes(emp[0])); - put.addColumn(Bytes.toBytes(CF_INFO), Bytes.toBytes("ename"), Bytes.toBytes(emp[1])); - put.addColumn(Bytes.toBytes(CF_INFO), Bytes.toBytes("job"), Bytes.toBytes(emp[2])); - put.addColumn(Bytes.toBytes(CF_INFO), Bytes.toBytes("mgr"), Bytes.toBytes(emp[3])); - put.addColumn(Bytes.toBytes(CF_INFO), Bytes.toBytes("hiredate"), Bytes.toBytes(emp[4])); - put.addColumn(Bytes.toBytes(CF_SALARY), Bytes.toBytes("sal"), Bytes.toBytes(emp[5])); - put.addColumn(Bytes.toBytes(CF_SALARY), Bytes.toBytes("credit"), Bytes.toBytes(emp[6])); - put.addColumn(Bytes.toBytes(CF_INFO), Bytes.toBytes("deptno"), Bytes.toBytes(emp[7])); - put.addColumn(Bytes.toBytes(CF_INFO), Bytes.toBytes("work_hours"), Bytes.toBytes(emp[8])); - put.addColumn(Bytes.toBytes(CF_PERFORMANCE), Bytes.toBytes("performance_rating"), Bytes.toBytes(emp[9])); - put.addColumn(Bytes.toBytes(CF_PERFORMANCE), Bytes.toBytes("promotion_date"), Bytes.toBytes(emp[10])); - put.addColumn(Bytes.toBytes(CF_TRAINING), Bytes.toBytes("training_courses"), Bytes.toBytes(emp[11])); - - table.put(put); - } - - br.close(); - table.close(); - System.out.println("Data inserted successfully!"); - } - - // 生成 RowKey,避免 Region 热点 - private static String generateRowKey(String empno) throws NoSuchAlgorithmException { - String prefix = empno.substring(0, 3); - MessageDigest md5 = MessageDigest.getInstance("MD5"); - byte[] digest = md5.digest(prefix.getBytes()); - StringBuilder hexString = new StringBuilder(); - for (byte b : digest) { - hexString.append(String.format("%02x", b)); - } - return hexString.substring(0, 4) + empno; - } - - // 查询 ID > 7500 的员工 - private static void queryById() throws IOException { - Table table = connection.getTable(TableName.valueOf(TABLE_NAME)); - Scan scan = new Scan(); - scan.addColumn(Bytes.toBytes(CF_EMPNUM), Bytes.toBytes("empno")); - - ResultScanner scanner = table.getScanner(scan); - for (Result result : scanner) { - String empno = Bytes.toString(result.getValue(Bytes.toBytes(CF_EMPNUM), Bytes.toBytes("empno"))); - if (empno != null && Integer.parseInt(empno) > 7500) { - System.out.println("ID > 7500 Employee: " + empno); - } - } - table.close(); - } - - // 查询绩效评分 > 4 且入职时间 < 2022 年 - private static void queryByPerformance() throws IOException { - Table table = connection.getTable(TableName.valueOf(TABLE_NAME)); - Scan scan = new Scan(); - scan.addColumn(Bytes.toBytes(CF_INFO), Bytes.toBytes("hiredate")); - scan.addColumn(Bytes.toBytes(CF_PERFORMANCE), Bytes.toBytes("performance_rating")); - - ResultScanner scanner = table.getScanner(scan); - for (Result result : scanner) { - String hiredate = Bytes.toString(result.getValue(Bytes.toBytes(CF_INFO), Bytes.toBytes("hiredate"))); - String rating = Bytes.toString(result.getValue(Bytes.toBytes(CF_PERFORMANCE), Bytes.toBytes("performance_rating"))); - - if (hiredate.compareTo("2022-01-01") < 0 && Integer.parseInt(rating) > 4) { - System.out.println("Qualified Employee: Hire Date " + hiredate + ", Rating " + rating); - } - } - table.close(); - } - - // 查找最近晋升的员工 - private static void queryRecentPromotion() throws IOException { - Table table = connection.getTable(TableName.valueOf(TABLE_NAME)); - Scan scan = new Scan(); - scan.addColumn(Bytes.toBytes(CF_PERFORMANCE), Bytes.toBytes("promotion_date")); - - ResultScanner scanner = table.getScanner(scan); - String latestDate = ""; - String latestEmp = ""; - - for (Result result : scanner) { - String promotionDate = Bytes.toString(result.getValue(Bytes.toBytes(CF_PERFORMANCE), Bytes.toBytes("promotion_date"))); - if (promotionDate != null && promotionDate.compareTo(latestDate) > 0) { - latestDate = promotionDate; - latestEmp = Bytes.toString(result.getRow()); - } - } - System.out.println("Most Recently Promoted Employee: " + latestEmp + ", Promotion Date: " + latestDate); - table.close(); - } - - // 统计培训课程的参与人数 - private static void countTrainingParticipants() throws IOException { - Table table = connection.getTable(TableName.valueOf(TABLE_NAME)); - Scan scan = new Scan(); - scan.addColumn(Bytes.toBytes(CF_TRAINING), Bytes.toBytes("training_courses")); - - ResultScanner scanner = table.getScanner(scan); - Map countMap = new HashMap<>(); - - for (Result result : scanner) { - String courses = Bytes.toString(result.getValue(Bytes.toBytes(CF_TRAINING), Bytes.toBytes("training_courses"))); - for (String course : courses.split(", ")) { - countMap.put(course, countMap.getOrDefault(course, 0) + 1); - } - } - - System.out.println("培训课程统计:"); - for (Map.Entry entry : countMap.entrySet()) { - System.out.println(entry.getKey() + ": " + entry.getValue() + " 人"); - } - - table.close(); - } } diff --git a/src/main/java/HBaseTableManager.java b/src/main/java/HBaseTableManager.java new file mode 100644 index 0000000..02cb7c5 --- /dev/null +++ b/src/main/java/HBaseTableManager.java @@ -0,0 +1,26 @@ +// 新建表管理类 +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; + +public class HBaseTableManager { + private static final String TABLE_NAME = "emp1520"; + private static final String[] CF_ARRAY = {"empnum", "info", "salary", "performance", "training"}; + + public static void createTable(Admin admin) throws IOException { + TableName tableName = TableName.valueOf(TABLE_NAME); + if (admin.tableExists(tableName)) { + admin.disableTable(tableName); + admin.deleteTable(tableName); + System.out.println("Existing table removed"); + } + + HTableDescriptor tableDescriptor = new HTableDescriptor(tableName); + for (String cf : CF_ARRAY) { + tableDescriptor.addFamily(new HColumnDescriptor(cf)); + } + admin.createTable(tableDescriptor); + System.out.println("Table created successfully"); + } +} \ No newline at end of file diff --git a/src/main/java/TrainingDAO.java b/src/main/java/TrainingDAO.java new file mode 100644 index 0000000..d0fa159 --- /dev/null +++ b/src/main/java/TrainingDAO.java @@ -0,0 +1,28 @@ +// 新建培训统计类 +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.util.Bytes; +import java.util.*; + +public class TrainingDAO { + public void countTrainingParticipants() throws IOException { + try(Table table = HBaseConnectionManager.getConnection().getTable(TableName.valueOf("emp1520"))) { + Scan scan = new Scan(); + scan.addColumn(Bytes.toBytes("training"), Bytes.toBytes("training_courses")); + + ResultScanner scanner = table.getScanner(scan); + Map countMap = new HashMap<>(); + + for (Result result : scanner) { + String courses = Bytes.toString(result.getValue(Bytes.toBytes("training"), Bytes.toBytes("training_courses"))); + for (String course : courses.split(", ")) { + countMap.put(course, countMap.getOrDefault(course, 0) + 1); + } + } + + System.out.println("培训课程统计:"); + for (Map.Entry entry : countMap.entrySet()) { + System.out.println(entry.getKey() + ": " + entry.getValue() + " 人"); + } + } + } +} \ No newline at end of file