import sys from pyspark.sql import SparkSession from pyspark.sql.functions import col, sum as _sum, hour, avg import happybase import matplotlib.pyplot as plt def convert_date(date_string): return f"{date_string[:4]}-{date_string[4:6]}-{date_string[6:]}" def convert_time(time_string): return f"{time_string[:2]}:{time_string[2:4]}:{time_string[4:]}" def get_region_code(dateline): year = dateline[:4] region_num = (int(year) - 2006) % 6 return str(region_num) def get_double_rowkey(input_date): split = "_" start_datetime = input_date + "000000" stop_datetime = input_date + "235900" start_region_code = get_region_code(start_datetime) stop_region_code = get_region_code(stop_datetime) return start_region_code + split + start_datetime, stop_region_code + split + stop_datetime spark = SparkSession.builder \ .appName("HBaseSparkIntegration") \ .getOrCreate() hbase_host = "100.64.0.3" hbase_table = "elec:eleclog" connection = happybase.Connection(hbase_host) table = connection.table(hbase_table) column_family = "info" input_date = input("input date (20061217):") # input_date = "20061230" start_row, stop_row = get_double_rowkey(input_date) def fetch_hbase_data(table): none_num = 0 rows = [] for key, data in table.scan(row_start=start_row.encode("utf-8"), row_stop=stop_row.encode("utf-8")): date = data[f'{column_family}:date'.encode('utf-8')].decode('utf-8') time = data[f'{column_family}:time'.encode('utf-8')].decode('utf-8') value = data[f'{column_family}:globalActivePower'.encode('utf-8')].decode('utf-8') if value == "?": none_num += 1 continue global_active_power = float(value) global_reactive_power = float(data[f'{column_family}:globalReactivePower'.encode('utf-8')].decode('utf-8')) voltage = float(data[f'{column_family}:voltage'.encode('utf-8')].decode('utf-8')) global_intensity = float(data[f'{column_family}:globalIntensity'.encode('utf-8')].decode('utf-8')) datetime = f"{convert_date(date)} {convert_time(time)}" rows.append((datetime, global_active_power, global_reactive_power, voltage, global_intensity)) return rows, none_num hbase_data, none_num = fetch_hbase_data(table) if len(hbase_data) == 0: print("No data searched, please confirm your input") sys.exit(0) if none_num / len(hbase_data) >= 0.20: print("This batch data has too many nulls to be of analytical value ") sys.exit(0) columns = ["datetime", "global_active_power", "global_reactive_power", "voltage", "global_intensity"] df = spark.createDataFrame(hbase_data, columns) df = df.withColumn("hour", hour(col("datetime"))) hourly_avg_features = df.groupBy("hour").agg( avg("global_active_power").alias("avg_global_active_power"), avg("global_reactive_power").alias("avg_global_reactive_power"), avg("voltage").alias("avg_voltage"), avg("global_intensity").alias("avg_global_intensity") ) hourly_avg_features_pd = hourly_avg_features.orderBy("hour").toPandas() plt.figure(figsize=(12, 8)) plt.plot(hourly_avg_features_pd['hour'], hourly_avg_features_pd['avg_global_active_power'], marker='o', label='Avg Global Active Power') plt.plot(hourly_avg_features_pd['hour'], hourly_avg_features_pd['avg_global_reactive_power'], marker='o', label='Avg Global Reactive Power') plt.plot(hourly_avg_features_pd['hour'], hourly_avg_features_pd['avg_voltage'], marker='o', label='Avg Voltage') plt.plot(hourly_avg_features_pd['hour'], hourly_avg_features_pd['avg_global_intensity'], marker='o', label='Avg Global Intensity') plt.title(f'Hourly Average Feature Values for {input_date}') plt.xlabel('Hour of the Day') plt.ylabel('Average Value') plt.legend() plt.grid(True) plt.xticks(range(24)) plt.tight_layout() plt.show() spark.stop()