From fd26fe33f31eccd92aab1620a82ab1491e5cf152 Mon Sep 17 00:00:00 2001 From: fly6516 Date: Wed, 26 Mar 2025 09:30:29 +0800 Subject: [PATCH] =?UTF-8?q?feat(EmpHBaseClient):=E9=87=8D=E6=9E=84?= =?UTF-8?q?=E5=B9=B6=E6=B7=BB=E5=8A=A0=E6=96=B0=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit -重构类结构,添加成员变量并优化资源管理 - 新增生成RowKey前缀的方法,使用MD5哈希 - 添加插入员工数据的方法,支持单条目插入 - 实现从CSV文件加载员工数据的方法 - 新增查询ID 7500以上员工的方法 - 修改绩效评分和入职时间联合查询方法 - 优化最近晋升查询方法,输出最新记录 - 改进培训课程统计方法,优化输出格式 --- src/main/java/EmpHBaseClient.java | 319 ++++++++++++++++++------------ 1 file changed, 193 insertions(+), 126 deletions(-) diff --git a/src/main/java/EmpHBaseClient.java b/src/main/java/EmpHBaseClient.java index e8afd9a..b2b0ffc 100644 --- a/src/main/java/EmpHBaseClient.java +++ b/src/main/java/EmpHBaseClient.java @@ -1,141 +1,208 @@ -// 创建职工表结构 -public void createEmpTable() throws IOException { - HBaseAdmin admin = new HBaseAdmin(conf); - if (admin.tableExists("emp1520")) { - System.out.println("Table already exists"); - return; - } - HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf("emp1520")); - tableDesc.addFamily(new HColumnDescriptor("empnum")); // 存储员工ID - tableDesc.addFamily(new HColumnDescriptor("info")); // 存储基本信息 - tableDesc.addFamily(new HColumnDescriptor("salary")); // 存储薪资 - tableDesc.addFamily(new HColumnDescriptor("performance")); // 存储绩效信息 - tableDesc.addFamily(new HColumnDescriptor("training")); // 存储培训课程 - admin.createTable(tableDesc); - admin.close(); -} +public class EmpHBaseClient { + private Configuration conf; + private Connection connection; + private Table table; // 添加表对象成员变量 -// 插入数据实现 -public void loadData() throws IOException { - Table table = connection.getTable(TableName.valueOf("emp1520")); - try (BufferedReader br = new BufferedReader(new FileReader("emp.txt"))) { - String line; - while ((line = br.readLine()) != null) { - String[] fields = line.split(","); - String empno = fields[0]; - String rowKey = generateRowKey(empno); // 生成MD5前缀+empno的RowKey - Put put = new Put(Bytes.toBytes(rowKey)); - // empnum列族 - put.addColumn(Bytes.toBytes("empnum"), Bytes.toBytes("empno"), Bytes.toBytes(empno)); - // info列族 - put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("ename"), Bytes.toBytes(fields[1])); - put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("job"), Bytes.toBytes(fields[2])); - put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("mgr"), Bytes.toBytes(fields[3])); - put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("hiredate"), Bytes.toBytes(fields[4])); - put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("credit"), Bytes.toBytes(fields[5])); - // salary列族 - put.addColumn(Bytes.toBytes("salary"), Bytes.toBytes("sal"), Bytes.toBytes(fields[6])); - // performance列族 - put.addColumn(Bytes.toBytes("performance"), Bytes.toBytes("work_hours"), Bytes.toBytes(fields[7])); - put.addColumn(Bytes.toBytes("performance"), Bytes.toBytes("performance_rating"), Bytes.toBytes(fields[8])); - put.addColumn(Bytes.toBytes("performance"), Bytes.toBytes("promotion_date"), Bytes.toBytes(fields[9])); - // training列族 - put.addColumn(Bytes.toBytes("training"), Bytes.toBytes("training_courses"), Bytes.toBytes(fields[10])); - table.put(put); + public EmpHBaseClient() { + conf = HBaseConfiguration.create(); + conf.set("hbase.zookeeper.quorum", "localhost"); + try { + connection = ConnectionFactory.createConnection(conf); + table = connection.getTable(TableName.valueOf("emp1520")); + } catch (IOException e) { + e.printStackTrace(); } } -} -// RowKey生成方法 -private String generateRowKey(String empno) { - String prefix = empno.substring(0,3); - String md5 = md5Hash(prefix); - return md5.substring(0,4) + empno; // 前4位MD5+原empno -} - -// MD5计算方法 -private static String md5Hash(String input) { - try { - MessageDigest md = MessageDigest.getInstance("MD5"); - byte[] messageDigest = md.digest(input.getBytes()); - BigInteger number = new BigInteger(1, messageDigest); - return number.toString(16); - } catch (NoSuchAlgorithmException e) { - throw new RuntimeException(e); + // 创建职工表结构 + public void createEmpTable() throws IOException { + HBaseAdmin admin = new HBaseAdmin(conf); + if (admin.tableExists("emp1520")) { + System.out.println("Table already exists"); + return; + } + HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf("emp1520")); + tableDesc.addFamily(new HColumnDescriptor("empnum")); // 存储员工ID + tableDesc.addFamily(new HColumnDescriptor("info")); // 存储基本信息 + tableDesc.addFamily(new HColumnDescriptor("salary")); // 存储薪资 + tableDesc.addFamily(new HColumnDescriptor("performance")); // 存储绩效信息 + tableDesc.addFamily(new HColumnDescriptor("training")); // 存储培训课程 + admin.createTable(tableDesc); + admin.close(); } -} -// 查询ID>7500的员工 -public void queryEmpNoOver7500() throws IOException { - Scan scan = new Scan(); - Filter filter = new SingleColumnValueFilter( + // 新增方法:生成RowKey前缀(MD5处理) + private String generateRowKeyPrefix(String empno) { + String firstThreeDigits = empno.substring(0, 3); + // 使用MD5哈希并截取前4位(避免过长) + 分隔符 + String md5 = DigestUtils.md5Hex(firstThreeDigits).substring(0, 4); + return md5 + "-"; + } + + // 新增插入数据方法(核心实现) + public void insertEmployee(String empno, String ename, String job, String mgr, String hiredate, String sal, String credit, String deptno, String work_hours, String performance_rating, String promotion_date, String training_courses) throws IOException { + // 生成符合要求的RowKey:MD5前缀 + 原始empno + String rowKey = generateRowKeyPrefix(empno) + empno; + Put put = new Put(Bytes.toBytes(rowKey)); + + // 列族分配逻辑(按实验要求字段划分) + // empnum列族:存储员工ID(空间换时间设计) + put.addColumn(Bytes.toBytes("empnum"), Bytes.toBytes("empno"), Bytes.toBytes(empno)); + // info列族:基本信息存储 + put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("ename"), Bytes.toBytes(ename)); + put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("job"), Bytes.toBytes(job)); + put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("mgr"), Bytes.toBytes(mgr == null ? "" : mgr)); + put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("hiredate"), Bytes.toBytes(hiredate)); + put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("deptno"), Bytes.toBytes(deptno)); // 新增部门号字段存储 + // salary列族:薪资相关字段存储 + put.addColumn(Bytes.toBytes("salary"), Bytes.toBytes("sal"), Bytes.toBytes(sal)); + put.addColumn(Bytes.toBytes("salary"), Bytes.toBytes("credit"), Bytes.toBytes(credit)); + // performance列族:绩效相关字段存储 + put.addColumn(Bytes.toBytes("performance"), Bytes.toBytes("performance_rating"), Bytes.toBytes(performance_rating)); + put.addColumn(Bytes.toBytes("performance"), Bytes.toBytes("promotion_date"), Bytes.toBytes(promotion_date)); + put.addColumn(Bytes.toBytes("performance"), Bytes.toBytes("work_hours"), Bytes.toBytes(work_hours)); // 新增工作时长字段存储 + // training列族:培训课程存储(逗号分隔的字符串) + put.addColumn(Bytes.toBytes("training"), Bytes.toBytes("training_courses"), Bytes.toBytes(training_courses)); + + // 写入表数据 + table.put(put); + } + + // 新增CSV文件加载方法(读取emp.txt并插入数据) + public void loadEmployeesFromCSV(String filePath) throws IOException { + try (BufferedReader br = new BufferedReader(new FileReader(filePath))) { + String line; + while ((line = br.readLine()) != null) { + // 处理CSV行(注意处理引号包裹的字段) + String[] parts = line.replaceAll("\"", "").split(",", -1); + if (parts.length >= 12) { + insertEmployee( + parts[0], parts[1], parts[2], parts[3], parts[4], + parts[5], parts[6], parts[7], parts[8], + parts[9], parts[10], parts[11] + ); + } + } + } + } + + // 修改构造函数:关闭资源时确保连接释放(新增close方法) + public void close() throws IOException { + if (table != null) table.close(); + if (connection != null) connection.close(); + } + + // 新增查询ID 7500以上员工方法 + public void findEmployeesWithEmpnoAbove7500() throws IOException { + Scan scan = new Scan(); + SingleColumnValueFilter filter = new SingleColumnValueFilter( Bytes.toBytes("empnum"), Bytes.toBytes("empno"), CompareFilter.CompareOp.GREATER_OR_EQUAL, new SubstringComparator("7500") - ); - scan.setFilter(filter); - ResultScanner scanner = table.getScanner(scan); - for (Result result : scanner) { - // 输出结果处理 - } -} - -// 绩效>4且入职早于2022年 -public void queryPerformanceAndHiredate() throws IOException { - FilterList list = new FilterList(FilterList.Operator.MUST_PASS_ALL); - list.addFilter(new SingleColumnValueFilter( - Bytes.toBytes("performance"), - Bytes.toBytes("performance_rating"), - CompareFilter.CompareOp.GREATER_OR_EQUAL, - new SubstringComparator("5") - )); - list.addFilter(new SingleColumnValueFilter( - Bytes.toBytes("info"), - Bytes.toBytes("hiredate"), - CompareFilter.CompareOp.LESS, - new SubstringComparator("2022-01-01") - )); - Scan scan = new Scan(); - scan.setFilter(list); - ResultScanner scanner = table.getScanner(scan); - // 结果处理逻辑 -} - -// 最近晋升员工查询 -public void findLatestPromotion() throws IOException { - Scan scan = new Scan(); - ResultScanner scanner = table.getScanner(scan); - List records = new ArrayList<>(); - for (Result result : scanner) { - String promotionDateStr = Bytes.toString( - result.getValue(Bytes.toBytes("performance"), Bytes.toBytes("promotion_date")) ); - if (promotionDateStr != null) { - records.add(new PromotionRecord( - Bytes.toString(result.getRow()), - LocalDate.parse(promotionDateStr) - )); - } - } - records.sort(Comparator.comparing(PromotionRecord::getDate).reversed()); - // 输出最新记录 -} + filter.setFilterIfMissing(true); + scan.setFilter(filter); -// 培训课程统计 -public void countTrainingCourses() throws IOException { - Map courseMap = new HashMap<>(); - Scan scan = new Scan(); - ResultScanner scanner = table.getScanner(scan); - for (Result result : scanner) { - String coursesStr = Bytes.toString( - result.getValue(Bytes.toBytes("training"), Bytes.toBytes("training_courses")) - ); - if (coursesStr != null) { - for (String course : coursesStr.split(",")) { - courseMap.put(course, courseMap.getOrDefault(course, 0)+1); + try (ResultScanner scanner = table.getScanner(scan)) { + for (Result result : scanner) { + System.out.println("RowKey: " + Bytes.toString(result.getRow())); + // 可根据需要扩展字段输出 } } } - // 输出统计结果 -} + + // 新增绩效评分和入职时间联合查询方法 + public void findQualifiedEmployees() throws IOException { + Scan scan = new Scan(); + + // 绩效评分>4 + SingleColumnValueFilter filter1 = new SingleColumnValueFilter( + Bytes.toBytes("performance"), + Bytes.toBytes("performance_rating"), + CompareFilter.CompareOp.GREATER_THAN, + new BinaryComparator(Bytes.toBytes("4")) + ); + // 入职时间早于2022年 + SingleColumnValueFilter filter2 = new SingleColumnValueFilter( + Bytes.toBytes("info"), + Bytes.toBytes("hiredate"), + CompareFilter.CompareOp.LESS, + new BinaryComparator(Bytes.toBytes("2022-01-01")) + ); + + FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL); + filterList.addFilter(filter1); + filterList.addFilter(filter2); + scan.setFilter(filterList); + + try (ResultScanner scanner = table.getScanner(scan)) { + for (Result result : scanner) { + System.out.println("RowKey: " + Bytes.toString(result.getRow())); + } + } + } + + // 修改最近晋升查询方法 + public void findLatestPromotion() throws IOException { + Scan scan = new Scan(); + List records = new ArrayList<>(); + try (ResultScanner scanner = table.getScanner(scan)) { + for (Result result : scanner) { + String promotionDateStr = Bytes.toString( + result.getValue(Bytes.toBytes("performance"), Bytes.toBytes("promotion_date")) + ); + if (promotionDateStr != null) { + records.add(new PromotionRecord( + Bytes.toString(result.getRow()), + LocalDate.parse(promotionDateStr) + )); + } + } + } + + records.sort(Comparator.comparing(PromotionRecord::getDate).reversed()); + if (!records.isEmpty()) { + PromotionRecord latest = records.get(0); + System.out.println("Latest promotion: RowKey=" + latest.rowKey + ", Date=" + latest.date); + } + } + + // 修改培训课程统计方法 + public void countTrainingCourses() throws IOException { + Map courseMap = new HashMap<>(); + Scan scan = new Scan(); + try (ResultScanner scanner = table.getScanner(scan)) { + for (Result result : scanner) { + String coursesStr = Bytes.toString( + result.getValue(Bytes.toBytes("training"), Bytes.toBytes("training_courses")) + ); + if (coursesStr != null) { + for (String course : coursesStr.split(",")) { + courseMap.put(course, courseMap.getOrDefault(course, 0)+1); + } + } + } + } + + // 输出统计结果 + for (Map.Entry entry : courseMap.entrySet()) { + System.out.printf("Course: %-20s Count: %d%n", entry.getKey(), entry.getValue()); + } + } + + // 新增内部类处理晋升记录 + private static class PromotionRecord { + private final String rowKey; + private final LocalDate date; + + public PromotionRecord(String rowKey, LocalDate date) { + this.rowKey = rowKey; + this.date = date; + } + + public LocalDate getDate() { + return date; + } + } +} \ No newline at end of file