project-elec-py/equiepment_averge_power.py

110 lines
3.8 KiB
Python
Raw Normal View History

2025-05-11 10:58:02 +00:00
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as _sum, hour, avg, expr
import happybase
import matplotlib.pyplot as plt
def convert_date(date_string):
return f"{date_string[:4]}-{date_string[4:6]}-{date_string[6:]}"
def convert_time(time_string):
return f"{time_string[:2]}:{time_string[2:4]}:{time_string[4:]}"
def get_region_code(dateline):
year = dateline[:4]
region_num = (int(year) - 2006) % 6
return str(region_num)
def get_double_rowkey(input_date):
split = "_"
start_datetime = input_date + "000000"
stop_datetime = input_date + "235900"
start_region_code = get_region_code(start_datetime)
stop_region_code = get_region_code(stop_datetime)
return start_region_code + split + start_datetime, stop_region_code + split + stop_datetime
spark = SparkSession.builder \
.appName("HBaseSparkIntegration") \
.getOrCreate()
hbase_host = "100.64.0.3"
hbase_table = "elec:eleclog"
connection = happybase.Connection(hbase_host)
table = connection.table(hbase_table)
column_family = "info"
input_date = input("input date (20061217):")
# input_date = "20061230"
start_row, stop_row = get_double_rowkey(input_date)
def fetch_hbase_data(table):
none_num = 0
rows = []
for key, data in table.scan(row_start=start_row.encode("utf-8"), row_stop=stop_row.encode("utf-8")):
date = data[f'{column_family}:date'.encode('utf-8')].decode('utf-8')
time = data[f'{column_family}:time'.encode('utf-8')].decode('utf-8')
value = data[f'{column_family}:globalActivePower'.encode('utf-8')].decode('utf-8')
if value == "?":
none_num += 1
continue
global_active_power = float(value)
sub_metering_1 = float(data[f'{column_family}:subMetering1'.encode('utf-8')].decode('utf-8'))
sub_metering_2 = float(data[f'{column_family}:subMetering2'.encode('utf-8')].decode('utf-8'))
sub_metering_3 = float(data[f'{column_family}:subMetering3'.encode('utf-8')].decode('utf-8'))
datetime = f"{convert_date(date)} {convert_time(time)}"
rows.append((datetime, global_active_power, sub_metering_1, sub_metering_2, sub_metering_3))
return rows, none_num
hbase_data, none_num = fetch_hbase_data(table)
if len(hbase_data) == 0:
print("No data searched, please confirm your input")
sys.exit(0)
if none_num / len(hbase_data) >= 0.20:
print("This batch data has too many nulls to be of analytical value ")
sys.exit(0)
columns = ["datetime", "global_active_power", "sub_metering_1", "sub_metering_2", "sub_metering_3"]
df = spark.createDataFrame(hbase_data, columns)
df = df.withColumn("hour", hour(col("datetime")))
df = df.withColumn("other_consumption", expr("global_active_power * 1000 / 60 - sub_metering_1 - sub_metering_2 - sub_metering_3"))
hourly_consumption = df.groupBy("hour").agg(
avg("sub_metering_1").alias("avg_sub_metering_1"),
avg("sub_metering_2").alias("avg_sub_metering_2"),
avg("sub_metering_3").alias("avg_sub_metering_3"),
avg("other_consumption").alias("avg_other_consumption")
)
hourly_consumption_pd = hourly_consumption.orderBy("hour").toPandas()
plt.figure(figsize=(12, 8))
plt.plot(hourly_consumption_pd['hour'], hourly_consumption_pd['avg_sub_metering_1'], marker='o', label='Sub Metering 1')
plt.plot(hourly_consumption_pd['hour'], hourly_consumption_pd['avg_sub_metering_2'], marker='o', label='Sub Metering 2')
plt.plot(hourly_consumption_pd['hour'], hourly_consumption_pd['avg_sub_metering_3'], marker='o', label='Sub Metering 3')
plt.plot(hourly_consumption_pd['hour'], hourly_consumption_pd['avg_other_consumption'], marker='o', label='Other Consumption')
plt.title(f'Hourly Consumption for {input_date}')
plt.xlabel('Hour of the Day')
plt.ylabel('Total Consumption (Watt-hours)')
plt.legend()
plt.grid(True)
plt.xticks(range(24))
plt.tight_layout()
plt.show()
spark.stop()