Merge branch 'bugfix/ci_ble_wifi_example_test_v4.1' into 'release/v4.1'

Fix for ble and wifi example test (v4.1)

See merge request espressif/esp-idf!13804
This commit is contained in:
Mahavir Jain 2021-07-28 07:33:43 +00:00
commit 92d703e1f3
10 changed files with 1095 additions and 1115 deletions

View File

@ -15,63 +15,39 @@
# limitations under the License. # limitations under the License.
from __future__ import print_function from __future__ import print_function
import os
import re
import uuid
import subprocess
from tiny_test_fw import Utility import os
import subprocess
import threading
import traceback
try:
import Queue
except ImportError:
import queue as Queue
import ttfw_idf import ttfw_idf
from ble import lib_ble_client from ble import lib_ble_client
from tiny_test_fw import Utility
# When running on local machine execute the following before running this script # When running on local machine execute the following before running this script
# > make app bootloader # > make app bootloader
# > make print_flash_cmd | tail -n 1 > build/download.config # > make print_flash_cmd | tail -n 1 > build/download.config
@ttfw_idf.idf_example_test(env_tag="Example_WIFI_BT") def blecent_client_task(dut):
def test_example_app_ble_central(env, extra_data):
"""
Steps:
1. Discover Bluetooth Adapter and Power On
"""
interface = 'hci0' interface = 'hci0'
adv_host_name = "BleCentTestApp" adv_host_name = 'BleCentTestApp'
adv_iface_index = 0
adv_type = 'peripheral' adv_type = 'peripheral'
adv_uuid = '1811' adv_uuid = '1811'
subprocess.check_output(['rm','-rf','/var/lib/bluetooth/*'])
subprocess.check_output(['hciconfig','hci0','reset'])
# Acquire DUT
dut = env.get_dut("blecent", "examples/bluetooth/nimble/blecent", dut_class=ttfw_idf.ESP32DUT)
# Get binary file
binary_file = os.path.join(dut.app.binary_path, "blecent.bin")
bin_size = os.path.getsize(binary_file)
ttfw_idf.log_performance("blecent_bin_size", "{}KB".format(bin_size // 1024))
# Upload binary and start testing
Utility.console_log("Starting blecent example test app")
dut.start_app()
dut.reset()
device_addr = ':'.join(re.findall('..', '%012x' % uuid.getnode()))
# Get BLE client module # Get BLE client module
ble_client_obj = lib_ble_client.BLE_Bluez_Client(interface) ble_client_obj = lib_ble_client.BLE_Bluez_Client(iface=interface)
if not ble_client_obj:
raise RuntimeError("Get DBus-Bluez object failed !!")
# Discover Bluetooth Adapter and power on # Discover Bluetooth Adapter and power on
is_adapter_set = ble_client_obj.set_adapter() is_adapter_set = ble_client_obj.set_adapter()
if not is_adapter_set: if not is_adapter_set:
raise RuntimeError("Adapter Power On failed !!") return
# Write device address to dut
dut.expect("BLE Host Task Started", timeout=60)
dut.write(device_addr + "\n")
''' '''
Blecent application run: Blecent application run:
@ -81,28 +57,83 @@ def test_example_app_ble_central(env, extra_data):
Register advertisement Register advertisement
Start advertising Start advertising
''' '''
ble_client_obj.start_advertising(adv_host_name, adv_iface_index, adv_type, adv_uuid) # Create Gatt Application
# Register GATT Application
ble_client_obj.register_gatt_app()
# Call disconnect to perform cleanup operations before exiting application # Register Advertisement
ble_client_obj.disconnect() # Check read/write/subscribe is received from device
ble_client_obj.register_adv(adv_host_name, adv_type, adv_uuid)
# Check dut responses # Check dut responses
dut.expect("Connection established", timeout=60) dut.expect('Connection established', timeout=30)
dut.expect('Service discovery complete; status=0', timeout=30)
dut.expect('GATT procedure initiated: read;', timeout=30)
dut.expect('Read complete; status=0', timeout=30)
dut.expect('GATT procedure initiated: write;', timeout=30)
dut.expect('Write complete; status=0', timeout=30)
dut.expect('GATT procedure initiated: write;', timeout=30)
dut.expect('Subscribe complete; status=0', timeout=30)
dut.expect('received notification;', timeout=30)
dut.expect("Service discovery complete; status=0", timeout=60)
print("Service discovery passed\n\tService Discovery Status: 0")
dut.expect("GATT procedure initiated: read;", timeout=60) class BleCentThread(threading.Thread):
dut.expect("Read complete; status=0", timeout=60) def __init__(self, dut, exceptions_queue):
print("Read passed\n\tSupportedNewAlertCategoryCharacteristic\n\tRead Status: 0") threading.Thread.__init__(self)
self.dut = dut
self.exceptions_queue = exceptions_queue
dut.expect("GATT procedure initiated: write;", timeout=60) def run(self):
dut.expect("Write complete; status=0", timeout=60) try:
print("Write passed\n\tAlertNotificationControlPointCharacteristic\n\tWrite Status: 0") blecent_client_task(self.dut)
except RuntimeError:
self.exceptions_queue.put(traceback.format_exc(), block=False)
dut.expect("GATT procedure initiated: write;", timeout=60)
dut.expect("Subscribe complete; status=0", timeout=60) @ttfw_idf.idf_example_test(env_tag='Example_WIFI_BT')
print("Subscribe passed\n\tClientCharacteristicConfigurationDescriptor\n\tSubscribe Status: 0") def test_example_app_ble_central(env, extra_data):
"""
Steps:
1. Discover Bluetooth Adapter and Power On
2. Connect BLE Device
3. Start Notifications
4. Updated value is retrieved
5. Stop Notifications
"""
# Remove cached bluetooth devices of any previous connections
subprocess.check_output(['rm', '-rf', '/var/lib/bluetooth/*'])
subprocess.check_output(['hciconfig', 'hci0', 'reset'])
# Acquire DUT
dut = env.get_dut('blecent', 'examples/bluetooth/nimble/blecent', dut_class=ttfw_idf.ESP32DUT)
# Get binary file
binary_file = os.path.join(dut.app.binary_path, 'blecent.bin')
bin_size = os.path.getsize(binary_file)
ttfw_idf.log_performance('blecent_bin_size', '{}KB'.format(bin_size // 1024))
# Upload binary and start testing
Utility.console_log('Starting blecent example test app')
dut.start_app()
dut.reset()
exceptions_queue = Queue.Queue()
# Starting a py-client in a separate thread
blehr_thread_obj = BleCentThread(dut, exceptions_queue)
blehr_thread_obj.start()
blehr_thread_obj.join()
exception_msg = None
while True:
try:
exception_msg = exceptions_queue.get(block=False)
except Queue.Empty:
break
else:
Utility.console_log('\n' + exception_msg)
if exception_msg:
raise Exception('Blecent thread did not run successfully')
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -15,57 +15,56 @@
# limitations under the License. # limitations under the License.
from __future__ import print_function from __future__ import print_function
import os import os
import re import re
import subprocess
import threading import threading
import traceback import traceback
import subprocess
from tiny_test_fw import Utility
import ttfw_idf
from ble import lib_ble_client
try: try:
import Queue import Queue
except ImportError: except ImportError:
import queue as Queue import queue as Queue
import ttfw_idf
from ble import lib_ble_client
from tiny_test_fw import Utility
# When running on local machine execute the following before running this script # When running on local machine execute the following before running this script
# > make app bootloader # > make app bootloader
# > make print_flash_cmd | tail -n 1 > build/download.config # > make print_flash_cmd | tail -n 1 > build/download.config
def blehr_client_task(hr_obj, dut_addr): def blehr_client_task(hr_obj, dut, dut_addr):
interface = 'hci0' interface = 'hci0'
ble_devname = 'blehr_sensor_1.0' ble_devname = 'blehr_sensor_1.0'
hr_srv_uuid = '180d' hr_srv_uuid = '180d'
hr_char_uuid = '2a37' hr_char_uuid = '2a37'
# Get BLE client module # Get BLE client module
ble_client_obj = lib_ble_client.BLE_Bluez_Client(interface, devname=ble_devname, devaddr=dut_addr) ble_client_obj = lib_ble_client.BLE_Bluez_Client(iface=interface)
if not ble_client_obj:
raise RuntimeError("Failed to get DBus-Bluez object")
# Discover Bluetooth Adapter and power on # Discover Bluetooth Adapter and power on
is_adapter_set = ble_client_obj.set_adapter() is_adapter_set = ble_client_obj.set_adapter()
if not is_adapter_set: if not is_adapter_set:
raise RuntimeError("Adapter Power On failed !!") return
# Connect BLE Device # Connect BLE Device
is_connected = ble_client_obj.connect() is_connected = ble_client_obj.connect(
devname=ble_devname,
devaddr=dut_addr)
if not is_connected: if not is_connected:
# Call disconnect to perform cleanup operations before exiting application return
ble_client_obj.disconnect()
raise RuntimeError("Connection to device " + str(ble_devname) + " failed !!")
# Read Services # Get services of the connected device
services_ret = ble_client_obj.get_services() services = ble_client_obj.get_services()
if services_ret: if not services:
Utility.console_log("\nServices\n")
Utility.console_log(str(services_ret))
else:
ble_client_obj.disconnect() ble_client_obj.disconnect()
raise RuntimeError("Failure: Read Services failed") return
# Get characteristics of the connected device
ble_client_obj.get_chars()
''' '''
Blehr application run: Blehr application run:
@ -73,30 +72,52 @@ def blehr_client_task(hr_obj, dut_addr):
Retrieve updated value Retrieve updated value
Stop Notifications Stop Notifications
''' '''
blehr_ret = ble_client_obj.hr_update_simulation(hr_srv_uuid, hr_char_uuid) # Get service if exists
if blehr_ret: service = ble_client_obj.get_service_if_exists(hr_srv_uuid)
Utility.console_log("Success: blehr example test passed") if service:
# Get characteristic if exists
char = ble_client_obj.get_char_if_exists(hr_char_uuid)
if not char:
ble_client_obj.disconnect()
return
else: else:
raise RuntimeError("Failure: blehr example test failed") ble_client_obj.disconnect()
return
# Start Notify
# Read updated value
# Stop Notify
notify = ble_client_obj.start_notify(char)
if not notify:
ble_client_obj.disconnect()
return
# Check dut responses
dut.expect('subscribe event; cur_notify=1', timeout=30)
dut.expect('subscribe event; cur_notify=0', timeout=30)
# Call disconnect to perform cleanup operations before exiting application # Call disconnect to perform cleanup operations before exiting application
ble_client_obj.disconnect() ble_client_obj.disconnect()
# Check dut responses
dut.expect('disconnect;', timeout=30)
class BleHRThread(threading.Thread): class BleHRThread(threading.Thread):
def __init__(self, dut_addr, exceptions_queue): def __init__(self, dut, dut_addr, exceptions_queue):
threading.Thread.__init__(self) threading.Thread.__init__(self)
self.dut = dut
self.dut_addr = dut_addr self.dut_addr = dut_addr
self.exceptions_queue = exceptions_queue self.exceptions_queue = exceptions_queue
def run(self): def run(self):
try: try:
blehr_client_task(self, self.dut_addr) blehr_client_task(self, self.dut, self.dut_addr)
except Exception: except RuntimeError:
self.exceptions_queue.put(traceback.format_exc(), block=False) self.exceptions_queue.put(traceback.format_exc(), block=False)
@ttfw_idf.idf_example_test(env_tag="Example_WIFI_BT") @ttfw_idf.idf_example_test(env_tag='Example_WIFI_BT')
def test_example_app_ble_hr(env, extra_data): def test_example_app_ble_hr(env, extra_data):
""" """
Steps: Steps:
@ -106,28 +127,28 @@ def test_example_app_ble_hr(env, extra_data):
4. Updated value is retrieved 4. Updated value is retrieved
5. Stop Notifications 5. Stop Notifications
""" """
subprocess.check_output(['rm','-rf','/var/lib/bluetooth/*']) # Remove cached bluetooth devices of any previous connections
subprocess.check_output(['hciconfig','hci0','reset']) subprocess.check_output(['rm', '-rf', '/var/lib/bluetooth/*'])
subprocess.check_output(['hciconfig', 'hci0', 'reset'])
# Acquire DUT # Acquire DUT
dut = env.get_dut("blehr", "examples/bluetooth/nimble/blehr", dut_class=ttfw_idf.ESP32DUT) dut = env.get_dut('blehr', 'examples/bluetooth/nimble/blehr', dut_class=ttfw_idf.ESP32DUT)
# Get binary file # Get binary file
binary_file = os.path.join(dut.app.binary_path, "blehr.bin") binary_file = os.path.join(dut.app.binary_path, 'blehr.bin')
bin_size = os.path.getsize(binary_file) bin_size = os.path.getsize(binary_file)
ttfw_idf.log_performance("blehr_bin_size", "{}KB".format(bin_size // 1024)) ttfw_idf.log_performance('blehr_bin_size', '{}KB'.format(bin_size // 1024))
ttfw_idf.check_performance("blehr_bin_size", bin_size // 1024)
# Upload binary and start testing # Upload binary and start testing
Utility.console_log("Starting blehr simple example test app") Utility.console_log('Starting blehr simple example test app')
dut.start_app() dut.start_app()
dut.reset() dut.reset()
# Get device address from dut # Get device address from dut
dut_addr = dut.expect(re.compile(r"Device Address: ([a-fA-F0-9:]+)"), timeout=30)[0] dut_addr = dut.expect(re.compile(r'Device Address: ([a-fA-F0-9:]+)'), timeout=30)[0]
exceptions_queue = Queue.Queue() exceptions_queue = Queue.Queue()
# Starting a py-client in a separate thread # Starting a py-client in a separate thread
blehr_thread_obj = BleHRThread(dut_addr, exceptions_queue) blehr_thread_obj = BleHRThread(dut, dut_addr, exceptions_queue)
blehr_thread_obj.start() blehr_thread_obj.start()
blehr_thread_obj.join() blehr_thread_obj.join()
@ -138,15 +159,10 @@ def test_example_app_ble_hr(env, extra_data):
except Queue.Empty: except Queue.Empty:
break break
else: else:
Utility.console_log("\n" + exception_msg) Utility.console_log('\n' + exception_msg)
if exception_msg: if exception_msg:
raise Exception("Thread did not run successfully") raise Exception('Blehr thread did not run successfully')
# Check dut responses
dut.expect("subscribe event; cur_notify=1", timeout=30)
dut.expect("subscribe event; cur_notify=0", timeout=30)
dut.expect("disconnect;", timeout=30)
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -15,20 +15,21 @@
# limitations under the License. # limitations under the License.
from __future__ import print_function from __future__ import print_function
import os import os
import re import re
import traceback
import threading
import subprocess import subprocess
import threading
import traceback
try: try:
import Queue import Queue
except ImportError: except ImportError:
import queue as Queue import queue as Queue
from tiny_test_fw import Utility
import ttfw_idf import ttfw_idf
from ble import lib_ble_client from ble import lib_ble_client
from tiny_test_fw import Utility
# When running on local machine execute the following before running this script # When running on local machine execute the following before running this script
# > make app bootloader # > make app bootloader
@ -36,73 +37,57 @@ from ble import lib_ble_client
# > export TEST_FW_PATH=~/esp/esp-idf/tools/tiny-test-fw # > export TEST_FW_PATH=~/esp/esp-idf/tools/tiny-test-fw
def bleprph_client_task(prph_obj, dut, dut_addr): def bleprph_client_task(dut, dut_addr):
interface = 'hci0' interface = 'hci0'
ble_devname = 'nimble-bleprph' ble_devname = 'nimble-bleprph'
srv_uuid = '2f12' srv_uuid = '2f12'
write_value = b'A'
# Get BLE client module # Get BLE client module
ble_client_obj = lib_ble_client.BLE_Bluez_Client(interface, devname=ble_devname, devaddr=dut_addr) ble_client_obj = lib_ble_client.BLE_Bluez_Client(iface=interface)
if not ble_client_obj:
raise RuntimeError("Failed to get DBus-Bluez object")
# Discover Bluetooth Adapter and power on # Discover Bluetooth Adapter and power on
is_adapter_set = ble_client_obj.set_adapter() is_adapter_set = ble_client_obj.set_adapter()
if not is_adapter_set: if not is_adapter_set:
raise RuntimeError("Adapter Power On failed !!") return
# Connect BLE Device # Connect BLE Device
is_connected = ble_client_obj.connect() is_connected = ble_client_obj.connect(
devname=ble_devname,
devaddr=dut_addr)
if not is_connected: if not is_connected:
# Call disconnect to perform cleanup operations before exiting application return
# Get services of the connected device
services = ble_client_obj.get_services()
if not services:
ble_client_obj.disconnect() ble_client_obj.disconnect()
raise RuntimeError("Connection to device " + ble_devname + " failed !!") return
# Verify service uuid exists
service_exists = ble_client_obj.get_service_if_exists(srv_uuid)
if not service_exists:
ble_client_obj.disconnect()
return
# Get characteristics of the connected device
ble_client_obj.get_chars()
# Read properties of characteristics (uuid, value and permissions)
ble_client_obj.read_chars()
# Write new value to characteristic having read and write permission
# and display updated value
write_char = ble_client_obj.write_chars(write_value)
if not write_char:
ble_client_obj.disconnect()
return
# Disconnect device
ble_client_obj.disconnect()
# Check dut responses # Check dut responses
dut.expect("GAP procedure initiated: advertise;", timeout=30) dut.expect('connection established; status=0', timeout=30)
dut.expect('disconnect;', timeout=30)
# Read Services
services_ret = ble_client_obj.get_services(srv_uuid)
if services_ret:
Utility.console_log("\nServices\n")
Utility.console_log(str(services_ret))
else:
ble_client_obj.disconnect()
raise RuntimeError("Failure: Read Services failed")
# Read Characteristics
chars_ret = {}
chars_ret = ble_client_obj.read_chars()
if chars_ret:
Utility.console_log("\nCharacteristics retrieved")
for path, props in chars_ret.items():
Utility.console_log("\n\tCharacteristic: " + str(path))
Utility.console_log("\tCharacteristic UUID: " + str(props[2]))
Utility.console_log("\tValue: " + str(props[0]))
Utility.console_log("\tProperties: : " + str(props[1]))
else:
ble_client_obj.disconnect()
raise RuntimeError("Failure: Read Characteristics failed")
'''
Write Characteristics
- write 'A' to characteristic with write permission
'''
chars_ret_on_write = {}
chars_ret_on_write = ble_client_obj.write_chars(b'A')
if chars_ret_on_write:
Utility.console_log("\nCharacteristics after write operation")
for path, props in chars_ret_on_write.items():
Utility.console_log("\n\tCharacteristic:" + str(path))
Utility.console_log("\tCharacteristic UUID: " + str(props[2]))
Utility.console_log("\tValue:" + str(props[0]))
Utility.console_log("\tProperties: : " + str(props[1]))
else:
ble_client_obj.disconnect()
raise RuntimeError("Failure: Write Characteristics failed")
# Call disconnect to perform cleanup operations before exiting application
ble_client_obj.disconnect()
class BlePrphThread(threading.Thread): class BlePrphThread(threading.Thread):
@ -114,12 +99,12 @@ class BlePrphThread(threading.Thread):
def run(self): def run(self):
try: try:
bleprph_client_task(self, self.dut, self.dut_addr) bleprph_client_task(self.dut, self.dut_addr)
except Exception: except RuntimeError:
self.exceptions_queue.put(traceback.format_exc(), block=False) self.exceptions_queue.put(traceback.format_exc(), block=False)
@ttfw_idf.idf_example_test(env_tag="Example_WIFI_BT") @ttfw_idf.idf_example_test(env_tag='Example_WIFI_BT')
def test_example_app_ble_peripheral(env, extra_data): def test_example_app_ble_peripheral(env, extra_data):
""" """
Steps: Steps:
@ -129,28 +114,31 @@ def test_example_app_ble_peripheral(env, extra_data):
4. Read Characteristics 4. Read Characteristics
5. Write Characteristics 5. Write Characteristics
""" """
subprocess.check_output(['rm','-rf','/var/lib/bluetooth/*']) # Remove cached bluetooth devices of any previous connections
subprocess.check_output(['hciconfig','hci0','reset']) subprocess.check_output(['rm', '-rf', '/var/lib/bluetooth/*'])
subprocess.check_output(['hciconfig', 'hci0', 'reset'])
# Acquire DUT # Acquire DUT
dut = env.get_dut("bleprph", "examples/bluetooth/nimble/bleprph", dut_class=ttfw_idf.ESP32DUT) dut = env.get_dut('bleprph', 'examples/bluetooth/nimble/bleprph', dut_class=ttfw_idf.ESP32DUT)
# Get binary file # Get binary file
binary_file = os.path.join(dut.app.binary_path, "bleprph.bin") binary_file = os.path.join(dut.app.binary_path, 'bleprph.bin')
bin_size = os.path.getsize(binary_file) bin_size = os.path.getsize(binary_file)
ttfw_idf.log_performance("bleprph_bin_size", "{}KB".format(bin_size // 1024)) ttfw_idf.log_performance('bleprph_bin_size', '{}KB'.format(bin_size // 1024))
ttfw_idf.check_performance("bleprph_bin_size", bin_size // 1024)
# Upload binary and start testing # Upload binary and start testing
Utility.console_log("Starting bleprph simple example test app") Utility.console_log('Starting bleprph simple example test app')
dut.start_app() dut.start_app()
dut.reset() dut.reset()
# Get device address from dut # Get device address from dut
dut_addr = dut.expect(re.compile(r"Device Address: ([a-fA-F0-9:]+)"), timeout=30)[0] dut_addr = dut.expect(re.compile(r'Device Address: ([a-fA-F0-9:]+)'), timeout=30)[0]
# Check dut responses
dut.expect('GAP procedure initiated: advertise;', timeout=30)
exceptions_queue = Queue.Queue()
# Starting a py-client in a separate thread # Starting a py-client in a separate thread
exceptions_queue = Queue.Queue()
bleprph_thread_obj = BlePrphThread(dut, dut_addr, exceptions_queue) bleprph_thread_obj = BlePrphThread(dut, dut_addr, exceptions_queue)
bleprph_thread_obj.start() bleprph_thread_obj.start()
bleprph_thread_obj.join() bleprph_thread_obj.join()
@ -162,14 +150,10 @@ def test_example_app_ble_peripheral(env, extra_data):
except Queue.Empty: except Queue.Empty:
break break
else: else:
Utility.console_log("\n" + exception_msg) Utility.console_log('\n' + exception_msg)
if exception_msg: if exception_msg:
raise Exception("Thread did not run successfully") raise Exception('BlePrph thread did not run successfully')
# Check dut responses
dut.expect("connection established; status=0", timeout=30)
dut.expect("disconnect;", timeout=30)
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -15,89 +15,102 @@
# limitations under the License. # limitations under the License.
from __future__ import print_function from __future__ import print_function
import re
import os
import ttfw_idf import os
import re
import esp_prov import esp_prov
import tiny_test_fw
import ttfw_idf
import wifi_tools import wifi_tools
from tiny_test_fw import Utility
# Have esp_prov throw exception # Have esp_prov throw exception
esp_prov.config_throw_except = True esp_prov.config_throw_except = True
@ttfw_idf.idf_example_test(env_tag="Example_WIFI_BT") @ttfw_idf.idf_example_test(env_tag='Example_WIFI_BT')
def test_examples_provisioning_softap(env, extra_data): def test_examples_provisioning_softap(env, extra_data):
# Acquire DUT # Acquire DUT
dut1 = env.get_dut("softap_prov", "examples/provisioning/softap_prov", dut_class=ttfw_idf.ESP32DUT) dut1 = env.get_dut('softap_prov', 'examples/provisioning/softap_prov', dut_class=ttfw_idf.ESP32DUT)
# Get binary file # Get binary file
binary_file = os.path.join(dut1.app.binary_path, "softap_prov.bin") binary_file = os.path.join(dut1.app.binary_path, 'softap_prov.bin')
bin_size = os.path.getsize(binary_file) bin_size = os.path.getsize(binary_file)
ttfw_idf.log_performance("softap_prov_bin_size", "{}KB".format(bin_size // 1024)) ttfw_idf.log_performance('softap_prov_bin_size', '{}KB'.format(bin_size // 1024))
ttfw_idf.check_performance("softap_prov_bin_size", bin_size // 1024)
# Upload binary and start testing # Upload binary and start testing
dut1.start_app() dut1.start_app()
# Parse IP address of STA # Parse IP address of STA
dut1.expect("Starting WiFi SoftAP provisioning", timeout=60) dut1.expect('Starting WiFi SoftAP provisioning', timeout=60)
[ssid, password] = dut1.expect(re.compile(r"SoftAP Provisioning started with SSID '(\S+)', Password '(\S+)'"), timeout=30) [ssid, password] = dut1.expect(re.compile(r"SoftAP Provisioning started with SSID '(\S+)', Password '(\S+)'"), timeout=30)
iface = wifi_tools.get_wiface_name() iface = wifi_tools.get_wiface_name()
if iface is None: if iface is None:
raise RuntimeError("Failed to get Wi-Fi interface on host") raise RuntimeError('Failed to get Wi-Fi interface on host')
print("Interface name : " + iface) print('Interface name : ' + iface)
print("SoftAP SSID : " + ssid) print('SoftAP SSID : ' + ssid)
print("SoftAP Password : " + password) print('SoftAP Password : ' + password)
try: try:
ctrl = wifi_tools.wpa_cli(iface, reset_on_exit=True) ctrl = wifi_tools.wpa_cli(iface, reset_on_exit=True)
print("Connecting to DUT SoftAP...") print('Connecting to DUT SoftAP...')
ip = ctrl.connect(ssid, password) try:
got_ip = dut1.expect(re.compile(r"DHCP server assigned IP to a station, IP is: (\d+.\d+.\d+.\d+)"), timeout=60)[0] ip = ctrl.connect(ssid, password)
if ip != got_ip: except RuntimeError as err:
raise RuntimeError("SoftAP connected to another host! " + ip + "!=" + got_ip) Utility.console_log('error: {}'.format(err))
print("Connected to DUT SoftAP") try:
got_ip = dut1.expect(re.compile(r'DHCP server assigned IP to a station, IP is: (\d+.\d+.\d+.\d+)'), timeout=60)
Utility.console_log('got_ip: {}'.format(got_ip))
got_ip = got_ip[0]
if ip != got_ip:
raise RuntimeError('SoftAP connected to another host! {} != {}'.format(ip, got_ip))
except tiny_test_fw.DUT.ExpectTimeout:
# print what is happening on dut side
Utility.console_log('in exception tiny_test_fw.DUT.ExpectTimeout')
Utility.console_log(dut1.read())
raise
print('Connected to DUT SoftAP')
print("Starting Provisioning") print('Starting Provisioning')
verbose = False verbose = False
protover = "V0.1" protover = 'V0.1'
secver = 1 secver = 1
pop = "abcd1234" pop = 'abcd1234'
provmode = "softap" provmode = 'softap'
ap_ssid = "myssid" ap_ssid = 'myssid'
ap_password = "mypassword" ap_password = 'mypassword'
softap_endpoint = ip.split('.')[0] + "." + ip.split('.')[1] + "." + ip.split('.')[2] + ".1:80" softap_endpoint = '{}.{}.{}.1:80'.format(ip.split('.')[0], ip.split('.')[1], ip.split('.')[2])
print("Getting security") print('Getting security')
security = esp_prov.get_security(secver, pop, verbose) security = esp_prov.get_security(secver, pop, verbose)
if security is None: if security is None:
raise RuntimeError("Failed to get security") raise RuntimeError('Failed to get security')
print("Getting transport") print('Getting transport')
transport = esp_prov.get_transport(provmode, softap_endpoint) transport = esp_prov.get_transport(provmode, softap_endpoint)
if transport is None: if transport is None:
raise RuntimeError("Failed to get transport") raise RuntimeError('Failed to get transport')
print("Verifying protocol version") print('Verifying protocol version')
if not esp_prov.version_match(transport, protover): if not esp_prov.version_match(transport, protover):
raise RuntimeError("Mismatch in protocol version") raise RuntimeError('Mismatch in protocol version')
print("Starting Session") print('Starting Session')
if not esp_prov.establish_session(transport, security): if not esp_prov.establish_session(transport, security):
raise RuntimeError("Failed to start session") raise RuntimeError('Failed to start session')
print("Sending Wifi credential to DUT") print('Sending Wifi credential to DUT')
if not esp_prov.send_wifi_config(transport, security, ap_ssid, ap_password): if not esp_prov.send_wifi_config(transport, security, ap_ssid, ap_password):
raise RuntimeError("Failed to send Wi-Fi config") raise RuntimeError('Failed to send Wi-Fi config')
print("Applying config") print('Applying config')
if not esp_prov.apply_wifi_config(transport, security): if not esp_prov.apply_wifi_config(transport, security):
raise RuntimeError("Failed to send apply config") raise RuntimeError('Failed to send apply config')
if not esp_prov.wait_wifi_connected(transport, security): if not esp_prov.wait_wifi_connected(transport, security):
raise RuntimeError("Provisioning failed") raise RuntimeError('Provisioning failed')
finally: finally:
ctrl.reset() ctrl.reset()

File diff suppressed because it is too large Load Diff

View File

@ -18,6 +18,7 @@
# Register Advertisement # Register Advertisement
from __future__ import print_function from __future__ import print_function
import sys import sys
try: try:
@ -27,11 +28,10 @@ except ImportError as e:
if 'linux' not in sys.platform: if 'linux' not in sys.platform:
raise e raise e
print(e) print(e)
print("Install packages `libgirepository1.0-dev gir1.2-gtk-3.0 libcairo2-dev libdbus-1-dev libdbus-glib-1-dev` for resolving the issue") print('Install packages `libgirepository1.0-dev gir1.2-gtk-3.0 libcairo2-dev libdbus-1-dev libdbus-glib-1-dev` for resolving the issue')
print("Run `pip install -r $IDF_PATH/tools/ble/requirements.txt` for resolving the issue") print('Run `pip install -r $IDF_PATH/tools/ble/requirements.txt` for resolving the issue')
raise raise
ADV_OBJ = False
DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties' DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
LE_ADVERTISEMENT_IFACE = 'org.bluez.LEAdvertisement1' LE_ADVERTISEMENT_IFACE = 'org.bluez.LEAdvertisement1'
@ -74,10 +74,8 @@ class Advertisement(dbus.service.Object):
in_signature='s', in_signature='s',
out_signature='a{sv}') out_signature='a{sv}')
def GetAll(self, interface): def GetAll(self, interface):
global ADV_OBJ
if interface != LE_ADVERTISEMENT_IFACE: if interface != LE_ADVERTISEMENT_IFACE:
raise InvalidArgsException() raise InvalidArgsException()
ADV_OBJ = True
return self.get_properties()[LE_ADVERTISEMENT_IFACE] return self.get_properties()[LE_ADVERTISEMENT_IFACE]
@dbus.service.method(LE_ADVERTISEMENT_IFACE, @dbus.service.method(LE_ADVERTISEMENT_IFACE,

View File

@ -18,6 +18,7 @@
# Creating GATT Application which then becomes available to remote devices. # Creating GATT Application which then becomes available to remote devices.
from __future__ import print_function from __future__ import print_function
import sys import sys
try: try:
@ -27,17 +28,10 @@ except ImportError as e:
if 'linux' not in sys.platform: if 'linux' not in sys.platform:
raise e raise e
print(e) print(e)
print("Install packages `libgirepository1.0-dev gir1.2-gtk-3.0 libcairo2-dev libdbus-1-dev libdbus-glib-1-dev` for resolving the issue") print('Install packages `libgirepository1.0-dev gir1.2-gtk-3.0 libcairo2-dev libdbus-1-dev libdbus-glib-1-dev` for resolving the issue')
print("Run `pip install -r $IDF_PATH/tools/ble/requirements.txt` for resolving the issue") print('Run `pip install -r $IDF_PATH/tools/ble/requirements.txt` for resolving the issue')
raise raise
alert_status_char_obj = None
GATT_APP_OBJ = False
CHAR_READ = False
CHAR_WRITE = False
CHAR_SUBSCRIBE = False
DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager' DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties' DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
GATT_MANAGER_IFACE = 'org.bluez.GattManager1' GATT_MANAGER_IFACE = 'org.bluez.GattManager1'
@ -46,6 +40,21 @@ GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1'
GATT_DESC_IFACE = 'org.bluez.GattDescriptor1' GATT_DESC_IFACE = 'org.bluez.GattDescriptor1'
SERVICE_UUIDS = {
'ALERT_NOTIF_SVC_UUID': '00001811-0000-1000-8000-00805f9b34fb'
}
CHAR_UUIDS = {
'SUPPORT_NEW_ALERT_UUID': '00002A47-0000-1000-8000-00805f9b34fb',
'ALERT_NOTIF_UUID': '00002A44-0000-1000-8000-00805f9b34fb',
'UNREAD_ALERT_STATUS_UUID': '00002A45-0000-1000-8000-00805f9b34fb'
}
DESCR_UUIDS = {
'CCCD_UUID': '00002902-0000-1000-8000-00805f9b34fb'
}
class InvalidArgsException(dbus.exceptions.DBusException): class InvalidArgsException(dbus.exceptions.DBusException):
_dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs' _dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs'
@ -62,22 +71,16 @@ class Application(dbus.service.Object):
def __init__(self, bus, path): def __init__(self, bus, path):
self.path = path self.path = path
self.services = [] self.services = []
srv_obj = AlertNotificationService(bus, '0001')
self.add_service(srv_obj)
dbus.service.Object.__init__(self, bus, self.path) dbus.service.Object.__init__(self, bus, self.path)
def __del__(self): def __del__(self):
pass pass
def get_path(self):
return dbus.ObjectPath(self.path)
def add_service(self, service): def add_service(self, service):
self.services.append(service) self.services.append(service)
@dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}') @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}')
def GetManagedObjects(self): def GetManagedObjects(self):
global GATT_APP_OBJ
response = {} response = {}
for service in self.services: for service in self.services:
@ -89,13 +92,25 @@ class Application(dbus.service.Object):
for desc in descs: for desc in descs:
response[desc.get_path()] = desc.get_properties() response[desc.get_path()] = desc.get_properties()
GATT_APP_OBJ = True
return response return response
def get_path(self):
return dbus.ObjectPath(self.path)
def Release(self): def Release(self):
pass pass
class AlertNotificationApp(Application):
'''
Alert Notification Application
'''
def __init__(self, bus, path):
Application.__init__(self, bus, path)
self.service = AlertNotificationService(bus, '0001')
self.add_service(self.service)
class Service(dbus.service.Object): class Service(dbus.service.Object):
""" """
org.bluez.GattService1 interface implementation org.bluez.GattService1 interface implementation
@ -190,7 +205,6 @@ class Characteristic(dbus.service.Object):
def GetAll(self, interface): def GetAll(self, interface):
if interface != GATT_CHRC_IFACE: if interface != GATT_CHRC_IFACE:
raise InvalidArgsException() raise InvalidArgsException()
return self.get_properties()[GATT_CHRC_IFACE] return self.get_properties()[GATT_CHRC_IFACE]
@dbus.service.method(GATT_CHRC_IFACE, in_signature='a{sv}', out_signature='ay') @dbus.service.method(GATT_CHRC_IFACE, in_signature='a{sv}', out_signature='ay')
@ -216,7 +230,8 @@ class Characteristic(dbus.service.Object):
@dbus.service.signal(DBUS_PROP_IFACE, @dbus.service.signal(DBUS_PROP_IFACE,
signature='sa{sv}as') signature='sa{sv}as')
def PropertiesChanged(self, interface, changed, invalidated): def PropertiesChanged(self, interface, changed, invalidated):
print("\nProperties Changed") pass
# print('\nProperties Changed')
class Descriptor(dbus.service.Object): class Descriptor(dbus.service.Object):
@ -249,7 +264,6 @@ class Descriptor(dbus.service.Object):
def GetAll(self, interface): def GetAll(self, interface):
if interface != GATT_DESC_IFACE: if interface != GATT_DESC_IFACE:
raise InvalidArgsException() raise InvalidArgsException()
return self.get_properties()[GATT_DESC_IFACE] return self.get_properties()[GATT_DESC_IFACE]
@dbus.service.method(GATT_DESC_IFACE, in_signature='a{sv}', out_signature='ay') @dbus.service.method(GATT_DESC_IFACE, in_signature='a{sv}', out_signature='ay')
@ -262,151 +276,155 @@ class Descriptor(dbus.service.Object):
print('Default WriteValue called, returning error') print('Default WriteValue called, returning error')
raise NotSupportedException() raise NotSupportedException()
@dbus.service.signal(DBUS_PROP_IFACE,
signature='sa{sv}as')
def PropertiesChanged(self, interface, changed, invalidated):
pass
# print('\nProperties Changed')
class AlertNotificationService(Service): class AlertNotificationService(Service):
TEST_SVC_UUID = '00001811-0000-1000-8000-00805f9b34fb'
def __init__(self, bus, index): def __init__(self, bus, index):
global alert_status_char_obj Service.__init__(self, bus, index, SERVICE_UUIDS['ALERT_NOTIF_SVC_UUID'], primary=True)
Service.__init__(self, bus, index, self.TEST_SVC_UUID, primary=True)
self.add_characteristic(SupportedNewAlertCategoryCharacteristic(bus, '0001', self)) self.add_characteristic(SupportedNewAlertCategoryCharacteristic(bus, '0001', self))
self.add_characteristic(AlertNotificationControlPointCharacteristic(bus, '0002', self)) self.add_characteristic(AlertNotificationControlPointCharacteristic(bus, '0002', self))
alert_status_char_obj = UnreadAlertStatusCharacteristic(bus, '0003', self) self.add_characteristic(UnreadAlertStatusCharacteristic(bus, '0003', self))
self.add_characteristic(alert_status_char_obj)
def get_char_status(self, uuid, status):
for char in self.characteristics:
if char.uuid == uuid:
if status in char.status:
return True
return False
class SupportedNewAlertCategoryCharacteristic(Characteristic): class SupportedNewAlertCategoryCharacteristic(Characteristic):
SUPPORT_NEW_ALERT_UUID = '00002A47-0000-1000-8000-00805f9b34fb'
def __init__(self, bus, index, service): def __init__(self, bus, index, service):
Characteristic.__init__( Characteristic.__init__(
self, bus, index, self, bus, index,
self.SUPPORT_NEW_ALERT_UUID, CHAR_UUIDS['SUPPORT_NEW_ALERT_UUID'],
['read'], ['read'],
service) service)
self.value = [dbus.Byte(2)] self.value = [dbus.Byte(2)]
self.status = []
def ReadValue(self, options): def ReadValue(self, options):
global CHAR_READ
CHAR_READ = True
val_list = [] val_list = []
for val in self.value: for val in self.value:
val_list.append(dbus.Byte(val)) val_list.append(dbus.Byte(val))
print("Read Request received\n", "\tSupportedNewAlertCategoryCharacteristic") print('Read Request received\n', '\tSupportedNewAlertCategoryCharacteristic')
print("\tValue:", "\t", val_list) print('\tValue:', '\t', val_list)
self.status.append('read')
return val_list return val_list
class AlertNotificationControlPointCharacteristic(Characteristic): class AlertNotificationControlPointCharacteristic(Characteristic):
ALERT_NOTIF_UUID = '00002A44-0000-1000-8000-00805f9b34fb'
def __init__(self, bus, index, service): def __init__(self, bus, index, service):
Characteristic.__init__( Characteristic.__init__(
self, bus, index, self, bus, index,
self.ALERT_NOTIF_UUID, CHAR_UUIDS['ALERT_NOTIF_UUID'],
['read','write'], ['read', 'write'],
service) service)
self.value = [dbus.Byte(0)] self.value = [dbus.Byte(0)]
self.status = []
def ReadValue(self, options): def ReadValue(self, options):
val_list = [] val_list = []
for val in self.value: for val in self.value:
val_list.append(dbus.Byte(val)) val_list.append(dbus.Byte(val))
print("Read Request received\n", "\tAlertNotificationControlPointCharacteristic") print('Read Request received\n', '\tAlertNotificationControlPointCharacteristic')
print("\tValue:", "\t", val_list) print('\tValue:', '\t', val_list)
self.status.append('read')
return val_list return val_list
def WriteValue(self, value, options): def WriteValue(self, value, options):
global CHAR_WRITE print('Write Request received\n', '\tAlertNotificationControlPointCharacteristic')
CHAR_WRITE = True print('\tCurrent value:', '\t', self.value)
print("Write Request received\n", "\tAlertNotificationControlPointCharacteristic")
print("\tCurrent value:", "\t", self.value)
val_list = [] val_list = []
for val in value: for val in value:
val_list.append(val) val_list.append(val)
self.value = val_list self.value = val_list
self.PropertiesChanged(GATT_CHRC_IFACE, {'Value': self.value}, [])
# Check if new value is written # Check if new value is written
print("\tNew value:", "\t", self.value) print('\tNew value:', '\t', self.value)
if not self.value == value: if not self.value == value:
print("Failed: Write Request\n\tNew value not written\tCurrent value:", self.value) print('Failed: Write Request\n\tNew value not written\tCurrent value:', self.value)
self.status.append('write')
class UnreadAlertStatusCharacteristic(Characteristic): class UnreadAlertStatusCharacteristic(Characteristic):
UNREAD_ALERT_STATUS_UUID = '00002A45-0000-1000-8000-00805f9b34fb'
def __init__(self, bus, index, service): def __init__(self, bus, index, service):
Characteristic.__init__( Characteristic.__init__(
self, bus, index, self, bus, index,
self.UNREAD_ALERT_STATUS_UUID, CHAR_UUIDS['UNREAD_ALERT_STATUS_UUID'],
['read', 'write', 'notify'], ['read', 'write', 'notify'],
service) service)
self.value = [dbus.Byte(0)] self.value = [dbus.Byte(0)]
self.cccd_obj = ClientCharacteristicConfigurationDescriptor(bus, '0001', self) self.cccd_obj = ClientCharacteristicConfigurationDescriptor(bus, '0001', self)
self.add_descriptor(self.cccd_obj) self.add_descriptor(self.cccd_obj)
self.notifying = False self.notifying = False
self.status = []
def StartNotify(self): def StartNotify(self):
global CHAR_SUBSCRIBE try:
CHAR_SUBSCRIBE = True if self.notifying:
print('\nAlready notifying, nothing to do')
return
print('Notify Started')
self.notifying = True
self.ReadValue()
self.WriteValue([dbus.Byte(1), dbus.Byte(0)])
self.status.append('notify')
if self.notifying: except Exception as e:
print('\nAlready notifying, nothing to do') print(e)
return
self.notifying = True
print("\nNotify Started")
self.cccd_obj.WriteValue([dbus.Byte(1), dbus.Byte(0)])
self.cccd_obj.ReadValue()
def StopNotify(self): def StopNotify(self):
if not self.notifying: if not self.notifying:
print('\nNot notifying, nothing to do') print('\nNot notifying, nothing to do')
return return
self.notifying = False self.notifying = False
print("\nNotify Stopped") print('\nNotify Stopped')
def ReadValue(self, options): def ReadValue(self, options=None):
print("Read Request received\n", "\tUnreadAlertStatusCharacteristic")
val_list = [] val_list = []
for val in self.value: for val in self.value:
val_list.append(dbus.Byte(val)) val_list.append(dbus.Byte(val))
print("\tValue:", "\t", val_list) self.status.append('read')
print('\tValue:', '\t', val_list)
return val_list return val_list
def WriteValue(self, value, options): def WriteValue(self, value, options=None):
print("Write Request received\n", "\tUnreadAlertStatusCharacteristic")
val_list = [] val_list = []
for val in value: for val in value:
val_list.append(val) val_list.append(val)
self.value = val_list self.value = val_list
self.PropertiesChanged(GATT_CHRC_IFACE, {'Value': self.value}, [])
# Check if new value is written # Check if new value is written
print("\tNew value:", "\t", self.value)
if not self.value == value: if not self.value == value:
print("Failed: Write Request\n\tNew value not written\tCurrent value:", self.value) print('Failed: Write Request\n\tNew value not written\tCurrent value:', self.value)
print('New value:', '\t', self.value)
self.status.append('write')
class ClientCharacteristicConfigurationDescriptor(Descriptor): class ClientCharacteristicConfigurationDescriptor(Descriptor):
CCCD_UUID = '00002902-0000-1000-8000-00805f9b34fb'
def __init__(self, bus, index, characteristic): def __init__(self, bus, index, characteristic):
self.value = [dbus.Byte(0)] self.value = [dbus.Byte(1)]
Descriptor.__init__( Descriptor.__init__(
self, bus, index, self, bus, index,
self.CCCD_UUID, DESCR_UUIDS['CCCD_UUID'],
['read', 'write'], ['read', 'write'],
characteristic) characteristic)
def ReadValue(self): def ReadValue(self, options=None):
print("\tValue on read:", "\t", self.value)
return self.value return self.value
def WriteValue(self, value): def WriteValue(self, value, options=None):
val_list = [] val_list = []
for val in value: for val in value:
val_list.append(val) val_list.append(val)
self.value = val_list self.value = val_list
self.PropertiesChanged(GATT_DESC_IFACE, {'Value': self.value}, [])
# Check if new value is written # Check if new value is written
print("New value on write:", "\t", self.value)
if not self.value == value: if not self.value == value:
print("Failed: Write Request\n\tNew value not written\tCurrent value:", self.value) print('Failed: Write Request\n\tNew value not written\tCurrent value:', self.value)

View File

@ -13,10 +13,11 @@
# limitations under the License. # limitations under the License.
# #
import time
import dbus import dbus
import dbus.mainloop.glib import dbus.mainloop.glib
import netifaces import netifaces
import time
def get_wiface_name(): def get_wiface_name():
@ -43,25 +44,30 @@ class wpa_cli:
self.new_network = None self.new_network = None
self.connected = False self.connected = False
self.reset_on_exit = reset_on_exit self.reset_on_exit = reset_on_exit
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()
service = dbus.Interface(bus.get_object("fi.w1.wpa_supplicant1", "/fi/w1/wpa_supplicant1"), try:
"fi.w1.wpa_supplicant1") dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
iface_path = service.GetInterface(self.iface_name) bus = dbus.SystemBus()
self.iface_obj = bus.get_object("fi.w1.wpa_supplicant1", iface_path)
self.iface_ifc = dbus.Interface(self.iface_obj, "fi.w1.wpa_supplicant1.Interface")
self.iface_props = dbus.Interface(self.iface_obj, 'org.freedesktop.DBus.Properties')
if self.iface_ifc is None:
raise RuntimeError('supplicant : Failed to fetch interface')
self.old_network = self._get_iface_property("CurrentNetwork") service = dbus.Interface(bus.get_object('fi.w1.wpa_supplicant1', '/fi/w1/wpa_supplicant1'),
print("Old network is %s" % self.old_network) 'fi.w1.wpa_supplicant1')
iface_path = service.GetInterface(self.iface_name)
self.iface_obj = bus.get_object('fi.w1.wpa_supplicant1', iface_path)
self.iface_ifc = dbus.Interface(self.iface_obj, 'fi.w1.wpa_supplicant1.Interface')
self.iface_props = dbus.Interface(self.iface_obj, 'org.freedesktop.DBus.Properties')
if self.iface_ifc is None:
raise RuntimeError('supplicant : Failed to fetch interface')
if self.old_network == '/': self.old_network = self._get_iface_property('CurrentNetwork')
self.old_network = None print('Old network is %s' % self.old_network)
else:
self.connected = True if self.old_network == '/':
self.old_network = None
else:
self.connected = True
except Exception as err:
raise Exception('Failure in wpa_cli init: {}'.format(err))
def _get_iface_property(self, name): def _get_iface_property(self, name):
""" Read the property with 'name' from the wi-fi interface object """ Read the property with 'name' from the wi-fi interface object
@ -69,40 +75,44 @@ class wpa_cli:
Note: The result is a dbus wrapped type, so should usually convert it to the corresponding native Note: The result is a dbus wrapped type, so should usually convert it to the corresponding native
Python type Python type
""" """
return self.iface_props.Get("fi.w1.wpa_supplicant1.Interface", name) return self.iface_props.Get('fi.w1.wpa_supplicant1.Interface', name)
def connect(self, ssid, password): def connect(self, ssid, password):
if self.connected is True: try:
self.iface_ifc.Disconnect() if self.connected is True:
self.connected = False self.iface_ifc.Disconnect()
self.connected = False
if self.new_network is not None: if self.new_network is not None:
self.iface_ifc.RemoveNetwork(self.new_network) self.iface_ifc.RemoveNetwork(self.new_network)
print("Pre-connect state is %s, IP is %s" % (self._get_iface_property("State"), get_wiface_IPv4(self.iface_name))) print('Pre-connect state is %s, IP is %s' % (self._get_iface_property('State'), get_wiface_IPv4(self.iface_name)))
self.new_network = self.iface_ifc.AddNetwork({"ssid": ssid, "psk": password}) self.new_network = self.iface_ifc.AddNetwork({'ssid': ssid, 'psk': password})
self.iface_ifc.SelectNetwork(self.new_network) self.iface_ifc.SelectNetwork(self.new_network)
time.sleep(10)
ip = None ip = None
retry = 10 retry = 10
while retry > 0: while retry > 0:
time.sleep(5) time.sleep(5)
state = str(self._get_iface_property('State'))
print('wpa iface state %s (scanning %s)' % (state, bool(self._get_iface_property('Scanning'))))
if state in ['disconnected', 'inactive']:
self.iface_ifc.Reconnect()
ip = get_wiface_IPv4(self.iface_name)
print('wpa iface %s IP %s' % (self.iface_name, ip))
if ip is not None:
self.connected = True
return ip
retry -= 1
time.sleep(3)
state = str(self._get_iface_property("State")) self.reset()
print("wpa iface state %s (scanning %s)" % (state, bool(self._get_iface_property("Scanning")))) print('wpa_cli : Connection failed')
if state in ["disconnected", "inactive"]:
self.iface_ifc.Reconnect()
ip = get_wiface_IPv4(self.iface_name) except Exception as err:
print("wpa iface %s IP %s" % (self.iface_name, ip)) raise Exception('Failure in wpa_cli init: {}'.format(err))
if ip is not None:
self.connected = True
return ip
retry -= 1
self.reset()
raise RuntimeError('wpa_cli : Connection failed')
def reset(self): def reset(self):
if self.iface_ifc is not None: if self.iface_ifc is not None:

View File

@ -14,12 +14,12 @@
# #
from __future__ import print_function from __future__ import print_function
from builtins import input
from future.utils import iteritems
import platform import platform
from builtins import input
import utils import utils
from future.utils import iteritems
fallback = True fallback = True
@ -28,9 +28,10 @@ fallback = True
# else fallback to console mode # else fallback to console mode
if platform.system() == 'Linux': if platform.system() == 'Linux':
try: try:
import time
import dbus import dbus
import dbus.mainloop.glib import dbus.mainloop.glib
import time
fallback = False fallback = False
except ImportError: except ImportError:
pass pass
@ -41,13 +42,15 @@ if platform.system() == 'Linux':
# BLE client (Linux Only) using Bluez and DBus # BLE client (Linux Only) using Bluez and DBus
class BLE_Bluez_Client: class BLE_Bluez_Client:
def __init__(self):
self.adapter_props = None
def connect(self, devname, iface, chrc_names, fallback_srv_uuid): def connect(self, devname, iface, chrc_names, fallback_srv_uuid):
self.devname = devname self.devname = devname
self.srv_uuid_fallback = fallback_srv_uuid self.srv_uuid_fallback = fallback_srv_uuid
self.chrc_names = [name.lower() for name in chrc_names] self.chrc_names = [name.lower() for name in chrc_names]
self.device = None self.device = None
self.adapter = None self.adapter = None
self.adapter_props = None
self.services = None self.services = None
self.nu_lookup = None self.nu_lookup = None
self.characteristics = dict() self.characteristics = dict()
@ -55,33 +58,69 @@ class BLE_Bluez_Client:
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus() bus = dbus.SystemBus()
manager = dbus.Interface(bus.get_object("org.bluez", "/"), "org.freedesktop.DBus.ObjectManager") manager = dbus.Interface(bus.get_object('org.bluez', '/'), 'org.freedesktop.DBus.ObjectManager')
objects = manager.GetManagedObjects() objects = manager.GetManagedObjects()
adapter_path = None
for path, interfaces in iteritems(objects): for path, interfaces in iteritems(objects):
adapter = interfaces.get("org.bluez.Adapter1") adapter = interfaces.get('org.bluez.Adapter1')
if adapter is not None: if adapter is not None:
if path.endswith(iface): if path.endswith(iface):
self.adapter = dbus.Interface(bus.get_object("org.bluez", path), "org.bluez.Adapter1") self.adapter = dbus.Interface(bus.get_object('org.bluez', path), 'org.bluez.Adapter1')
self.adapter_props = dbus.Interface(bus.get_object("org.bluez", path), "org.freedesktop.DBus.Properties") self.adapter_props = dbus.Interface(bus.get_object('org.bluez', path), 'org.freedesktop.DBus.Properties')
adapter_path = path
break break
if self.adapter is None: if self.adapter is None:
raise RuntimeError("Bluetooth adapter not found") raise RuntimeError('Bluetooth adapter not found')
self.adapter_props.Set("org.bluez.Adapter1", "Powered", dbus.Boolean(1)) # Power on bluetooth adapter
self.adapter.StartDiscovery() self.adapter_props.Set('org.bluez.Adapter1', 'Powered', dbus.Boolean(1))
print('checking if adapter is powered on')
for cnt in range(10, 0, -1):
time.sleep(5)
powered_on = self.adapter_props.Get('org.bluez.Adapter1', 'Powered')
if powered_on == 1:
# Set adapter props again with powered on value
self.adapter_props = dbus.Interface(bus.get_object('org.bluez', adapter_path), 'org.freedesktop.DBus.Properties')
print('bluetooth adapter powered on')
break
print('number of retries left({})'.format(cnt - 1))
if powered_on == 0:
raise RuntimeError('Failed to starte bluetooth adapter')
# Start discovery if not already discovering
started_discovery = 0
discovery_val = self.adapter_props.Get('org.bluez.Adapter1', 'Discovering')
if discovery_val == 0:
print('starting discovery')
self.adapter.StartDiscovery()
# Set as start discovery is called
started_discovery = 1
for cnt in range(10, 0, -1):
time.sleep(5)
discovery_val = self.adapter_props.Get('org.bluez.Adapter1', 'Discovering')
if discovery_val == 1:
print('start discovery successful')
break
print('number of retries left ({})'.format(cnt - 1))
if discovery_val == 0:
print('start discovery failed')
raise RuntimeError('Failed to start discovery')
retry = 10 retry = 10
while (retry > 0): while (retry > 0):
try: try:
if self.device is None: if self.device is None:
print("Connecting...") print('Connecting...')
# Wait for device to be discovered # Wait for device to be discovered
time.sleep(5) time.sleep(5)
self._connect_() connected = self._connect_()
print("Connected") if connected:
print("Getting Services...") print('Connected')
else:
return False
print('Getting Services...')
# Wait for services to be discovered # Wait for services to be discovered
time.sleep(5) time.sleep(5)
self._get_services_() self._get_services_()
@ -89,28 +128,42 @@ class BLE_Bluez_Client:
except Exception as e: except Exception as e:
print(e) print(e)
retry -= 1 retry -= 1
print("Retries left", retry) print('Retries left', retry)
continue continue
self.adapter.StopDiscovery()
# Call StopDiscovery() for corresponding StartDiscovery() session
if started_discovery == 1:
print('stopping discovery')
self.adapter.StopDiscovery()
for cnt in range(10, 0, -1):
time.sleep(5)
discovery_val = self.adapter_props.Get('org.bluez.Adapter1', 'Discovering')
if discovery_val == 0:
print('stop discovery successful')
break
print('number of retries left ({})'.format(cnt - 1))
if discovery_val == 1:
print('stop discovery failed')
return False return False
def _connect_(self): def _connect_(self):
bus = dbus.SystemBus() bus = dbus.SystemBus()
manager = dbus.Interface(bus.get_object("org.bluez", "/"), "org.freedesktop.DBus.ObjectManager") manager = dbus.Interface(bus.get_object('org.bluez', '/'), 'org.freedesktop.DBus.ObjectManager')
objects = manager.GetManagedObjects() objects = manager.GetManagedObjects()
dev_path = None dev_path = None
for path, interfaces in iteritems(objects): for path, interfaces in iteritems(objects):
if "org.bluez.Device1" not in interfaces: if 'org.bluez.Device1' not in interfaces:
continue continue
if interfaces["org.bluez.Device1"].get("Name") == self.devname: if interfaces['org.bluez.Device1'].get('Name') == self.devname:
dev_path = path dev_path = path
break break
if dev_path is None: if dev_path is None:
raise RuntimeError("BLE device not found") raise RuntimeError('BLE device not found')
try: try:
self.device = bus.get_object("org.bluez", dev_path) self.device = bus.get_object('org.bluez', dev_path)
try: try:
uuids = self.device.Get('org.bluez.Device1', 'UUIDs', uuids = self.device.Get('org.bluez.Device1', 'UUIDs',
dbus_interface='org.freedesktop.DBus.Properties') dbus_interface='org.freedesktop.DBus.Properties')
@ -122,25 +175,42 @@ class BLE_Bluez_Client:
if len(uuids) == 1: if len(uuids) == 1:
self.srv_uuid_adv = uuids[0] self.srv_uuid_adv = uuids[0]
except dbus.exceptions.DBusException as e: except dbus.exceptions.DBusException as e:
print(e) raise RuntimeError(e)
self.device.Connect(dbus_interface='org.bluez.Device1') self.device.Connect(dbus_interface='org.bluez.Device1')
# Check device is connected successfully
for cnt in range(10, 0, -1):
time.sleep(5)
device_conn = self.device.Get(
'org.bluez.Device1',
'Connected',
dbus_interface='org.freedesktop.DBus.Properties')
if device_conn == 1:
print('device is connected')
break
print('number of retries left ({})'.format(cnt - 1))
if device_conn == 0:
print('failed to connect device')
return False
return True
except Exception as e: except Exception as e:
print(e) print(e)
self.device = None self.device = None
raise RuntimeError("BLE device could not connect") raise RuntimeError('BLE device could not connect')
def _get_services_(self): def _get_services_(self):
bus = dbus.SystemBus() bus = dbus.SystemBus()
manager = dbus.Interface(bus.get_object("org.bluez", "/"), "org.freedesktop.DBus.ObjectManager") manager = dbus.Interface(bus.get_object('org.bluez', '/'), 'org.freedesktop.DBus.ObjectManager')
objects = manager.GetManagedObjects() objects = manager.GetManagedObjects()
service_found = False service_found = False
for srv_path, srv_interfaces in iteritems(objects): for srv_path, srv_interfaces in iteritems(objects):
if "org.bluez.GattService1" not in srv_interfaces: if 'org.bluez.GattService1' not in srv_interfaces:
continue continue
if not srv_path.startswith(self.device.object_path): if not srv_path.startswith(self.device.object_path):
continue continue
service = bus.get_object("org.bluez", srv_path) service = bus.get_object('org.bluez', srv_path)
srv_uuid = service.Get('org.bluez.GattService1', 'UUID', srv_uuid = service.Get('org.bluez.GattService1', 'UUID',
dbus_interface='org.freedesktop.DBus.Properties') dbus_interface='org.freedesktop.DBus.Properties')
@ -152,28 +222,28 @@ class BLE_Bluez_Client:
nu_lookup = dict() nu_lookup = dict()
characteristics = dict() characteristics = dict()
for chrc_path, chrc_interfaces in iteritems(objects): for chrc_path, chrc_interfaces in iteritems(objects):
if "org.bluez.GattCharacteristic1" not in chrc_interfaces: if 'org.bluez.GattCharacteristic1' not in chrc_interfaces:
continue continue
if not chrc_path.startswith(service.object_path): if not chrc_path.startswith(service.object_path):
continue continue
chrc = bus.get_object("org.bluez", chrc_path) chrc = bus.get_object('org.bluez', chrc_path)
uuid = chrc.Get('org.bluez.GattCharacteristic1', 'UUID', uuid = chrc.Get('org.bluez.GattCharacteristic1', 'UUID',
dbus_interface='org.freedesktop.DBus.Properties') dbus_interface='org.freedesktop.DBus.Properties')
characteristics[uuid] = chrc characteristics[uuid] = chrc
for desc_path, desc_interfaces in iteritems(objects): for desc_path, desc_interfaces in iteritems(objects):
if "org.bluez.GattDescriptor1" not in desc_interfaces: if 'org.bluez.GattDescriptor1' not in desc_interfaces:
continue continue
if not desc_path.startswith(chrc.object_path): if not desc_path.startswith(chrc.object_path):
continue continue
desc = bus.get_object("org.bluez", desc_path) desc = bus.get_object('org.bluez', desc_path)
desc_uuid = desc.Get('org.bluez.GattDescriptor1', 'UUID', desc_uuid = desc.Get('org.bluez.GattDescriptor1', 'UUID',
dbus_interface='org.freedesktop.DBus.Properties') dbus_interface='org.freedesktop.DBus.Properties')
if desc_uuid[4:8] != '2901': if desc_uuid[4:8] != '2901':
continue continue
try: try:
readval = desc.ReadValue({}, dbus_interface='org.bluez.GattDescriptor1') readval = desc.ReadValue({}, dbus_interface='org.bluez.GattDescriptor1')
except dbus.exceptions.DBusException: except dbus.exceptions.DBusException as err:
break raise RuntimeError('Failed to read value for descriptor while getting services - {}'.format(err))
found_name = ''.join(chr(b) for b in readval).lower() found_name = ''.join(chr(b) for b in readval).lower()
nu_lookup[found_name] = uuid nu_lookup[found_name] = uuid
break break
@ -200,12 +270,14 @@ class BLE_Bluez_Client:
if not service_found: if not service_found:
self.device.Disconnect(dbus_interface='org.bluez.Device1') self.device.Disconnect(dbus_interface='org.bluez.Device1')
# Check if device is disconnected successfully
self._check_device_disconnected()
if self.adapter: if self.adapter:
self.adapter.RemoveDevice(self.device) self.adapter.RemoveDevice(self.device)
self.device = None self.device = None
self.nu_lookup = None self.nu_lookup = None
self.characteristics = dict() self.characteristics = dict()
raise RuntimeError("Provisioning service not found") raise RuntimeError('Provisioning service not found')
def get_nu_lookup(self): def get_nu_lookup(self):
return self.nu_lookup return self.nu_lookup
@ -218,31 +290,47 @@ class BLE_Bluez_Client:
def disconnect(self): def disconnect(self):
if self.device: if self.device:
self.device.Disconnect(dbus_interface='org.bluez.Device1') self.device.Disconnect(dbus_interface='org.bluez.Device1')
# Check if device is disconnected successfully
self._check_device_disconnected()
if self.adapter: if self.adapter:
self.adapter.RemoveDevice(self.device) self.adapter.RemoveDevice(self.device)
self.device = None self.device = None
self.nu_lookup = None self.nu_lookup = None
self.characteristics = dict() self.characteristics = dict()
if self.adapter_props: if self.adapter_props:
self.adapter_props.Set("org.bluez.Adapter1", "Powered", dbus.Boolean(0)) self.adapter_props.Set('org.bluez.Adapter1', 'Powered', dbus.Boolean(0))
def _check_device_disconnected(self):
for cnt in range(10, 0, -1):
time.sleep(5)
device_conn = self.device.Get(
'org.bluez.Device1',
'Connected',
dbus_interface='org.freedesktop.DBus.Properties')
if device_conn == 0:
print('device disconnected')
break
print('number of retries left ({})'.format(cnt - 1))
if device_conn == 1:
print('failed to disconnect device')
def send_data(self, characteristic_uuid, data): def send_data(self, characteristic_uuid, data):
try: try:
path = self.characteristics[characteristic_uuid] path = self.characteristics[characteristic_uuid]
except KeyError: except KeyError:
raise RuntimeError("Invalid characteristic : " + characteristic_uuid) raise RuntimeError('Invalid characteristic : ' + characteristic_uuid)
try: try:
path.WriteValue([ord(c) for c in data], {}, dbus_interface='org.bluez.GattCharacteristic1') path.WriteValue([ord(c) for c in data], {}, dbus_interface='org.bluez.GattCharacteristic1')
except TypeError: # python3 compatible except TypeError: # python3 compatible
path.WriteValue([c for c in data], {}, dbus_interface='org.bluez.GattCharacteristic1') path.WriteValue([c for c in data], {}, dbus_interface='org.bluez.GattCharacteristic1')
except dbus.exceptions.DBusException as e: except dbus.exceptions.DBusException as e:
raise RuntimeError("Failed to write value to characteristic " + characteristic_uuid + ": " + str(e)) raise RuntimeError('Failed to write value to characteristic ' + characteristic_uuid + ': ' + str(e))
try: try:
readval = path.ReadValue({}, dbus_interface='org.bluez.GattCharacteristic1') readval = path.ReadValue({}, dbus_interface='org.bluez.GattCharacteristic1')
except dbus.exceptions.DBusException as e: except dbus.exceptions.DBusException as e:
raise RuntimeError("Failed to read value from characteristic " + characteristic_uuid + ": " + str(e)) raise RuntimeError('Failed to read value from characteristic ' + characteristic_uuid + ': ' + str(e))
return ''.join(chr(b) for b in readval) return ''.join(chr(b) for b in readval)
@ -252,14 +340,14 @@ class BLE_Bluez_Client:
# Console based BLE client for Cross Platform support # Console based BLE client for Cross Platform support
class BLE_Console_Client: class BLE_Console_Client:
def connect(self, devname, iface, chrc_names, fallback_srv_uuid): def connect(self, devname, iface, chrc_names, fallback_srv_uuid):
print("BLE client is running in console mode") print('BLE client is running in console mode')
print("\tThis could be due to your platform not being supported or dependencies not being met") print('\tThis could be due to your platform not being supported or dependencies not being met')
print("\tPlease ensure all pre-requisites are met to run the full fledged client") print('\tPlease ensure all pre-requisites are met to run the full fledged client')
print("BLECLI >> Please connect to BLE device `" + devname + "` manually using your tool of choice") print('BLECLI >> Please connect to BLE device `' + devname + '` manually using your tool of choice')
resp = input("BLECLI >> Was the device connected successfully? [y/n] ") resp = input('BLECLI >> Was the device connected successfully? [y/n] ')
if resp != 'Y' and resp != 'y': if resp != 'Y' and resp != 'y':
return False return False
print("BLECLI >> List available attributes of the connected device") print('BLECLI >> List available attributes of the connected device')
resp = input("BLECLI >> Is the service UUID '" + fallback_srv_uuid + "' listed among available attributes? [y/n] ") resp = input("BLECLI >> Is the service UUID '" + fallback_srv_uuid + "' listed among available attributes? [y/n] ")
if resp != 'Y' and resp != 'y': if resp != 'Y' and resp != 'y':
return False return False
@ -279,9 +367,9 @@ class BLE_Console_Client:
def send_data(self, characteristic_uuid, data): def send_data(self, characteristic_uuid, data):
print("BLECLI >> Write following data to characteristic with UUID '" + characteristic_uuid + "' :") print("BLECLI >> Write following data to characteristic with UUID '" + characteristic_uuid + "' :")
print("\t>> " + utils.str_to_hexstr(data)) print('\t>> ' + utils.str_to_hexstr(data))
print("BLECLI >> Enter data read from characteristic (in hex) :") print('BLECLI >> Enter data read from characteristic (in hex) :')
resp = input("\t<< ") resp = input('\t<< ')
return utils.hexstr_to_str(resp) return utils.hexstr_to_str(resp)

View File

@ -14,43 +14,47 @@
# #
from __future__ import print_function from __future__ import print_function
from future.utils import tobytes
import socket import socket
import http.client
import ssl from future.utils import tobytes
try:
from http.client import HTTPConnection, HTTPSConnection
except ImportError:
# Python 2 fallback
from httplib import HTTPConnection, HTTPSConnection
from .transport import Transport from .transport import Transport
class Transport_HTTP(Transport): class Transport_HTTP(Transport):
def __init__(self, hostname, certfile=None): def __init__(self, hostname, ssl_context=None):
try: try:
socket.gethostbyname(hostname.split(':')[0]) socket.gethostbyname(hostname.split(':')[0])
except socket.gaierror: except socket.gaierror:
raise RuntimeError("Unable to resolve hostname :" + hostname) raise RuntimeError('Unable to resolve hostname :' + hostname)
if certfile is None: if ssl_context is None:
self.conn = http.client.HTTPConnection(hostname, timeout=45) self.conn = HTTPConnection(hostname, timeout=60)
else: else:
ssl_ctx = ssl.create_default_context(cafile=certfile) self.conn = HTTPSConnection(hostname, context=ssl_context, timeout=60)
self.conn = http.client.HTTPSConnection(hostname, context=ssl_ctx, timeout=45)
try: try:
print("Connecting to " + hostname) print('Connecting to ' + hostname)
self.conn.connect() self.conn.connect()
except Exception as err: except Exception as err:
raise RuntimeError("Connection Failure : " + str(err)) raise RuntimeError('Connection Failure : ' + str(err))
self.headers = {"Content-type": "application/x-www-form-urlencoded","Accept": "text/plain"} self.headers = {'Content-type': 'application/x-www-form-urlencoded','Accept': 'text/plain'}
def _send_post_request(self, path, data): def _send_post_request(self, path, data):
try: try:
self.conn.request("POST", path, tobytes(data), self.headers) self.conn.request('POST', path, tobytes(data), self.headers)
response = self.conn.getresponse() response = self.conn.getresponse()
if response.status == 200: if response.status == 200:
return response.read().decode('latin-1') return response.read().decode('latin-1')
except Exception as err: except Exception as err:
raise RuntimeError("Connection Failure : " + str(err)) raise RuntimeError('Connection Failure : ' + str(err))
raise RuntimeError("Server responded with error code " + str(response.status)) raise RuntimeError('Server responded with error code ' + str(response.status))
def send_data(self, ep_name, data): def send_data(self, ep_name, data):
return self._send_post_request('/' + ep_name, data) return self._send_post_request('/' + ep_name, data)