96 lines
		
	
	
		
			2.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			96 lines
		
	
	
		
			2.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import sys
 | |
| 
 | |
| from pyspark.sql import SparkSession
 | |
| from pyspark.sql.functions import col, sum as _sum, hour
 | |
| import happybase
 | |
| import matplotlib.pyplot as plt
 | |
| 
 | |
| 
 | |
| def convert_date(date_string):
 | |
|     return f"{date_string[:4]}-{date_string[4:6]}-{date_string[6:]}"
 | |
| 
 | |
| 
 | |
| def convert_time(time_string):
 | |
|     return f"{time_string[:2]}:{time_string[2:4]}:{time_string[4:]}"
 | |
| 
 | |
| 
 | |
| def get_region_code(dateline):
 | |
|     year = dateline[:4]
 | |
|     region_num = (int(year) - 2006) % 6
 | |
|     return str(region_num)
 | |
| 
 | |
| 
 | |
| def get_double_rowkey(input_date):
 | |
|     split = "_"
 | |
|     start_datetime = input_date + "000000"
 | |
|     stop_datetime = input_date + "235900"
 | |
|     start_region_code = get_region_code(start_datetime)
 | |
|     stop_region_code = get_region_code(stop_datetime)
 | |
|     return start_region_code + split + start_datetime, stop_region_code + split + stop_datetime
 | |
| 
 | |
| 
 | |
| spark = SparkSession.builder \
 | |
|     .appName("HBaseSparkIntegration") \
 | |
|     .getOrCreate()
 | |
| 
 | |
| hbase_host = "100.64.0.3"
 | |
| hbase_table = "elec:eleclog"
 | |
| 
 | |
| connection = happybase.Connection(hbase_host)
 | |
| table = connection.table(hbase_table)
 | |
| 
 | |
| column_family = "info"
 | |
| 
 | |
| input_date = input("input date (20061217):")
 | |
| # input_date = "20061230"
 | |
| start_row, stop_row = get_double_rowkey(input_date)
 | |
| 
 | |
| 
 | |
| def fetch_hbase_data(table):
 | |
|     none_num = 0
 | |
|     rows = []
 | |
|     for key, data in table.scan(row_start=start_row.encode("utf-8"), row_stop=stop_row.encode("utf-8")):
 | |
|         date = data[f'{column_family}:date'.encode('utf-8')].decode('utf-8')
 | |
|         time = data[f'{column_family}:time'.encode('utf-8')].decode('utf-8')
 | |
|         value = data[f'{column_family}:globalActivePower'.encode('utf-8')].decode('utf-8')
 | |
|         if value == "?":
 | |
|             none_num += 1
 | |
|             continue
 | |
|         global_active_power = float(value)
 | |
|         datetime = f"{convert_date(date)} {convert_time(time)}"
 | |
| 
 | |
|         rows.append((datetime, global_active_power))
 | |
|     return rows, none_num
 | |
| 
 | |
| 
 | |
| hbase_data, none_num = fetch_hbase_data(table)
 | |
| 
 | |
| if len(hbase_data) == 0:
 | |
|     print("No data searched, please confirm your input")
 | |
|     sys.exit(0)
 | |
| 
 | |
| if none_num / len(hbase_data) >= 0.20:
 | |
|     print("This batch data has too many nulls to be of analytical value ")
 | |
|     sys.exit(0)
 | |
| 
 | |
| columns = ["datetime", "globalActivePower"]
 | |
| df = spark.createDataFrame(hbase_data, columns)
 | |
| 
 | |
| df = df.withColumn("hour", hour(col("datetime")))
 | |
| 
 | |
| hourly_total_power = df.groupBy("hour").agg((_sum("globalActivePower") / 60).alias("hourly_total_power"))
 | |
| 
 | |
| print(hourly_total_power.head())
 | |
| 
 | |
| hourly_total_power_pd = hourly_total_power.orderBy("hour").toPandas()
 | |
| 
 | |
| plt.figure(figsize=(10, 6))
 | |
| plt.plot(hourly_total_power_pd['hour'], hourly_total_power_pd['hourly_total_power'], marker='o')
 | |
| plt.title(f'Hourly Power Consumption for {input_date}')
 | |
| plt.xlabel('Hour of the Day')
 | |
| plt.ylabel('Total Power Consumption (kW)')
 | |
| plt.grid(True)
 | |
| plt.xticks(range(24))
 | |
| plt.show()
 | |
| 
 | |
| spark.stop() |