mirror of
https://github.com/espressif/esp-idf.git
synced 2024-10-05 20:47:46 -04:00
Merge branch 'bugfix/prov_example_tests' into 'master'
Fix provisioning example tests See merge request idf/esp-idf!3667
This commit is contained in:
commit
480fb72b46
7
.flake8
7
.flake8
@ -78,7 +78,7 @@ select =
|
||||
E275, # missing whitespace after keyword
|
||||
E301, # expected 1 blank line, found 0
|
||||
E302, # expected 2 blank lines, found 0
|
||||
E303, # too many blank lines
|
||||
E303, # too many blank lines
|
||||
E304, # blank lines found after function decorator
|
||||
E305, # expected 2 blank lines after end of function or class
|
||||
E306, # expected 1 blank line before a nested definition
|
||||
@ -166,9 +166,6 @@ exclude =
|
||||
components/ulp/esp32ulp_mapgen.py,
|
||||
components/wifi_provisioning/python/wifi_config_pb2.py,
|
||||
components/wifi_provisioning/python/wifi_constants_pb2.py,
|
||||
examples/provisioning/ble_prov/ble_prov_test.py,
|
||||
examples/provisioning/softap_prov/softap_prov_test.py,
|
||||
examples/provisioning/softap_prov/utils/wifi_tools.py,
|
||||
tools/ci/apply_bot_filter.py,
|
||||
tools/cmake/convert_to_cmake.py,
|
||||
tools/esp_app_trace/apptrace_proc.py,
|
||||
@ -180,7 +177,6 @@ exclude =
|
||||
tools/esp_app_trace/pylibelf/types/__init__.py,
|
||||
tools/esp_app_trace/pylibelf/util/__init__.py,
|
||||
tools/esp_app_trace/pylibelf/util/syms/__init__.py,
|
||||
tools/esp_prov/esp_prov.py,
|
||||
tools/esp_prov/proto/__init__.py,
|
||||
tools/esp_prov/prov/__init__.py,
|
||||
tools/esp_prov/prov/custom_prov.py,
|
||||
@ -190,7 +186,6 @@ exclude =
|
||||
tools/esp_prov/security/security0.py,
|
||||
tools/esp_prov/security/security1.py,
|
||||
tools/esp_prov/transport/__init__.py,
|
||||
tools/esp_prov/transport/ble_cli.py,
|
||||
tools/esp_prov/transport/transport.py,
|
||||
tools/esp_prov/transport/transport_ble.py,
|
||||
tools/esp_prov/transport/transport_console.py,
|
||||
|
@ -15,28 +15,30 @@
|
||||
# limitations under the License.
|
||||
|
||||
from __future__ import print_function
|
||||
import imp
|
||||
import re
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
|
||||
# This environment variable is expected on the host machine
|
||||
test_fw_path = os.getenv("TEST_FW_PATH")
|
||||
if test_fw_path and test_fw_path not in sys.path:
|
||||
sys.path.insert(0, test_fw_path)
|
||||
try:
|
||||
import IDF
|
||||
except ImportError:
|
||||
test_fw_path = os.getenv("TEST_FW_PATH")
|
||||
if test_fw_path and test_fw_path not in sys.path:
|
||||
sys.path.insert(0, test_fw_path)
|
||||
import IDF
|
||||
|
||||
# When running on local machine execute the following before running this script
|
||||
# > export TEST_FW_PATH='~/esp/esp-idf/tools/tiny-test-fw'
|
||||
# > make print_flash_cmd | tail -n 1 > build/download.config
|
||||
# > make app bootloader
|
||||
try:
|
||||
import esp_prov
|
||||
except ImportError:
|
||||
esp_prov_path = os.getenv("IDF_PATH") + "/tools/esp_prov"
|
||||
if esp_prov_path and esp_prov_path not in sys.path:
|
||||
sys.path.insert(0, esp_prov_path)
|
||||
import esp_prov
|
||||
|
||||
import TinyFW
|
||||
import IDF
|
||||
# Have esp_prov throw exception
|
||||
esp_prov.config_throw_except = True
|
||||
|
||||
# Import esp_prov tool
|
||||
idf_path = os.environ['IDF_PATH']
|
||||
esp_prov = imp.load_source("esp_prov", idf_path + "/tools/esp_prov/esp_prov.py")
|
||||
|
||||
@IDF.idf_example_test(env_tag="Example_WIFI_BT")
|
||||
def test_examples_provisioning_ble(env, extra_data):
|
||||
@ -46,18 +48,18 @@ def test_examples_provisioning_ble(env, extra_data):
|
||||
# Get binary file
|
||||
binary_file = os.path.join(dut1.app.binary_path, "ble_prov.bin")
|
||||
bin_size = os.path.getsize(binary_file)
|
||||
IDF.log_performance("ble_prov_bin_size", "{}KB".format(bin_size//1024))
|
||||
IDF.check_performance("ble_prov_bin_size", bin_size//1024)
|
||||
IDF.log_performance("ble_prov_bin_size", "{}KB".format(bin_size // 1024))
|
||||
IDF.check_performance("ble_prov_bin_size", bin_size // 1024)
|
||||
|
||||
# Upload binary and start testing
|
||||
dut1.start_app()
|
||||
|
||||
# Parse BLE devname
|
||||
devname = dut1.expect(re.compile(r"(?:[\s\S]*) Provisioning started with BLE devname : (PROV_\S\S\S\S\S\S)"))[0]
|
||||
devname = dut1.expect(re.compile(r"Provisioning started with BLE devname : '(PROV_\S\S\S\S\S\S)'"), timeout=60)[0]
|
||||
print("BLE Device Alias for DUT :", devname)
|
||||
|
||||
# Match additional headers sent in the request
|
||||
dut1.expect("BLE Provisioning started")
|
||||
dut1.expect("BLE Provisioning started", timeout=30)
|
||||
|
||||
print("Starting Provisioning")
|
||||
verbose = False
|
||||
@ -70,12 +72,12 @@ def test_examples_provisioning_ble(env, extra_data):
|
||||
|
||||
print("Getting security")
|
||||
security = esp_prov.get_security(secver, pop, verbose)
|
||||
if security == None:
|
||||
if security is None:
|
||||
raise RuntimeError("Failed to get security")
|
||||
|
||||
print("Getting transport")
|
||||
transport = esp_prov.get_transport(provmode, None, devname)
|
||||
if transport == None:
|
||||
if transport is None:
|
||||
raise RuntimeError("Failed to get transport")
|
||||
|
||||
print("Verifying protocol version")
|
||||
@ -109,5 +111,6 @@ def test_examples_provisioning_ble(env, extra_data):
|
||||
if not success:
|
||||
raise RuntimeError("Provisioning failed")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
test_examples_provisioning_ble()
|
||||
|
@ -114,7 +114,7 @@ static esp_err_t app_prov_start_service(void)
|
||||
return ESP_FAIL;
|
||||
}
|
||||
|
||||
ESP_LOGI(TAG, "Provisioning started with BLE devname : %s", config.device_name);
|
||||
ESP_LOGI(TAG, "Provisioning started with BLE devname : '%s'", config.device_name);
|
||||
return ESP_OK;
|
||||
}
|
||||
|
||||
|
@ -15,29 +15,38 @@
|
||||
# limitations under the License.
|
||||
|
||||
from __future__ import print_function
|
||||
import imp
|
||||
import re
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
|
||||
# This environment variable is expected on the host machine
|
||||
test_fw_path = os.getenv("TEST_FW_PATH")
|
||||
if test_fw_path and test_fw_path not in sys.path:
|
||||
sys.path.insert(0, test_fw_path)
|
||||
try:
|
||||
import IDF
|
||||
except ImportError:
|
||||
test_fw_path = os.getenv("TEST_FW_PATH")
|
||||
if test_fw_path and test_fw_path not in sys.path:
|
||||
sys.path.insert(0, test_fw_path)
|
||||
import IDF
|
||||
|
||||
# When running on local machine execute the following before running this script
|
||||
# > export TEST_FW_PATH='~/esp/esp-idf/tools/tiny-test-fw'
|
||||
# > make print_flash_cmd | tail -n 1 > build/download.config
|
||||
# > make app bootloader
|
||||
try:
|
||||
import esp_prov
|
||||
except ImportError:
|
||||
esp_prov_path = os.getenv("IDF_PATH") + "/tools/esp_prov"
|
||||
if esp_prov_path and esp_prov_path not in sys.path:
|
||||
sys.path.insert(0, esp_prov_path)
|
||||
import esp_prov
|
||||
|
||||
import TinyFW
|
||||
import IDF
|
||||
try:
|
||||
import wifi_tools
|
||||
except ImportError:
|
||||
wifi_tools_path = os.getenv("IDF_PATH") + "/examples/provisioning/softap_prov/utils"
|
||||
if wifi_tools_path and wifi_tools_path not in sys.path:
|
||||
sys.path.insert(0, wifi_tools_path)
|
||||
import wifi_tools
|
||||
|
||||
# Have esp_prov throw exception
|
||||
esp_prov.config_throw_except = True
|
||||
|
||||
# Import esp_prov tool
|
||||
idf_path = os.environ['IDF_PATH']
|
||||
esp_prov = imp.load_source("esp_prov", idf_path + "/tools/esp_prov/esp_prov.py")
|
||||
wifi_tools = imp.load_source("wifi_tools", idf_path + "/examples/provisioning/softap_prov/utils/wifi_tools.py")
|
||||
|
||||
@IDF.idf_example_test(env_tag="Example_WIFI_BT")
|
||||
def test_examples_provisioning_softap(env, extra_data):
|
||||
@ -47,27 +56,30 @@ def test_examples_provisioning_softap(env, extra_data):
|
||||
# Get binary file
|
||||
binary_file = os.path.join(dut1.app.binary_path, "softap_prov.bin")
|
||||
bin_size = os.path.getsize(binary_file)
|
||||
IDF.log_performance("softap_prov_bin_size", "{}KB".format(bin_size//1024))
|
||||
IDF.check_performance("softap_prov_bin_size", bin_size//1024)
|
||||
IDF.log_performance("softap_prov_bin_size", "{}KB".format(bin_size // 1024))
|
||||
IDF.check_performance("softap_prov_bin_size", bin_size // 1024)
|
||||
|
||||
# Upload binary and start testing
|
||||
dut1.start_app()
|
||||
|
||||
# Parse IP address of STA
|
||||
dut1.expect("Starting WiFi SoftAP provisioning")
|
||||
dut1.expect("SoftAP started")
|
||||
[ssid, password] = dut1.expect(re.compile(r"(?:[\s\S]*)SoftAP Provisioning started with SSID '(\S+)', Password '(\S+)'"))
|
||||
dut1.expect("Starting WiFi SoftAP provisioning", timeout=60)
|
||||
dut1.expect("SoftAP started", timeout=30)
|
||||
[ssid, password] = dut1.expect(re.compile(r"SoftAP Provisioning started with SSID '(\S+)', Password '(\S+)'"), timeout=30)
|
||||
|
||||
iface = wifi_tools.get_wiface_name()
|
||||
if iface == None:
|
||||
if iface is None:
|
||||
raise RuntimeError("Failed to get Wi-Fi interface on host")
|
||||
print("Interface name : " + iface)
|
||||
print("SoftAP SSID : " + ssid)
|
||||
print("SoftAP Password : " + password)
|
||||
|
||||
ctrl = wifi_tools.wpa_cli(iface, reset_on_exit = True)
|
||||
ctrl = wifi_tools.wpa_cli(iface, reset_on_exit=True)
|
||||
print("Connecting to DUT SoftAP...")
|
||||
ip = ctrl.connect(ssid, password)
|
||||
got_ip = dut1.expect(re.compile(r"softAP assign IP to station,IP is: (\d+.\d+.\d+.\d+)"), timeout=30)[0]
|
||||
if ip != got_ip:
|
||||
raise RuntimeError("SoftAP connected to another host! " + ip + "!=" + got_ip)
|
||||
print("Connected to DUT SoftAP")
|
||||
|
||||
print("Starting Provisioning")
|
||||
@ -78,16 +90,16 @@ def test_examples_provisioning_softap(env, extra_data):
|
||||
provmode = "softap"
|
||||
ap_ssid = "myssid"
|
||||
ap_password = "mypassword"
|
||||
softap_endpoint = ip.split('.')[0] + "." + ip.split('.')[1]+ "." + ip.split('.')[2] + ".1:80"
|
||||
softap_endpoint = ip.split('.')[0] + "." + ip.split('.')[1] + "." + ip.split('.')[2] + ".1:80"
|
||||
|
||||
print("Getting security")
|
||||
security = esp_prov.get_security(secver, pop, verbose)
|
||||
if security == None:
|
||||
if security is None:
|
||||
raise RuntimeError("Failed to get security")
|
||||
|
||||
print("Getting transport")
|
||||
transport = esp_prov.get_transport(provmode, softap_endpoint, None)
|
||||
if transport == None:
|
||||
if transport is None:
|
||||
raise RuntimeError("Failed to get transport")
|
||||
|
||||
print("Verifying protocol version")
|
||||
@ -121,5 +133,6 @@ def test_examples_provisioning_softap(env, extra_data):
|
||||
if not success:
|
||||
raise RuntimeError("Provisioning failed")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
test_examples_provisioning_softap()
|
||||
|
@ -18,12 +18,14 @@ import dbus.mainloop.glib
|
||||
import netifaces
|
||||
import time
|
||||
|
||||
|
||||
def get_wiface_name():
|
||||
for iface in netifaces.interfaces():
|
||||
if iface.startswith('w'):
|
||||
return iface
|
||||
return None
|
||||
|
||||
|
||||
def get_wiface_IPv4(iface):
|
||||
try:
|
||||
[info] = netifaces.ifaddresses(iface)[netifaces.AF_INET]
|
||||
@ -31,8 +33,9 @@ def get_wiface_IPv4(iface):
|
||||
except KeyError:
|
||||
return None
|
||||
|
||||
|
||||
class wpa_cli:
|
||||
def __init__(self, iface, reset_on_exit = False):
|
||||
def __init__(self, iface, reset_on_exit=False):
|
||||
self.iface_name = iface
|
||||
self.iface_obj = None
|
||||
self.iface_ifc = None
|
||||
@ -43,26 +46,27 @@ class wpa_cli:
|
||||
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
|
||||
bus = dbus.SystemBus()
|
||||
|
||||
service = dbus.Interface(bus.get_object("fi.w1.wpa_supplicant1", "/fi/w1/wpa_supplicant1"), "fi.w1.wpa_supplicant1")
|
||||
paths = service.Get("fi.w1.wpa_supplicant1", "Interfaces", dbus_interface='org.freedesktop.DBus.Properties')
|
||||
service = dbus.Interface(bus.get_object("fi.w1.wpa_supplicant1", "/fi/w1/wpa_supplicant1"),
|
||||
"fi.w1.wpa_supplicant1")
|
||||
iface_path = service.GetInterface(self.iface_name)
|
||||
self.iface_obj = bus.get_object("fi.w1.wpa_supplicant1", iface_path)
|
||||
self.iface_ifc = dbus.Interface(self.iface_obj, "fi.w1.wpa_supplicant1.Interface")
|
||||
if self.iface_ifc == None:
|
||||
if self.iface_ifc is None:
|
||||
raise RuntimeError('supplicant : Failed to fetch interface')
|
||||
|
||||
self.old_network = self.iface_obj.Get("fi.w1.wpa_supplicant1.Interface", "CurrentNetwork", dbus_interface='org.freedesktop.DBus.Properties')
|
||||
self.old_network = self.iface_obj.Get("fi.w1.wpa_supplicant1.Interface", "CurrentNetwork",
|
||||
dbus_interface='org.freedesktop.DBus.Properties')
|
||||
if self.old_network == '/':
|
||||
self.old_network = None
|
||||
else:
|
||||
self.connected = True
|
||||
|
||||
def connect(self, ssid, password):
|
||||
if self.connected == True:
|
||||
if self.connected is True:
|
||||
self.iface_ifc.Disconnect()
|
||||
self.connected = False
|
||||
|
||||
if self.new_network != None:
|
||||
if self.new_network is not None:
|
||||
self.iface_ifc.RemoveNetwork(self.new_network)
|
||||
|
||||
self.new_network = self.iface_ifc.AddNetwork({"ssid": ssid, "psk": password})
|
||||
@ -73,7 +77,7 @@ class wpa_cli:
|
||||
while retry > 0:
|
||||
time.sleep(5)
|
||||
ip = get_wiface_IPv4(self.iface_name)
|
||||
if ip != None:
|
||||
if ip is not None:
|
||||
self.connected = True
|
||||
return ip
|
||||
retry -= 1
|
||||
@ -82,17 +86,17 @@ class wpa_cli:
|
||||
raise RuntimeError('wpa_cli : Connection failed')
|
||||
|
||||
def reset(self):
|
||||
if self.iface_ifc != None:
|
||||
if self.connected == True:
|
||||
if self.iface_ifc is not None:
|
||||
if self.connected is True:
|
||||
self.iface_ifc.Disconnect()
|
||||
self.connected = False
|
||||
if self.new_network != None:
|
||||
if self.new_network is not None:
|
||||
self.iface_ifc.RemoveNetwork(self.new_network)
|
||||
self.new_network = None
|
||||
if self.old_network != None:
|
||||
if self.old_network is not None:
|
||||
self.iface_ifc.SelectNetwork(self.old_network)
|
||||
self.old_network = None
|
||||
|
||||
def __del__(self):
|
||||
if self.reset_on_exit == True:
|
||||
if self.reset_on_exit is True:
|
||||
self.reset()
|
||||
|
@ -21,13 +21,30 @@ import time
|
||||
import os
|
||||
import sys
|
||||
|
||||
idf_path = os.environ['IDF_PATH']
|
||||
sys.path.insert(0, idf_path + "/components/protocomm/python")
|
||||
sys.path.insert(1, idf_path + "/tools/esp_prov")
|
||||
try:
|
||||
import security
|
||||
import transport
|
||||
import prov
|
||||
|
||||
except ImportError:
|
||||
idf_path = os.environ['IDF_PATH']
|
||||
sys.path.insert(0, idf_path + "/components/protocomm/python")
|
||||
sys.path.insert(1, idf_path + "/tools/esp_prov")
|
||||
|
||||
import security
|
||||
import transport
|
||||
import prov
|
||||
|
||||
# Set this to true to allow exceptions to be thrown
|
||||
config_throw_except = False
|
||||
|
||||
|
||||
def on_except(err):
|
||||
if config_throw_except:
|
||||
raise RuntimeError(err)
|
||||
else:
|
||||
print(err)
|
||||
|
||||
import security
|
||||
import transport
|
||||
import prov
|
||||
|
||||
def get_security(secver, pop=None, verbose=False):
|
||||
if secver == 1:
|
||||
@ -36,26 +53,27 @@ def get_security(secver, pop=None, verbose=False):
|
||||
return security.Security0(verbose)
|
||||
return None
|
||||
|
||||
|
||||
def get_transport(sel_transport, softap_endpoint=None, ble_devname=None):
|
||||
try:
|
||||
tp = None
|
||||
if (sel_transport == 'softap'):
|
||||
tp = transport.Transport_Softap(softap_endpoint)
|
||||
elif (sel_transport == 'ble'):
|
||||
tp = transport.Transport_BLE(devname = ble_devname,
|
||||
service_uuid = '0000ffff-0000-1000-8000-00805f9b34fb',
|
||||
nu_lookup = {
|
||||
'prov-session': 'ff51',
|
||||
'prov-config' : 'ff52',
|
||||
'proto-ver' : 'ff53'
|
||||
})
|
||||
tp = transport.Transport_BLE(devname=ble_devname,
|
||||
service_uuid='0000ffff-0000-1000-8000-00805f9b34fb',
|
||||
nu_lookup={'prov-session': 'ff51',
|
||||
'prov-config': 'ff52',
|
||||
'proto-ver': 'ff53'
|
||||
})
|
||||
elif (sel_transport == 'console'):
|
||||
tp = transport.Transport_Console()
|
||||
return tp
|
||||
except RuntimeError as e:
|
||||
print(e)
|
||||
on_except(e)
|
||||
return None
|
||||
|
||||
|
||||
def version_match(tp, protover):
|
||||
try:
|
||||
response = tp.send_data('proto-ver', protover)
|
||||
@ -63,134 +81,139 @@ def version_match(tp, protover):
|
||||
return False
|
||||
return True
|
||||
except RuntimeError as e:
|
||||
print(e)
|
||||
on_except(e)
|
||||
return None
|
||||
|
||||
|
||||
def establish_session(tp, sec):
|
||||
try:
|
||||
response = None
|
||||
while True:
|
||||
request = sec.security_session(response)
|
||||
if request == None:
|
||||
if request is None:
|
||||
break
|
||||
response = tp.send_data('prov-session', request)
|
||||
if (response == None):
|
||||
if (response is None):
|
||||
return False
|
||||
return True
|
||||
except RuntimeError as e:
|
||||
print(e)
|
||||
on_except(e)
|
||||
return None
|
||||
|
||||
|
||||
def custom_config(tp, sec, custom_info, custom_ver):
|
||||
try:
|
||||
message = prov.custom_config_request(sec, custom_info, custom_ver)
|
||||
response = tp.send_data('custom-config', message)
|
||||
return (prov.custom_config_response(sec, response) == 0)
|
||||
except RuntimeError as e:
|
||||
print(e)
|
||||
on_except(e)
|
||||
return None
|
||||
|
||||
|
||||
def send_wifi_config(tp, sec, ssid, passphrase):
|
||||
try:
|
||||
message = prov.config_set_config_request(sec, ssid, passphrase)
|
||||
response = tp.send_data('prov-config', message)
|
||||
return (prov.config_set_config_response(sec, response) == 0)
|
||||
except RuntimeError as e:
|
||||
print(e)
|
||||
on_except(e)
|
||||
return None
|
||||
|
||||
|
||||
def apply_wifi_config(tp, sec):
|
||||
try:
|
||||
message = prov.config_apply_config_request(sec)
|
||||
response = tp.send_data('prov-config', message)
|
||||
return (prov.config_set_config_response(sec, response) == 0)
|
||||
except RuntimeError as e:
|
||||
print(e)
|
||||
on_except(e)
|
||||
return None
|
||||
|
||||
|
||||
def get_wifi_config(tp, sec):
|
||||
try:
|
||||
message = prov.config_get_status_request(sec)
|
||||
response = tp.send_data('prov-config', message)
|
||||
return prov.config_get_status_response(sec, response)
|
||||
except RuntimeError as e:
|
||||
print(e)
|
||||
on_except(e)
|
||||
return None
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = argparse.ArgumentParser(description="Generate ESP prov payload")
|
||||
|
||||
parser.add_argument("--ssid", dest = 'ssid', type = str,
|
||||
help = "SSID of Wi-Fi Network", required = True)
|
||||
parser.add_argument("--passphrase", dest = 'passphrase', type = str,
|
||||
help = "Passphrase of Wi-Fi network", default = '')
|
||||
parser.add_argument("--ssid", dest='ssid', type=str,
|
||||
help="SSID of Wi-Fi Network", required=True)
|
||||
parser.add_argument("--passphrase", dest='passphrase', type=str,
|
||||
help="Passphrase of Wi-Fi network", default='')
|
||||
|
||||
parser.add_argument("--sec_ver", dest = 'secver', type = int,
|
||||
help = "Security scheme version", default = 1)
|
||||
parser.add_argument("--proto_ver", dest = 'protover', type = str,
|
||||
help = "Protocol version", default = 'V0.1')
|
||||
parser.add_argument("--pop", dest = 'pop', type = str,
|
||||
help = "Proof of possession", default = '')
|
||||
parser.add_argument("--sec_ver", dest='secver', type=int,
|
||||
help="Security scheme version", default=1)
|
||||
parser.add_argument("--proto_ver", dest='protover', type=str,
|
||||
help="Protocol version", default='V0.1')
|
||||
parser.add_argument("--pop", dest='pop', type=str,
|
||||
help="Proof of possession", default='')
|
||||
|
||||
parser.add_argument("--softap_endpoint", dest = 'softap_endpoint', type = str,
|
||||
help = "<softap_ip:port>, http(s):// shouldn't be included", default = '192.168.4.1:80')
|
||||
parser.add_argument("--softap_endpoint", dest='softap_endpoint', type=str,
|
||||
help="<softap_ip:port>, http(s):// shouldn't be included", default='192.168.4.1:80')
|
||||
|
||||
parser.add_argument("--ble_devname", dest = 'ble_devname', type = str,
|
||||
help = "BLE Device Name", default = '')
|
||||
parser.add_argument("--ble_devname", dest='ble_devname', type=str,
|
||||
help="BLE Device Name", default='')
|
||||
|
||||
parser.add_argument("--transport", dest = 'provmode', type = str,
|
||||
help = "provisioning mode i.e console or softap or ble", default = 'softap')
|
||||
parser.add_argument("--transport", dest='provmode', type=str,
|
||||
help="provisioning mode i.e console or softap or ble", default='softap')
|
||||
|
||||
parser.add_argument("--custom_config", help="Provision Custom Configuration",
|
||||
action = "store_true")
|
||||
parser.add_argument("--custom_info", dest = 'custom_info', type = str,
|
||||
help = "Custom Config Info String", default = '<some custom info string>')
|
||||
parser.add_argument("--custom_ver", dest = 'custom_ver', type = int,
|
||||
help = "Custom Config Version Number", default = 2)
|
||||
action="store_true")
|
||||
parser.add_argument("--custom_info", dest='custom_info', type=str,
|
||||
help="Custom Config Info String", default='<some custom info string>')
|
||||
parser.add_argument("--custom_ver", dest='custom_ver', type=int,
|
||||
help="Custom Config Version Number", default=2)
|
||||
|
||||
parser.add_argument("-v","--verbose", help = "increase output verbosity",
|
||||
action = "store_true")
|
||||
parser.add_argument("-v","--verbose", help="increase output verbosity", action="store_true")
|
||||
args = parser.parse_args()
|
||||
|
||||
print("==== Esp_Prov Version: " + args.protover + " ====")
|
||||
|
||||
security = get_security(args.secver, args.pop, args.verbose)
|
||||
if security == None:
|
||||
obj_security = get_security(args.secver, args.pop, args.verbose)
|
||||
if obj_security is None:
|
||||
print("---- Invalid Security Version ----")
|
||||
exit(1)
|
||||
|
||||
transport = get_transport(args.provmode, args.softap_endpoint, args.ble_devname)
|
||||
if transport == None:
|
||||
obj_transport = get_transport(args.provmode, args.softap_endpoint, args.ble_devname)
|
||||
if obj_transport is None:
|
||||
print("---- Invalid provisioning mode ----")
|
||||
exit(2)
|
||||
|
||||
print("\n==== Verifying protocol version ====")
|
||||
if not version_match(transport, args.protover):
|
||||
if not version_match(obj_transport, args.protover):
|
||||
print("---- Error in protocol version matching ----")
|
||||
exit(3)
|
||||
print("==== Verified protocol version successfully ====")
|
||||
|
||||
print("\n==== Starting Session ====")
|
||||
if not establish_session(transport, security):
|
||||
if not establish_session(obj_transport, obj_security):
|
||||
print("---- Error in establishing session ----")
|
||||
exit(4)
|
||||
print("==== Session Established ====")
|
||||
|
||||
if args.custom_config:
|
||||
print("\n==== Sending Custom config to esp32 ====")
|
||||
if not custom_config(transport, security, args.custom_info, args.custom_ver):
|
||||
if not custom_config(obj_transport, obj_security, args.custom_info, args.custom_ver):
|
||||
print("---- Error in custom config ----")
|
||||
exit(5)
|
||||
print("==== Custom config sent successfully ====")
|
||||
|
||||
print("\n==== Sending Wi-Fi credential to esp32 ====")
|
||||
if not send_wifi_config(transport, security, args.ssid, args.passphrase):
|
||||
if not send_wifi_config(obj_transport, obj_security, args.ssid, args.passphrase):
|
||||
print("---- Error in send Wi-Fi config ----")
|
||||
exit(6)
|
||||
print("==== Wi-Fi Credentials sent successfully ====")
|
||||
|
||||
print("\n==== Applying config to esp32 ====")
|
||||
if not apply_wifi_config(transport, security):
|
||||
if not apply_wifi_config(obj_transport, obj_security):
|
||||
print("---- Error in apply Wi-Fi config ----")
|
||||
exit(7)
|
||||
print("==== Apply config sent successfully ====")
|
||||
@ -198,7 +221,7 @@ if __name__ == '__main__':
|
||||
while True:
|
||||
time.sleep(5)
|
||||
print("\n==== Wi-Fi connection state ====")
|
||||
ret = get_wifi_config(transport, security)
|
||||
ret = get_wifi_config(obj_transport, obj_security)
|
||||
if (ret == 1):
|
||||
continue
|
||||
elif (ret == 0):
|
||||
|
@ -22,6 +22,7 @@ import utils
|
||||
|
||||
fallback = True
|
||||
|
||||
|
||||
# Check if platform is Linux and required packages are installed
|
||||
# else fallback to console mode
|
||||
if platform.system() == 'Linux':
|
||||
@ -30,10 +31,12 @@ if platform.system() == 'Linux':
|
||||
import dbus.mainloop.glib
|
||||
import time
|
||||
fallback = False
|
||||
except:
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
#--------------------------------------------------------------------
|
||||
|
||||
# --------------------------------------------------------------------
|
||||
|
||||
|
||||
# BLE client (Linux Only) using Bluez and DBus
|
||||
class BLE_Bluez_Client:
|
||||
@ -52,13 +55,13 @@ class BLE_Bluez_Client:
|
||||
|
||||
for path, interfaces in objects.items():
|
||||
adapter = interfaces.get("org.bluez.Adapter1")
|
||||
if adapter != None:
|
||||
if adapter is not None:
|
||||
if path.endswith(iface):
|
||||
self.adapter = dbus.Interface(bus.get_object("org.bluez", path), "org.bluez.Adapter1")
|
||||
self.adapter_props = dbus.Interface(bus.get_object("org.bluez", path), "org.freedesktop.DBus.Properties")
|
||||
break
|
||||
|
||||
if self.adapter == None:
|
||||
if self.adapter is None:
|
||||
raise RuntimeError("Bluetooth adapter not found")
|
||||
|
||||
self.adapter_props.Set("org.bluez.Adapter1", "Powered", dbus.Boolean(1))
|
||||
@ -67,7 +70,7 @@ class BLE_Bluez_Client:
|
||||
retry = 10
|
||||
while (retry > 0):
|
||||
try:
|
||||
if self.device == None:
|
||||
if self.device is None:
|
||||
print("Connecting...")
|
||||
# Wait for device to be discovered
|
||||
time.sleep(5)
|
||||
@ -98,7 +101,7 @@ class BLE_Bluez_Client:
|
||||
dev_path = path
|
||||
break
|
||||
|
||||
if dev_path == None:
|
||||
if dev_path is None:
|
||||
raise RuntimeError("BLE device not found")
|
||||
|
||||
try:
|
||||
@ -120,12 +123,14 @@ class BLE_Bluez_Client:
|
||||
if path.startswith(self.device.object_path):
|
||||
service = bus.get_object("org.bluez", path)
|
||||
uuid = service.Get('org.bluez.GattService1', 'UUID',
|
||||
dbus_interface='org.freedesktop.DBus.Properties')
|
||||
dbus_interface='org.freedesktop.DBus.Properties')
|
||||
if uuid == self.srv_uuid:
|
||||
srv_path = path
|
||||
break
|
||||
|
||||
if srv_path == None:
|
||||
if srv_path is None:
|
||||
self.device.Disconnect(dbus_interface='org.bluez.Device1')
|
||||
self.device = None
|
||||
raise RuntimeError("Provisioning service not found")
|
||||
|
||||
self.characteristics = dict()
|
||||
@ -135,7 +140,7 @@ class BLE_Bluez_Client:
|
||||
if path.startswith(srv_path):
|
||||
chrc = bus.get_object("org.bluez", path)
|
||||
uuid = chrc.Get('org.bluez.GattCharacteristic1', 'UUID',
|
||||
dbus_interface='org.freedesktop.DBus.Properties')
|
||||
dbus_interface='org.freedesktop.DBus.Properties')
|
||||
self.characteristics[uuid] = chrc
|
||||
|
||||
def has_characteristic(self, uuid):
|
||||
@ -148,6 +153,7 @@ class BLE_Bluez_Client:
|
||||
self.device.Disconnect(dbus_interface='org.bluez.Device1')
|
||||
if self.adapter:
|
||||
self.adapter.RemoveDevice(self.device)
|
||||
self.device = None
|
||||
if self.adapter_props:
|
||||
self.adapter_props.Set("org.bluez.Adapter1", "Powered", dbus.Boolean(0))
|
||||
|
||||
@ -159,7 +165,9 @@ class BLE_Bluez_Client:
|
||||
path.WriteValue([ord(c) for c in data], {}, dbus_interface='org.bluez.GattCharacteristic1')
|
||||
return ''.join(chr(b) for b in path.ReadValue({}, dbus_interface='org.bluez.GattCharacteristic1'))
|
||||
|
||||
#--------------------------------------------------------------------
|
||||
|
||||
# --------------------------------------------------------------------
|
||||
|
||||
|
||||
# Console based BLE client for Cross Platform support
|
||||
class BLE_Console_Client:
|
||||
@ -193,7 +201,9 @@ class BLE_Console_Client:
|
||||
resp = input("\t<< ")
|
||||
return utils.hexstr_to_str(resp)
|
||||
|
||||
#--------------------------------------------------------------------
|
||||
|
||||
# --------------------------------------------------------------------
|
||||
|
||||
|
||||
# Function to get client instance depending upon platform
|
||||
def get_client():
|
||||
|
Loading…
x
Reference in New Issue
Block a user