Merge branch 'contrib/github_pr_9117' into 'master'

esp_prov: switch from bluez/dbus to bleak (GitHub PR)

Closes IDFGH-7556

See merge request espressif/esp-idf!18618
This commit is contained in:
Mahavir Jain 2022-06-24 11:39:09 +08:00
commit 3f77c65e56
25 changed files with 505 additions and 814 deletions

View File

@ -44,11 +44,9 @@ For iOS, a provisioning application along with source code is available on GitHu
#### Platform : Linux / Windows / macOS #### Platform : Linux / Windows / macOS
To provision the device running this example, the `esp_prov.py` script needs to be run (found under `$IDF_PATH/tools/esp_prov`). Make sure to satisfy all the dependencies prior to running the script. To provision the device running this example, the `esp_prov.py` script needs to be run (found under `$IDF_PATH/tools/esp_prov`). Make sure to satisfy all the dependencies prior to running the script (check out the `requirements.txt` file in `esp_prov` directory).
Presently, `esp_prov` supports BLE transport only for Linux platform. For Windows/macOS it falls back to console mode and requires another application (for BLE) through which the communication can take place. `esp_prov` supports BLE and SoftAP transport for Linux, MacOS and Windows platforms. For BLE, however, if dependencies are not met, the script falls back to console mode and requires another application through which the communication can take place. The `esp_prov` console will guide you through the provisioning process of locating the correct BLE GATT services and characteristics, the values to write, and input read values.
There are various applications, specific to Windows and macOS platform which can be used. The `esp_prov` console will guide you through the provisioning process of locating the correct BLE GATT services and characteristics, the values to write, and input read values.
### Configure the project ### Configure the project
@ -79,10 +77,15 @@ I (1045) wifi_prov_mgr: Provisioning started with service name : PROV_261FCC
Make sure to note down the BLE device name (starting with `PROV_`) displayed in the serial monitor log (eg. PROV_261FCC). This will depend on the MAC ID and will be unique for every device. Make sure to note down the BLE device name (starting with `PROV_`) displayed in the serial monitor log (eg. PROV_261FCC). This will depend on the MAC ID and will be unique for every device.
In a separate terminal run the `esp_prov.py` script under `$IDP_PATH/tools/esp_prov` directory (make sure to replace `myssid` and `mypassword` with the credentials of the AP to which the device is supposed to connect to after provisioning). Assuming default example configuration, which uses the protocomm security scheme 2 (based on Secure Remote Password protocol (SRP6a)) : In a separate terminal run the `esp_prov.py` script under `$IDP_PATH/tools/esp_prov` directory (make sure to replace `myssid` and `mypassword` with the credentials of the AP to which the device is supposed to connect to after provisioning). Assuming default example configuration, which uses the protocomm security scheme 1 with PoP-based (proof-of-possession) authentication :
``` ```
python esp_prov.py --verbose --transport ble --service_name PROV_4C33E8 --sec_ver 2 --sec2_username testuser --sec2_pwd testpassword --ssid myssid --passphrase mypassword python esp_prov.py --transport ble --service_name PROV_261FCC --sec_ver 1 --pop abcd1234 --ssid myssid --passphrase mypassword
```
For security version 2, the following command can be used:
```
python esp_prov.py --transport ble --service_name PROV_261FCC --sec_ver 2 --sec2_username testuser --sec2_pwd testpassword --ssid myssid --passphrase mypassword
``` ```
Above command will perform the provisioning steps, and the monitor log should display something like this : Above command will perform the provisioning steps, and the monitor log should display something like this :
@ -116,10 +119,10 @@ I (55355) app: Hello World!
**Note:** For generating the credentials for security version 2 (`SRP6a` salt and verifier) for the device-side, the following example command can be used. The output can then directly be used in this example. **Note:** For generating the credentials for security version 2 (`SRP6a` salt and verifier) for the device-side, the following example command can be used. The output can then directly be used in this example.
The config option `CONFIG_EXAMPLE_PROV_SEC2_USERNAME` should be set to the same username used in the salt-verifier generation. The config option `CONFIG_EXAMPLE_PROV_SEC2_DEV_MODE` should be enabled for the example and in `main/app_main.c`, the macro `EXAMPLE_PROV_SEC2_USERNAME` should be set to the same username used in the salt-verifier generation.
```log ```log
$ python esp_prov.py --verbose --transport softap --sec_ver 2 --sec2_gen_cred --sec2_username testuser --sec2_pwd testpassword $ python esp_prov.py --transport softap --sec_ver 2 --sec2_gen_cred --sec2_username testuser --sec2_pwd testpassword
==== Salt-verifier for security scheme 2 (SRP6a) ==== ==== Salt-verifier for security scheme 2 (SRP6a) ====
static const char sec2_salt[] = { static const char sec2_salt[] = {
0x14, 0xdf, 0x42, 0x50, 0x3d, 0xec, 0x54, 0xc3, 0xe5, 0x0e, 0x0c, 0x9d, 0xb4, 0x84, 0xd7, 0xe4 0x14, 0xdf, 0x42, 0x50, 0x3d, 0xec, 0x54, 0xc3, 0xe5, 0x0e, 0x0c, 0x9d, 0xb4, 0x84, 0xd7, 0xe4
@ -170,6 +173,7 @@ I (1702) app: If QR code is not visible, copy paste the below URL in a browser.
https://espressif.github.io/esp-jumpstart/qrcode.html?data={"ver":"v1","name":"PROV_EA69FC","pop":"abcd1234","transport":"ble"} https://espressif.github.io/esp-jumpstart/qrcode.html?data={"ver":"v1","name":"PROV_EA69FC","pop":"abcd1234","transport":"ble"}
``` ```
### Wi-Fi Scanning ### Wi-Fi Scanning
Provisioning manager also supports providing real-time Wi-Fi scan results (performed on the device) during provisioning. This allows the client side applications to choose the AP for which the device Wi-Fi station is to be configured. Various information about the visible APs is available, like signal strength (RSSI) and security type, etc. Also, the manager now provides capabilities information which can be used by client applications to determine the security type and availability of specific features (like `wifi_scan`). Provisioning manager also supports providing real-time Wi-Fi scan results (performed on the device) during provisioning. This allows the client side applications to choose the AP for which the device Wi-Fi station is to be configured. Various information about the visible APs is available, like signal strength (RSSI) and security type, etc. Also, the manager now provides capabilities information which can be used by client applications to determine the security type and availability of specific features (like `wifi_scan`).
@ -207,14 +211,59 @@ S.N. SSID BSSID CHN RSSI AUTH
Select AP by number (0 to rescan) : 1 Select AP by number (0 to rescan) : 1
Enter passphrase for MyHomeWiFiAP : Enter passphrase for MyHomeWiFiAP :
==== Sending Wi-Fi credential to esp32 ==== ==== Sending Wi-Fi Credentials to Target ====
==== Wi-Fi Credentials sent successfully ==== ==== Wi-Fi Credentials sent successfully ====
==== Applying config to esp32 ==== ==== Applying Wi-Fi Config to Target ====
==== Apply config sent successfully ==== ==== Apply config sent successfully ====
==== Wi-Fi connection state ==== ==== Wi-Fi connection state ====
++++ WiFi state: connected ++++ ==== WiFi state: Connected ====
==== Provisioning was successful ====
```
### Interactive Provisioning
`esp_prov` supports interactive provisioning. You can trigger the script with a simplified command and input the necessary details
(`Proof-of-possession` for security scheme 1 and `SRP6a username`, `SRP6a password` for security scheme 2) as the provisioning process advances.
The command `python esp_prov.py --transport ble --sec_ver 1` gives out the following sample output:
```
Discovering...
==== BLE Discovery results ====
S.N. Name Address
[ 1] PROV_4C33E8 01:02:03:04:05:06
[ 1] BT_DEVICE_SBC 0A:0B:0C:0D:0E:0F
Select device by number (0 to rescan) : 1
Connecting...
Getting Services...
Proof of Possession required:
==== Starting Session ====
==== Session Established ====
==== Scanning Wi-Fi APs ====
++++ Scan process executed in 3.8695244789123535 sec
++++ Scan results : 2
++++ Scan finished in 4.4132080078125 sec
==== Wi-Fi Scan results ====
S.N. SSID BSSID CHN RSSI AUTH
[ 1] MyHomeWiFiAP 788a20841996 1 -45 WPA2_PSK
[ 2] MobileHotspot 7a8a20841996 11 -46 WPA2_PSK
Select AP by number (0 to rescan) : 1
Enter passphrase for myssid :
==== Sending Wi-Fi Credentials to Target ====
==== Wi-Fi Credentials sent successfully ====
==== Applying Wi-Fi Config to Target ====
==== Apply config sent successfully ====
==== Wi-Fi connection state ====
==== WiFi state: Connected ====
==== Provisioning was successful ==== ==== Provisioning was successful ====
``` ```

View File

@ -2192,19 +2192,6 @@ tools/ci/python_packages/ttfw_idf/unity_test_parser.py
tools/ci/python_packages/wifi_tools.py tools/ci/python_packages/wifi_tools.py
tools/ci/test_autocomplete.py tools/ci/test_autocomplete.py
tools/esp_app_trace/test/sysview/blink.c tools/esp_app_trace/test/sysview/blink.c
tools/esp_prov/__init__.py
tools/esp_prov/prov/__init__.py
tools/esp_prov/prov/wifi_prov.py
tools/esp_prov/security/security.py
tools/esp_prov/security/security0.py
tools/esp_prov/security/security1.py
tools/esp_prov/transport/__init__.py
tools/esp_prov/transport/ble_cli.py
tools/esp_prov/transport/transport.py
tools/esp_prov/transport/transport_ble.py
tools/esp_prov/transport/transport_console.py
tools/esp_prov/transport/transport_http.py
tools/esp_prov/utils/__init__.py
tools/find_apps.py tools/find_apps.py
tools/find_build_apps/__init__.py tools/find_build_apps/__init__.py
tools/find_build_apps/cmake.py tools/find_build_apps/cmake.py

View File

@ -206,7 +206,6 @@ tools/esp_prov/transport/transport.py
tools/esp_prov/transport/transport_ble.py tools/esp_prov/transport/transport_ble.py
tools/esp_prov/transport/transport_console.py tools/esp_prov/transport/transport_console.py
tools/esp_prov/transport/transport_http.py tools/esp_prov/transport/transport_http.py
tools/esp_prov/utils/convenience.py
tools/find_apps.py tools/find_apps.py
tools/find_build_apps/common.py tools/find_build_apps/common.py
tools/gen_esp_err_to_name.py tools/gen_esp_err_to_name.py

View File

@ -36,14 +36,14 @@ Usage of `esp-prov` assumes that the provisioning app has specific protocomm end
* Requires the device to be running in Wi-Fi SoftAP mode and hosting an HTTP server supporting specific endpoint URIs * Requires the device to be running in Wi-Fi SoftAP mode and hosting an HTTP server supporting specific endpoint URIs
* The client needs to be connected to the device softAP network before running the `esp_prov` tool. * The client needs to be connected to the device softAP network before running the `esp_prov` tool.
* `ble` - for BLE based provisioning * `ble` - for BLE based provisioning
* Supports Linux only; on Windows/macOS, it is redirected to console * Supports Linux, Windows and macOS; redirected to console if dependencies are not met
* Assumes that the provisioning endpoints are active on the device with specific BLE service UUIDs * Assumes that the provisioning endpoints are active on the device with specific BLE service UUIDs
* `console` - for debugging via console-based provisioning * `console` - for debugging via console-based provisioning
* The client->device commands are printed to STDOUT and device->client messages are accepted via STDIN. * The client->device commands are printed to STDOUT and device->client messages are accepted via STDIN.
* This is to be used when the device is accepting provisioning commands on UART console. * This is to be used when the device is accepting provisioning commands on UART console.
* `--service_name <name>` (Optional) * `--service_name <name>` (Optional)
- When transport mode is `ble`, this specifies the BLE device name to which connection is to be established for provisioned. - When transport mode is `ble`, this specifies the BLE device name to which connection is to be established for provisioned. If not provided, BLE scanning is initiated and a list of nearby devices, as seen by the host, is displayed, of which the target device can be chosen.
- When transport mode is `softap`, this specifies the HTTP server hostname / IP which is running the provisioning service, on the SoftAP network of the device which is to be provisioned. This defaults to `192.168.4.1:80` if not specified - When transport mode is `softap`, this specifies the HTTP server hostname / IP which is running the provisioning service, on the SoftAP network of the device which is to be provisioned. This defaults to `192.168.4.1:80` if not specified
* `--ssid <AP SSID>` (Optional) * `--ssid <AP SSID>` (Optional)
@ -86,15 +86,12 @@ Usage of `esp-prov` assumes that the provisioning app has specific protocomm end
# AVAILABILITY # AVAILABILITY
`esp_prov` is intended as a cross-platform tool, but currently BLE communication functionality is only available on Linux (via BlueZ and DBus)
For Android, a provisioning tool along with source code is available [here](https://github.com/espressif/esp-idf-provisioning-android) For Android, a provisioning tool along with source code is available [here](https://github.com/espressif/esp-idf-provisioning-android)
On macOS and Windows, running with `--transport ble` option falls back to console mode, ie. write data and target UUID are printed to STDOUT and read data is input through STDIN. Users are free to use their app of choice to connect to the BLE device, send the write data to the target characteristic and read from it.
## Dependencies ## Dependencies
This requires the following python libraries to run (included in requirements.txt): This requires the following python libraries to run (included in requirements.txt):
* `bleak`
* `future` * `future`
* `protobuf` * `protobuf`
* `cryptography` * `cryptography`
@ -103,14 +100,6 @@ Run `pip install -r $IDF_PATH/tools/esp_prov/requirements.txt`
Note : Note :
* The packages listed in requirements.txt are limited only to the ones needed AFTER fully satisfying the requirements of ESP-IDF * The packages listed in requirements.txt are limited only to the ones needed AFTER fully satisfying the requirements of ESP-IDF
* BLE communication is only supported on Linux (via Bluez and DBus), therefore, the dependencies for this have been made optional
## Optional Dependencies (Linux Only)
These dependencies are for enabling communication with BLE devices using Bluez and DBus on Linux:
* `dbus-python`
Run `pip install -r $IDF_PATH/tools/esp_prov/requirements_linux_extra.txt`
# EXAMPLE USAGE # EXAMPLE USAGE

View File

@ -1 +1,5 @@
# SPDX-FileCopyrightText: 2018-2022 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
#
from .esp_prov import * # noqa: export esp_prov module to users from .esp_prov import * # noqa: export esp_prov module to users

View File

@ -4,15 +4,13 @@
# SPDX-License-Identifier: Apache-2.0 # SPDX-License-Identifier: Apache-2.0
# #
from __future__ import print_function
import argparse import argparse
import asyncio
import json import json
import os import os
import sys import sys
import textwrap import textwrap
import time import time
from builtins import input as binput
from getpass import getpass from getpass import getpass
try: try:
@ -50,7 +48,7 @@ def get_security(secver, username, password, pop='', verbose=False):
return None return None
def get_transport(sel_transport, service_name): async def get_transport(sel_transport, service_name):
try: try:
tp = None tp = None
if (sel_transport == 'softap'): if (sel_transport == 'softap'):
@ -58,8 +56,6 @@ def get_transport(sel_transport, service_name):
service_name = '192.168.4.1:80' service_name = '192.168.4.1:80'
tp = transport.Transport_HTTP(service_name) tp = transport.Transport_HTTP(service_name)
elif (sel_transport == 'ble'): elif (sel_transport == 'ble'):
if service_name is None:
raise RuntimeError('"--service_name" must be specified for ble transport')
# BLE client is now capable of automatically figuring out # BLE client is now capable of automatically figuring out
# the primary service from the advertisement data and the # the primary service from the advertisement data and the
# characteristics corresponding to each endpoint. # characteristics corresponding to each endpoint.
@ -68,9 +64,9 @@ def get_transport(sel_transport, service_name):
# in which case, the automated discovery will fail and the client # in which case, the automated discovery will fail and the client
# will fallback to using the provided UUIDs instead # will fallback to using the provided UUIDs instead
nu_lookup = {'prov-session': 'ff51', 'prov-config': 'ff52', 'proto-ver': 'ff53'} nu_lookup = {'prov-session': 'ff51', 'prov-config': 'ff52', 'proto-ver': 'ff53'}
tp = transport.Transport_BLE(devname=service_name, tp = transport.Transport_BLE(service_uuid='021a9004-0382-4aea-bff4-6b3f1c5adfb4',
service_uuid='021a9004-0382-4aea-bff4-6b3f1c5adfb4',
nu_lookup=nu_lookup) nu_lookup=nu_lookup)
await tp.connect(devname=service_name)
elif (sel_transport == 'console'): elif (sel_transport == 'console'):
tp = transport.Transport_Console() tp = transport.Transport_Console()
return tp return tp
@ -79,9 +75,9 @@ def get_transport(sel_transport, service_name):
return None return None
def version_match(tp, protover, verbose=False): async def version_match(tp, protover, verbose=False):
try: try:
response = tp.send_data('proto-ver', protover) response = await tp.send_data('proto-ver', protover)
if verbose: if verbose:
print('proto-ver response : ', response) print('proto-ver response : ', response)
@ -108,11 +104,11 @@ def version_match(tp, protover, verbose=False):
return None return None
def has_capability(tp, capability='none', verbose=False): async def has_capability(tp, capability='none', verbose=False):
# Note : default value of `capability` argument cannot be empty string # Note : default value of `capability` argument cannot be empty string
# because protocomm_httpd expects non zero content lengths # because protocomm_httpd expects non zero content lengths
try: try:
response = tp.send_data('proto-ver', capability) response = await tp.send_data('proto-ver', capability)
if verbose: if verbose:
print('proto-ver response : ', response) print('proto-ver response : ', response)
@ -142,24 +138,24 @@ def has_capability(tp, capability='none', verbose=False):
return False return False
def get_version(tp): async def get_version(tp):
response = None response = None
try: try:
response = tp.send_data('proto-ver', '---') response = await tp.send_data('proto-ver', '---')
except RuntimeError as e: except RuntimeError as e:
on_except(e) on_except(e)
response = '' response = ''
return response return response
def establish_session(tp, sec): async def establish_session(tp, sec):
try: try:
response = None response = None
while True: while True:
request = sec.security_session(response) request = sec.security_session(response)
if request is None: if request is None:
break break
response = tp.send_data('prov-session', request) response = await tp.send_data('prov-session', request)
if (response is None): if (response is None):
return False return False
return True return True
@ -168,27 +164,27 @@ def establish_session(tp, sec):
return None return None
def custom_config(tp, sec, custom_info, custom_ver): async def custom_config(tp, sec, custom_info, custom_ver):
try: try:
message = prov.custom_config_request(sec, custom_info, custom_ver) message = prov.custom_config_request(sec, custom_info, custom_ver)
response = tp.send_data('custom-config', message) response = await tp.send_data('custom-config', message)
return (prov.custom_config_response(sec, response) == 0) return (prov.custom_config_response(sec, response) == 0)
except RuntimeError as e: except RuntimeError as e:
on_except(e) on_except(e)
return None return None
def custom_data(tp, sec, custom_data): async def custom_data(tp, sec, custom_data):
try: try:
message = prov.custom_data_request(sec, custom_data) message = prov.custom_data_request(sec, custom_data)
response = tp.send_data('custom-data', message) response = await tp.send_data('custom-data', message)
return (prov.custom_data_response(sec, response) == 0) return (prov.custom_data_response(sec, response) == 0)
except RuntimeError as e: except RuntimeError as e:
on_except(e) on_except(e)
return None return None
def scan_wifi_APs(sel_transport, tp, sec): async def scan_wifi_APs(sel_transport, tp, sec):
APs = [] APs = []
group_channels = 0 group_channels = 0
readlen = 100 readlen = 100
@ -211,13 +207,13 @@ def scan_wifi_APs(sel_transport, tp, sec):
try: try:
message = prov.scan_start_request(sec, blocking=True, group_channels=group_channels) message = prov.scan_start_request(sec, blocking=True, group_channels=group_channels)
start_time = time.time() start_time = time.time()
response = tp.send_data('prov-scan', message) response = await tp.send_data('prov-scan', message)
stop_time = time.time() stop_time = time.time()
print('++++ Scan process executed in ' + str(stop_time - start_time) + ' sec') print('++++ Scan process executed in ' + str(stop_time - start_time) + ' sec')
prov.scan_start_response(sec, response) prov.scan_start_response(sec, response)
message = prov.scan_status_request(sec) message = prov.scan_status_request(sec)
response = tp.send_data('prov-scan', message) response = await tp.send_data('prov-scan', message)
result = prov.scan_status_response(sec, response) result = prov.scan_status_response(sec, response)
print('++++ Scan results : ' + str(result['count'])) print('++++ Scan results : ' + str(result['count']))
if result['count'] != 0: if result['count'] != 0:
@ -226,7 +222,7 @@ def scan_wifi_APs(sel_transport, tp, sec):
while remaining: while remaining:
count = [remaining, readlen][remaining > readlen] count = [remaining, readlen][remaining > readlen]
message = prov.scan_result_request(sec, index, count) message = prov.scan_result_request(sec, index, count)
response = tp.send_data('prov-scan', message) response = await tp.send_data('prov-scan', message)
APs += prov.scan_result_response(sec, response) APs += prov.scan_result_response(sec, response)
remaining -= count remaining -= count
index += count index += count
@ -238,37 +234,37 @@ def scan_wifi_APs(sel_transport, tp, sec):
return APs return APs
def send_wifi_config(tp, sec, ssid, passphrase): async def send_wifi_config(tp, sec, ssid, passphrase):
try: try:
message = prov.config_set_config_request(sec, ssid, passphrase) message = prov.config_set_config_request(sec, ssid, passphrase)
response = tp.send_data('prov-config', message) response = await tp.send_data('prov-config', message)
return (prov.config_set_config_response(sec, response) == 0) return (prov.config_set_config_response(sec, response) == 0)
except RuntimeError as e: except RuntimeError as e:
on_except(e) on_except(e)
return None return None
def apply_wifi_config(tp, sec): async def apply_wifi_config(tp, sec):
try: try:
message = prov.config_apply_config_request(sec) message = prov.config_apply_config_request(sec)
response = tp.send_data('prov-config', message) response = await tp.send_data('prov-config', message)
return (prov.config_apply_config_response(sec, response) == 0) return (prov.config_apply_config_response(sec, response) == 0)
except RuntimeError as e: except RuntimeError as e:
on_except(e) on_except(e)
return None return None
def get_wifi_config(tp, sec): async def get_wifi_config(tp, sec):
try: try:
message = prov.config_get_status_request(sec) message = prov.config_get_status_request(sec)
response = tp.send_data('prov-config', message) response = await tp.send_data('prov-config', message)
return prov.config_get_status_response(sec, response) return prov.config_get_status_response(sec, response)
except RuntimeError as e: except RuntimeError as e:
on_except(e) on_except(e)
return None return None
def wait_wifi_connected(tp, sec): async def wait_wifi_connected(tp, sec):
""" """
Wait for provisioning to report Wi-Fi is connected Wait for provisioning to report Wi-Fi is connected
@ -280,7 +276,7 @@ def wait_wifi_connected(tp, sec):
while True: while True:
time.sleep(TIME_PER_POLL) time.sleep(TIME_PER_POLL)
print('\n==== Wi-Fi connection state ====') print('\n==== Wi-Fi connection state ====')
ret = get_wifi_config(tp, sec) ret = await get_wifi_config(tp, sec)
if ret == 'connecting': if ret == 'connecting':
continue continue
elif ret == 'connected': elif ret == 'connected':
@ -290,7 +286,7 @@ def wait_wifi_connected(tp, sec):
retry -= 1 retry -= 1
print('Waiting to poll status again (status %s, %d tries left)...' % (ret, retry)) print('Waiting to poll status again (status %s, %d tries left)...' % (ret, retry))
else: else:
print('---- Provisioning failed ----') print('---- Provisioning failed! ----')
return False return False
@ -301,7 +297,7 @@ def desc_format(*args):
return desc return desc
if __name__ == '__main__': async def main():
parser = argparse.ArgumentParser(description=desc_format( parser = argparse.ArgumentParser(description=desc_format(
'ESP Provisioning tool for configuring devices ' 'ESP Provisioning tool for configuring devices '
'running protocomm based provisioning service.', 'running protocomm based provisioning service.',
@ -364,7 +360,7 @@ if __name__ == '__main__':
'If Wi-Fi scanning is supported by the provisioning service, this need not ' 'If Wi-Fi scanning is supported by the provisioning service, this need not '
'be specified')) 'be specified'))
parser.add_argument('--passphrase', dest='passphrase', type=str, default='', parser.add_argument('--passphrase', dest='passphrase', type=str,
help=desc_format( help=desc_format(
'This configures the device to use Passphrase for the Wi-Fi network to which ' 'This configures the device to use Passphrase for the Wi-Fi network to which '
'we would like it to connect to permanently, once provisioning is complete. ' 'we would like it to connect to permanently, once provisioning is complete. '
@ -382,116 +378,121 @@ if __name__ == '__main__':
if args.secver == 2 and args.sec2_gen_cred: if args.secver == 2 and args.sec2_gen_cred:
if not args.sec2_usr or not args.sec2_pwd: if not args.sec2_usr or not args.sec2_pwd:
print('---- Username/password cannot be empty for security scheme 2 (SRP6a) ----') raise ValueError('Username/password cannot be empty for security scheme 2 (SRP6a)')
exit(1)
print('==== Salt-verifier for security scheme 2 (SRP6a) ====') print('==== Salt-verifier for security scheme 2 (SRP6a) ====')
security.sec2_gen_salt_verifier(args.sec2_usr, args.sec2_pwd, args.sec2_salt_len) security.sec2_gen_salt_verifier(args.sec2_usr, args.sec2_pwd, args.sec2_salt_len)
exit(0) sys.exit()
obj_transport = get_transport(args.mode.lower(), args.name) obj_transport = await get_transport(args.mode.lower(), args.name)
if obj_transport is None: if obj_transport is None:
print('---- Failed to establish connection ----') raise RuntimeError('Failed to establish connection')
exit(1)
# If security version not specified check in capabilities try:
if args.secver is None: # If security version not specified check in capabilities
# First check if capabilities are supported or not if args.secver is None:
if not has_capability(obj_transport): # First check if capabilities are supported or not
print('Security capabilities could not be determined. Please specify "--sec_ver" explicitly') if not await has_capability(obj_transport):
print('---- Invalid Security Version ----') print('Security capabilities could not be determined, please specify "--sec_ver" explicitly')
exit(2) raise ValueError('Invalid Security Version')
# When no_sec is present, use security 0, else security 1 # When no_sec is present, use security 0, else security 1
args.secver = int(not has_capability(obj_transport, 'no_sec')) args.secver = int(not await has_capability(obj_transport, 'no_sec'))
print('Security scheme determined to be :', args.secver) print(f'==== Security Scheme: {args.secver} ====')
if (args.secver != 0) and not has_capability(obj_transport, 'no_pop'): if (args.secver == 1):
if len(args.sec1_pop) == 0: if not await has_capability(obj_transport, 'no_pop'):
print('---- Proof of Possession argument not provided ----') if len(args.sec1_pop) == 0:
exit(2) prompt_str = 'Proof of Possession required: '
elif len(args.sec1_pop) != 0: args.sec1_pop = getpass(prompt_str)
print('---- Proof of Possession will be ignored ----') elif len(args.sec1_pop) != 0:
args.sec1_pop = '' print('Proof of Possession will be ignored')
args.sec1_pop = ''
obj_security = get_security(args.secver, args.sec2_usr, args.sec2_pwd, args.sec1_pop, args.verbose) if (args.secver == 2):
if obj_security is None: if len(args.sec2_usr) == 0:
print('---- Invalid Security Version ----') args.sec2_usr = input('Security Scheme 2 - SRP6a Username required: ')
exit(2) if len(args.sec2_pwd) == 0:
prompt_str = 'Security Scheme 2 - SRP6a Password required: '
args.sec2_pwd = getpass(prompt_str)
if args.version != '': obj_security = get_security(args.secver, args.sec2_usr, args.sec2_pwd, args.sec1_pop, args.verbose)
print('\n==== Verifying protocol version ====') if obj_security is None:
if not version_match(obj_transport, args.version, args.verbose): raise ValueError('Invalid Security Version')
print('---- Error in protocol version matching ----')
exit(3)
print('==== Verified protocol version successfully ====')
print('\n==== Starting Session ====') if args.version != '':
if not establish_session(obj_transport, obj_security): print('\n==== Verifying protocol version ====')
print('Failed to establish session. Ensure that security scheme and proof of possession are correct') if not await version_match(obj_transport, args.version, args.verbose):
print('---- Error in establishing session ----') raise RuntimeError('Error in protocol version matching')
exit(4) print('==== Verified protocol version successfully ====')
print('==== Session Established ====')
if args.custom_data != '': print('\n==== Starting Session ====')
print('\n==== Sending Custom data to esp32 ====') if not await establish_session(obj_transport, obj_security):
if not custom_data(obj_transport, obj_security, args.custom_data): print('Failed to establish session. Ensure that security scheme and proof of possession are correct')
print('---- Error in custom data ----') raise RuntimeError('Error in establishing session')
exit(5) print('==== Session Established ====')
print('==== Custom data sent successfully ====')
if args.ssid == '': if args.custom_data != '':
if not has_capability(obj_transport, 'wifi_scan'): print('\n==== Sending Custom data to Target ====')
print('---- Wi-Fi Scan List is not supported by provisioning service ----') if not await custom_data(obj_transport, obj_security, args.custom_data):
print('---- Rerun esp_prov with SSID and Passphrase as argument ----') raise RuntimeError('Error in custom data')
exit(3) print('==== Custom data sent successfully ====')
while True: if args.ssid == '':
print('\n==== Scanning Wi-Fi APs ====') if not await has_capability(obj_transport, 'wifi_scan'):
start_time = time.time() raise RuntimeError('Wi-Fi Scan List is not supported by provisioning service')
APs = scan_wifi_APs(args.mode.lower(), obj_transport, obj_security)
end_time = time.time()
print('\n++++ Scan finished in ' + str(end_time - start_time) + ' sec')
if APs is None:
print('---- Error in scanning Wi-Fi APs ----')
exit(8)
if len(APs) == 0:
print('No APs found!')
exit(9)
print('==== Wi-Fi Scan results ====')
print('{0: >4} {1: <33} {2: <12} {3: >4} {4: <4} {5: <16}'.format(
'S.N.', 'SSID', 'BSSID', 'CHN', 'RSSI', 'AUTH'))
for i in range(len(APs)):
print('[{0: >2}] {1: <33} {2: <12} {3: >4} {4: <4} {5: <16}'.format(
i + 1, APs[i]['ssid'], APs[i]['bssid'], APs[i]['channel'], APs[i]['rssi'], APs[i]['auth']))
while True: while True:
try: print('\n==== Scanning Wi-Fi APs ====')
select = int(binput('Select AP by number (0 to rescan) : ')) start_time = time.time()
if select < 0 or select > len(APs): APs = await scan_wifi_APs(args.mode.lower(), obj_transport, obj_security)
raise ValueError end_time = time.time()
print('\n++++ Scan finished in ' + str(end_time - start_time) + ' sec')
if APs is None:
raise RuntimeError('Error in scanning Wi-Fi APs')
if len(APs) == 0:
print('No APs found!')
sys.exit()
print('==== Wi-Fi Scan results ====')
print('{0: >4} {1: <33} {2: <12} {3: >4} {4: <4} {5: <16}'.format(
'S.N.', 'SSID', 'BSSID', 'CHN', 'RSSI', 'AUTH'))
for i in range(len(APs)):
print('[{0: >2}] {1: <33} {2: <12} {3: >4} {4: <4} {5: <16}'.format(
i + 1, APs[i]['ssid'], APs[i]['bssid'], APs[i]['channel'], APs[i]['rssi'], APs[i]['auth']))
while True:
try:
select = int(input('Select AP by number (0 to rescan) : '))
if select < 0 or select > len(APs):
raise ValueError
break
except ValueError:
print('Invalid input! Retry')
if select != 0:
break break
except ValueError:
print('Invalid input! Retry')
if select != 0: args.ssid = APs[select - 1]['ssid']
break
args.ssid = APs[select - 1]['ssid'] if args.passphrase is None:
prompt_str = 'Enter passphrase for {0} : '.format(args.ssid) prompt_str = 'Enter passphrase for {0} : '.format(args.ssid)
args.passphrase = getpass(prompt_str) args.passphrase = getpass(prompt_str)
print('\n==== Sending Wi-Fi credential to esp32 ====') print('\n==== Sending Wi-Fi Credentials to Target ====')
if not send_wifi_config(obj_transport, obj_security, args.ssid, args.passphrase): if not await send_wifi_config(obj_transport, obj_security, args.ssid, args.passphrase):
print('---- Error in send Wi-Fi config ----') raise RuntimeError('Error in send Wi-Fi config')
exit(6) print('==== Wi-Fi Credentials sent successfully ====')
print('==== Wi-Fi Credentials sent successfully ====')
print('\n==== Applying config to esp32 ====') print('\n==== Applying Wi-Fi Config to Target ====')
if not apply_wifi_config(obj_transport, obj_security): if not await apply_wifi_config(obj_transport, obj_security):
print('---- Error in apply Wi-Fi config ----') raise RuntimeError('Error in apply Wi-Fi config')
exit(7) print('==== Apply config sent successfully ====')
print('==== Apply config sent successfully ====')
wait_wifi_connected(obj_transport, obj_security) await wait_wifi_connected(obj_transport, obj_security)
finally:
await obj_transport.disconnect()
if __name__ == '__main__':
asyncio.run(main())

View File

@ -1,16 +1,5 @@
# Copyright 2018 Espressif Systems (Shanghai) PTE LTD # SPDX-FileCopyrightText: 2018-2022 Espressif Systems (Shanghai) CO LTD
# # SPDX-License-Identifier: Apache-2.0
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# #
from .custom_prov import * # noqa F403 from .custom_prov import * # noqa F403

View File

@ -4,26 +4,23 @@
# APIs for interpreting and creating protobuf packets for `custom-config` protocomm endpoint # APIs for interpreting and creating protobuf packets for `custom-config` protocomm endpoint
from __future__ import print_function from utils import str_to_bytes
import utils
from future.utils import tobytes
def print_verbose(security_ctx, data): def print_verbose(security_ctx, data):
if (security_ctx.verbose): if (security_ctx.verbose):
print('++++ ' + data + ' ++++') print(f'\x1b[32;20m++++ {data} ++++\x1b[0m')
def custom_data_request(security_ctx, data): def custom_data_request(security_ctx, data):
# Encrypt the custom data # Encrypt the custom data
enc_cmd = security_ctx.encrypt_data(tobytes(data)) enc_cmd = security_ctx.encrypt_data(str_to_bytes(data))
print_verbose(security_ctx, 'Client -> Device (CustomData cmd) ' + utils.str_to_hexstr(enc_cmd)) print_verbose(security_ctx, f'Client -> Device (CustomData cmd): 0x{enc_cmd.hex()}')
return enc_cmd return enc_cmd.decode('latin-1')
def custom_data_response(security_ctx, response_data): def custom_data_response(security_ctx, response_data):
# Decrypt response packet # Decrypt response packet
decrypt = security_ctx.decrypt_data(tobytes(response_data)) decrypt = security_ctx.decrypt_data(str_to_bytes(response_data))
print('CustomData response: ' + str(decrypt)) print(f'++++ CustomData response: {str(decrypt)}++++')
return 0 return 0

View File

@ -1,30 +1,16 @@
# Copyright 2018 Espressif Systems (Shanghai) PTE LTD # SPDX-FileCopyrightText: 2018-2022 Espressif Systems (Shanghai) CO LTD
# # SPDX-License-Identifier: Apache-2.0
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# #
# APIs for interpreting and creating protobuf packets for Wi-Fi provisioning # APIs for interpreting and creating protobuf packets for Wi-Fi provisioning
from __future__ import print_function
import proto import proto
import utils from utils import str_to_bytes
from future.utils import tobytes
def print_verbose(security_ctx, data): def print_verbose(security_ctx, data):
if (security_ctx.verbose): if (security_ctx.verbose):
print('++++ ' + data + ' ++++') print(f'\x1b[32;20m++++ {data} ++++\x1b[0m')
def config_get_status_request(security_ctx): def config_get_status_request(security_ctx):
@ -33,34 +19,34 @@ def config_get_status_request(security_ctx):
cfg1.msg = proto.wifi_config_pb2.TypeCmdGetStatus cfg1.msg = proto.wifi_config_pb2.TypeCmdGetStatus
cmd_get_status = proto.wifi_config_pb2.CmdGetStatus() cmd_get_status = proto.wifi_config_pb2.CmdGetStatus()
cfg1.cmd_get_status.MergeFrom(cmd_get_status) cfg1.cmd_get_status.MergeFrom(cmd_get_status)
encrypted_cfg = security_ctx.encrypt_data(cfg1.SerializeToString()).decode('latin-1') encrypted_cfg = security_ctx.encrypt_data(cfg1.SerializeToString())
print_verbose(security_ctx, 'Client -> Device (Encrypted CmdGetStatus) ' + utils.str_to_hexstr(encrypted_cfg)) print_verbose(security_ctx, f'Client -> Device (Encrypted CmdGetStatus): 0x{encrypted_cfg.hex()}')
return encrypted_cfg return encrypted_cfg.decode('latin-1')
def config_get_status_response(security_ctx, response_data): def config_get_status_response(security_ctx, response_data):
# Interpret protobuf response packet from GetStatus command # Interpret protobuf response packet from GetStatus command
decrypted_message = security_ctx.decrypt_data(tobytes(response_data)) decrypted_message = security_ctx.decrypt_data(str_to_bytes(response_data))
cmd_resp1 = proto.wifi_config_pb2.WiFiConfigPayload() cmd_resp1 = proto.wifi_config_pb2.WiFiConfigPayload()
cmd_resp1.ParseFromString(decrypted_message) cmd_resp1.ParseFromString(decrypted_message)
print_verbose(security_ctx, 'Response type ' + str(cmd_resp1.msg)) print_verbose(security_ctx, f'CmdGetStatus type: {str(cmd_resp1.msg)}')
print_verbose(security_ctx, 'Response status ' + str(cmd_resp1.resp_get_status.status)) print_verbose(security_ctx, f'CmdGetStatus status: {str(cmd_resp1.resp_get_status.status)}')
if cmd_resp1.resp_get_status.sta_state == 0: if cmd_resp1.resp_get_status.sta_state == 0:
print('++++ WiFi state: ' + 'connected ++++') print('==== WiFi state: Connected ====')
return 'connected' return 'connected'
elif cmd_resp1.resp_get_status.sta_state == 1: elif cmd_resp1.resp_get_status.sta_state == 1:
print('++++ WiFi state: ' + 'connecting... ++++') print('++++ WiFi state: Connecting... ++++')
return 'connecting' return 'connecting'
elif cmd_resp1.resp_get_status.sta_state == 2: elif cmd_resp1.resp_get_status.sta_state == 2:
print('++++ WiFi state: ' + 'disconnected ++++') print('---- WiFi state: Disconnected ----')
return 'disconnected' return 'disconnected'
elif cmd_resp1.resp_get_status.sta_state == 3: elif cmd_resp1.resp_get_status.sta_state == 3:
print('++++ WiFi state: ' + 'connection failed ++++') print('---- WiFi state: Connection Failed ----')
if cmd_resp1.resp_get_status.fail_reason == 0: if cmd_resp1.resp_get_status.fail_reason == 0:
print('++++ Failure reason: ' + 'Incorrect Password ++++') print('---- Failure reason: Incorrect Password ----')
elif cmd_resp1.resp_get_status.fail_reason == 1: elif cmd_resp1.resp_get_status.fail_reason == 1:
print('++++ Failure reason: ' + 'Incorrect SSID ++++') print('---- Failure reason: Incorrect SSID ----')
return 'failed' return 'failed'
return 'unknown' return 'unknown'
@ -69,19 +55,19 @@ def config_set_config_request(security_ctx, ssid, passphrase):
# Form protobuf request packet for SetConfig command # Form protobuf request packet for SetConfig command
cmd = proto.wifi_config_pb2.WiFiConfigPayload() cmd = proto.wifi_config_pb2.WiFiConfigPayload()
cmd.msg = proto.wifi_config_pb2.TypeCmdSetConfig cmd.msg = proto.wifi_config_pb2.TypeCmdSetConfig
cmd.cmd_set_config.ssid = tobytes(ssid) cmd.cmd_set_config.ssid = str_to_bytes(ssid)
cmd.cmd_set_config.passphrase = tobytes(passphrase) cmd.cmd_set_config.passphrase = str_to_bytes(passphrase)
enc_cmd = security_ctx.encrypt_data(cmd.SerializeToString()).decode('latin-1') enc_cmd = security_ctx.encrypt_data(cmd.SerializeToString())
print_verbose(security_ctx, 'Client -> Device (SetConfig cmd) ' + utils.str_to_hexstr(enc_cmd)) print_verbose(security_ctx, f'Client -> Device (SetConfig cmd): 0x{enc_cmd.hex()}')
return enc_cmd return enc_cmd.decode('latin-1')
def config_set_config_response(security_ctx, response_data): def config_set_config_response(security_ctx, response_data):
# Interpret protobuf response packet from SetConfig command # Interpret protobuf response packet from SetConfig command
decrypt = security_ctx.decrypt_data(tobytes(response_data)) decrypt = security_ctx.decrypt_data(str_to_bytes(response_data))
cmd_resp4 = proto.wifi_config_pb2.WiFiConfigPayload() cmd_resp4 = proto.wifi_config_pb2.WiFiConfigPayload()
cmd_resp4.ParseFromString(decrypt) cmd_resp4.ParseFromString(decrypt)
print_verbose(security_ctx, 'SetConfig status ' + str(cmd_resp4.resp_set_config.status)) print_verbose(security_ctx, f'SetConfig status: 0x{str(cmd_resp4.resp_set_config.status)}')
return cmd_resp4.resp_set_config.status return cmd_resp4.resp_set_config.status
@ -89,15 +75,15 @@ def config_apply_config_request(security_ctx):
# Form protobuf request packet for ApplyConfig command # Form protobuf request packet for ApplyConfig command
cmd = proto.wifi_config_pb2.WiFiConfigPayload() cmd = proto.wifi_config_pb2.WiFiConfigPayload()
cmd.msg = proto.wifi_config_pb2.TypeCmdApplyConfig cmd.msg = proto.wifi_config_pb2.TypeCmdApplyConfig
enc_cmd = security_ctx.encrypt_data(cmd.SerializeToString()).decode('latin-1') enc_cmd = security_ctx.encrypt_data(cmd.SerializeToString())
print_verbose(security_ctx, 'Client -> Device (ApplyConfig cmd) ' + utils.str_to_hexstr(enc_cmd)) print_verbose(security_ctx, f'Client -> Device (ApplyConfig cmd): 0x{enc_cmd.hex()}')
return enc_cmd return enc_cmd.decode('latin-1')
def config_apply_config_response(security_ctx, response_data): def config_apply_config_response(security_ctx, response_data):
# Interpret protobuf response packet from ApplyConfig command # Interpret protobuf response packet from ApplyConfig command
decrypt = security_ctx.decrypt_data(tobytes(response_data)) decrypt = security_ctx.decrypt_data(str_to_bytes(response_data))
cmd_resp5 = proto.wifi_config_pb2.WiFiConfigPayload() cmd_resp5 = proto.wifi_config_pb2.WiFiConfigPayload()
cmd_resp5.ParseFromString(decrypt) cmd_resp5.ParseFromString(decrypt)
print_verbose(security_ctx, 'ApplyConfig status ' + str(cmd_resp5.resp_apply_config.status)) print_verbose(security_ctx, f'ApplyConfig status: 0x{str(cmd_resp5.resp_apply_config.status)}')
return cmd_resp5.resp_apply_config.status return cmd_resp5.resp_apply_config.status

View File

@ -3,17 +3,13 @@
# #
# APIs for interpreting and creating protobuf packets for Wi-Fi Scanning # APIs for interpreting and creating protobuf packets for Wi-Fi Scanning
from __future__ import print_function
import proto import proto
import utils from utils import str_to_bytes
from future.utils import tobytes
def print_verbose(security_ctx, data): def print_verbose(security_ctx, data):
if (security_ctx.verbose): if (security_ctx.verbose):
print('++++ ' + data + ' ++++') print(f'\x1b[32;20m++++ {data} ++++\x1b[0m')
def scan_start_request(security_ctx, blocking=True, passive=False, group_channels=5, period_ms=120): def scan_start_request(security_ctx, blocking=True, passive=False, group_channels=5, period_ms=120):
@ -24,17 +20,17 @@ def scan_start_request(security_ctx, blocking=True, passive=False, group_channel
cmd.cmd_scan_start.passive = passive cmd.cmd_scan_start.passive = passive
cmd.cmd_scan_start.group_channels = group_channels cmd.cmd_scan_start.group_channels = group_channels
cmd.cmd_scan_start.period_ms = period_ms cmd.cmd_scan_start.period_ms = period_ms
enc_cmd = security_ctx.encrypt_data(cmd.SerializeToString()).decode('latin-1') enc_cmd = security_ctx.encrypt_data(cmd.SerializeToString())
print_verbose(security_ctx, 'Client -> Device (Encrypted CmdScanStart) ' + utils.str_to_hexstr(enc_cmd)) print_verbose(security_ctx, f'Client -> Device (Encrypted CmdScanStart): 0x{enc_cmd.hex()}')
return enc_cmd return enc_cmd.decode('latin-1')
def scan_start_response(security_ctx, response_data): def scan_start_response(security_ctx, response_data):
# Interpret protobuf response packet from ScanStart command # Interpret protobuf response packet from ScanStart command
dec_resp = security_ctx.decrypt_data(tobytes(response_data)) dec_resp = security_ctx.decrypt_data(str_to_bytes(response_data))
resp = proto.wifi_scan_pb2.WiFiScanPayload() resp = proto.wifi_scan_pb2.WiFiScanPayload()
resp.ParseFromString(dec_resp) resp.ParseFromString(dec_resp)
print_verbose(security_ctx, 'ScanStart status ' + str(resp.status)) print_verbose(security_ctx, f'ScanStart status: 0x{str(resp.status)}')
if resp.status != 0: if resp.status != 0:
raise RuntimeError raise RuntimeError
@ -43,17 +39,17 @@ def scan_status_request(security_ctx):
# Form protobuf request packet for ScanStatus command # Form protobuf request packet for ScanStatus command
cmd = proto.wifi_scan_pb2.WiFiScanPayload() cmd = proto.wifi_scan_pb2.WiFiScanPayload()
cmd.msg = proto.wifi_scan_pb2.TypeCmdScanStatus cmd.msg = proto.wifi_scan_pb2.TypeCmdScanStatus
enc_cmd = security_ctx.encrypt_data(cmd.SerializeToString()).decode('latin-1') enc_cmd = security_ctx.encrypt_data(cmd.SerializeToString())
print_verbose(security_ctx, 'Client -> Device (Encrypted CmdScanStatus) ' + utils.str_to_hexstr(enc_cmd)) print_verbose(security_ctx, f'Client -> Device (Encrypted CmdScanStatus): 0x{enc_cmd.hex()}')
return enc_cmd return enc_cmd.decode('latin-1')
def scan_status_response(security_ctx, response_data): def scan_status_response(security_ctx, response_data):
# Interpret protobuf response packet from ScanStatus command # Interpret protobuf response packet from ScanStatus command
dec_resp = security_ctx.decrypt_data(tobytes(response_data)) dec_resp = security_ctx.decrypt_data(str_to_bytes(response_data))
resp = proto.wifi_scan_pb2.WiFiScanPayload() resp = proto.wifi_scan_pb2.WiFiScanPayload()
resp.ParseFromString(dec_resp) resp.ParseFromString(dec_resp)
print_verbose(security_ctx, 'ScanStatus status ' + str(resp.status)) print_verbose(security_ctx, f'ScanStatus status: 0x{str(resp.status)}')
if resp.status != 0: if resp.status != 0:
raise RuntimeError raise RuntimeError
return {'finished': resp.resp_scan_status.scan_finished, 'count': resp.resp_scan_status.result_count} return {'finished': resp.resp_scan_status.scan_finished, 'count': resp.resp_scan_status.result_count}
@ -65,17 +61,17 @@ def scan_result_request(security_ctx, index, count):
cmd.msg = proto.wifi_scan_pb2.TypeCmdScanResult cmd.msg = proto.wifi_scan_pb2.TypeCmdScanResult
cmd.cmd_scan_result.start_index = index cmd.cmd_scan_result.start_index = index
cmd.cmd_scan_result.count = count cmd.cmd_scan_result.count = count
enc_cmd = security_ctx.encrypt_data(cmd.SerializeToString()).decode('latin-1') enc_cmd = security_ctx.encrypt_data(cmd.SerializeToString())
print_verbose(security_ctx, 'Client -> Device (Encrypted CmdScanResult) ' + utils.str_to_hexstr(enc_cmd)) print_verbose(security_ctx, f'Client -> Device (Encrypted CmdScanResult): 0x{enc_cmd.hex()}')
return enc_cmd return enc_cmd.decode('latin-1')
def scan_result_response(security_ctx, response_data): def scan_result_response(security_ctx, response_data):
# Interpret protobuf response packet from ScanResult command # Interpret protobuf response packet from ScanResult command
dec_resp = security_ctx.decrypt_data(tobytes(response_data)) dec_resp = security_ctx.decrypt_data(str_to_bytes(response_data))
resp = proto.wifi_scan_pb2.WiFiScanPayload() resp = proto.wifi_scan_pb2.WiFiScanPayload()
resp.ParseFromString(dec_resp) resp.ParseFromString(dec_resp)
print_verbose(security_ctx, 'ScanResult status ' + str(resp.status)) print_verbose(security_ctx, f'ScanResult status: 0x{str(resp.status)}')
if resp.status != 0: if resp.status != 0:
raise RuntimeError raise RuntimeError
authmode_str = ['Open', 'WEP', 'WPA_PSK', 'WPA2_PSK', 'WPA_WPA2_PSK', authmode_str = ['Open', 'WEP', 'WPA_PSK', 'WPA2_PSK', 'WPA_WPA2_PSK',
@ -83,13 +79,13 @@ def scan_result_response(security_ctx, response_data):
results = [] results = []
for entry in resp.resp_scan_result.entries: for entry in resp.resp_scan_result.entries:
results += [{'ssid': entry.ssid.decode('latin-1').rstrip('\x00'), results += [{'ssid': entry.ssid.decode('latin-1').rstrip('\x00'),
'bssid': utils.str_to_hexstr(entry.bssid.decode('latin-1')), 'bssid': entry.bssid.hex(),
'channel': entry.channel, 'channel': entry.channel,
'rssi': entry.rssi, 'rssi': entry.rssi,
'auth': authmode_str[entry.auth]}] 'auth': authmode_str[entry.auth]}]
print_verbose(security_ctx, 'ScanResult SSID : ' + str(results[-1]['ssid'])) print_verbose(security_ctx, f"ScanResult SSID : {str(results[-1]['ssid'])}")
print_verbose(security_ctx, 'ScanResult BSSID : ' + str(results[-1]['bssid'])) print_verbose(security_ctx, f"ScanResult BSSID : {str(results[-1]['bssid'])}")
print_verbose(security_ctx, 'ScanResult Channel : ' + str(results[-1]['channel'])) print_verbose(security_ctx, f"ScanResult Channel : {str(results[-1]['channel'])}")
print_verbose(security_ctx, 'ScanResult RSSI : ' + str(results[-1]['rssi'])) print_verbose(security_ctx, f"ScanResult RSSI : {str(results[-1]['rssi'])}")
print_verbose(security_ctx, 'ScanResult AUTH : ' + str(results[-1]['auth'])) print_verbose(security_ctx, f"ScanResult AUTH : {str(results[-1]['auth'])}")
return results return results

View File

@ -1,3 +1,4 @@
bleak
future future
cryptography cryptography
protobuf protobuf

View File

@ -1 +0,0 @@
dbus-python

View File

@ -1,16 +1,5 @@
# Copyright 2018 Espressif Systems (Shanghai) PTE LTD # SPDX-FileCopyrightText: 2018-2022 Espressif Systems (Shanghai) CO LTD
# # SPDX-License-Identifier: Apache-2.0
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# #
# Base class for protocomm security # Base class for protocomm security

View File

@ -1,25 +1,12 @@
# Copyright 2018 Espressif Systems (Shanghai) PTE LTD # SPDX-FileCopyrightText: 2018-2022 Espressif Systems (Shanghai) CO LTD
# # SPDX-License-Identifier: Apache-2.0
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# #
# APIs for interpreting and creating protobuf packets for # APIs for interpreting and creating protobuf packets for
# protocomm endpoint with security type protocomm_security0 # protocomm endpoint with security type protocomm_security0
from __future__ import print_function
import proto import proto
from future.utils import tobytes from utils import str_to_bytes
from .security import Security from .security import Security
@ -52,10 +39,10 @@ class Security0(Security):
def setup0_response(self, response_data): def setup0_response(self, response_data):
# Interpret protocomm security0 response packet # Interpret protocomm security0 response packet
setup_resp = proto.session_pb2.SessionData() setup_resp = proto.session_pb2.SessionData()
setup_resp.ParseFromString(tobytes(response_data)) setup_resp.ParseFromString(str_to_bytes(response_data))
# Check if security scheme matches # Check if security scheme matches
if setup_resp.sec_ver != proto.session_pb2.SecScheme0: if setup_resp.sec_ver != proto.session_pb2.SecScheme0:
print('Incorrect sec scheme') raise RuntimeError('Incorrect security scheme')
def encrypt_data(self, data): def encrypt_data(self, data):
# Passive. No encryption when security0 used # Passive. No encryption when security0 used

View File

@ -1,35 +1,24 @@
# Copyright 2018 Espressif Systems (Shanghai) PTE LTD # SPDX-FileCopyrightText: 2018-2022 Espressif Systems (Shanghai) CO LTD
# # SPDX-License-Identifier: Apache-2.0
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# #
# APIs for interpreting and creating protobuf packets for # APIs for interpreting and creating protobuf packets for
# protocomm endpoint with security type protocomm_security1 # protocomm endpoint with security type protocomm_security1
from __future__ import print_function
import proto import proto
import session_pb2
import utils
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey, X25519PublicKey from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey, X25519PublicKey
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from future.utils import tobytes from utils import long_to_bytes, str_to_bytes
from .security import Security from .security import Security
def a_xor_b(a: bytes, b: bytes) -> bytes:
return b''.join(long_to_bytes(a[i] ^ b[i]) for i in range(0, len(b)))
# Enum for state of protocomm_security1 FSM # Enum for state of protocomm_security1 FSM
class security_state: class security_state:
REQUEST1 = 0 REQUEST1 = 0
@ -38,25 +27,11 @@ class security_state:
FINISHED = 3 FINISHED = 3
def xor(a, b):
# XOR two inputs of type `bytes`
ret = bytearray()
# Decode the input bytes to strings
a = a.decode('latin-1')
b = b.decode('latin-1')
for i in range(max(len(a), len(b))):
# Convert the characters to corresponding 8-bit ASCII codes
# then XOR them and store in bytearray
ret.append(([0, ord(a[i])][i < len(a)]) ^ ([0, ord(b[i])][i < len(b)]))
# Convert bytearray to bytes
return bytes(ret)
class Security1(Security): class Security1(Security):
def __init__(self, pop, verbose): def __init__(self, pop, verbose):
# Initialize state of the security1 FSM # Initialize state of the security1 FSM
self.session_state = security_state.REQUEST1 self.session_state = security_state.REQUEST1
self.pop = tobytes(pop) self.pop = str_to_bytes(pop)
self.verbose = verbose self.verbose = verbose
Security.__init__(self, self.security1_session) Security.__init__(self, self.security1_session)
@ -66,59 +41,55 @@ class Security1(Security):
if (self.session_state == security_state.REQUEST1): if (self.session_state == security_state.REQUEST1):
self.session_state = security_state.RESPONSE1_REQUEST2 self.session_state = security_state.RESPONSE1_REQUEST2
return self.setup0_request() return self.setup0_request()
if (self.session_state == security_state.RESPONSE1_REQUEST2): elif (self.session_state == security_state.RESPONSE1_REQUEST2):
self.session_state = security_state.RESPONSE2 self.session_state = security_state.RESPONSE2
self.setup0_response(response_data) self.setup0_response(response_data)
return self.setup1_request() return self.setup1_request()
if (self.session_state == security_state.RESPONSE2): elif (self.session_state == security_state.RESPONSE2):
self.session_state = security_state.FINISHED self.session_state = security_state.FINISHED
self.setup1_response(response_data) self.setup1_response(response_data)
return None return None
else:
print('Unexpected state') print('Unexpected state')
return None return None
def __generate_key(self): def __generate_key(self):
# Generate private and public key pair for client # Generate private and public key pair for client
self.client_private_key = X25519PrivateKey.generate() self.client_private_key = X25519PrivateKey.generate()
try: self.client_public_key = self.client_private_key.public_key().public_bytes(
self.client_public_key = self.client_private_key.public_key().public_bytes( encoding=serialization.Encoding.Raw,
encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw)
format=serialization.PublicFormat.Raw)
except TypeError:
# backward compatible call for older cryptography library
self.client_public_key = self.client_private_key.public_key().public_bytes()
def _print_verbose(self, data): def _print_verbose(self, data):
if (self.verbose): if (self.verbose):
print('++++ ' + data + ' ++++') print(f'\x1b[32;20m++++ {data} ++++\x1b[0m')
def setup0_request(self): def setup0_request(self):
# Form SessionCmd0 request packet using client public key # Form SessionCmd0 request packet using client public key
setup_req = session_pb2.SessionData() setup_req = proto.session_pb2.SessionData()
setup_req.sec_ver = session_pb2.SecScheme1 setup_req.sec_ver = proto.session_pb2.SecScheme1
self.__generate_key() self.__generate_key()
setup_req.sec1.sc0.client_pubkey = self.client_public_key setup_req.sec1.sc0.client_pubkey = self.client_public_key
self._print_verbose('Client Public Key:\t' + utils.str_to_hexstr(self.client_public_key.decode('latin-1'))) self._print_verbose(f'Client Public Key:\t0x{self.client_public_key.hex()}')
return setup_req.SerializeToString().decode('latin-1') return setup_req.SerializeToString().decode('latin-1')
def setup0_response(self, response_data): def setup0_response(self, response_data):
# Interpret SessionResp0 response packet # Interpret SessionResp0 response packet
setup_resp = proto.session_pb2.SessionData() setup_resp = proto.session_pb2.SessionData()
setup_resp.ParseFromString(tobytes(response_data)) setup_resp.ParseFromString(str_to_bytes(response_data))
self._print_verbose('Security version:\t' + str(setup_resp.sec_ver)) self._print_verbose('Security version:\t' + str(setup_resp.sec_ver))
if setup_resp.sec_ver != session_pb2.SecScheme1: if setup_resp.sec_ver != proto.session_pb2.SecScheme1:
print('Incorrect sec scheme') raise RuntimeError('Incorrect security scheme')
exit(1)
self.device_public_key = setup_resp.sec1.sr0.device_pubkey self.device_public_key = setup_resp.sec1.sr0.device_pubkey
# Device random is the initialization vector # Device random is the initialization vector
device_random = setup_resp.sec1.sr0.device_random device_random = setup_resp.sec1.sr0.device_random
self._print_verbose('Device Public Key:\t' + utils.str_to_hexstr(self.device_public_key.decode('latin-1'))) self._print_verbose(f'Device Public Key:\t0x{self.device_public_key.hex()}')
self._print_verbose('Device Random:\t' + utils.str_to_hexstr(device_random.decode('latin-1'))) self._print_verbose(f'Device Random:\t0x{device_random.hex()}')
# Calculate Curve25519 shared key using Client private key and Device public key # Calculate Curve25519 shared key using Client private key and Device public key
sharedK = self.client_private_key.exchange(X25519PublicKey.from_public_bytes(self.device_public_key)) sharedK = self.client_private_key.exchange(X25519PublicKey.from_public_bytes(self.device_public_key))
self._print_verbose('Shared Key:\t' + utils.str_to_hexstr(sharedK.decode('latin-1'))) self._print_verbose(f'Shared Key:\t0x{sharedK.hex()}')
# If PoP is provided, XOR SHA256 of PoP with the previously # If PoP is provided, XOR SHA256 of PoP with the previously
# calculated Shared Key to form the actual Shared Key # calculated Shared Key to form the actual Shared Key
@ -128,8 +99,8 @@ class Security1(Security):
h.update(self.pop) h.update(self.pop)
digest = h.finalize() digest = h.finalize()
# XOR with and update Shared Key # XOR with and update Shared Key
sharedK = xor(sharedK, digest) sharedK = a_xor_b(sharedK, digest)
self._print_verbose('New Shared Key XORed with PoP:\t' + utils.str_to_hexstr(sharedK.decode('latin-1'))) self._print_verbose(f'Updated Shared Key (Shared key XORed with PoP):\t0x{sharedK.hex()}')
# Initialize the encryption engine with Shared Key and initialization vector # Initialize the encryption engine with Shared Key and initialization vector
cipher = Cipher(algorithms.AES(sharedK), modes.CTR(device_random), backend=default_backend()) cipher = Cipher(algorithms.AES(sharedK), modes.CTR(device_random), backend=default_backend())
self.cipher = cipher.encryptor() self.cipher = cipher.encryptor()
@ -137,36 +108,33 @@ class Security1(Security):
def setup1_request(self): def setup1_request(self):
# Form SessionCmd1 request packet using encrypted device public key # Form SessionCmd1 request packet using encrypted device public key
setup_req = proto.session_pb2.SessionData() setup_req = proto.session_pb2.SessionData()
setup_req.sec_ver = session_pb2.SecScheme1 setup_req.sec_ver = proto.session_pb2.SecScheme1
setup_req.sec1.msg = proto.sec1_pb2.Session_Command1 setup_req.sec1.msg = proto.sec1_pb2.Session_Command1
# Encrypt device public key and attach to the request packet # Encrypt device public key and attach to the request packet
client_verify = self.cipher.update(self.device_public_key) client_verify = self.cipher.update(self.device_public_key)
self._print_verbose('Client Verify:\t' + utils.str_to_hexstr(client_verify.decode('latin-1'))) self._print_verbose(f'Client Proof:\t0x{client_verify.hex()}')
setup_req.sec1.sc1.client_verify_data = client_verify setup_req.sec1.sc1.client_verify_data = client_verify
return setup_req.SerializeToString().decode('latin-1') return setup_req.SerializeToString().decode('latin-1')
def setup1_response(self, response_data): def setup1_response(self, response_data):
# Interpret SessionResp1 response packet # Interpret SessionResp1 response packet
setup_resp = proto.session_pb2.SessionData() setup_resp = proto.session_pb2.SessionData()
setup_resp.ParseFromString(tobytes(response_data)) setup_resp.ParseFromString(str_to_bytes(response_data))
# Ensure security scheme matches # Ensure security scheme matches
if setup_resp.sec_ver == session_pb2.SecScheme1: if setup_resp.sec_ver == proto.session_pb2.SecScheme1:
# Read encrypyed device verify string # Read encrypyed device verify string
device_verify = setup_resp.sec1.sr1.device_verify_data device_verify = setup_resp.sec1.sr1.device_verify_data
self._print_verbose('Device verify:\t' + utils.str_to_hexstr(device_verify.decode('latin-1'))) self._print_verbose(f'Device Proof:\t0x{device_verify.hex()}')
# Decrypt the device verify string # Decrypt the device verify string
enc_client_pubkey = self.cipher.update(setup_resp.sec1.sr1.device_verify_data) enc_client_pubkey = self.cipher.update(setup_resp.sec1.sr1.device_verify_data)
self._print_verbose('Enc client pubkey:\t ' + utils.str_to_hexstr(enc_client_pubkey.decode('latin-1')))
# Match decryped string with client public key # Match decryped string with client public key
if enc_client_pubkey != self.client_public_key: if enc_client_pubkey != self.client_public_key:
print('Mismatch in device verify') raise RuntimeError('Failed to verify device!')
return -2
else: else:
print('Unsupported security protocol') raise RuntimeError('Unsupported security protocol')
return -1
def encrypt_data(self, data): def encrypt_data(self, data):
return self.cipher.update(tobytes(data)) return self.cipher.update(data)
def decrypt_data(self, data): def decrypt_data(self, data):
return self.cipher.update(tobytes(data)) return self.cipher.update(data)

View File

@ -9,10 +9,10 @@ from typing import Any, Type
import proto import proto
from cryptography.hazmat.primitives.ciphers.aead import AESGCM from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from future.utils import tobytes from utils import long_to_bytes, str_to_bytes
from .security import Security from .security import Security
from .srp6a import Srp6a, bytes_to_long, generate_salt_and_verifier, long_to_bytes from .srp6a import Srp6a, generate_salt_and_verifier
AES_KEY_LEN = 256 // 8 AES_KEY_LEN = 256 // 8
@ -70,7 +70,7 @@ class Security2(Security):
self.setup1_response(response_data) self.setup1_response(response_data)
return None return None
print('Unexpected state') print('---- Unexpected state! ----')
return None return None
def _print_verbose(self, data: str) -> None: def _print_verbose(self, data: str) -> None:
@ -83,34 +83,30 @@ class Security2(Security):
setup_req.sec_ver = proto.session_pb2.SecScheme2 setup_req.sec_ver = proto.session_pb2.SecScheme2
setup_req.sec2.msg = proto.sec2_pb2.S2Session_Command0 setup_req.sec2.msg = proto.sec2_pb2.S2Session_Command0
setup_req.sec2.sc0.client_username = tobytes(self.username) setup_req.sec2.sc0.client_username = str_to_bytes(self.username)
self.srp6a_ctx = Srp6a(self.username, self.password) self.srp6a_ctx = Srp6a(self.username, self.password)
if self.srp6a_ctx is None: if self.srp6a_ctx is None:
print('Failed to initialize SRP6a instance!') raise RuntimeError('Failed to initialize SRP6a instance!')
exit(1)
client_pubkey = long_to_bytes(self.srp6a_ctx.A) client_pubkey = long_to_bytes(self.srp6a_ctx.A)
setup_req.sec2.sc0.client_pubkey = client_pubkey setup_req.sec2.sc0.client_pubkey = client_pubkey
self._print_verbose('Client Public Key:\t' + hex(bytes_to_long(client_pubkey))) self._print_verbose(f'Client Public Key:\t0x{client_pubkey.hex()}')
return setup_req.SerializeToString().decode('latin-1') return setup_req.SerializeToString().decode('latin-1')
def setup0_response(self, response_data: bytes) -> None: def setup0_response(self, response_data: bytes) -> None:
# Interpret SessionResp0 response packet # Interpret SessionResp0 response packet
setup_resp = proto.session_pb2.SessionData() setup_resp = proto.session_pb2.SessionData()
setup_resp.ParseFromString(tobytes(response_data)) setup_resp.ParseFromString(str_to_bytes(response_data))
self._print_verbose('Security version:\t' + str(setup_resp.sec_ver)) self._print_verbose(f'Security version:\t{str(setup_resp.sec_ver)}')
if setup_resp.sec_ver != proto.session_pb2.SecScheme2: if setup_resp.sec_ver != proto.session_pb2.SecScheme2:
print('Incorrect sec scheme') raise RuntimeError('Incorrect security scheme')
exit(1)
# Device public key, random salt and password verifier # Device public key, random salt and password verifier
device_pubkey = setup_resp.sec2.sr0.device_pubkey device_pubkey = setup_resp.sec2.sr0.device_pubkey
device_salt = setup_resp.sec2.sr0.device_salt device_salt = setup_resp.sec2.sr0.device_salt
self._print_verbose('Device Public Key:\t' + hex(bytes_to_long(device_pubkey))) self._print_verbose(f'Device Public Key:\t0x{device_pubkey.hex()}')
self._print_verbose('Device Salt:\t' + hex(bytes_to_long(device_salt)))
self.client_pop_key = self.srp6a_ctx.process_challenge(device_salt, device_pubkey) self.client_pop_key = self.srp6a_ctx.process_challenge(device_salt, device_pubkey)
def setup1_request(self) -> Any: def setup1_request(self) -> Any:
@ -120,7 +116,10 @@ class Security2(Security):
setup_req.sec2.msg = proto.sec2_pb2.S2Session_Command1 setup_req.sec2.msg = proto.sec2_pb2.S2Session_Command1
# Encrypt device public key and attach to the request packet # Encrypt device public key and attach to the request packet
self._print_verbose('Client Proof:\t' + hex(bytes_to_long(self.client_pop_key))) if self.client_pop_key is None:
raise RuntimeError('Failed to generate client proof!')
self._print_verbose(f'Client Proof:\t0x{self.client_pop_key.hex()}')
setup_req.sec2.sc1.client_proof = self.client_pop_key setup_req.sec2.sc1.client_proof = self.client_pop_key
return setup_req.SerializeToString().decode('latin-1') return setup_req.SerializeToString().decode('latin-1')
@ -128,37 +127,36 @@ class Security2(Security):
def setup1_response(self, response_data: bytes) -> Any: def setup1_response(self, response_data: bytes) -> Any:
# Interpret SessionResp1 response packet # Interpret SessionResp1 response packet
setup_resp = proto.session_pb2.SessionData() setup_resp = proto.session_pb2.SessionData()
setup_resp.ParseFromString(tobytes(response_data)) setup_resp.ParseFromString(str_to_bytes(response_data))
# Ensure security scheme matches # Ensure security scheme matches
if setup_resp.sec_ver == proto.session_pb2.SecScheme2: if setup_resp.sec_ver == proto.session_pb2.SecScheme2:
# Read encrypyed device proof string # Read encrypyed device proof string
device_proof = setup_resp.sec2.sr1.device_proof device_proof = setup_resp.sec2.sr1.device_proof
self._print_verbose('Device Proof:\t' + hex(bytes_to_long(device_proof))) self._print_verbose(f'Device Proof:\t0x{device_proof.hex()}')
self.srp6a_ctx.verify_session(device_proof) self.srp6a_ctx.verify_session(device_proof)
if not self.srp6a_ctx.authenticated(): if not self.srp6a_ctx.authenticated():
print('Failed to verify device proof') raise RuntimeError('Failed to verify device proof')
exit(1)
else: else:
print('Unsupported security protocol') raise RuntimeError('Unsupported security protocol')
exit(1)
# Getting the shared secret # Getting the shared secret
shared_secret = self.srp6a_ctx.get_session_key() shared_secret = self.srp6a_ctx.get_session_key()
self._print_verbose('Shared Secret:\t' + hex(bytes_to_long(shared_secret))) self._print_verbose(f'Shared Secret:\t0x{shared_secret.hex()}')
# Using the first 256 bits of a 512 bit key # Using the first 256 bits of a 512 bit key
session_key = shared_secret[:AES_KEY_LEN] session_key = shared_secret[:AES_KEY_LEN]
self._print_verbose('Session Key:\t' + hex(bytes_to_long(session_key))) self._print_verbose(f'Session Key:\t0x{session_key.hex()}')
# 96-bit nonce # 96-bit nonce
self.nonce = setup_resp.sec2.sr1.device_nonce self.nonce = setup_resp.sec2.sr1.device_nonce
self._print_verbose('Nonce:\t' + hex(bytes_to_long(self.nonce))) if self.nonce is None:
raise RuntimeError('Received invalid nonce from device!')
self._print_verbose(f'Nonce:\t0x{self.nonce.hex()}')
# Initialize the encryption engine with Shared Key and initialization vector # Initialize the encryption engine with Shared Key and initialization vector
self.cipher = AESGCM(session_key) self.cipher = AESGCM(session_key)
if self.cipher is None: if self.cipher is None:
print('Failed to initialize AES-GCM cryptographic engine!') raise RuntimeError('Failed to initialize AES-GCM cryptographic engine!')
exit(1)
def encrypt_data(self, data: bytes) -> Any: def encrypt_data(self, data: bytes) -> Any:
return self.cipher.encrypt(self.nonce, data, None) return self.cipher.encrypt(self.nonce, data, None)

View File

@ -1,5 +1,6 @@
# SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD # SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0 # SPDX-License-Identifier: Apache-2.0
#
# N A large safe prime (N = 2q+1, where q is prime) [All arithmetic is done modulo N] # N A large safe prime (N = 2q+1, where q is prime) [All arithmetic is done modulo N]
# g A generator modulo N # g A generator modulo N
@ -19,6 +20,8 @@ import hashlib
import os import os
from typing import Any, Callable, Optional, Tuple from typing import Any, Callable, Optional, Tuple
from utils import bytes_to_long, long_to_bytes
SHA1 = 0 SHA1 = 0
SHA224 = 1 SHA224 = 1
SHA256 = 2 SHA256 = 2
@ -143,21 +146,11 @@ def get_ng(ng_type: int) -> Tuple[int, int]:
return int(n_hex, 16), int(g_hex, 16) return int(n_hex, 16), int(g_hex, 16)
def bytes_to_long(s: bytes) -> int: def get_random(nbytes: int) -> Any:
return int.from_bytes(s, 'big')
def long_to_bytes(n: int) -> bytes:
if n == 0:
return b'\x00'
return n.to_bytes((n.bit_length() + 7) // 8, 'big')
def get_random(nbytes: int) -> int:
return bytes_to_long(os.urandom(nbytes)) return bytes_to_long(os.urandom(nbytes))
def get_random_of_length(nbytes: int) -> int: def get_random_of_length(nbytes: int) -> Any:
offset = (nbytes * 8) - 1 offset = (nbytes * 8) - 1
return get_random(nbytes) | (1 << offset) return get_random(nbytes) | (1 << offset)
@ -255,7 +248,7 @@ class Srp6a (object):
def get_username(self) -> str: def get_username(self) -> str:
return self.Iu return self.Iu
def get_ephemeral_secret(self) -> bytes: def get_ephemeral_secret(self) -> Any:
return long_to_bytes(self.a) return long_to_bytes(self.a)
def get_session_key(self) -> Any: def get_session_key(self) -> Any:

View File

@ -1,16 +1,5 @@
# Copyright 2018 Espressif Systems (Shanghai) PTE LTD # SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD
# # SPDX-License-Identifier: Apache-2.0
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# #
from .transport_ble import * # noqa: F403, F401 from .transport_ble import * # noqa: F403, F401

View File

@ -1,345 +1,173 @@
# Copyright 2018 Espressif Systems (Shanghai) PTE LTD # SPDX-FileCopyrightText: 2018-2022 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import print_function
import platform import platform
from builtins import input
import utils from utils import hex_str_to_bytes, str_to_bytes
from future.utils import iteritems
fallback = True fallback = True
# Check if platform is Linux and required packages are installed # Check if required packages are installed
# else fallback to console mode # else fallback to console mode
if platform.system() == 'Linux': try:
try: import bleak
import time fallback = False
except ImportError:
import dbus pass
import dbus.mainloop.glib
fallback = False
except ImportError:
pass
# -------------------------------------------------------------------- # --------------------------------------------------------------------
def device_sort(device):
return device.address
# BLE client (Linux Only) using Bluez and DBus
class BLE_Bluez_Client: class BLE_Bleak_Client:
def __init__(self): def __init__(self):
self.adapter = None
self.adapter_props = None self.adapter_props = None
self.characteristics = dict()
self.chrc_names = None
self.device = None
self.devname = None
self.iface = None
self.nu_lookup = None
self.services = None
self.srv_uuid_adv = None
self.srv_uuid_fallback = None
def connect(self, devname, iface, chrc_names, fallback_srv_uuid): async def connect(self, devname, iface, chrc_names, fallback_srv_uuid):
self.devname = devname self.devname = devname
self.srv_uuid_fallback = fallback_srv_uuid self.srv_uuid_fallback = fallback_srv_uuid
self.chrc_names = [name.lower() for name in chrc_names] self.chrc_names = [name.lower() for name in chrc_names]
self.device = None self.iface = iface
self.adapter = None
self.services = None
self.nu_lookup = None
self.characteristics = dict()
self.srv_uuid_adv = None
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()
manager = dbus.Interface(bus.get_object('org.bluez', '/'), 'org.freedesktop.DBus.ObjectManager')
objects = manager.GetManagedObjects()
adapter_path = None
for path, interfaces in iteritems(objects):
adapter = interfaces.get('org.bluez.Adapter1')
if adapter is not None:
if path.endswith(iface):
self.adapter = dbus.Interface(bus.get_object('org.bluez', path), 'org.bluez.Adapter1')
self.adapter_props = dbus.Interface(bus.get_object('org.bluez', path), 'org.freedesktop.DBus.Properties')
adapter_path = path
break
if self.adapter is None:
raise RuntimeError('Bluetooth adapter not found')
# Power on bluetooth adapter
self.adapter_props.Set('org.bluez.Adapter1', 'Powered', dbus.Boolean(1))
print('checking if adapter is powered on')
for cnt in range(10, 0, -1):
time.sleep(5)
powered_on = self.adapter_props.Get('org.bluez.Adapter1', 'Powered')
if powered_on == 1:
# Set adapter props again with powered on value
self.adapter_props = dbus.Interface(bus.get_object('org.bluez', adapter_path), 'org.freedesktop.DBus.Properties')
print('bluetooth adapter powered on')
break
print('number of retries left({})'.format(cnt - 1))
if powered_on == 0:
raise RuntimeError('Failed to starte bluetooth adapter')
# Start discovery if not already discovering
started_discovery = 0
discovery_val = self.adapter_props.Get('org.bluez.Adapter1', 'Discovering')
if discovery_val == 0:
print('starting discovery')
self.adapter.StartDiscovery()
# Set as start discovery is called
started_discovery = 1
for cnt in range(10, 0, -1):
time.sleep(5)
discovery_val = self.adapter_props.Get('org.bluez.Adapter1', 'Discovering')
if discovery_val == 1:
print('start discovery successful')
break
print('number of retries left ({})'.format(cnt - 1))
if discovery_val == 0:
print('start discovery failed')
raise RuntimeError('Failed to start discovery')
retry = 10
while (retry > 0):
try:
if self.device is None:
print('Connecting...')
# Wait for device to be discovered
time.sleep(5)
connected = self._connect_()
if connected:
print('Connected')
else:
return False
print('Getting Services...')
# Wait for services to be discovered
time.sleep(5)
self._get_services_()
return True
except Exception as e:
print(e)
retry -= 1
print('Retries left', retry)
continue
# Call StopDiscovery() for corresponding StartDiscovery() session
if started_discovery == 1:
print('stopping discovery')
self.adapter.StopDiscovery()
for cnt in range(10, 0, -1):
time.sleep(5)
discovery_val = self.adapter_props.Get('org.bluez.Adapter1', 'Discovering')
if discovery_val == 0:
print('stop discovery successful')
break
print('number of retries left ({})'.format(cnt - 1))
if discovery_val == 1:
print('stop discovery failed')
return False
def _connect_(self):
bus = dbus.SystemBus()
manager = dbus.Interface(bus.get_object('org.bluez', '/'), 'org.freedesktop.DBus.ObjectManager')
objects = manager.GetManagedObjects()
dev_path = None
for path, interfaces in iteritems(objects):
if 'org.bluez.Device1' not in interfaces:
continue
if interfaces['org.bluez.Device1'].get('Name') == self.devname:
dev_path = path
break
if dev_path is None:
raise RuntimeError('BLE device not found')
print('Discovering...')
try: try:
self.device = bus.get_object('org.bluez', dev_path) devices = await bleak.discover()
try: except bleak.exc.BleakDBusError as e:
uuids = self.device.Get('org.bluez.Device1', 'UUIDs', if str(e) == '[org.bluez.Error.NotReady] Resource Not Ready':
dbus_interface='org.freedesktop.DBus.Properties') raise RuntimeError('Bluetooth is not ready. Maybe try `bluetoothctl power on`?')
# There should be 1 service UUID in advertising data raise
# If bluez had cached an old version of the advertisement data
# the list of uuids may be incorrect, in which case connection
# or service discovery may fail the first time. If that happens
# the cache will be refreshed before next retry
if len(uuids) == 1:
self.srv_uuid_adv = uuids[0]
except dbus.exceptions.DBusException as e:
raise RuntimeError(e)
self.device.Connect(dbus_interface='org.bluez.Device1') found_device = None
# Check device is connected successfully
for cnt in range(10, 0, -1):
time.sleep(5)
device_conn = self.device.Get(
'org.bluez.Device1',
'Connected',
dbus_interface='org.freedesktop.DBus.Properties')
if device_conn == 1:
print('device is connected')
break
print('number of retries left ({})'.format(cnt - 1))
if device_conn == 0:
print('failed to connect device')
return False
return True if self.devname is None:
if len(devices) == 0:
print('No devices found!')
exit(1)
except Exception as e: while True:
print(e) devices.sort(key=device_sort)
self.device = None print('==== BLE Discovery results ====')
raise RuntimeError('BLE device could not connect') print('{0: >4} {1: <33} {2: <12}'.format(
'S.N.', 'Name', 'Address'))
for i, _ in enumerate(devices):
print('[{0: >2}] {1: <33} {2: <12}'.format(i + 1, devices[i].name or 'Unknown', devices[i].address))
def _get_services_(self): while True:
bus = dbus.SystemBus()
manager = dbus.Interface(bus.get_object('org.bluez', '/'), 'org.freedesktop.DBus.ObjectManager')
objects = manager.GetManagedObjects()
service_found = False
for srv_path, srv_interfaces in iteritems(objects):
if 'org.bluez.GattService1' not in srv_interfaces:
continue
if not srv_path.startswith(self.device.object_path):
continue
service = bus.get_object('org.bluez', srv_path)
srv_uuid = service.Get('org.bluez.GattService1', 'UUID',
dbus_interface='org.freedesktop.DBus.Properties')
# If service UUID doesn't match the one found in advertisement data
# then also check if it matches the fallback UUID
if srv_uuid not in [self.srv_uuid_adv, self.srv_uuid_fallback]:
continue
nu_lookup = dict()
characteristics = dict()
for chrc_path, chrc_interfaces in iteritems(objects):
if 'org.bluez.GattCharacteristic1' not in chrc_interfaces:
continue
if not chrc_path.startswith(service.object_path):
continue
chrc = bus.get_object('org.bluez', chrc_path)
uuid = chrc.Get('org.bluez.GattCharacteristic1', 'UUID',
dbus_interface='org.freedesktop.DBus.Properties')
characteristics[uuid] = chrc
for desc_path, desc_interfaces in iteritems(objects):
if 'org.bluez.GattDescriptor1' not in desc_interfaces:
continue
if not desc_path.startswith(chrc.object_path):
continue
desc = bus.get_object('org.bluez', desc_path)
desc_uuid = desc.Get('org.bluez.GattDescriptor1', 'UUID',
dbus_interface='org.freedesktop.DBus.Properties')
if desc_uuid[4:8] != '2901':
continue
try: try:
readval = desc.ReadValue({}, dbus_interface='org.bluez.GattDescriptor1') select = int(input('Select device by number (0 to rescan) : '))
except dbus.exceptions.DBusException as err: if select < 0 or select > len(devices):
raise RuntimeError('Failed to read value for descriptor while getting services - {}'.format(err)) raise ValueError
found_name = ''.join(chr(b) for b in readval).lower() break
nu_lookup[found_name] = uuid except ValueError:
print('Invalid input! Retry')
if select != 0:
break break
match_found = True devices = await bleak.discover()
for name in self.chrc_names:
if name not in nu_lookup:
# Endpoint name not present
match_found = False
break
# Create lookup table only if all endpoint names found self.devname = devices[select - 1].name
self.nu_lookup = [None, nu_lookup][match_found] found_device = devices[select - 1]
self.characteristics = characteristics else:
service_found = True for d in devices:
if d.name == self.devname:
found_device = d
# If the service UUID matches that in the advertisement if not found_device:
# we can stop the search now. If it doesn't match, we raise RuntimeError('Device not found')
# have found the service corresponding to the fallback
# UUID, in which case don't break and keep searching uuids = found_device.metadata['uuids']
# for the advertised service # There should be 1 service UUID in advertising data
if srv_uuid == self.srv_uuid_adv: # If bluez had cached an old version of the advertisement data
# the list of uuids may be incorrect, in which case connection
# or service discovery may fail the first time. If that happens
# the cache will be refreshed before next retry
if len(uuids) == 1:
self.srv_uuid_adv = uuids[0]
print('Connecting...')
self.device = bleak.BleakClient(found_device.address)
await self.device.connect()
# must be paired on Windows to access characteristics;
# cannot be paired on Mac
if platform.system() == 'Windows':
await self.device.pair()
print('Getting Services...')
services = await self.device.get_services()
service = services[self.srv_uuid_adv] or services[self.srv_uuid_fallback]
if not service:
await self.device.disconnect()
self.device = None
raise RuntimeError('Provisioning service not found')
nu_lookup = dict()
for characteristic in service.characteristics:
for descriptor in characteristic.descriptors:
if descriptor.uuid[4:8] != '2901':
continue
readval = await self.device.read_gatt_descriptor(descriptor.handle)
found_name = ''.join(chr(b) for b in readval).lower()
nu_lookup[found_name] = characteristic.uuid
self.characteristics[characteristic.uuid] = characteristic
match_found = True
for name in self.chrc_names:
if name not in nu_lookup:
# Endpoint name not present
match_found = False
break break
if not service_found: # Create lookup table only if all endpoint names found
self.device.Disconnect(dbus_interface='org.bluez.Device1') self.nu_lookup = [None, nu_lookup][match_found]
# Check if device is disconnected successfully
self._check_device_disconnected() return True
if self.adapter:
self.adapter.RemoveDevice(self.device)
self.device = None
self.nu_lookup = None
self.characteristics = dict()
raise RuntimeError('Provisioning service not found')
def get_nu_lookup(self): def get_nu_lookup(self):
return self.nu_lookup return self.nu_lookup
def has_characteristic(self, uuid): def has_characteristic(self, uuid):
print('checking for characteristic ' + uuid)
if uuid in self.characteristics: if uuid in self.characteristics:
return True return True
return False return False
def disconnect(self): async def disconnect(self):
if self.device: if self.device:
self.device.Disconnect(dbus_interface='org.bluez.Device1') print('Disconnecting...')
# Check if device is disconnected successfully if platform.system() == 'Windows':
self._check_device_disconnected() await self.device.unpair()
if self.adapter: await self.device.disconnect()
self.adapter.RemoveDevice(self.device)
self.device = None self.device = None
self.nu_lookup = None self.nu_lookup = None
self.characteristics = dict() self.characteristics = dict()
if self.adapter_props:
self.adapter_props.Set('org.bluez.Adapter1', 'Powered', dbus.Boolean(0))
def _check_device_disconnected(self): async def send_data(self, characteristic_uuid, data):
for cnt in range(10, 0, -1): await self.device.write_gatt_char(characteristic_uuid, bytearray(data.encode('latin-1')), True)
time.sleep(5) readval = await self.device.read_gatt_char(characteristic_uuid)
device_conn = self.device.Get(
'org.bluez.Device1',
'Connected',
dbus_interface='org.freedesktop.DBus.Properties')
if device_conn == 0:
print('device disconnected')
break
print('number of retries left ({})'.format(cnt - 1))
if device_conn == 1:
print('failed to disconnect device')
def send_data(self, characteristic_uuid, data):
try:
path = self.characteristics[characteristic_uuid]
except KeyError:
raise RuntimeError('Invalid characteristic : ' + characteristic_uuid)
try:
path.WriteValue([ord(c) for c in data], {}, dbus_interface='org.bluez.GattCharacteristic1')
except TypeError: # python3 compatible
path.WriteValue([c for c in data], {}, dbus_interface='org.bluez.GattCharacteristic1')
except dbus.exceptions.DBusException as e:
raise RuntimeError('Failed to write value to characteristic ' + characteristic_uuid + ': ' + str(e))
try:
readval = path.ReadValue({}, dbus_interface='org.bluez.GattCharacteristic1')
except dbus.exceptions.DBusException as e:
raise RuntimeError('Failed to read value from characteristic ' + characteristic_uuid + ': ' + str(e))
return ''.join(chr(b) for b in readval) return ''.join(chr(b) for b in readval)
# -------------------------------------------------------------------- # --------------------------------------------------------------------
# Console based BLE client for Cross Platform support # Console based BLE client for Cross Platform support
class BLE_Console_Client: class BLE_Console_Client:
def connect(self, devname, iface, chrc_names, fallback_srv_uuid): async def connect(self, devname, iface, chrc_names, fallback_srv_uuid):
print('BLE client is running in console mode') print('BLE client is running in console mode')
print('\tThis could be due to your platform not being supported or dependencies not being met') print('\tThis could be due to your platform not being supported or dependencies not being met')
print('\tPlease ensure all pre-requisites are met to run the full fledged client') print('\tPlease ensure all pre-requisites are met to run the full fledged client')
@ -362,15 +190,15 @@ class BLE_Console_Client:
return False return False
return True return True
def disconnect(self): async def disconnect(self):
pass pass
def send_data(self, characteristic_uuid, data): async def send_data(self, characteristic_uuid, data):
print("BLECLI >> Write following data to characteristic with UUID '" + characteristic_uuid + "' :") print("BLECLI >> Write following data to characteristic with UUID '" + characteristic_uuid + "' :")
print('\t>> ' + utils.str_to_hexstr(data)) print('\t>> ' + str_to_bytes(data).hex())
print('BLECLI >> Enter data read from characteristic (in hex) :') print('BLECLI >> Enter data read from characteristic (in hex) :')
resp = input('\t<< ') resp = input('\t<< ')
return utils.hexstr_to_str(resp) return hex_str_to_bytes(resp)
# -------------------------------------------------------------------- # --------------------------------------------------------------------
@ -380,4 +208,4 @@ class BLE_Console_Client:
def get_client(): def get_client():
if fallback: if fallback:
return BLE_Console_Client() return BLE_Console_Client()
return BLE_Bluez_Client() return BLE_Bleak_Client()

View File

@ -1,16 +1,5 @@
# Copyright 2018 Espressif Systems (Shanghai) PTE LTD # SPDX-FileCopyrightText: 2018-2022 Espressif Systems (Shanghai) CO LTD
# # SPDX-License-Identifier: Apache-2.0
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# #
# Base class for protocomm transport # Base class for protocomm transport
@ -27,3 +16,6 @@ class Transport():
@abc.abstractmethod @abc.abstractmethod
def send_config_data(self, data): def send_config_data(self, data):
pass pass
async def disconnect(self):
pass

View File

@ -1,16 +1,5 @@
# Copyright 2018 Espressif Systems (Shanghai) PTE LTD # SPDX-FileCopyrightText: 2018-2022 Espressif Systems (Shanghai) CO LTD
# # SPDX-License-Identifier: Apache-2.0
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# #
from __future__ import print_function from __future__ import print_function
@ -20,7 +9,10 @@ from .transport import Transport
class Transport_BLE(Transport): class Transport_BLE(Transport):
def __init__(self, devname, service_uuid, nu_lookup): def __init__(self, service_uuid, nu_lookup):
self.nu_lookup = nu_lookup
self.service_uuid = service_uuid
self.name_uuid_lookup = None
# Expect service UUID like '0000ffff-0000-1000-8000-00805f9b34fb' # Expect service UUID like '0000ffff-0000-1000-8000-00805f9b34fb'
for name in nu_lookup.keys(): for name in nu_lookup.keys():
# Calculate characteristic UUID for each endpoint # Calculate characteristic UUID for each endpoint
@ -30,10 +22,11 @@ class Transport_BLE(Transport):
# Get BLE client module # Get BLE client module
self.cli = ble_cli.get_client() self.cli = ble_cli.get_client()
async def connect(self, devname):
# Use client to connect to BLE device and bind to service # Use client to connect to BLE device and bind to service
if not self.cli.connect(devname=devname, iface='hci0', if not await self.cli.connect(devname=devname, iface='hci0',
chrc_names=nu_lookup.keys(), chrc_names=self.nu_lookup.keys(),
fallback_srv_uuid=service_uuid): fallback_srv_uuid=self.service_uuid):
raise RuntimeError('Failed to initialize transport') raise RuntimeError('Failed to initialize transport')
# Irrespective of provided parameters, let the client # Irrespective of provided parameters, let the client
@ -43,24 +36,17 @@ class Transport_BLE(Transport):
# If that doesn't work, use the lookup table provided as parameter # If that doesn't work, use the lookup table provided as parameter
if self.name_uuid_lookup is None: if self.name_uuid_lookup is None:
self.name_uuid_lookup = nu_lookup self.name_uuid_lookup = self.nu_lookup
# Check if expected characteristics are provided by the service # Check if expected characteristics are provided by the service
for name in self.name_uuid_lookup.keys(): for name in self.name_uuid_lookup.keys():
if not self.cli.has_characteristic(self.name_uuid_lookup[name]): if not self.cli.has_characteristic(self.name_uuid_lookup[name]):
raise RuntimeError("'" + name + "' endpoint not found") raise RuntimeError(f"'{name}' endpoint not found")
def __del__(self): async def disconnect(self):
# Make sure device is disconnected before application gets closed await self.cli.disconnect()
try:
self.disconnect()
except Exception:
pass
def disconnect(self): async def send_data(self, ep_name, data):
self.cli.disconnect()
def send_data(self, ep_name, data):
# Write (and read) data to characteristic corresponding to the endpoint # Write (and read) data to characteristic corresponding to the endpoint
if ep_name not in self.name_uuid_lookup.keys(): if ep_name not in self.name_uuid_lookup.keys():
raise RuntimeError('Invalid endpoint : ' + ep_name) raise RuntimeError(f'Invalid endpoint: {ep_name}')
return self.cli.send_data(self.name_uuid_lookup[ep_name], data) return await self.cli.send_data(self.name_uuid_lookup[ep_name], data)

View File

@ -1,34 +1,19 @@
# Copyright 2018 Espressif Systems (Shanghai) PTE LTD # SPDX-FileCopyrightText: 2018-2022 Espressif Systems (Shanghai) CO LTD
# # SPDX-License-Identifier: Apache-2.0
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# #
from __future__ import print_function from utils import hex_str_to_bytes, str_to_bytes
from builtins import input
import utils
from .transport import Transport from .transport import Transport
class Transport_Console(Transport): class Transport_Console(Transport):
def send_data(self, path, data, session_id=0): async def send_data(self, path, data, session_id=0):
print('Client->Device msg :', path, session_id, utils.str_to_hexstr(data)) print('Client->Device msg :', path, session_id, str_to_bytes(data).hex())
try: try:
resp = input('Enter device->client msg : ') resp = input('Enter device->client msg : ')
except Exception as err: except Exception as err:
print('error:', err) print('error:', err)
return None return None
return utils.hexstr_to_str(resp) return hex_str_to_bytes(resp)

View File

@ -1,16 +1,5 @@
# Copyright 2018 Espressif Systems (Shanghai) PTE LTD # SPDX-FileCopyrightText: 2018-2022 Espressif Systems (Shanghai) CO LTD
# # SPDX-License-Identifier: Apache-2.0
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# #
from __future__ import print_function from __future__ import print_function
@ -23,7 +12,7 @@ try:
from http.client import HTTPConnection, HTTPSConnection from http.client import HTTPConnection, HTTPSConnection
except ImportError: except ImportError:
# Python 2 fallback # Python 2 fallback
from httplib import HTTPConnection, HTTPSConnection from httplib import HTTPConnection, HTTPSConnection # type: ignore
from .transport import Transport from .transport import Transport
@ -33,14 +22,14 @@ class Transport_HTTP(Transport):
try: try:
socket.gethostbyname(hostname.split(':')[0]) socket.gethostbyname(hostname.split(':')[0])
except socket.gaierror: except socket.gaierror:
raise RuntimeError('Unable to resolve hostname :' + hostname) raise RuntimeError(f'Unable to resolve hostname: {hostname}')
if ssl_context is None: if ssl_context is None:
self.conn = HTTPConnection(hostname, timeout=60) self.conn = HTTPConnection(hostname, timeout=60)
else: else:
self.conn = HTTPSConnection(hostname, context=ssl_context, timeout=60) self.conn = HTTPSConnection(hostname, context=ssl_context, timeout=60)
try: try:
print('Connecting to ' + hostname) print(f'++++ Connecting to {hostname}++++')
self.conn.connect() self.conn.connect()
except Exception as err: except Exception as err:
raise RuntimeError('Connection Failure : ' + str(err)) raise RuntimeError('Connection Failure : ' + str(err))
@ -56,5 +45,5 @@ class Transport_HTTP(Transport):
raise RuntimeError('Connection Failure : ' + str(err)) raise RuntimeError('Connection Failure : ' + str(err))
raise RuntimeError('Server responded with error code ' + str(response.status)) raise RuntimeError('Server responded with error code ' + str(response.status))
def send_data(self, ep_name, data): async def send_data(self, ep_name, data):
return self._send_post_request('/' + ep_name, data) return self._send_post_request('/' + ep_name, data)

View File

@ -1,16 +1,5 @@
# Copyright 2018 Espressif Systems (Shanghai) PTE LTD # SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD
# # SPDX-License-Identifier: Apache-2.0
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# #
from .convenience import * # noqa: F403, F401 from .convenience import * # noqa: F403, F401

View File

@ -3,21 +3,22 @@
# #
# Convenience functions for commonly used data type conversions # Convenience functions for commonly used data type conversions
import binascii
from future.utils import tobytes def bytes_to_long(s: bytes) -> int:
return int.from_bytes(s, 'big')
def str_to_hexstr(string): def long_to_bytes(n: int) -> bytes:
# Form hexstr by appending ASCII codes (in hex) corresponding to if n == 0:
# each character in the input string return b'\x00'
return binascii.hexlify(tobytes(string)).decode('latin-1') return n.to_bytes((n.bit_length() + 7) // 8, 'big')
def hexstr_to_str(hexstr): # 'deadbeef' -> b'deadbeef'
# Prepend 0 (if needed) to make the hexstr length an even number def str_to_bytes(s: str) -> bytes:
if len(hexstr) % 2 == 1: return bytes(s, encoding='latin-1')
hexstr = '0' + hexstr
# Interpret consecutive pairs of hex characters as 8 bit ASCII codes
# and append characters corresponding to each code to form the string # 'deadbeef' -> b'\xde\xad\xbe\xef'
return binascii.unhexlify(tobytes(hexstr)).decode('latin-1') def hex_str_to_bytes(s: str) -> bytes:
return bytes.fromhex(s)