From c9d6ddc44f33f3ec3325f7abc8c1ef612e9b9042 Mon Sep 17 00:00:00 2001 From: fly6516 Date: Sun, 11 May 2025 10:58:02 +0000 Subject: [PATCH] init all code --- daily_feature_power.py | 107 +++++++++++++++++++++++++++++++++++ daily_total_power.py | 109 +++++++++++++++++++++++++++++++++++ equiepment_averge_power.py | 110 ++++++++++++++++++++++++++++++++++++ hourly_total_power.py | 96 +++++++++++++++++++++++++++++++ monthly_total_power.py | 113 +++++++++++++++++++++++++++++++++++++ 5 files changed, 535 insertions(+) create mode 100644 daily_feature_power.py create mode 100644 daily_total_power.py create mode 100644 equiepment_averge_power.py create mode 100644 hourly_total_power.py create mode 100644 monthly_total_power.py diff --git a/daily_feature_power.py b/daily_feature_power.py new file mode 100644 index 0000000..6e72ae2 --- /dev/null +++ b/daily_feature_power.py @@ -0,0 +1,107 @@ +import sys + +from pyspark.sql import SparkSession +from pyspark.sql.functions import col, sum as _sum, hour, avg +import happybase +import matplotlib.pyplot as plt + + +def convert_date(date_string): + return f"{date_string[:4]}-{date_string[4:6]}-{date_string[6:]}" + + +def convert_time(time_string): + return f"{time_string[:2]}:{time_string[2:4]}:{time_string[4:]}" + + +def get_region_code(dateline): + year = dateline[:4] + region_num = (int(year) - 2006) % 6 + return str(region_num) + + +def get_double_rowkey(input_date): + split = "_" + start_datetime = input_date + "000000" + stop_datetime = input_date + "235900" + start_region_code = get_region_code(start_datetime) + stop_region_code = get_region_code(stop_datetime) + return start_region_code + split + start_datetime, stop_region_code + split + stop_datetime + + +spark = SparkSession.builder \ + .appName("HBaseSparkIntegration") \ + .getOrCreate() + +hbase_host = "100.64.0.3" +hbase_table = "elec:eleclog" + +connection = happybase.Connection(hbase_host) +table = connection.table(hbase_table) + +column_family = "info" + +input_date = input("input date (20061217):") +# input_date = "20061230" +start_row, stop_row = get_double_rowkey(input_date) + + +def fetch_hbase_data(table): + none_num = 0 + rows = [] + for key, data in table.scan(row_start=start_row.encode("utf-8"), row_stop=stop_row.encode("utf-8")): + date = data[f'{column_family}:date'.encode('utf-8')].decode('utf-8') + time = data[f'{column_family}:time'.encode('utf-8')].decode('utf-8') + value = data[f'{column_family}:globalActivePower'.encode('utf-8')].decode('utf-8') + if value == "?": + none_num += 1 + continue + global_active_power = float(value) + global_reactive_power = float(data[f'{column_family}:globalReactivePower'.encode('utf-8')].decode('utf-8')) + voltage = float(data[f'{column_family}:voltage'.encode('utf-8')].decode('utf-8')) + global_intensity = float(data[f'{column_family}:globalIntensity'.encode('utf-8')].decode('utf-8')) + datetime = f"{convert_date(date)} {convert_time(time)}" + + rows.append((datetime, global_active_power, global_reactive_power, voltage, global_intensity)) + return rows, none_num + + +hbase_data, none_num = fetch_hbase_data(table) + +if len(hbase_data) == 0: + print("No data searched, please confirm your input") + sys.exit(0) + +if none_num / len(hbase_data) >= 0.20: + print("This batch data has too many nulls to be of analytical value ") + sys.exit(0) + +columns = ["datetime", "global_active_power", "global_reactive_power", "voltage", "global_intensity"] +df = spark.createDataFrame(hbase_data, columns) + +df = df.withColumn("hour", hour(col("datetime"))) + +hourly_avg_features = df.groupBy("hour").agg( + avg("global_active_power").alias("avg_global_active_power"), + avg("global_reactive_power").alias("avg_global_reactive_power"), + avg("voltage").alias("avg_voltage"), + avg("global_intensity").alias("avg_global_intensity") +) + +hourly_avg_features_pd = hourly_avg_features.orderBy("hour").toPandas() + +plt.figure(figsize=(12, 8)) +plt.plot(hourly_avg_features_pd['hour'], hourly_avg_features_pd['avg_global_active_power'], marker='o', label='Avg Global Active Power') +plt.plot(hourly_avg_features_pd['hour'], hourly_avg_features_pd['avg_global_reactive_power'], marker='o', label='Avg Global Reactive Power') +plt.plot(hourly_avg_features_pd['hour'], hourly_avg_features_pd['avg_voltage'], marker='o', label='Avg Voltage') +plt.plot(hourly_avg_features_pd['hour'], hourly_avg_features_pd['avg_global_intensity'], marker='o', label='Avg Global Intensity') +plt.title(f'Hourly Average Feature Values for {input_date}') +plt.xlabel('Hour of the Day') +plt.ylabel('Average Value') +plt.legend() +plt.grid(True) +plt.xticks(range(24)) +plt.tight_layout() +plt.show() + +spark.stop() \ No newline at end of file diff --git a/daily_total_power.py b/daily_total_power.py new file mode 100644 index 0000000..b48b87c --- /dev/null +++ b/daily_total_power.py @@ -0,0 +1,109 @@ +import sys +from datetime import datetime, timedelta +from pyspark.sql import SparkSession +from pyspark.sql.functions import col, sum as _sum, dayofmonth, date_format +import happybase +import matplotlib.pyplot as plt + + +def convert_date(date_string): + return f"{date_string[:4]}-{date_string[4:6]}-{date_string[6:]}" + + +def convert_time(time_string): + return f"{time_string[:2]}:{time_string[2:4]}:{time_string[4:]}" + + +def get_region_code(dateline): + year = dateline[:4] + region_num = (int(year) - 2006) % 6 + return str(region_num) + + +def get_week_dates(input_date): + date = datetime.strptime(input_date, "%Y%m%d") + + start_of_week = date - timedelta(days=date.weekday()) + end_of_week = start_of_week + timedelta(days=6) + + start_of_week_str = start_of_week.strftime("%Y%m%d") + end_of_week_str = end_of_week.strftime("%Y%m%d") + + return start_of_week_str, end_of_week_str + + +def get_double_rowkey(input_date): + split = "_" + start_date, stop_date = get_week_dates(input_date) + start_datetime = start_date + "000000" + stop_datetime = stop_date + "235900" + start_region_code = get_region_code(start_datetime) + stop_region_code = get_region_code(stop_datetime) + return start_region_code + split + start_datetime, stop_region_code + split + stop_datetime + + +spark = SparkSession.builder \ + .appName("HBaseSparkIntegration") \ + .getOrCreate() + +hbase_host = "100.64.0.3" +hbase_table = "elec:eleclog" + +connection = happybase.Connection(hbase_host) +table = connection.table(hbase_table) + +column_family = "info" + +input_date = input("input date (20061217):") +# input_date = "20061230" +start_row, stop_row = get_double_rowkey(input_date) + +print(start_row, stop_row) + +def fetch_hbase_data(table): + none_num = 0 + rows = [] + for key, data in table.scan(row_start=start_row.encode("utf-8"), row_stop=stop_row.encode("utf-8")): + date = data[f'{column_family}:date'.encode('utf-8')].decode('utf-8') + time = data[f'{column_family}:time'.encode('utf-8')].decode('utf-8') + value = data[f'{column_family}:globalActivePower'.encode('utf-8')].decode('utf-8') + if value == "?": + none_num += 1 + continue + global_active_power = float(value) + datetime = f"{convert_date(date)} {convert_time(time)}" + + rows.append((datetime, global_active_power)) + return rows, none_num + + +hbase_data, none_num = fetch_hbase_data(table) + +if len(hbase_data) == 0: + print("No data searched, please confirm your input") + sys.exit(0) + +if none_num / len(hbase_data) >= 0.20: + print("This batch data has too many nulls to be of analytical value ") + sys.exit(0) + +columns = ["datetime", "globalActivePower"] +df = spark.createDataFrame(hbase_data, columns) + +df = df.withColumn("date", date_format(col("datetime"), "yyyy-MM-dd")) + +daily_total_power = df.groupBy("date").agg((_sum("globalActivePower") / (60*24)).alias("daily_total_power")) + +daily_total_power_pd = daily_total_power.orderBy("date").toPandas() + +plt.figure(figsize=(10, 6)) +plt.plot(daily_total_power_pd['date'], daily_total_power_pd['daily_total_power'], marker='o') +plt.title('Daily Power Consumption for One Week') +plt.xlabel('Date') +plt.ylabel('Total Power Consumption (kW)') +plt.grid(True) +plt.xticks(rotation=45) +plt.tight_layout() +plt.show() + +spark.stop() \ No newline at end of file diff --git a/equiepment_averge_power.py b/equiepment_averge_power.py new file mode 100644 index 0000000..c04c6c4 --- /dev/null +++ b/equiepment_averge_power.py @@ -0,0 +1,110 @@ +import sys + +from pyspark.sql import SparkSession +from pyspark.sql.functions import col, sum as _sum, hour, avg, expr +import happybase +import matplotlib.pyplot as plt + + +def convert_date(date_string): + return f"{date_string[:4]}-{date_string[4:6]}-{date_string[6:]}" + + +def convert_time(time_string): + return f"{time_string[:2]}:{time_string[2:4]}:{time_string[4:]}" + + +def get_region_code(dateline): + year = dateline[:4] + region_num = (int(year) - 2006) % 6 + return str(region_num) + + +def get_double_rowkey(input_date): + split = "_" + start_datetime = input_date + "000000" + stop_datetime = input_date + "235900" + start_region_code = get_region_code(start_datetime) + stop_region_code = get_region_code(stop_datetime) + return start_region_code + split + start_datetime, stop_region_code + split + stop_datetime + + +spark = SparkSession.builder \ + .appName("HBaseSparkIntegration") \ + .getOrCreate() + +hbase_host = "100.64.0.3" +hbase_table = "elec:eleclog" + +connection = happybase.Connection(hbase_host) +table = connection.table(hbase_table) + +column_family = "info" + +input_date = input("input date (20061217):") +# input_date = "20061230" +start_row, stop_row = get_double_rowkey(input_date) + + +def fetch_hbase_data(table): + none_num = 0 + rows = [] + for key, data in table.scan(row_start=start_row.encode("utf-8"), row_stop=stop_row.encode("utf-8")): + date = data[f'{column_family}:date'.encode('utf-8')].decode('utf-8') + time = data[f'{column_family}:time'.encode('utf-8')].decode('utf-8') + value = data[f'{column_family}:globalActivePower'.encode('utf-8')].decode('utf-8') + if value == "?": + none_num += 1 + continue + global_active_power = float(value) + sub_metering_1 = float(data[f'{column_family}:subMetering1'.encode('utf-8')].decode('utf-8')) + sub_metering_2 = float(data[f'{column_family}:subMetering2'.encode('utf-8')].decode('utf-8')) + sub_metering_3 = float(data[f'{column_family}:subMetering3'.encode('utf-8')].decode('utf-8')) + + datetime = f"{convert_date(date)} {convert_time(time)}" + + rows.append((datetime, global_active_power, sub_metering_1, sub_metering_2, sub_metering_3)) + return rows, none_num + + +hbase_data, none_num = fetch_hbase_data(table) + +if len(hbase_data) == 0: + print("No data searched, please confirm your input") + sys.exit(0) + +if none_num / len(hbase_data) >= 0.20: + print("This batch data has too many nulls to be of analytical value ") + sys.exit(0) + +columns = ["datetime", "global_active_power", "sub_metering_1", "sub_metering_2", "sub_metering_3"] +df = spark.createDataFrame(hbase_data, columns) + +df = df.withColumn("hour", hour(col("datetime"))) + +df = df.withColumn("other_consumption", expr("global_active_power * 1000 / 60 - sub_metering_1 - sub_metering_2 - sub_metering_3")) + +hourly_consumption = df.groupBy("hour").agg( + avg("sub_metering_1").alias("avg_sub_metering_1"), + avg("sub_metering_2").alias("avg_sub_metering_2"), + avg("sub_metering_3").alias("avg_sub_metering_3"), + avg("other_consumption").alias("avg_other_consumption") +) + +hourly_consumption_pd = hourly_consumption.orderBy("hour").toPandas() + +plt.figure(figsize=(12, 8)) +plt.plot(hourly_consumption_pd['hour'], hourly_consumption_pd['avg_sub_metering_1'], marker='o', label='Sub Metering 1') +plt.plot(hourly_consumption_pd['hour'], hourly_consumption_pd['avg_sub_metering_2'], marker='o', label='Sub Metering 2') +plt.plot(hourly_consumption_pd['hour'], hourly_consumption_pd['avg_sub_metering_3'], marker='o', label='Sub Metering 3') +plt.plot(hourly_consumption_pd['hour'], hourly_consumption_pd['avg_other_consumption'], marker='o', label='Other Consumption') +plt.title(f'Hourly Consumption for {input_date}') +plt.xlabel('Hour of the Day') +plt.ylabel('Total Consumption (Watt-hours)') +plt.legend() +plt.grid(True) +plt.xticks(range(24)) +plt.tight_layout() +plt.show() + +spark.stop() \ No newline at end of file diff --git a/hourly_total_power.py b/hourly_total_power.py new file mode 100644 index 0000000..c97d3bf --- /dev/null +++ b/hourly_total_power.py @@ -0,0 +1,96 @@ +import sys + +from pyspark.sql import SparkSession +from pyspark.sql.functions import col, sum as _sum, hour +import happybase +import matplotlib.pyplot as plt + + +def convert_date(date_string): + return f"{date_string[:4]}-{date_string[4:6]}-{date_string[6:]}" + + +def convert_time(time_string): + return f"{time_string[:2]}:{time_string[2:4]}:{time_string[4:]}" + + +def get_region_code(dateline): + year = dateline[:4] + region_num = (int(year) - 2006) % 6 + return str(region_num) + + +def get_double_rowkey(input_date): + split = "_" + start_datetime = input_date + "000000" + stop_datetime = input_date + "235900" + start_region_code = get_region_code(start_datetime) + stop_region_code = get_region_code(stop_datetime) + return start_region_code + split + start_datetime, stop_region_code + split + stop_datetime + + +spark = SparkSession.builder \ + .appName("HBaseSparkIntegration") \ + .getOrCreate() + +hbase_host = "100.64.0.3" +hbase_table = "elec:eleclog" + +connection = happybase.Connection(hbase_host) +table = connection.table(hbase_table) + +column_family = "info" + +input_date = input("input date (20061217):") +# input_date = "20061230" +start_row, stop_row = get_double_rowkey(input_date) + + +def fetch_hbase_data(table): + none_num = 0 + rows = [] + for key, data in table.scan(row_start=start_row.encode("utf-8"), row_stop=stop_row.encode("utf-8")): + date = data[f'{column_family}:date'.encode('utf-8')].decode('utf-8') + time = data[f'{column_family}:time'.encode('utf-8')].decode('utf-8') + value = data[f'{column_family}:globalActivePower'.encode('utf-8')].decode('utf-8') + if value == "?": + none_num += 1 + continue + global_active_power = float(value) + datetime = f"{convert_date(date)} {convert_time(time)}" + + rows.append((datetime, global_active_power)) + return rows, none_num + + +hbase_data, none_num = fetch_hbase_data(table) + +if len(hbase_data) == 0: + print("No data searched, please confirm your input") + sys.exit(0) + +if none_num / len(hbase_data) >= 0.20: + print("This batch data has too many nulls to be of analytical value ") + sys.exit(0) + +columns = ["datetime", "globalActivePower"] +df = spark.createDataFrame(hbase_data, columns) + +df = df.withColumn("hour", hour(col("datetime"))) + +hourly_total_power = df.groupBy("hour").agg((_sum("globalActivePower") / 60).alias("hourly_total_power")) + +print(hourly_total_power.head()) + +hourly_total_power_pd = hourly_total_power.orderBy("hour").toPandas() + +plt.figure(figsize=(10, 6)) +plt.plot(hourly_total_power_pd['hour'], hourly_total_power_pd['hourly_total_power'], marker='o') +plt.title(f'Hourly Power Consumption for {input_date}') +plt.xlabel('Hour of the Day') +plt.ylabel('Total Power Consumption (kW)') +plt.grid(True) +plt.xticks(range(24)) +plt.show() + +spark.stop() \ No newline at end of file diff --git a/monthly_total_power.py b/monthly_total_power.py new file mode 100644 index 0000000..35bdcd2 --- /dev/null +++ b/monthly_total_power.py @@ -0,0 +1,113 @@ +import sys +from datetime import datetime, timedelta +from pyspark.sql import SparkSession +from pyspark.sql.functions import col, sum as _sum, dayofmonth, date_format +import happybase +import matplotlib.pyplot as plt + + +def convert_date(date_string): + return f"{date_string[:4]}-{date_string[4:6]}-{date_string[6:]}" + + +def convert_time(time_string): + return f"{time_string[:2]}:{time_string[2:4]}:{time_string[4:]}" + + +def get_region_code(dateline): + year = dateline[:4] + region_num = (int(year) - 2006) % 6 + return str(region_num) + + +def get_month_dates(input_date): + date = datetime.strptime(input_date, "%Y%m") + + start_of_month = date.replace(day=1) + + next_month = start_of_month.replace(month=start_of_month.month % 12 + 1, + year=start_of_month.year + (start_of_month.month // 12)) + end_of_month = next_month - timedelta(days=1) + + start_of_month_str = start_of_month.strftime("%Y%m%d") + end_of_month_str = end_of_month.strftime("%Y%m%d") + + return start_of_month_str, end_of_month_str + + + +def get_double_rowkey(input_date): + split = "_" + start_date, stop_date = get_month_dates(input_date) + start_datetime = start_date + "000000" + stop_datetime = stop_date + "235900" + start_region_code = get_region_code(start_datetime) + stop_region_code = get_region_code(stop_datetime) + return start_region_code + split + start_datetime, stop_region_code + split + stop_datetime + + +spark = SparkSession.builder \ + .appName("HBaseSparkIntegration") \ + .getOrCreate() + +hbase_host = "100.64.0.3" +hbase_table = "elec:eleclog" + +connection = happybase.Connection(hbase_host) +table = connection.table(hbase_table) + +column_family = "info" + +input_date = input("input date (200701):") +# input_date = "20061230" +start_row, stop_row = get_double_rowkey(input_date) + +print(start_row, stop_row) + +def fetch_hbase_data(table): + none_num = 0 + rows = [] + for key, data in table.scan(row_start=start_row.encode("utf-8"), row_stop=stop_row.encode("utf-8")): + date = data[f'{column_family}:date'.encode('utf-8')].decode('utf-8') + time = data[f'{column_family}:time'.encode('utf-8')].decode('utf-8') + value = data[f'{column_family}:globalActivePower'.encode('utf-8')].decode('utf-8') + if value == "?": + none_num += 1 + continue + global_active_power = float(value) + datetime = f"{convert_date(date)} {convert_time(time)}" + + rows.append((datetime, global_active_power)) + return rows, none_num + + +hbase_data, none_num = fetch_hbase_data(table) + +if len(hbase_data) == 0: + print("No data searched, please confirm your input") + sys.exit(0) + +if none_num / len(hbase_data) >= 0.20: + print("This batch data has too many nulls to be of analytical value ") + sys.exit(0) + +columns = ["datetime", "globalActivePower"] +df = spark.createDataFrame(hbase_data, columns) + +df = df.withColumn("date", date_format(col("datetime"), "yyyy-MM-dd")) + +daily_total_power = df.groupBy("date").agg((_sum("globalActivePower") / (60*24)).alias("daily_total_power")) + +daily_total_power_pd = daily_total_power.orderBy("date").toPandas() + +plt.figure(figsize=(10, 6)) +plt.plot(daily_total_power_pd['date'], daily_total_power_pd['daily_total_power'], marker='o') +plt.title('Daily Power Consumption for One Mouth') +plt.xlabel('Date') +plt.ylabel('Total Power Consumption (kW)') +plt.grid(True) +plt.xticks(rotation=45) +plt.tight_layout() +plt.show() + +spark.stop() \ No newline at end of file