You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
195 lines
6.7 KiB
195 lines
6.7 KiB
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
# https://raw.githubusercontent.com/sdelrio/hs110-prometheus-exporter/master/hs110-exporter.py
|
|
|
|
# Python 3 to python 2 compatibilty
|
|
from __future__ import print_function
|
|
from builtins import bytes
|
|
|
|
from prometheus_client import start_http_server, Gauge, Info
|
|
import struct
|
|
import time
|
|
import socket
|
|
import argparse
|
|
import json
|
|
|
|
version = 0.80
|
|
|
|
keyname = {
|
|
"h1": { # Hardware version 1.x
|
|
"current": "current",
|
|
"voltage": "voltage",
|
|
"power": "power",
|
|
"total": "total"
|
|
},
|
|
"h2": { # Hardware version 2.x
|
|
"current": "current_ma",
|
|
"voltage": "voltage_mv",
|
|
"power": "power_mw",
|
|
"total": "total_wh"
|
|
}
|
|
}
|
|
|
|
# Default HS110 hardware version
|
|
hardware = "h2"
|
|
|
|
# Encryption and Decryption of TP-Link Smart Home Protocol
|
|
# XOR Autokey Cipher with starting key = 171
|
|
hs110_key = 171
|
|
|
|
# Check if IP is valid
|
|
def validIP(ip):
|
|
try:
|
|
socket.inet_pton(socket.AF_INET, ip)
|
|
except socket.error:
|
|
parser.error("Invalid IP Address.")
|
|
return ip
|
|
|
|
# Encryption and Decryption of TP-Link Smart Home Protocol
|
|
# XOR Autokey Cipher with starting key = 171
|
|
|
|
def encrypt(string):
|
|
key = hs110_key
|
|
result = b"\0\0\0" + bytes([len(string)])
|
|
for i in bytes(string.encode('latin-1')):
|
|
a = key ^ i
|
|
key = a
|
|
result += bytes([a])
|
|
return result
|
|
|
|
def decrypt(string):
|
|
key = hs110_key
|
|
result = b""
|
|
for i in bytes(string):
|
|
a = key ^ i
|
|
key = i
|
|
result += bytes([a])
|
|
return result.decode('latin-1')
|
|
|
|
# Parse commandline arguments
|
|
parser = argparse.ArgumentParser(description="TP-Link Wi-Fi Smart Plug Prometheus exporter v" + str(version))
|
|
parser.add_argument("-t", "--target", metavar="<ip>", required=True, help="Target IP Address", type=validIP)
|
|
parser.add_argument("-f", "--frequency", metavar="<seconds>", required=False, help="Interval in seconds between checking measures", default=1, type=int)
|
|
parser.add_argument("-p", "--port", metavar="<port>", required=False, help="Port for listenin", default=8110, type=int)
|
|
args = parser.parse_args()
|
|
|
|
# Set target IP, port and command to send
|
|
ip = args.target
|
|
listen_port = args.port
|
|
sleep_time = args.frequency
|
|
port = 9999
|
|
cmd = '{"emeter":{"get_realtime":{}}}'
|
|
received_data = {"emeter":{"get_realtime":{ keyname[hardware]['current']:0, keyname[hardware]['voltage']:0,keyname[hardware]['power']:0,keyname[hardware]['total']:0,"err_code":0}}}
|
|
|
|
# Send command and receive reply
|
|
|
|
# Create a metric to track time spent and requests made.
|
|
# Gaugage: it goes up and down, snapshot of state
|
|
|
|
REQUEST_POWER = Gauge('hs110_power_watt', 'HS110 Watt measure')
|
|
REQUEST_CURRENT = Gauge('hs110_current', 'HS110 Current measure')
|
|
REQUEST_VOLTAGE = Gauge('hs110_voltage', 'HS110 Voltage measure')
|
|
REQUEST_TOTAL = Gauge('hs110_total', 'HS110 Energy measure')
|
|
|
|
i = Info('hardware_version', 'Hardware Version')
|
|
i.info({'hardware_version': hardware})
|
|
|
|
REQUEST_POWER.set_function(lambda: get_power() )
|
|
REQUEST_CURRENT.set_function(lambda: get_current() )
|
|
REQUEST_VOLTAGE.set_function(lambda: get_voltage() )
|
|
REQUEST_TOTAL.set_function(lambda: get_total() )
|
|
|
|
def get_power():
|
|
""" Get HS110 power """
|
|
try:
|
|
if hardware == "h1":
|
|
print("h1")
|
|
return received_data["emeter"]["get_realtime"][keyname[hardware]['power']]
|
|
if hardware == "h2":
|
|
print("h2")
|
|
return (received_data["emeter"]["get_realtime"][keyname[hardware]['power']]/1000)
|
|
except socket.error:
|
|
quit("Could not connect to host " + ip + ":" + str(port))
|
|
return 0
|
|
|
|
def get_current():
|
|
""" Get HS110 current """
|
|
try:
|
|
if hardware == "h1":
|
|
print("h1")
|
|
return received_data["emeter"]["get_realtime"][keyname[hardware]['current']]
|
|
if hardware == "h2":
|
|
print("h2")
|
|
return (received_data["emeter"]["get_realtime"][keyname[hardware]['current']]/1000)
|
|
except socket.error:
|
|
quit("Could not connect to host " + ip + ":" + str(port))
|
|
return 0
|
|
|
|
def get_voltage():
|
|
""" Get HS110 voltage """
|
|
try:
|
|
if hardware == "h1":
|
|
print("h1")
|
|
return received_data["emeter"]["get_realtime"][keyname[hardware]['voltage']]
|
|
if hardware == "h2":
|
|
print("h2")
|
|
return (received_data["emeter"]["get_realtime"][keyname[hardware]['voltage']]/1000)
|
|
|
|
except socket.error:
|
|
quit("Could not connect to host " + ip + ":" + str(port))
|
|
return 0
|
|
|
|
def get_total():
|
|
""" Get HS110 total energy usage """
|
|
try:
|
|
if hardware == "h1":
|
|
print("h1")
|
|
return received_data["emeter"]["get_realtime"][keyname[hardware]['total']]
|
|
if hardware == "h2":
|
|
print("h2")
|
|
return (received_data["emeter"]["get_realtime"][keyname[hardware]['total']]/1000)
|
|
except socket.error:
|
|
quit("Could not connect to host " + ip + ":" + str(port))
|
|
return 0
|
|
|
|
def get_hardware():
|
|
""" Get HS110 Hardware """
|
|
try:
|
|
return hardware
|
|
except socket.error:
|
|
quit("Could not connect to host " + ip + ":" + str(port))
|
|
return 0
|
|
|
|
# Main entry point
|
|
if __name__ == '__main__':
|
|
# Start up the server to expose the metrics.
|
|
start_http_server(listen_port)
|
|
|
|
# Main loop
|
|
while True:
|
|
sock_tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
sock_tcp.settimeout(2)
|
|
|
|
try:
|
|
sock_tcp.connect((ip, port))
|
|
sock_tcp.send(encrypt(cmd))
|
|
data = sock_tcp.recv(2048)
|
|
sock_tcp.close()
|
|
# Sample return value received:
|
|
# HS110 Hardware 1: {"emeter":{"get_realtime":{"voltage":229865,"current":1110,"power":231866,"total":228,"err_code":0}}}
|
|
# HS110 Hardware 2: {"emeter":{"get_realtime":{"voltage_mv":229865,"current_ma":1110,"power_mw":231866,"total_wh":228,"err_code":0}}}
|
|
received_data = json.loads(decrypt(data[4:]))
|
|
print(received_data)
|
|
if "current" in received_data['emeter']['get_realtime']:
|
|
hardware = "h1"
|
|
if "current_ma" in received_data['emeter']['get_realtime']:
|
|
hardware = "h2"
|
|
print("Hardware: " + hardware + " - IP: " + ip + ":" + str(port) + " Received power: " + str(received_data["emeter"]["get_realtime"][keyname[hardware]['power']]))
|
|
except socket.error:
|
|
print("Could not connect to the host "+ ip + ":" + str(port))
|
|
except ValueError:
|
|
received_data = {"emeter":{"get_realtime":{keyname[hardware]['voltage']:0,keyname[hardware]['current']:0,keyname[hardware]['power']:0,keyname[hardware]['total']:0,"err_code":0}}}
|
|
print("Could not decrypt data from hs110.")
|
|
|
|
time.sleep(sleep_time)
|