diff --git a/src/main/java/EmployeeDAO.java b/src/main/java/EmployeeDAO.java deleted file mode 100644 index c15ce47..0000000 --- a/src/main/java/EmployeeDAO.java +++ /dev/null @@ -1,40 +0,0 @@ -// 新建员工数据访问类 -import org.apache.hadoop.hbase.client.*; -import org.apache.hadoop.hbase.util.Bytes; -import java.io.*; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; -import java.util.*; - -public class EmployeeDAO { - private static final String[] CF_ARRAY = {"empnum", "info", "salary", "performance", "training"}; - - public void insertData(String filePath) throws IOException, NoSuchAlgorithmException { - try(Table table = HBaseConnectionManager.getConnection().getTable(TableName.valueOf("emp1520"))) { - BufferedReader br = new BufferedReader(new FileReader(filePath)); - String line; - while ((line = br.readLine()) != null) { - String[] emp = line.split(","); - if (emp.length < 12) continue; - - String rowKey = generateRowKey(emp[0]); - Put put = new Put(Bytes.toBytes(rowKey)); - table.put(put); - } - br.close(); - } - System.out.println("Data inserted successfully!"); - } - - private String generateRowKey(String empno) throws NoSuchAlgorithmException { - } - - public void queryById() throws IOException { - } - - public void queryByPerformance() throws IOException { - } - - public void queryRecentPromotion() throws IOException { - } -} \ No newline at end of file diff --git a/src/main/java/HBaseConnectionManager.java b/src/main/java/HBaseConnectionManager.java deleted file mode 100644 index 6eeba59..0000000 --- a/src/main/java/HBaseConnectionManager.java +++ /dev/null @@ -1,41 +0,0 @@ -// 新建连接管理类 -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Admin; - -public class HBaseConnectionManager { - private static Configuration configuration; - private static Connection connection; - private static Admin admin; - - public static void init() { - configuration = HBaseConfiguration.create(); - try { - connection = ConnectionFactory.createConnection(configuration); - admin = connection.getAdmin(); - System.out.println("Connected to HBase Successfully!"); - } catch (IOException e) { - e.printStackTrace(); - } - } - - public static void close() { - try { - if (admin != null) admin.close(); - if (connection != null) connection.close(); - System.out.println("HBase Connection Closed."); - } catch (IOException e) { - e.printStackTrace(); - } - } - - public static Admin getAdmin() { - return admin; - } - - public static Connection getConnection() { - return connection; - } -} \ No newline at end of file diff --git a/src/main/java/HBaseEmpManager.java b/src/main/java/HBaseEmpManager.java index dd1a777..b3a2923 100644 --- a/src/main/java/HBaseEmpManager.java +++ b/src/main/java/HBaseEmpManager.java @@ -8,24 +8,206 @@ import java.security.NoSuchAlgorithmException; import java.util.*; public class HBaseEmpManager { - public static void main(String[] args) { + private static Configuration configuration; + private static Connection connection; + private static Admin admin; + + private static final String TABLE_NAME = "emp1520"; + private static final String CF_EMPNUM = "empnum"; + private static final String CF_INFO = "info"; + private static final String CF_SALARY = "salary"; + private static final String CF_PERFORMANCE = "performance"; + private static final String CF_TRAINING = "training"; + + public static void main(String[] args) throws IOException, NoSuchAlgorithmException { + init(); // 初始化 HBase 连接 + createTable(); // 创建表 + insertData("emp.txt"); // 读取 emp.txt 插入数据 + queryById(); // 查询 ID > 7500 的员工 + queryByPerformance(); // 查询绩效评分 > 4 且入职时间 < 2022 年的员工 + queryRecentPromotion(); // 查找最近晋升的员工 + countTrainingParticipants(); // 统计培训课程的参与人数 + close(); // 关闭连接 + } + + // 初始化 HBase 连接 + private static void init() { + configuration = HBaseConfiguration.create(); try { - HBaseConnectionManager.init(); - HBaseTableManager.createTable(HBaseConnectionManager.getAdmin()); - - EmployeeDAO employeeDAO = new EmployeeDAO(); - employeeDAO.insertData("emp.txt"); - employeeDAO.queryById(); - employeeDAO.queryByPerformance(); - employeeDAO.queryRecentPromotion(); - - TrainingDAO trainingDAO = new TrainingDAO(); - trainingDAO.countTrainingParticipants(); - - } catch (Exception e) { + connection = ConnectionFactory.createConnection(configuration); + admin = connection.getAdmin(); + System.out.println("Connected to HBase Successfully!"); + } catch (IOException e) { e.printStackTrace(); - } finally { - HBaseConnectionManager.close(); } } + + // 关闭 HBase 连接 + private static void close() { + try { + if (admin != null) { + admin.close(); + } + if (connection != null) { + connection.close(); + } + System.out.println("HBase Connection Closed."); + } catch (IOException e) { + e.printStackTrace(); + } + } + + // 创建 HBase 表 + private static void createTable() throws IOException { + TableName tableName = TableName.valueOf(TABLE_NAME); + // 检查表格是否存在 + if (admin.tableExists(tableName)) { + System.out.println("表 " + TABLE_NAME + " 已经存在,准备禁用并删除..."); + + // 禁用表格 + admin.disableTable(tableName); + System.out.println("表 " + TABLE_NAME + " 已禁用。"); + + // 删除表格 + admin.deleteTable(tableName); + System.out.println("表 " + TABLE_NAME + " 已删除。"); + } + + HTableDescriptor tableDescriptor = new HTableDescriptor(tableName); + tableDescriptor.addFamily(new HColumnDescriptor(CF_EMPNUM)); + tableDescriptor.addFamily(new HColumnDescriptor(CF_INFO)); + tableDescriptor.addFamily(new HColumnDescriptor(CF_SALARY)); + tableDescriptor.addFamily(new HColumnDescriptor(CF_PERFORMANCE)); + tableDescriptor.addFamily(new HColumnDescriptor(CF_TRAINING)); + + admin.createTable(tableDescriptor); + System.out.println("Table emp1520 created successfully."); + } + + // 读取并插入数据 + private static void insertData(String filePath) throws IOException, NoSuchAlgorithmException { + Table table = connection.getTable(TableName.valueOf(TABLE_NAME)); + BufferedReader br = new BufferedReader(new FileReader(filePath)); + String line; + + while ((line = br.readLine()) != null) { + String[] emp = line.split(","); + + if (emp.length < 12) continue; // 确保数据完整 + + String rowKey = generateRowKey(emp[0]); // 使用哈希前缀 + + Put put = new Put(Bytes.toBytes(rowKey)); + put.addColumn(Bytes.toBytes(CF_EMPNUM), Bytes.toBytes("empno"), Bytes.toBytes(emp[0])); + put.addColumn(Bytes.toBytes(CF_INFO), Bytes.toBytes("ename"), Bytes.toBytes(emp[1])); + put.addColumn(Bytes.toBytes(CF_INFO), Bytes.toBytes("job"), Bytes.toBytes(emp[2])); + put.addColumn(Bytes.toBytes(CF_INFO), Bytes.toBytes("mgr"), Bytes.toBytes(emp[3])); + put.addColumn(Bytes.toBytes(CF_INFO), Bytes.toBytes("hiredate"), Bytes.toBytes(emp[4])); + put.addColumn(Bytes.toBytes(CF_SALARY), Bytes.toBytes("sal"), Bytes.toBytes(emp[5])); + put.addColumn(Bytes.toBytes(CF_SALARY), Bytes.toBytes("credit"), Bytes.toBytes(emp[6])); + put.addColumn(Bytes.toBytes(CF_INFO), Bytes.toBytes("deptno"), Bytes.toBytes(emp[7])); + put.addColumn(Bytes.toBytes(CF_INFO), Bytes.toBytes("work_hours"), Bytes.toBytes(emp[8])); + put.addColumn(Bytes.toBytes(CF_PERFORMANCE), Bytes.toBytes("performance_rating"), Bytes.toBytes(emp[9])); + put.addColumn(Bytes.toBytes(CF_PERFORMANCE), Bytes.toBytes("promotion_date"), Bytes.toBytes(emp[10])); + put.addColumn(Bytes.toBytes(CF_TRAINING), Bytes.toBytes("training_courses"), Bytes.toBytes(emp[11])); + + table.put(put); + } + + br.close(); + table.close(); + System.out.println("Data inserted successfully!"); + } + + // 生成 RowKey,避免 Region 热点 + private static String generateRowKey(String empno) throws NoSuchAlgorithmException { + String prefix = empno.substring(0, 3); + MessageDigest md5 = MessageDigest.getInstance("MD5"); + byte[] digest = md5.digest(prefix.getBytes()); + StringBuilder hexString = new StringBuilder(); + for (byte b : digest) { + hexString.append(String.format("%02x", b)); + } + return hexString.substring(0, 4) + empno; + } + + // 查询 ID > 7500 的员工 + private static void queryById() throws IOException { + Table table = connection.getTable(TableName.valueOf(TABLE_NAME)); + Scan scan = new Scan(); + scan.addColumn(Bytes.toBytes(CF_EMPNUM), Bytes.toBytes("empno")); + + ResultScanner scanner = table.getScanner(scan); + for (Result result : scanner) { + String empno = Bytes.toString(result.getValue(Bytes.toBytes(CF_EMPNUM), Bytes.toBytes("empno"))); + if (empno != null && Integer.parseInt(empno) > 7500) { + System.out.println("ID > 7500 Employee: " + empno); + } + } + table.close(); + } + + // 查询绩效评分 > 4 且入职时间 < 2022 年 + private static void queryByPerformance() throws IOException { + Table table = connection.getTable(TableName.valueOf(TABLE_NAME)); + Scan scan = new Scan(); + scan.addColumn(Bytes.toBytes(CF_INFO), Bytes.toBytes("hiredate")); + scan.addColumn(Bytes.toBytes(CF_PERFORMANCE), Bytes.toBytes("performance_rating")); + + ResultScanner scanner = table.getScanner(scan); + for (Result result : scanner) { + String hiredate = Bytes.toString(result.getValue(Bytes.toBytes(CF_INFO), Bytes.toBytes("hiredate"))); + String rating = Bytes.toString(result.getValue(Bytes.toBytes(CF_PERFORMANCE), Bytes.toBytes("performance_rating"))); + + if (hiredate.compareTo("2022-01-01") < 0 && Integer.parseInt(rating) > 4) { + System.out.println("Qualified Employee: Hire Date " + hiredate + ", Rating " + rating); + } + } + table.close(); + } + + // 查找最近晋升的员工 + private static void queryRecentPromotion() throws IOException { + Table table = connection.getTable(TableName.valueOf(TABLE_NAME)); + Scan scan = new Scan(); + scan.addColumn(Bytes.toBytes(CF_PERFORMANCE), Bytes.toBytes("promotion_date")); + + ResultScanner scanner = table.getScanner(scan); + String latestDate = ""; + String latestEmp = ""; + + for (Result result : scanner) { + String promotionDate = Bytes.toString(result.getValue(Bytes.toBytes(CF_PERFORMANCE), Bytes.toBytes("promotion_date"))); + if (promotionDate != null && promotionDate.compareTo(latestDate) > 0) { + latestDate = promotionDate; + latestEmp = Bytes.toString(result.getRow()); + } + } + System.out.println("Most Recently Promoted Employee: " + latestEmp + ", Promotion Date: " + latestDate); + table.close(); + } + + // 统计培训课程的参与人数 + private static void countTrainingParticipants() throws IOException { + Table table = connection.getTable(TableName.valueOf(TABLE_NAME)); + Scan scan = new Scan(); + scan.addColumn(Bytes.toBytes(CF_TRAINING), Bytes.toBytes("training_courses")); + + ResultScanner scanner = table.getScanner(scan); + Map countMap = new HashMap<>(); + + for (Result result : scanner) { + String courses = Bytes.toString(result.getValue(Bytes.toBytes(CF_TRAINING), Bytes.toBytes("training_courses"))); + for (String course : courses.split(", ")) { + countMap.put(course, countMap.getOrDefault(course, 0) + 1); + } + } + + System.out.println("培训课程统计:"); + for (Map.Entry entry : countMap.entrySet()) { + System.out.println(entry.getKey() + ": " + entry.getValue() + " 人"); + } + + table.close(); + } } diff --git a/src/main/java/HBaseTableManager.java b/src/main/java/HBaseTableManager.java deleted file mode 100644 index 02cb7c5..0000000 --- a/src/main/java/HBaseTableManager.java +++ /dev/null @@ -1,26 +0,0 @@ -// 新建表管理类 -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Admin; - -public class HBaseTableManager { - private static final String TABLE_NAME = "emp1520"; - private static final String[] CF_ARRAY = {"empnum", "info", "salary", "performance", "training"}; - - public static void createTable(Admin admin) throws IOException { - TableName tableName = TableName.valueOf(TABLE_NAME); - if (admin.tableExists(tableName)) { - admin.disableTable(tableName); - admin.deleteTable(tableName); - System.out.println("Existing table removed"); - } - - HTableDescriptor tableDescriptor = new HTableDescriptor(tableName); - for (String cf : CF_ARRAY) { - tableDescriptor.addFamily(new HColumnDescriptor(cf)); - } - admin.createTable(tableDescriptor); - System.out.println("Table created successfully"); - } -} \ No newline at end of file diff --git a/src/main/java/TrainingDAO.java b/src/main/java/TrainingDAO.java deleted file mode 100644 index d0fa159..0000000 --- a/src/main/java/TrainingDAO.java +++ /dev/null @@ -1,28 +0,0 @@ -// 新建培训统计类 -import org.apache.hadoop.hbase.client.*; -import org.apache.hadoop.hbase.util.Bytes; -import java.util.*; - -public class TrainingDAO { - public void countTrainingParticipants() throws IOException { - try(Table table = HBaseConnectionManager.getConnection().getTable(TableName.valueOf("emp1520"))) { - Scan scan = new Scan(); - scan.addColumn(Bytes.toBytes("training"), Bytes.toBytes("training_courses")); - - ResultScanner scanner = table.getScanner(scan); - Map countMap = new HashMap<>(); - - for (Result result : scanner) { - String courses = Bytes.toString(result.getValue(Bytes.toBytes("training"), Bytes.toBytes("training_courses"))); - for (String course : courses.split(", ")) { - countMap.put(course, countMap.getOrDefault(course, 0) + 1); - } - } - - System.out.println("培训课程统计:"); - for (Map.Entry entry : countMap.entrySet()) { - System.out.println(entry.getKey() + ": " + entry.getValue() + " 人"); - } - } - } -} \ No newline at end of file