import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; import java.io.*; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.*; public class HBaseEmpManager { private static Configuration configuration; private static Connection connection; private static Admin admin; private static final String TABLE_NAME = "emp1520"; private static final String CF_EMPNUM = "empnum"; private static final String CF_INFO = "info"; private static final String CF_SALARY = "salary"; private static final String CF_PERFORMANCE = "performance"; private static final String CF_TRAINING = "training"; public static void main(String[] args) throws IOException, NoSuchAlgorithmException { init(); // 初始化 HBase 连接 createTable(); // 创建表 insertData("emp.txt"); // 读取 emp.txt 插入数据 queryById(); // 查询 ID > 7500 的员工 queryByPerformance(); // 查询绩效评分 > 4 且入职时间 < 2022 年的员工 queryRecentPromotion(); // 查找最近晋升的员工 countTrainingParticipants(); // 统计培训课程的参与人数 close(); // 关闭连接 } // 初始化 HBase 连接 private static void init() { configuration = HBaseConfiguration.create(); try { connection = ConnectionFactory.createConnection(configuration); admin = connection.getAdmin(); System.out.println("Connected to HBase Successfully!"); } catch (IOException e) { e.printStackTrace(); } } // 关闭 HBase 连接 private static void close() { try { if (admin != null) { admin.close(); } if (connection != null) { connection.close(); } System.out.println("HBase Connection Closed."); } catch (IOException e) { e.printStackTrace(); } } // 创建 HBase 表 private static void createTable() throws IOException { TableName tableName = TableName.valueOf(TABLE_NAME); // 检查表格是否存在 if (admin.tableExists(tableName)) { System.out.println("表 " + TABLE_NAME + " 已经存在,准备禁用并删除..."); // 禁用表格 admin.disableTable(tableName); System.out.println("表 " + TABLE_NAME + " 已禁用。"); // 删除表格 admin.deleteTable(tableName); System.out.println("表 " + TABLE_NAME + " 已删除。"); } HTableDescriptor tableDescriptor = new HTableDescriptor(tableName); tableDescriptor.addFamily(new HColumnDescriptor(CF_EMPNUM)); tableDescriptor.addFamily(new HColumnDescriptor(CF_INFO)); tableDescriptor.addFamily(new HColumnDescriptor(CF_SALARY)); tableDescriptor.addFamily(new HColumnDescriptor(CF_PERFORMANCE)); tableDescriptor.addFamily(new HColumnDescriptor(CF_TRAINING)); admin.createTable(tableDescriptor); System.out.println("Table emp1520 created successfully."); } // 读取并插入数据 private static void insertData(String filePath) throws IOException, NoSuchAlgorithmException { Table table = connection.getTable(TableName.valueOf(TABLE_NAME)); BufferedReader br = new BufferedReader(new FileReader(filePath)); String line; while ((line = br.readLine()) != null) { String[] emp = line.split(","); if (emp.length < 12) continue; // 确保数据完整 String rowKey = generateRowKey(emp[0]); // 使用哈希前缀 Put put = new Put(Bytes.toBytes(rowKey)); put.addColumn(Bytes.toBytes(CF_EMPNUM), Bytes.toBytes("empno"), Bytes.toBytes(emp[0])); put.addColumn(Bytes.toBytes(CF_INFO), Bytes.toBytes("ename"), Bytes.toBytes(emp[1])); put.addColumn(Bytes.toBytes(CF_INFO), Bytes.toBytes("job"), Bytes.toBytes(emp[2])); put.addColumn(Bytes.toBytes(CF_INFO), Bytes.toBytes("mgr"), Bytes.toBytes(emp[3])); put.addColumn(Bytes.toBytes(CF_INFO), Bytes.toBytes("hiredate"), Bytes.toBytes(emp[4])); put.addColumn(Bytes.toBytes(CF_SALARY), Bytes.toBytes("sal"), Bytes.toBytes(emp[5])); put.addColumn(Bytes.toBytes(CF_SALARY), Bytes.toBytes("credit"), Bytes.toBytes(emp[6])); put.addColumn(Bytes.toBytes(CF_INFO), Bytes.toBytes("deptno"), Bytes.toBytes(emp[7])); put.addColumn(Bytes.toBytes(CF_INFO), Bytes.toBytes("work_hours"), Bytes.toBytes(emp[8])); put.addColumn(Bytes.toBytes(CF_PERFORMANCE), Bytes.toBytes("performance_rating"), Bytes.toBytes(emp[9])); put.addColumn(Bytes.toBytes(CF_PERFORMANCE), Bytes.toBytes("promotion_date"), Bytes.toBytes(emp[10])); put.addColumn(Bytes.toBytes(CF_TRAINING), Bytes.toBytes("training_courses"), Bytes.toBytes(emp[11])); table.put(put); } br.close(); table.close(); System.out.println("Data inserted successfully!"); } // 生成 RowKey,避免 Region 热点 private static String generateRowKey(String empno) throws NoSuchAlgorithmException { String prefix = empno.substring(0, 3); MessageDigest md5 = MessageDigest.getInstance("MD5"); byte[] digest = md5.digest(prefix.getBytes()); StringBuilder hexString = new StringBuilder(); for (byte b : digest) { hexString.append(String.format("%02x", b)); } return hexString.substring(0, 4) + empno; } // 查询 ID > 7500 的员工 private static void queryById() throws IOException { Table table = connection.getTable(TableName.valueOf(TABLE_NAME)); Scan scan = new Scan(); scan.addColumn(Bytes.toBytes(CF_EMPNUM), Bytes.toBytes("empno")); ResultScanner scanner = table.getScanner(scan); for (Result result : scanner) { String empno = Bytes.toString(result.getValue(Bytes.toBytes(CF_EMPNUM), Bytes.toBytes("empno"))); if (empno != null && Integer.parseInt(empno) > 7500) { System.out.println("ID > 7500 Employee: " + empno); } } table.close(); } // 查询绩效评分 > 4 且入职时间 < 2022 年 private static void queryByPerformance() throws IOException { Table table = connection.getTable(TableName.valueOf(TABLE_NAME)); Scan scan = new Scan(); scan.addColumn(Bytes.toBytes(CF_INFO), Bytes.toBytes("hiredate")); scan.addColumn(Bytes.toBytes(CF_PERFORMANCE), Bytes.toBytes("performance_rating")); ResultScanner scanner = table.getScanner(scan); for (Result result : scanner) { String hiredate = Bytes.toString(result.getValue(Bytes.toBytes(CF_INFO), Bytes.toBytes("hiredate"))); String rating = Bytes.toString(result.getValue(Bytes.toBytes(CF_PERFORMANCE), Bytes.toBytes("performance_rating"))); if (hiredate.compareTo("2022-01-01") < 0 && Integer.parseInt(rating) > 4) { System.out.println("Qualified Employee: Hire Date " + hiredate + ", Rating " + rating); } } table.close(); } // 查找最近晋升的员工 private static void queryRecentPromotion() throws IOException { Table table = connection.getTable(TableName.valueOf(TABLE_NAME)); Scan scan = new Scan(); scan.addColumn(Bytes.toBytes(CF_PERFORMANCE), Bytes.toBytes("promotion_date")); ResultScanner scanner = table.getScanner(scan); String latestDate = ""; String latestEmp = ""; for (Result result : scanner) { String promotionDate = Bytes.toString(result.getValue(Bytes.toBytes(CF_PERFORMANCE), Bytes.toBytes("promotion_date"))); if (promotionDate != null && promotionDate.compareTo(latestDate) > 0) { latestDate = promotionDate; latestEmp = Bytes.toString(result.getRow()); } } System.out.println("Most Recently Promoted Employee: " + latestEmp + ", Promotion Date: " + latestDate); table.close(); } // 统计培训课程的参与人数 private static void countTrainingParticipants() throws IOException { Table table = connection.getTable(TableName.valueOf(TABLE_NAME)); Scan scan = new Scan(); scan.addColumn(Bytes.toBytes(CF_TRAINING), Bytes.toBytes("training_courses")); ResultScanner scanner = table.getScanner(scan); Map countMap = new HashMap<>(); for (Result result : scanner) { String courses = Bytes.toString(result.getValue(Bytes.toBytes(CF_TRAINING), Bytes.toBytes("training_courses"))); for (String course : courses.split(", ")) { countMap.put(course, countMap.getOrDefault(course, 0) + 1); } } System.out.println("培训课程统计:"); for (Map.Entry entry : countMap.entrySet()) { System.out.println(entry.getKey() + ": " + entry.getValue() + " 人"); } table.close(); } }