diff --git a/src/main/java/EmpHBaseClient.java b/src/main/java/EmpHBaseClient.java deleted file mode 100644 index 50906bd..0000000 --- a/src/main/java/EmpHBaseClient.java +++ /dev/null @@ -1,298 +0,0 @@ -import java.io.BufferedReader; -import java.io.FileReader; -import java.io.IOException; -import java.time.LocalDate; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -//import org.apache.hadoop.hbase.client.HTableDescriptor; -//import org.apache.hadoop.hbase.client.HColumnDescriptor; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -//import org.apache.hadoop.hbase.client.Filter; -import org.apache.hadoop.hbase.filter.BinaryComparator; -import org.apache.hadoop.hbase.filter.CompareFilter; -import org.apache.hadoop.hbase.filter.FilterList; -import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.commons.codec.digest.DigestUtils; - -public class EmpHBaseClient { - private static Configuration conf; - private static Connection connection; - private static Admin admin; - private Table table; // 添加表对象成员变量 - - // 新增静态初始化方法 - public static void init() { - conf = HBaseConfiguration.create(); - conf.set("hbase.zookeeper.quorum", "localhost"); - conf.set("hbase.zookeeper.property.clientPort", "2181"); - conf.set("hbase.client.retries.number", "5"); - conf.set("hbase.client.pause", "1000"); - conf.set("hbase.rpc.timeout", "120000"); - conf.set("hbase.zookeeper.connection.timeout", "30000"); - - try { - connection = ConnectionFactory.createConnection(conf); - admin = connection.getAdmin(); - } catch (IOException e) { - e.printStackTrace(); - throw new RuntimeException("HBase连接初始化失败", e); - } - } - - // 修改构造函数依赖静态连接 - public EmpHBaseClient() { - if (connection == null) { - init(); - } - try { - table = connection.getTable(TableName.valueOf("emp1520")); - } catch (IOException e) { - e.printStackTrace(); - throw new RuntimeException("获取表失败", e); - } - } - - // 创建职工表结构 - public void createEmpTable() throws IOException { - if (admin == null) { - throw new IOException("Admin未初始化"); - } - if (admin.tableExists(TableName.valueOf("emp1520"))) { - System.out.println("Table already exists"); - return; - } - HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf("emp1520")); - tableDesc.addFamily(new HColumnDescriptor("empnum")); // 存储员工ID - tableDesc.addFamily(new HColumnDescriptor("info")); // 存储基本信息 - tableDesc.addFamily(new HColumnDescriptor("salary")); // 存储薪资 - tableDesc.addFamily(new HColumnDescriptor("performance")); // 存储绩效信息 - tableDesc.addFamily(new HColumnDescriptor("training")); // 存储培训课程 - admin.createTable(tableDesc); - } - - // 新增方法:生成RowKey前缀(MD5处理) - private String generateRowKeyPrefix(String empno) { - String firstThreeDigits = empno.substring(0, 3); - // 使用MD5哈希并截取前4位(避免过长) + 分隔符 - String md5 = DigestUtils.md5Hex(firstThreeDigits).substring(0, 4); - return md5 + "-"; - } - - // 新增插入数据方法(核心实现) - public void insertEmployee(String empno, String ename, String job, String mgr, String hiredate, String sal, String credit, String deptno, String work_hours, String performance_rating, String promotion_date, String training_courses) throws IOException { - // 生成符合要求的RowKey:MD5前缀 + 原始empno - String rowKey = generateRowKeyPrefix(empno) + empno; - Put put = new Put(Bytes.toBytes(rowKey)); - - // 列族分配逻辑(按实验要求字段划分) - // empnum列族:存储员工ID(空间换时间设计) - put.addColumn(Bytes.toBytes("empnum"), Bytes.toBytes("empno"), Bytes.toBytes(empno)); - // info列族:基本信息存储 - put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("ename"), Bytes.toBytes(ename)); - put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("job"), Bytes.toBytes(job)); - put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("mgr"), Bytes.toBytes(mgr)); // 删除原条件判断 - put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("hiredate"), Bytes.toBytes(hiredate)); - put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("deptno"), Bytes.toBytes(deptno)); // 新增部门号字段存储 - // salary列族:薪资相关字段存储 - put.addColumn(Bytes.toBytes("salary"), Bytes.toBytes("sal"), Bytes.toBytes(sal)); - put.addColumn(Bytes.toBytes("salary"), Bytes.toBytes("credit"), Bytes.toBytes(credit)); - // performance列族:绩效相关字段存储 - put.addColumn(Bytes.toBytes("performance"), Bytes.toBytes("performance_rating"), Bytes.toBytes(performance_rating)); - put.addColumn(Bytes.toBytes("performance"), Bytes.toBytes("promotion_date"), Bytes.toBytes(promotion_date)); - put.addColumn(Bytes.toBytes("performance"), Bytes.toBytes("work_hours"), Bytes.toBytes(work_hours)); // 新增工作时长字段存储 - // training列族:培训课程存储(逗号分隔的字符串) - put.addColumn(Bytes.toBytes("training"), Bytes.toBytes("training_courses"), Bytes.toBytes(training_courses)); - - // 写入表数据 - table.put(put); - } - - // 新增CSV文件加载方法(读取emp.txt并插入数据) - public void loadEmployeesFromCSV(String filePath) throws IOException { - try (BufferedReader br = new BufferedReader(new FileReader(filePath))) { - String line; - while ((line = br.readLine()) != null) { - // 处理CSV行(注意处理引号包裹的字段) - String[] parts = line.replaceAll("\"", "").split(",", -1); - if (parts.length >= 12) { - insertEmployee( - parts[0], parts[1], parts[2], parts[3], parts[4], - parts[5], parts[6], parts[7], parts[8], - parts[9], parts[10], parts[11] - ); - } - } - } - } - - // 修改构造函数:关闭资源时确保连接释放(新增close方法) - public static void close() { - try { - if (admin != null) admin.close(); - if (connection != null) connection.close(); - } catch (IOException e) { - e.printStackTrace(); - } - } - - // 新增查询ID 7500以上员工方法 - public void findEmployeesWithEmpnoAbove7500() throws IOException { - Scan scan = new Scan(); - SingleColumnValueFilter filter = new SingleColumnValueFilter( - Bytes.toBytes("empnum"), - Bytes.toBytes("empno"), - CompareFilter.CompareOp.GREATER_OR_EQUAL, - new BinaryComparator(Bytes.toBytes("7500")) - ); - filter.setFilterIfMissing(true); - scan.setFilter(filter); - - try (ResultScanner scanner = table.getScanner(scan)) { - for (Result result : scanner) { - System.out.println("RowKey: " + Bytes.toString(result.getRow())); - // 可根据需要扩展字段输出 - } - } - } - - // 新增绩效评分和入职时间联合查询方法 - public void findQualifiedEmployees() throws IOException { - Scan scan = new Scan(); - - // 绩效评分>4 - SingleColumnValueFilter filter1 = new SingleColumnValueFilter( - Bytes.toBytes("performance"), - Bytes.toBytes("performance_rating"), - CompareFilter.CompareOp.GREATER, // 修改点:将GREATER_THAN改为GREATER(HBase 1.1.5版本不支持GREATER_THAN) - new BinaryComparator(Bytes.toBytes("4")) - ); - // 入职时间早于2022年 - SingleColumnValueFilter filter2 = new SingleColumnValueFilter( - Bytes.toBytes("info"), - Bytes.toBytes("hiredate"), - CompareFilter.CompareOp.LESS, - new BinaryComparator(Bytes.toBytes("2022-01-01")) - ); - - FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL); - filterList.addFilter(filter1); - filterList.addFilter(filter2); - scan.setFilter(filterList); - - try (ResultScanner scanner = table.getScanner(scan)) { - for (Result result : scanner) { - System.out.println("RowKey: " + Bytes.toString(result.getRow())); - } - } - } - - // 修改最近晋升查询方法 - public void findLatestPromotion() throws IOException { - Scan scan = new Scan(); - List records = new ArrayList<>(); - try (ResultScanner scanner = table.getScanner(scan)) { - for (Result result : scanner) { - String promotionDateStr = Bytes.toString( - result.getValue(Bytes.toBytes("performance"), Bytes.toBytes("promotion_date")) - ); - if (promotionDateStr != null && !promotionDateStr.isEmpty()) { // 新增空值检查 - records.add(new PromotionRecord( - Bytes.toString(result.getRow()), - LocalDate.parse(promotionDateStr) - )); - } - } - } - - records.sort(Comparator.comparing(PromotionRecord::getDate).reversed()); - if (!records.isEmpty()) { - PromotionRecord latest = records.get(0); - System.out.println("Latest promotion: RowKey=" + latest.rowKey + ", Date=" + latest.date); - } - } - - // 修改培训课程统计方法 - public void countTrainingCourses() throws IOException { - Map courseMap = new HashMap<>(); - Scan scan = new Scan(); - try (ResultScanner scanner = table.getScanner(scan)) { - for (Result result : scanner) { - String coursesStr = Bytes.toString( - result.getValue(Bytes.toBytes("training"), Bytes.toBytes("training_courses")) - ); - if (coursesStr != null) { - for (String course : coursesStr.split(",")) { - courseMap.put(course, courseMap.getOrDefault(course, 0)+1); - } - } - } - } - - // 输出统计结果 - for (Map.Entry entry : courseMap.entrySet()) { - System.out.printf("Course: %-20s Count: %d%n", entry.getKey(), entry.getValue()); - } - } - - // 新增内部类处理晋升记录 - private static class PromotionRecord { - private final String rowKey; - private final LocalDate date; - - public PromotionRecord(String rowKey, LocalDate date) { - this.rowKey = rowKey; - this.date = date; - } - - public LocalDate getDate() { - return date; - } - } - - // 新增main方法作为入口 - public static void main(String[] args) { - try { - EmpHBaseClient.init(); // 新增静态初始化调用 - EmpHBaseClient client = new EmpHBaseClient(); - System.out.println("1. 创建表结构..."); - client.createEmpTable(); - - System.out.println("2. 加载CSV数据..."); - client.loadEmployeesFromCSV("emp.txt"); - - System.out.println("\n3. 查询ID≥7500的员工:"); - client.findEmployeesWithEmpnoAbove7500(); - - System.out.println("\n4. 查询绩效>4且入职早于2022的员工:"); - client.findQualifiedEmployees(); - - System.out.println("\n5. 最近晋升记录:"); - client.findLatestPromotion(); - - System.out.println("\n6. 培训课程统计:"); - client.countTrainingCourses(); - - } catch (IOException e) { - e.printStackTrace(); - } finally { - EmpHBaseClient.close(); // 新增静态关闭调用 - } - } -} \ No newline at end of file diff --git a/src/main/java/HBaseEmpManager.java b/src/main/java/HBaseEmpManager.java new file mode 100644 index 0000000..9e5281c --- /dev/null +++ b/src/main/java/HBaseEmpManager.java @@ -0,0 +1,177 @@ +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.util.Bytes; +import java.io.*; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.*; + +public class HBaseEmpManager { + private static final String TABLE_NAME = "emp1520"; + private static final String CF_EMPNUM = "empnum"; + private static final String CF_INFO = "info"; + private static final String CF_SALARY = "salary"; + private static final String CF_PERFORMANCE = "performance"; + private static final String CF_TRAINING = "training"; + + private static Connection connection; + private static Admin admin; + + public static void main(String[] args) throws IOException, NoSuchAlgorithmException { + Configuration config = HBaseConfiguration.create(); + connection = ConnectionFactory.createConnection(config); + admin = connection.getAdmin(); + + createTable(); + insertData("emp.txt"); + queryById(); + queryByPerformance(); + queryRecentPromotion(); + countTrainingParticipants(); + + admin.close(); + connection.close(); + } + + // 创建 HBase 表 + private static void createTable() throws IOException { + TableName tableName = TableName.valueOf(TABLE_NAME); + if (admin.tableExists(tableName)) { + System.out.println("表 " + TABLE_NAME + " 已存在!"); + return; + } + + HTableDescriptor tableDescriptor = new HTableDescriptor(tableName); + tableDescriptor.addFamily(new HColumnDescriptor(CF_EMPNUM)); + tableDescriptor.addFamily(new HColumnDescriptor(CF_INFO)); + tableDescriptor.addFamily(new HColumnDescriptor(CF_SALARY)); + tableDescriptor.addFamily(new HColumnDescriptor(CF_PERFORMANCE)); + tableDescriptor.addFamily(new HColumnDescriptor(CF_TRAINING)); + + admin.createTable(tableDescriptor); + System.out.println("表 " + TABLE_NAME + " 创建成功!"); + } + + // 读取并插入数据 + private static void insertData(String filePath) throws IOException, NoSuchAlgorithmException { + Table table = connection.getTable(TableName.valueOf(TABLE_NAME)); + BufferedReader br = new BufferedReader(new FileReader(filePath)); + String line; + + while ((line = br.readLine()) != null) { + String[] emp = line.split(","); + + if (emp.length < 12) continue; // 确保数据完整 + + String rowKey = generateRowKey(emp[0]); // 使用哈希前缀 + + Put put = new Put(Bytes.toBytes(rowKey)); + put.addColumn(Bytes.toBytes(CF_EMPNUM), Bytes.toBytes("empno"), Bytes.toBytes(emp[0])); + put.addColumn(Bytes.toBytes(CF_INFO), Bytes.toBytes("ename"), Bytes.toBytes(emp[1])); + put.addColumn(Bytes.toBytes(CF_INFO), Bytes.toBytes("job"), Bytes.toBytes(emp[2])); + put.addColumn(Bytes.toBytes(CF_INFO), Bytes.toBytes("mgr"), Bytes.toBytes(emp[3])); + put.addColumn(Bytes.toBytes(CF_INFO), Bytes.toBytes("hiredate"), Bytes.toBytes(emp[4])); + put.addColumn(Bytes.toBytes(CF_SALARY), Bytes.toBytes("sal"), Bytes.toBytes(emp[5])); + put.addColumn(Bytes.toBytes(CF_SALARY), Bytes.toBytes("credit"), Bytes.toBytes(emp[6])); + put.addColumn(Bytes.toBytes(CF_INFO), Bytes.toBytes("deptno"), Bytes.toBytes(emp[7])); + put.addColumn(Bytes.toBytes(CF_INFO), Bytes.toBytes("work_hours"), Bytes.toBytes(emp[8])); + put.addColumn(Bytes.toBytes(CF_PERFORMANCE), Bytes.toBytes("performance_rating"), Bytes.toBytes(emp[9])); + put.addColumn(Bytes.toBytes(CF_PERFORMANCE), Bytes.toBytes("promotion_date"), Bytes.toBytes(emp[10])); + put.addColumn(Bytes.toBytes(CF_TRAINING), Bytes.toBytes("training_courses"), Bytes.toBytes(emp[11])); + + table.put(put); + } + + br.close(); + table.close(); + System.out.println("数据插入完成!"); + } + + // 查询 ID > 7500 的员工 + private static void queryById() throws IOException { + Table table = connection.getTable(TableName.valueOf(TABLE_NAME)); + Scan scan = new Scan(); + scan.addColumn(Bytes.toBytes(CF_EMPNUM), Bytes.toBytes("empno")); + + ResultScanner scanner = table.getScanner(scan); + for (Result result : scanner) { + String empno = Bytes.toString(result.getValue(Bytes.toBytes(CF_EMPNUM), Bytes.toBytes("empno"))); + if (empno != null && Integer.parseInt(empno) > 7500) { + System.out.println("ID > 7500 员工: " + empno); + } + } + table.close(); + } + + // 查询绩效评分 > 4 且入职时间 < 2022 年 + private static void queryByPerformance() throws IOException { + Table table = connection.getTable(TableName.valueOf(TABLE_NAME)); + Scan scan = new Scan(); + scan.addColumn(Bytes.toBytes(CF_INFO), Bytes.toBytes("hiredate")); + scan.addColumn(Bytes.toBytes(CF_PERFORMANCE), Bytes.toBytes("performance_rating")); + + ResultScanner scanner = table.getScanner(scan); + for (Result result : scanner) { + String hiredate = Bytes.toString(result.getValue(Bytes.toBytes(CF_INFO), Bytes.toBytes("hiredate"))); + String rating = Bytes.toString(result.getValue(Bytes.toBytes(CF_PERFORMANCE), Bytes.toBytes("performance_rating"))); + + if (hiredate.compareTo("2022-01-01") < 0 && Integer.parseInt(rating) > 4) { + System.out.println("符合条件的员工: 入职时间 " + hiredate + ",绩效评分 " + rating); + } + } + table.close(); + } + + // 查找最近晋升的员工 + private static void queryRecentPromotion() throws IOException { + Table table = connection.getTable(TableName.valueOf(TABLE_NAME)); + Scan scan = new Scan(); + scan.addColumn(Bytes.toBytes(CF_PERFORMANCE), Bytes.toBytes("promotion_date")); + + ResultScanner scanner = table.getScanner(scan); + String latestDate = ""; + String latestEmp = ""; + + for (Result result : scanner) { + String promotionDate = Bytes.toString(result.getValue(Bytes.toBytes(CF_PERFORMANCE), Bytes.toBytes("promotion_date"))); + if (promotionDate != null && promotionDate.compareTo(latestDate) > 0) { + latestDate = promotionDate; + latestEmp = Bytes.toString(result.getRow()); + } + } + System.out.println("最近晋升的员工: " + latestEmp + ",晋升日期: " + latestDate); + table.close(); + } + + // 统计每个培训课程的参与人数 + private static void countTrainingParticipants() throws IOException { + Table table = connection.getTable(TableName.valueOf(TABLE_NAME)); + Scan scan = new Scan(); + scan.addColumn(Bytes.toBytes(CF_TRAINING), Bytes.toBytes("training_courses")); + + ResultScanner scanner = table.getScanner(scan); + Map countMap = new HashMap<>(); + + for (Result result : scanner) { + String courses = Bytes.toString(result.getValue(Bytes.toBytes(CF_TRAINING), Bytes.toBytes("training_courses"))); + for (String course : courses.split(", ")) { + countMap.put(course, countMap.getOrDefault(course, 0) + 1); + } + } + System.out.println("培训课程统计: " + countMap); + table.close(); + } + + // 生成 RowKey,避免 Region 热点 + private static String generateRowKey(String empno) throws NoSuchAlgorithmException { + String prefix = empno.substring(0, 3); + MessageDigest md5 = MessageDigest.getInstance("MD5"); + byte[] digest = md5.digest(prefix.getBytes()); + StringBuilder hexString = new StringBuilder(); + for (byte b : digest) { + hexString.append(String.format("%02x", b)); + } + return hexString.substring(0, 4) + empno; + } +}