diff --git a/tools/host_tests/__init__.py b/tools/host_tests/__init__.py deleted file mode 100644 index 491f1317ade..00000000000 --- a/tools/host_tests/__init__.py +++ /dev/null @@ -1,68 +0,0 @@ -""" -mbed SDK -Copyright (c) 2011-2013 ARM Limited -SPDX-License-Identifier: Apache-2.0 - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" - -from .host_registry import HostRegistry - -# Host test supervisors -from .echo import EchoTest -from .rtc_auto import RTCTest -from .stdio_auto import StdioTest -from .hello_auto import HelloTest -from .detect_auto import DetectPlatformTest -from .default_auto import DefaultAuto -from .dev_null_auto import DevNullTest -from .wait_us_auto import WaitusTest -from .tcpecho_server_auto import TCPEchoServerTest -from .udpecho_server_auto import UDPEchoServerTest -from .tcpecho_client_auto import TCPEchoClientTest -from .udpecho_client_auto import UDPEchoClientTest -from .wfi_auto import WFITest -from .serial_nc_rx_auto import SerialNCRXTest -from .serial_nc_tx_auto import SerialNCTXTest -from .serial_complete_auto import SerialCompleteTest - -# Populate registry with supervising objects -HOSTREGISTRY = HostRegistry() -HOSTREGISTRY.register_host_test("echo", EchoTest()) -HOSTREGISTRY.register_host_test("default", DefaultAuto()) -HOSTREGISTRY.register_host_test("rtc_auto", RTCTest()) -HOSTREGISTRY.register_host_test("hello_auto", HelloTest()) -HOSTREGISTRY.register_host_test("stdio_auto", StdioTest()) -HOSTREGISTRY.register_host_test("detect_auto", DetectPlatformTest()) -HOSTREGISTRY.register_host_test("default_auto", DefaultAuto()) -HOSTREGISTRY.register_host_test("wait_us_auto", WaitusTest()) -HOSTREGISTRY.register_host_test("dev_null_auto", DevNullTest()) -HOSTREGISTRY.register_host_test("tcpecho_server_auto", TCPEchoServerTest()) -HOSTREGISTRY.register_host_test("udpecho_server_auto", UDPEchoServerTest()) -HOSTREGISTRY.register_host_test("tcpecho_client_auto", TCPEchoClientTest()) -HOSTREGISTRY.register_host_test("udpecho_client_auto", UDPEchoClientTest()) -HOSTREGISTRY.register_host_test("wfi_auto", WFITest()) -HOSTREGISTRY.register_host_test("serial_nc_rx_auto", SerialNCRXTest()) -HOSTREGISTRY.register_host_test("serial_nc_tx_auto", SerialNCTXTest()) -HOSTREGISTRY.register_host_test("serial_complete_auto", SerialCompleteTest()) - -############################################################################### -# Functional interface for test supervisor registry -############################################################################### - - -def get_host_test(ht_name): - return HOSTREGISTRY.get_host_test(ht_name) - -def is_host_test(ht_name): - return HOSTREGISTRY.is_host_test(ht_name) diff --git a/tools/host_tests/default_auto.py b/tools/host_tests/default_auto.py deleted file mode 100644 index 13cc066d253..00000000000 --- a/tools/host_tests/default_auto.py +++ /dev/null @@ -1,38 +0,0 @@ -""" -mbed SDK -Copyright (c) 2011-2013 ARM Limited -SPDX-License-Identifier: Apache-2.0 - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" -from __future__ import print_function - -from sys import stdout - -class DefaultAuto(object): - """ Simple, basic host test's test runner waiting for serial port - output from MUT, no supervision over test running in MUT is executed. - """ - def test(self, selftest): - result = selftest.RESULT_SUCCESS - try: - while True: - c = selftest.mbed.serial_read(512) - if c is None: - return selftest.RESULT_IO_SERIAL - stdout.write(c) - stdout.flush() - except KeyboardInterrupt: - selftest.notify("\r\n[CTRL+C] exit") - result = selftest.RESULT_ERROR - return result diff --git a/tools/host_tests/detect_auto.py b/tools/host_tests/detect_auto.py deleted file mode 100644 index 3da1bb70878..00000000000 --- a/tools/host_tests/detect_auto.py +++ /dev/null @@ -1,56 +0,0 @@ -""" -mbed SDK -Copyright (c) 2011-2013 ARM Limited -SPDX-License-Identifier: Apache-2.0 - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" - -import re - -class DetectPlatformTest(object): - PATTERN_MICRO_NAME = "Target '(\w+)'" - re_detect_micro_name = re.compile(PATTERN_MICRO_NAME) - - def test(self, selftest): - result = True - - c = selftest.mbed.serial_readline() # {{start}} preamble - if c is None: - return selftest.RESULT_IO_SERIAL - - selftest.notify(c.strip()) - selftest.notify("HOST: Detecting target name...") - - c = selftest.mbed.serial_readline() - if c is None: - return selftest.RESULT_IO_SERIAL - selftest.notify(c.strip()) - - # Check for target name - m = self.re_detect_micro_name.search(c) - if m and len(m.groups()): - micro_name = m.groups()[0] - micro_cmp = selftest.mbed.options.micro == micro_name - result = result and micro_cmp - selftest.notify("HOST: MUT Target name '%s', expected '%s'... [%s]"% (micro_name, - selftest.mbed.options.micro, - "OK" if micro_cmp else "FAIL")) - - for i in range(0, 2): - c = selftest.mbed.serial_readline() - if c is None: - return selftest.RESULT_IO_SERIAL - selftest.notify(c.strip()) - - return selftest.RESULT_SUCCESS if result else selftest.RESULT_FAILURE diff --git a/tools/host_tests/dev_null_auto.py b/tools/host_tests/dev_null_auto.py deleted file mode 100644 index c7aa3406988..00000000000 --- a/tools/host_tests/dev_null_auto.py +++ /dev/null @@ -1,51 +0,0 @@ -""" -mbed SDK -Copyright (c) 2011-2013 ARM Limited -SPDX-License-Identifier: Apache-2.0 - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" - -class DevNullTest(object): - - def check_readline(self, selftest, text): - """ Reads line from serial port and checks if text was part of read string - """ - result = False - c = selftest.mbed.serial_readline() - if c and text in c: - result = True - return result - - def test(self, selftest): - result = True - # Test should print some text and later stop printing - # 'MBED: re-routing stdout to /null' - res = self.check_readline(selftest, "re-routing stdout to /null") - if not res: - # We haven't read preamble line - result = False - else: - # Check if there are printed characters - str = '' - for i in range(3): - c = selftest.mbed.serial_read(32) - if c is None: - return selftest.RESULT_IO_SERIAL - else: - str += c - if len(str) > 0: - result = False - break - selftest.notify("Received %d bytes: %s"% (len(str), str)) - return selftest.RESULT_SUCCESS if result else selftest.RESULT_FAILURE diff --git a/tools/host_tests/echo.py b/tools/host_tests/echo.py deleted file mode 100644 index 032c6835fd0..00000000000 --- a/tools/host_tests/echo.py +++ /dev/null @@ -1,60 +0,0 @@ -""" -mbed SDK -Copyright (c) 2011-2013 ARM Limited -SPDX-License-Identifier: Apache-2.0 - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" - -import sys -import uuid -from sys import stdout - -class EchoTest(object): - - # Test parameters - TEST_SERIAL_BAUDRATE = 115200 - TEST_LOOP_COUNT = 50 - - def test(self, selftest): - """ This host test will use mbed serial port with - baudrate 115200 to perform echo test on that port. - """ - # Custom initialization for echo test - selftest.mbed.init_serial_params(serial_baud=self.TEST_SERIAL_BAUDRATE) - selftest.mbed.init_serial() - - # Test function, return True or False to get standard test notification on stdout - selftest.mbed.flush() - selftest.notify("HOST: Starting the ECHO test") - result = True - - """ This ensures that there are no parasites left in the serial buffer. - """ - for i in range(0, 2): - selftest.mbed.serial_write("\n") - c = selftest.mbed.serial_readline() - - for i in range(0, self.TEST_LOOP_COUNT): - TEST_STRING = str(uuid.uuid4()) + "\n" - selftest.mbed.serial_write(TEST_STRING) - c = selftest.mbed.serial_readline() - if c is None: - return selftest.RESULT_IO_SERIAL - if c.strip() != TEST_STRING.strip(): - selftest.notify('HOST: "%s" != "%s"'% (c, TEST_STRING)) - result = False - else: - sys.stdout.write('.') - stdout.flush() - return selftest.RESULT_SUCCESS if result else selftest.RESULT_FAILURE diff --git a/tools/host_tests/echo_flow_control.py b/tools/host_tests/echo_flow_control.py deleted file mode 100644 index f370d3b6b20..00000000000 --- a/tools/host_tests/echo_flow_control.py +++ /dev/null @@ -1,49 +0,0 @@ -""" -mbed SDK -Copyright (c) 2011-2013 ARM Limited -SPDX-License-Identifier: Apache-2.0 - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" -from .host_test import Test - - -class EchoTest(Test): - def __init__(self): - Test.__init__(self) - self.mbed.init_serial() - self.mbed.extra_serial.rtscts = True - self.mbed.reset() - - def test(self): - self.mbed.flush() - self.notify("Starting the ECHO test") - TEST="longer serial test" - check = True - for i in range(1, 100): - self.mbed.extra_serial.write(TEST + "\n") - l = self.mbed.extra_serial.readline().strip() - if not l: continue - - if l != TEST: - check = False - self.notify('"%s" != "%s"' % (l, TEST)) - else: - if (i % 10) == 0: - self.notify('.') - - return check - - -if __name__ == '__main__': - EchoTest().run() diff --git a/tools/host_tests/example/BroadcastReceive.py b/tools/host_tests/example/BroadcastReceive.py deleted file mode 100644 index eafb7bc20bf..00000000000 --- a/tools/host_tests/example/BroadcastReceive.py +++ /dev/null @@ -1,26 +0,0 @@ -""" -mbed SDK -Copyright (c) 2011-2013 ARM Limited -SPDX-License-Identifier: Apache-2.0 - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" -import socket - -BROADCAST_PORT = 58083 - -s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) -s.bind(('0.0.0.0', BROADCAST_PORT)) - -while True: - print s.recvfrom(256) diff --git a/tools/host_tests/example/BroadcastSend.py b/tools/host_tests/example/BroadcastSend.py deleted file mode 100644 index 14b453298a9..00000000000 --- a/tools/host_tests/example/BroadcastSend.py +++ /dev/null @@ -1,31 +0,0 @@ -""" -mbed SDK -Copyright (c) 2011-2013 ARM Limited -SPDX-License-Identifier: Apache-2.0 - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" -import socket -from time import sleep, time - -BROADCAST_PORT = 58083 - -s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) -s.bind(('', 0)) -s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) - -while True: - print "Broadcasting..." - data = 'Hello World: ' + repr(time()) + '\n' - s.sendto(data, ('', BROADCAST_PORT)) - sleep(1) diff --git a/tools/host_tests/example/MulticastReceive.py b/tools/host_tests/example/MulticastReceive.py deleted file mode 100644 index 9087d8d7ce7..00000000000 --- a/tools/host_tests/example/MulticastReceive.py +++ /dev/null @@ -1,32 +0,0 @@ -""" -mbed SDK -Copyright (c) 2011-2013 ARM Limited -SPDX-License-Identifier: Apache-2.0 - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" -import socket -import struct - -MCAST_GRP = '224.1.1.1' -MCAST_PORT = 5007 - -sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) -sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) -sock.bind(('', MCAST_PORT)) -mreq = struct.pack("4sl", socket.inet_aton(MCAST_GRP), socket.INADDR_ANY) - -sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) - -while True: - print sock.recv(10240) diff --git a/tools/host_tests/example/MulticastSend.py b/tools/host_tests/example/MulticastSend.py deleted file mode 100644 index b004783b3d1..00000000000 --- a/tools/host_tests/example/MulticastSend.py +++ /dev/null @@ -1,31 +0,0 @@ -""" -mbed SDK -Copyright (c) 2011-2013 ARM Limited -SPDX-License-Identifier: Apache-2.0 - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" -import socket -from time import sleep, time - -MCAST_GRP = '224.1.1.1' -MCAST_PORT = 5007 - -sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) -sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2) - -while True: - print "Multicast to group: %s\n" % MCAST_GRP - data = 'Hello World: ' + repr(time()) + '\n' - sock.sendto(data, (MCAST_GRP, MCAST_PORT)) - sleep(1) diff --git a/tools/host_tests/example/TCPEchoClient.py b/tools/host_tests/example/TCPEchoClient.py deleted file mode 100644 index b3c8322737e..00000000000 --- a/tools/host_tests/example/TCPEchoClient.py +++ /dev/null @@ -1,29 +0,0 @@ -""" -mbed SDK -Copyright (c) 2011-2013 ARM Limited -SPDX-License-Identifier: Apache-2.0 - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" -import socket - -ECHO_SERVER_ADDRESS = "10.2.202.45" -ECHO_PORT = 7 - -s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) -s.connect((ECHO_SERVER_ADDRESS, ECHO_PORT)) - -s.sendall('Hello, world') -data = s.recv(1024) -s.close() -print 'Received', repr(data) diff --git a/tools/host_tests/example/TCPEchoServer.py b/tools/host_tests/example/TCPEchoServer.py deleted file mode 100644 index 8168c2323a4..00000000000 --- a/tools/host_tests/example/TCPEchoServer.py +++ /dev/null @@ -1,31 +0,0 @@ -""" -mbed SDK -Copyright (c) 2011-2013 ARM Limited -SPDX-License-Identifier: Apache-2.0 - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" -import socket - -s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) -s.bind(('', 7)) -s.listen(1) - -while True: - conn, addr = s.accept() - print 'Connected by', addr - while True: - data = conn.recv(1024) - if not data: break - conn.sendall(data) - conn.close() diff --git a/tools/host_tests/example/UDPEchoClient.py b/tools/host_tests/example/UDPEchoClient.py deleted file mode 100644 index e699f9cb24d..00000000000 --- a/tools/host_tests/example/UDPEchoClient.py +++ /dev/null @@ -1,29 +0,0 @@ -""" -mbed SDK -Copyright (c) 2011-2013 ARM Limited -SPDX-License-Identifier: Apache-2.0 - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" -import socket - -ECHO_SERVER_ADDRESS = '10.2.202.45' -ECHO_PORT = 7 - -sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - -sock.sendto("Hello World\n", (ECHO_SERVER_ADDRESS, ECHO_PORT)) -response = sock.recv(256) -sock.close() - -print response diff --git a/tools/host_tests/example/UDPEchoServer.py b/tools/host_tests/example/UDPEchoServer.py deleted file mode 100644 index 8bc297d72d3..00000000000 --- a/tools/host_tests/example/UDPEchoServer.py +++ /dev/null @@ -1,28 +0,0 @@ -""" -mbed SDK -Copyright (c) 2011-2013 ARM Limited -SPDX-License-Identifier: Apache-2.0 - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" -import socket - -ECHO_PORT = 7 - -sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) -sock.bind(('', ECHO_PORT)) - -while True: - data, address = sock.recvfrom(256) - print "datagram from", address - sock.sendto(data, address) diff --git a/tools/host_tests/example/__init__.py b/tools/host_tests/example/__init__.py deleted file mode 100644 index 019367e7f5e..00000000000 --- a/tools/host_tests/example/__init__.py +++ /dev/null @@ -1,17 +0,0 @@ -""" -mbed SDK -Copyright (c) 2011-2013 ARM Limited -SPDX-License-Identifier: Apache-2.0 - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" diff --git a/tools/host_tests/hello_auto.py b/tools/host_tests/hello_auto.py deleted file mode 100644 index 046f1b8d7b6..00000000000 --- a/tools/host_tests/hello_auto.py +++ /dev/null @@ -1,35 +0,0 @@ -""" -mbed SDK -Copyright (c) 2011-2013 ARM Limited -SPDX-License-Identifier: Apache-2.0 - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" - -class HelloTest(object): - HELLO_WORLD = "Hello World" - - def test(self, selftest): - c = selftest.mbed.serial_readline() - if c is None: - return selftest.RESULT_IO_SERIAL - selftest.notify("Read %d bytes:"% len(c)) - selftest.notify(c.strip()) - - result = True - # Because we can have targetID here let's try to decode - if len(c) < len(self.HELLO_WORLD): - result = False - else: - result = self.HELLO_WORLD in c - return selftest.RESULT_SUCCESS if result else selftest.RESULT_FAILURE diff --git a/tools/host_tests/host_registry.py b/tools/host_tests/host_registry.py deleted file mode 100644 index 0ad0420b71d..00000000000 --- a/tools/host_tests/host_registry.py +++ /dev/null @@ -1,37 +0,0 @@ -""" -mbed SDK -Copyright (c) 2011-2013 ARM Limited -SPDX-License-Identifier: Apache-2.0 - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" - -class HostRegistry(object): - """ Class stores registry with host tests and objects representing them - """ - HOST_TESTS = {} # host_test_name -> host_test_ojbect - - def register_host_test(self, ht_name, ht_object): - if ht_name not in self.HOST_TESTS: - self.HOST_TESTS[ht_name] = ht_object - - def unregister_host_test(self): - if ht_name in HOST_TESTS: - self.HOST_TESTS[ht_name] = None - - def get_host_test(self, ht_name): - return self.HOST_TESTS[ht_name] if ht_name in self.HOST_TESTS else None - - def is_host_test(self, ht_name): - return ht_name in self.HOST_TESTS - diff --git a/tools/host_tests/host_test.py b/tools/host_tests/host_test.py deleted file mode 100644 index a487e9bc030..00000000000 --- a/tools/host_tests/host_test.py +++ /dev/null @@ -1,427 +0,0 @@ -""" -mbed SDK -Copyright (c) 2011-2013 ARM Limited -SPDX-License-Identifier: Apache-2.0 - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" - -# Check if 'serial' module is installed -try: - from serial import Serial -except ImportError as e: - print("Error: Can't import 'serial' module: %s"% e) - exit(-1) - -import os -import re -import types -from sys import stdout -from time import sleep, time -from optparse import OptionParser - -from . import host_tests_plugins - -# This is a little tricky. We need to add upper directory to path so -# we can find packages we want from the same level as other files do -import sys -sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../..'))) -from tools.test_api import get_autodetected_MUTS_list -from tools.test_api import get_module_avail - - -class Mbed(object): - """ Base class for a host driven test - """ - def __init__(self): - parser = OptionParser() - - parser.add_option("-m", "--micro", - dest="micro", - help="The target microcontroller", - metavar="MICRO") - - parser.add_option("-p", "--port", - dest="port", - help="The serial port of the target mbed", - metavar="PORT") - - parser.add_option("-d", "--disk", - dest="disk", - help="The target disk path", - metavar="DISK_PATH") - - parser.add_option("-f", "--image-path", - dest="image_path", - help="Path with target's image", - metavar="IMAGE_PATH") - - parser.add_option("-c", "--copy", - dest="copy_method", - help="Copy method selector", - metavar="COPY_METHOD") - - parser.add_option("-C", "--program_cycle_s", - dest="program_cycle_s", - help="Program cycle sleep. Define how many seconds you want wait after copying bianry onto target", - type="float", - metavar="COPY_METHOD") - - parser.add_option("-t", "--timeout", - dest="timeout", - help="Timeout", - metavar="TIMEOUT") - - parser.add_option("-r", "--reset", - dest="forced_reset_type", - help="Forces different type of reset") - - parser.add_option("-R", "--reset-timeout", - dest="forced_reset_timeout", - metavar="NUMBER", - type="int", - help="When forcing a reset using option -r you can set up after reset timeout in seconds") - - parser.add_option('', '--auto', - dest='auto_detect', - metavar=False, - action="store_true", - help='Use mbed-ls module to detect all connected mbed devices') - - (self.options, _) = parser.parse_args() - - self.DEFAULT_RESET_TOUT = 0 - self.DEFAULT_TOUT = 10 - - if self.options.port is None: - raise Exception("The serial port of the target mbed have to be provided as command line arguments") - - # Options related to copy / reset mbed device - self.port = self.options.port - self.disk = self.options.disk - self.image_path = self.options.image_path.strip('"') - self.copy_method = self.options.copy_method - self.program_cycle_s = float(self.options.program_cycle_s) - - self.serial = None - self.serial_baud = 9600 - self.serial_timeout = 1 - - self.timeout = self.DEFAULT_TOUT if self.options.timeout is None else self.options.timeout - print('MBED: Instrumentation: "%s" and disk: "%s"' % (self.port, self.disk)) - - def init_serial_params(self, serial_baud=9600, serial_timeout=1): - """ Initialize port parameters. - This parameters will be used by self.init_serial() function to open serial port - """ - self.serial_baud = serial_baud - self.serial_timeout = serial_timeout - - def init_serial(self, serial_baud=None, serial_timeout=None): - """ Initialize serial port. - Function will return error is port can't be opened or initialized - """ - # Overload serial port configuration from default to parameters' values if they are specified - serial_baud = serial_baud if serial_baud is not None else self.serial_baud - serial_timeout = serial_timeout if serial_timeout is not None else self.serial_timeout - - if get_module_avail('mbed_lstools') and self.options.auto_detect: - # Ensure serial port is up-to-date (try to find it 60 times) - found = False - - for i in range(0, 60): - print('Looking for %s with MBEDLS' % self.options.micro) - muts_list = get_autodetected_MUTS_list(platform_name_filter=[self.options.micro]) - - if 1 in muts_list: - mut = muts_list[1] - self.port = mut['port'] - found = True - break - else: - sleep(3) - - if not found: - return False - - # Clear serial port - if self.serial: - self.serial.close() - self.serial = None - - # We will pool for serial to be re-mounted if it was unmounted after device reset - result = self.pool_for_serial_init(serial_baud, serial_timeout) # Blocking - - # Port can be opened - if result: - self.flush() - return result - - def pool_for_serial_init(self, serial_baud, serial_timeout, pooling_loops=40, init_delay=0.5, loop_delay=0.25): - """ Functions pools for serial port readiness - """ - result = True - last_error = None - # This loop is used to check for serial port availability due to - # some delays and remounting when devices are being flashed with new software. - for i in range(pooling_loops): - sleep(loop_delay if i else init_delay) - try: - self.serial = Serial(self.port, baudrate=serial_baud, timeout=serial_timeout) - except Exception as e: - result = False - last_error = "MBED: %s"% str(e) - stdout.write('.') - stdout.flush() - else: - print("...port ready!") - result = True - break - if not result and last_error: - print(last_error) - return result - - def set_serial_timeout(self, timeout): - """ Wraps self.mbed.serial object timeout property - """ - result = None - if self.serial: - self.serial.timeout = timeout - result = True - return result - - def serial_read(self, count=1): - """ Wraps self.mbed.serial object read method - """ - result = None - if self.serial: - try: - result = self.serial.read(count) - except: - result = None - return result - - def serial_readline(self, timeout=5): - """ Wraps self.mbed.serial object read method to read one line from serial port - """ - result = '' - start = time() - while (time() - start) < timeout: - if self.serial: - try: - c = self.serial.read(1) - result += c - except Exception as e: - print("MBED: %s"% str(e)) - result = None - break - if c == '\n': - break - return result - - def serial_write(self, write_buffer): - """ Wraps self.mbed.serial object write method - """ - result = None - if self.serial: - try: - result = self.serial.write(write_buffer) - except: - result = None - return result - - def reset_timeout(self, timeout): - """ Timeout executed just after reset command is issued - """ - for n in range(0, timeout): - sleep(1) - - def reset(self): - """ Calls proper reset plugin to do the job. - Please refer to host_test_plugins functionality - """ - # Flush serials to get only input after reset - self.flush() - if self.options.forced_reset_type: - result = host_tests_plugins.call_plugin('ResetMethod', self.options.forced_reset_type, disk=self.disk) - else: - result = host_tests_plugins.call_plugin('ResetMethod', 'default', serial=self.serial) - # Give time to wait for the image loading - reset_tout_s = self.options.forced_reset_timeout if self.options.forced_reset_timeout is not None else self.DEFAULT_RESET_TOUT - self.reset_timeout(reset_tout_s) - return result - - def copy_image(self, image_path=None, disk=None, copy_method=None): - """ Closure for copy_image_raw() method. - Method which is actually copying image to mbed - """ - # Set closure environment - image_path = image_path if image_path is not None else self.image_path - disk = disk if disk is not None else self.disk - copy_method = copy_method if copy_method is not None else self.copy_method - # Call proper copy method - result = self.copy_image_raw(image_path, disk, copy_method) - return result - - def copy_image_raw(self, image_path=None, disk=None, copy_method=None): - """ Copy file depending on method you want to use. Handles exception - and return code from shell copy commands. - """ - # image_path - Where is binary with target's firmware - if copy_method is not None: - # We override 'default' method with 'shell' method - if copy_method == 'default': - copy_method = 'shell' - else: - copy_method = 'shell' - - result = host_tests_plugins.call_plugin('CopyMethod', copy_method, image_path=image_path, destination_disk=disk, program_cycle_s=self.program_cycle_s, target_mcu=self.options.micro) - return result; - - def flush(self): - """ Flush serial ports - """ - result = False - if self.serial: - self.serial.flushInput() - self.serial.flushOutput() - result = True - return result - - -class HostTestResults(object): - """ Test results set by host tests - """ - def __init__(self): - self.RESULT_SUCCESS = 'success' - self.RESULT_FAILURE = 'failure' - self.RESULT_ERROR = 'error' - self.RESULT_IO_SERIAL = 'ioerr_serial' - self.RESULT_NO_IMAGE = 'no_image' - self.RESULT_IOERR_COPY = "ioerr_copy" - self.RESULT_PASSIVE = "passive" - self.RESULT_NOT_DETECTED = "not_detected" - self.RESULT_MBED_ASSERT = "mbed_assert" - - -import tools.host_tests as host_tests - - -class Test(HostTestResults): - """ Base class for host test's test runner - """ - # Select default host_test supervision (replaced after autodetection) - test_supervisor = host_tests.get_host_test("default") - - def __init__(self): - self.mbed = Mbed() - - def detect_test_config(self, verbose=False): - """ Detects test case configuration - """ - result = {} - while True: - line = self.mbed.serial_readline() - if "{start}" in line: - self.notify("HOST: Start test...") - break - else: - # Detect if this is property from TEST_ENV print - m = re.search('{([\w_]+);([\w\d\+ ]+)}}', line[:-1]) - if m and len(m.groups()) == 2: - # This is most likely auto-detection property - result[m.group(1)] = m.group(2) - if verbose: - self.notify("HOST: Property '%s' = '%s'"% (m.group(1), m.group(2))) - else: - # We can check if this is TArget Id in mbed specific format - m2 = re.search('^([\$]+)([a-fA-F0-9]+)', line[:-1]) - if m2 and len(m2.groups()) == 2: - if verbose: - target_id = m2.group(1) + m2.group(2) - self.notify("HOST: TargetID '%s'"% target_id) - self.notify(line[len(target_id):-1]) - else: - self.notify("HOST: Unknown property: %s"% line.strip()) - return result - - def run(self): - """ Test runner for host test. This function will start executing - test and forward test result via serial port to test suite - """ - # Copy image to device - self.notify("HOST: Copy image onto target...") - result = self.mbed.copy_image() - if not result: - self.print_result(self.RESULT_IOERR_COPY) - - # Initialize and open target's serial port (console) - self.notify("HOST: Initialize serial port...") - result = self.mbed.init_serial() - if not result: - self.print_result(self.RESULT_IO_SERIAL) - - # Reset device - self.notify("HOST: Reset target...") - result = self.mbed.reset() - if not result: - self.print_result(self.RESULT_IO_SERIAL) - - # Run test - try: - CONFIG = self.detect_test_config(verbose=True) # print CONFIG - - if "host_test_name" in CONFIG: - if host_tests.is_host_test(CONFIG["host_test_name"]): - self.test_supervisor = host_tests.get_host_test(CONFIG["host_test_name"]) - result = self.test_supervisor.test(self) #result = self.test() - - if result is not None: - self.print_result(result) - else: - self.notify("HOST: Passive mode...") - except Exception as e: - print(str(e)) - self.print_result(self.RESULT_ERROR) - - def setup(self): - """ Setup and check if configuration for test is - correct. E.g. if serial port can be opened. - """ - result = True - if not self.mbed.serial: - result = False - self.print_result(self.RESULT_IO_SERIAL) - return result - - def notify(self, message): - """ On screen notification function - """ - print(message) - stdout.flush() - - def print_result(self, result): - """ Test result unified printing function - """ - self.notify("\r\n{{%s}}\r\n{{end}}" % result) - - -class DefaultTestSelector(Test): - """ Test class with serial port initialization - """ - def __init__(self): - HostTestResults.__init__(self) - Test.__init__(self) - -if __name__ == '__main__': - DefaultTestSelector().run() diff --git a/tools/host_tests/host_tests_plugins/__init__.py b/tools/host_tests/host_tests_plugins/__init__.py deleted file mode 100644 index 7ee7963b43b..00000000000 --- a/tools/host_tests/host_tests_plugins/__init__.py +++ /dev/null @@ -1,82 +0,0 @@ -""" -mbed SDK -Copyright (c) 2011-2013 ARM Limited -SPDX-License-Identifier: Apache-2.0 - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" -from __future__ import print_function - -from . import host_test_registry - -# This plugins provide 'flashing' methods to host test scripts -from . import module_copy_mbed -from . import module_copy_shell -from . import module_copy_silabs - -try: - from . import module_copy_smart -except: - pass - -#import module_copy_firefox -from . import module_copy_mps2 - -# Plugins used to reset certain platform -from . import module_reset_mbed -from . import module_reset_silabs -from . import module_reset_mps2 - - -# Plugin registry instance -HOST_TEST_PLUGIN_REGISTRY = host_test_registry.HostTestRegistry() - -# Static plugin registration -# Some plugins are commented out if they are not stable or not commonly used -HOST_TEST_PLUGIN_REGISTRY.register_plugin(module_copy_mbed.load_plugin()) -HOST_TEST_PLUGIN_REGISTRY.register_plugin(module_copy_shell.load_plugin()) - -try: - HOST_TEST_PLUGIN_REGISTRY.register_plugin(module_copy_smart.load_plugin()) -except: - pass - -HOST_TEST_PLUGIN_REGISTRY.register_plugin(module_reset_mbed.load_plugin()) -#HOST_TEST_PLUGIN_REGISTRY.register_plugin(module_copy_firefox.load_plugin()) - -# Extra platforms support -HOST_TEST_PLUGIN_REGISTRY.register_plugin(module_copy_mps2.load_plugin()) -HOST_TEST_PLUGIN_REGISTRY.register_plugin(module_reset_mps2.load_plugin()) -HOST_TEST_PLUGIN_REGISTRY.register_plugin(module_copy_silabs.load_plugin()) -HOST_TEST_PLUGIN_REGISTRY.register_plugin(module_reset_silabs.load_plugin()) - -# TODO: extend plugin loading to files with name module_*.py loaded ad-hoc - -############################################################################### -# Functional interface for host test plugin registry -############################################################################### -def call_plugin(type, capability, *args, **kwargs): - """ Interface to call plugin registry functional way - """ - return HOST_TEST_PLUGIN_REGISTRY.call_plugin(type, capability, *args, **kwargs) - -def get_plugin_caps(type): - """ Returns list of all capabilities for plugin family with the same type. - If there are no capabilities empty list is returned - """ - return HOST_TEST_PLUGIN_REGISTRY.get_plugin_caps(type) - -def print_plugin_info(): - """ Prints plugins' information in user friendly way - """ - print(HOST_TEST_PLUGIN_REGISTRY) diff --git a/tools/host_tests/host_tests_plugins/host_test_plugins.py b/tools/host_tests/host_tests_plugins/host_test_plugins.py deleted file mode 100644 index a27a9cd1b46..00000000000 --- a/tools/host_tests/host_tests_plugins/host_test_plugins.py +++ /dev/null @@ -1,118 +0,0 @@ -""" -mbed SDK -Copyright (c) 2011-2013 ARM Limited -SPDX-License-Identifier: Apache-2.0 - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" -from __future__ import print_function - -from os import access, F_OK -from sys import stdout -from time import sleep -from subprocess import call - - -class HostTestPluginBase: - """ Base class for all plug-ins used with host tests. - """ - ########################################################################### - # Interface: - ########################################################################### - - ########################################################################### - # Interface attributes defining plugin name, type etc. - ########################################################################### - name = "HostTestPluginBase" # Plugin name, can be plugin class name - type = "BasePlugin" # Plugin type: ResetMethod, Copymethod etc. - capabilities = [] # Capabilities names: what plugin can achieve - # (e.g. reset using some external command line tool) - stable = False # Determine if plugin is stable and can be used - - ########################################################################### - # Interface methods - ########################################################################### - def setup(self, *args, **kwargs): - """ Configure plugin, this function should be called before plugin execute() method is used. - """ - return False - - def execute(self, capabilitity, *args, **kwargs): - """ Executes capability by name. - Each capability e.g. may directly just call some command line - program or execute building pythonic function - """ - return False - - ########################################################################### - # Interface helper methods - overload only if you need to have custom behaviour - ########################################################################### - def print_plugin_error(self, text): - """ Function prints error in console and exits always with False - """ - print("Plugin error: %s::%s: %s" % (self.name, self.type, text)) - return False - - def print_plugin_info(self, text, NL=True): - """ Function prints notification in console and exits always with True - """ - print("Plugin info: %s::%s: %s"% (self.name, self.type, text)) - return True - - def print_plugin_char(self, char): - """ Function prints char on stdout - """ - stdout.write(char) - stdout.flush() - return True - - def check_mount_point_ready(self, destination_disk, init_delay=0.2, loop_delay=0.25): - """ Checks if destination_disk is ready and can be accessed by e.g. copy commands - @init_delay - Initial delay time before first access check - @loop_delay - pooling delay for access check - """ - if not access(destination_disk, F_OK): - self.print_plugin_info("Waiting for mount point '%s' to be ready..."% destination_disk, NL=False) - sleep(init_delay) - while not access(destination_disk, F_OK): - sleep(loop_delay) - self.print_plugin_char('.') - - def check_parameters(self, capabilitity, *args, **kwargs): - """ This function should be ran each time we call execute() - to check if none of the required parameters is missing. - """ - missing_parameters = [] - for parameter in self.required_parameters: - if parameter not in kwargs: - missing_parameters.append(parameter) - if len(missing_parameters) > 0: - self.print_plugin_error("execute parameter(s) '%s' missing!"% (', '.join(parameter))) - return False - return True - - def run_command(self, cmd, shell=True): - """ Runs command from command line. - """ - result = True - ret = 0 - try: - ret = call(cmd, shell=shell) - if ret: - self.print_plugin_error("[ret=%d] Command: %s"% (int(ret), cmd)) - return False - except Exception as e: - result = False - self.print_plugin_error("[ret=%d] Command: %s"% (int(ret), cmd)) - self.print_plugin_error(str(e)) - return result diff --git a/tools/host_tests/host_tests_plugins/host_test_registry.py b/tools/host_tests/host_tests_plugins/host_test_registry.py deleted file mode 100644 index 0cc20f489fb..00000000000 --- a/tools/host_tests/host_tests_plugins/host_test_registry.py +++ /dev/null @@ -1,91 +0,0 @@ -""" -mbed SDK -Copyright (c) 2011-2013 ARM Limited -SPDX-License-Identifier: Apache-2.0 - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" -from __future__ import print_function - -class HostTestRegistry: - """ Simple class used to register and store - host test plugins for further usage - """ - # Here we actually store all the plugins - PLUGINS = {} # 'Plugin Name' : Plugin Object - - def print_error(self, text): - print("Plugin load failed. Reason: %s" % text) - - def register_plugin(self, plugin): - """ Registers and stores plugin inside registry for further use. - Method also calls plugin's setup() function to configure plugin if needed. - - Note: Different groups of plugins may demand different extra parameter. Plugins - should be at least for one type of plugin configured with the same parameters - because we do not know which of them will actually use particular parameter. - """ - # TODO: - # - check for unique caps for specified type - if plugin.name not in self.PLUGINS: - if plugin.setup(): # Setup plugin can be completed without errors - self.PLUGINS[plugin.name] = plugin - return True - else: - self.print_error("%s setup failed"% plugin.name) - else: - self.print_error("%s already loaded"% plugin.name) - return False - - def call_plugin(self, type, capability, *args, **kwargs): - """ Execute plugin functionality respectively to its purpose - """ - for plugin_name in self.PLUGINS: - plugin = self.PLUGINS[plugin_name] - if plugin.type == type and capability in plugin.capabilities: - return plugin.execute(capability, *args, **kwargs) - return False - - def get_plugin_caps(self, type): - """ Returns list of all capabilities for plugin family with the same type. - If there are no capabilities empty list is returned - """ - result = [] - for plugin_name in self.PLUGINS: - plugin = self.PLUGINS[plugin_name] - if plugin.type == type: - result.extend(plugin.capabilities) - return sorted(result) - - def load_plugin(self, name): - """ Used to load module from - """ - mod = __import__("module_%s"% name) - return mod - - def __str__(self): - """ User friendly printing method to show hooked plugins - """ - from prettytable import PrettyTable - column_names = ['name', 'type', 'capabilities', 'stable'] - pt = PrettyTable(column_names) - for column in column_names: - pt.align[column] = 'l' - for plugin_name in sorted(self.PLUGINS.keys()): - name = self.PLUGINS[plugin_name].name - type = self.PLUGINS[plugin_name].type - stable = self.PLUGINS[plugin_name].stable - capabilities = ', '.join(self.PLUGINS[plugin_name].capabilities) - row = [name, type, capabilities, stable] - pt.add_row(row) - return pt.get_string() diff --git a/tools/host_tests/host_tests_plugins/module_copy_firefox.py b/tools/host_tests/host_tests_plugins/module_copy_firefox.py deleted file mode 100644 index ba2e21dff76..00000000000 --- a/tools/host_tests/host_tests_plugins/module_copy_firefox.py +++ /dev/null @@ -1,77 +0,0 @@ -""" -mbed SDK -Copyright (c) 2011-2013 ARM Limited -SPDX-License-Identifier: Apache-2.0 - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" - -from os.path import join, basename -from host_test_plugins import HostTestPluginBase - - -class HostTestPluginCopyMethod_Firefox(HostTestPluginBase): - - def file_store_firefox(self, file_path, dest_disk): - try: - from selenium import webdriver - profile = webdriver.FirefoxProfile() - profile.set_preference('browser.download.folderList', 2) # custom location - profile.set_preference('browser.download.manager.showWhenStarting', False) - profile.set_preference('browser.download.dir', dest_disk) - profile.set_preference('browser.helperApps.neverAsk.saveToDisk', 'application/octet-stream') - # Launch browser with profile and get file - browser = webdriver.Firefox(profile) - browser.get(file_path) - browser.close() - except: - return False - return True - - # Plugin interface - name = 'HostTestPluginCopyMethod_Firefox' - type = 'CopyMethod' - capabilities = ['firefox'] - required_parameters = ['image_path', 'destination_disk'] - - def setup(self, *args, **kwargs): - """ Configure plugin, this function should be called before plugin execute() method is used. - """ - try: - from selenium import webdriver - except ImportError, e: - self.print_plugin_error("Error: firefox copy method requires selenium library. %s"% e) - return False - return True - - def execute(self, capabilitity, *args, **kwargs): - """ Executes capability by name. - Each capability may directly just call some command line - program or execute building pythonic function - """ - result = False - if self.check_parameters(capabilitity, *args, **kwargs) is True: - image_path = kwargs['image_path'] - destination_disk = kwargs['destination_disk'] - # Prepare correct command line parameter values - image_base_name = basename(image_path) - destination_path = join(destination_disk, image_base_name) - if capabilitity == 'firefox': - self.file_store_firefox(image_path, destination_path) - return result - - -def load_plugin(): - """ Returns plugin available in this module - """ - return HostTestPluginCopyMethod_Firefox() diff --git a/tools/host_tests/host_tests_plugins/module_copy_mbed.py b/tools/host_tests/host_tests_plugins/module_copy_mbed.py deleted file mode 100644 index 3721a997a38..00000000000 --- a/tools/host_tests/host_tests_plugins/module_copy_mbed.py +++ /dev/null @@ -1,80 +0,0 @@ -""" -mbed SDK -Copyright (c) 2011-2013 ARM Limited -SPDX-License-Identifier: Apache-2.0 - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" -from __future__ import print_function - -from shutil import copy -from .host_test_plugins import HostTestPluginBase -from time import sleep - - -class HostTestPluginCopyMethod_Mbed(HostTestPluginBase): - - def generic_mbed_copy(self, image_path, destination_disk): - """ Generic mbed copy method for "mbed enabled" devices. - It uses standard python shuitl function to copy - image_file (target specific binary) to device's disk. - """ - result = True - if not destination_disk.endswith('/') and not destination_disk.endswith('\\'): - destination_disk += '/' - try: - copy(image_path, destination_disk) - except Exception as e: - self.print_plugin_error("shutil.copy('%s', '%s')"% (image_path, destination_disk)) - self.print_plugin_error("Error: %s"% str(e)) - result = False - return result - - # Plugin interface - name = 'HostTestPluginCopyMethod_Mbed' - type = 'CopyMethod' - stable = True - capabilities = ['shutil', 'default'] - required_parameters = ['image_path', 'destination_disk', 'program_cycle_s'] - - def setup(self, *args, **kwargs): - """ Configure plugin, this function should be called before plugin execute() method is used. - """ - return True - - def execute(self, capability, *args, **kwargs): - """ Executes capability by name. - Each capability may directly just call some command line - program or execute building pythonic function - """ - result = False - if self.check_parameters(capability, *args, **kwargs) is True: - # Capability 'default' is a dummy capability - if capability == 'shutil': - image_path = kwargs['image_path'] - destination_disk = kwargs['destination_disk'] - program_cycle_s = kwargs['program_cycle_s'] - # Wait for mount point to be ready - self.check_mount_point_ready(destination_disk) # Blocking - result = self.generic_mbed_copy(image_path, destination_disk) - - # Allow mbed to cycle - sleep(program_cycle_s) - - return result - - -def load_plugin(): - """ Returns plugin available in this module - """ - return HostTestPluginCopyMethod_Mbed() diff --git a/tools/host_tests/host_tests_plugins/module_copy_mps2.py b/tools/host_tests/host_tests_plugins/module_copy_mps2.py deleted file mode 100644 index 44cbef8d265..00000000000 --- a/tools/host_tests/host_tests_plugins/module_copy_mps2.py +++ /dev/null @@ -1,152 +0,0 @@ -""" -mbed SDK -Copyright (c) 2011-2013 ARM Limited -SPDX-License-Identifier: Apache-2.0 - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" -from __future__ import print_function - -import re -import os, shutil -from os.path import join -from time import sleep -from .host_test_plugins import HostTestPluginBase - - -class HostTestPluginCopyMethod_MPS2(HostTestPluginBase): - - # MPS2 specific flashing / binary setup funcitons - def mps2_set_board_image_file(self, disk, images_cfg_path, image0file_path, image_name='images.txt'): - """ This function will alter image cfg file. - Main goal of this function is to change number of images to 1, comment all - existing image entries and append at the end of file new entry with test path. - @return True when all steps succeed. - """ - MBED_SDK_TEST_STAMP = 'test suite entry' - image_path = join(disk, images_cfg_path, image_name) - new_file_lines = [] # New configuration file lines (entries) - - # Check each line of the image configuration file - try: - with open(image_path, 'r') as file: - for line in file: - if re.search('^TOTALIMAGES', line): - # Check number of total images, should be 1 - new_file_lines.append(re.sub('^TOTALIMAGES:[\t ]*[\d]+', 'TOTALIMAGES: 1', line)) - elif re.search('; - %s[\n\r]*$'% MBED_SDK_TEST_STAMP, line): - # Look for test suite entries and remove them - pass # Omit all test suite entries - elif re.search('^IMAGE[\d]+FILE', line): - # Check all image entries and mark the ';' - new_file_lines.append(';' + line) # Comment non test suite lines - else: - # Append line to new file - new_file_lines.append(line) - except IOError as e: - return False - - # Add new image entry with proper commented stamp - new_file_lines.append('IMAGE0FILE: %s ; - %s\r\n'% (image0file_path, MBED_SDK_TEST_STAMP)) - - # Write all lines to file - try: - with open(image_path, 'w') as file: - for line in new_file_lines: - file.write(line), - except IOError: - return False - - return True - - def mps2_select_core(self, disk, mobo_config_name=""): - """ Function selects actual core - """ - # TODO: implement core selection - pass - - def mps2_switch_usb_auto_mounting_after_restart(self, disk, usb_config_name=""): - """ Function alters configuration to allow USB MSD to be mounted after restarts - """ - # TODO: implement USB MSD restart detection - pass - - def copy_file(self, file, disk): - if not file: - return - - _, ext = os.path.splitext(file) - ext = ext.lower() - dfile = disk + "/SOFTWARE/mbed" + ext - - if os.path.isfile(dfile): - print('Remove old binary %s' % dfile) - os.remove(dfile) - - shutil.copy(file, dfile) - return True - - def touch_file(self, file): - """ Touch file and set timestamp to items - """ - tfile = file+'.tmp' - fhandle = open(tfile, 'a') - try: - fhandle.close() - finally: - os.rename(tfile, file) - return True - - # Plugin interface - name = 'HostTestPluginCopyMethod_MPS2' - type = 'CopyMethod' - capabilities = ['mps2-copy'] - required_parameters = ['image_path', 'destination_disk'] - - def setup(self, *args, **kwargs): - """ Configure plugin, this function should be called before plugin execute() method is used. - """ - return True - - def execute(self, capabilitity, *args, **kwargs): - """ Executes capability by name. - Each capability may directly just call some command line - program or execute building pythonic function - """ - result = False - if self.check_parameters(capabilitity, *args, **kwargs) is True: - file = kwargs['image_path'] - disk = kwargs['destination_disk'] - - """ Add a delay in case there a test just finished - Prevents interface firmware hiccups - """ - sleep(20) - if capabilitity == 'mps2-copy' and self.copy_file(file, disk): - sleep(3) - if self.touch_file(disk + 'reboot.txt'): - """ Add a delay after the board was rebooted. - The actual reboot time is 20 seconds, but using 15 seconds - allows us to open the COM port and save a board reset. - This also prevents interface firmware hiccups. - """ - sleep(7) - result = True - - return result - - -def load_plugin(): - """ Returns plugin available in this module - """ - return HostTestPluginCopyMethod_MPS2() diff --git a/tools/host_tests/host_tests_plugins/module_copy_shell.py b/tools/host_tests/host_tests_plugins/module_copy_shell.py deleted file mode 100644 index f18076a95db..00000000000 --- a/tools/host_tests/host_tests_plugins/module_copy_shell.py +++ /dev/null @@ -1,76 +0,0 @@ -""" -mbed SDK -Copyright (c) 2011-2013 ARM Limited -SPDX-License-Identifier: Apache-2.0 - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" -from __future__ import print_function - -import os -from os.path import join, basename -from time import sleep - -from .host_test_plugins import HostTestPluginBase - -class HostTestPluginCopyMethod_Shell(HostTestPluginBase): - - # Plugin interface - name = 'HostTestPluginCopyMethod_Shell' - type = 'CopyMethod' - stable = True - capabilities = ['shell', 'cp', 'copy', 'xcopy'] - required_parameters = ['image_path', 'destination_disk', 'program_cycle_s'] - - def setup(self, *args, **kwargs): - """ Configure plugin, this function should be called before plugin execute() method is used. - """ - return True - - def execute(self, capability, *args, **kwargs): - """ Executes capability by name. - Each capability may directly just call some command line - program or execute building pythonic function - """ - result = False - if self.check_parameters(capability, *args, **kwargs) is True: - image_path = kwargs['image_path'] - destination_disk = kwargs['destination_disk'] - program_cycle_s = kwargs['program_cycle_s'] - # Wait for mount point to be ready - self.check_mount_point_ready(destination_disk) # Blocking - # Prepare correct command line parameter values - image_base_name = basename(image_path) - destination_path = join(destination_disk, image_base_name) - if capability == 'shell': - if os.name == 'nt': capability = 'copy' - elif os.name == 'posix': capability = 'cp' - if capability == 'cp' or capability == 'copy' or capability == 'copy': - copy_method = capability - cmd = [copy_method, image_path, destination_path] - if os.name == 'posix': - result = self.run_command(cmd, shell=False) - result = self.run_command(["sync"]) - else: - result = self.run_command(cmd) - - # Allow mbed to cycle - sleep(program_cycle_s) - - return result - - -def load_plugin(): - """ Returns plugin available in this module - """ - return HostTestPluginCopyMethod_Shell() diff --git a/tools/host_tests/host_tests_plugins/module_copy_silabs.py b/tools/host_tests/host_tests_plugins/module_copy_silabs.py deleted file mode 100644 index 4a66821bdf8..00000000000 --- a/tools/host_tests/host_tests_plugins/module_copy_silabs.py +++ /dev/null @@ -1,69 +0,0 @@ -""" -mbed SDK -Copyright (c) 2011-2013 ARM Limited -SPDX-License-Identifier: Apache-2.0 - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" -from __future__ import print_function - -from time import sleep -from .host_test_plugins import HostTestPluginBase - - -class HostTestPluginCopyMethod_Silabs(HostTestPluginBase): - - # Plugin interface - name = 'HostTestPluginCopyMethod_Silabs' - type = 'CopyMethod' - capabilities = ['eACommander', 'eACommander-usb'] - required_parameters = ['image_path', 'destination_disk', 'program_cycle_s'] - - def setup(self, *args, **kwargs): - """ Configure plugin, this function should be called before plugin execute() method is used. - """ - self.EACOMMANDER_CMD = 'eACommander.exe' - return True - - def execute(self, capabilitity, *args, **kwargs): - """ Executes capability by name. - Each capability may directly just call some command line - program or execute building pythonic function - """ - result = False - if self.check_parameters(capabilitity, *args, **kwargs) is True: - image_path = kwargs['image_path'] - destination_disk = kwargs['destination_disk'] - program_cycle_s = kwargs['program_cycle_s'] - if capabilitity == 'eACommander': - cmd = [self.EACOMMANDER_CMD, - '--serialno', destination_disk, - '--flash', image_path, - '--resettype', '2', '--reset'] - result = self.run_command(cmd) - elif capabilitity == 'eACommander-usb': - cmd = [self.EACOMMANDER_CMD, - '--usb', destination_disk, - '--flash', image_path] - result = self.run_command(cmd) - - # Allow mbed to cycle - sleep(program_cycle_s) - - return result - - -def load_plugin(): - """ Returns plugin available in this module - """ - return HostTestPluginCopyMethod_Silabs() diff --git a/tools/host_tests/host_tests_plugins/module_copy_smart.py b/tools/host_tests/host_tests_plugins/module_copy_smart.py deleted file mode 100644 index 76b71f12353..00000000000 --- a/tools/host_tests/host_tests_plugins/module_copy_smart.py +++ /dev/null @@ -1,119 +0,0 @@ -""" -mbed SDK -Copyright (c) 2011-2013 ARM Limited -SPDX-License-Identifier: Apache-2.0 - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" - -import os -import sys -from os.path import join, basename, exists, abspath, dirname -from time import sleep -from host_test_plugins import HostTestPluginBase - -sys.path.append(abspath(join(dirname(__file__), "../../../"))) -import tools.test_api - -class HostTestPluginCopyMethod_Smart(HostTestPluginBase): - - # Plugin interface - name = 'HostTestPluginCopyMethod_Smart' - type = 'CopyMethod' - stable = True - capabilities = ['smart'] - required_parameters = ['image_path', 'destination_disk', 'target_mcu'] - - def setup(self, *args, **kwargs): - """ Configure plugin, this function should be called before plugin execute() method is used. - """ - return True - - def execute(self, capability, *args, **kwargs): - """ Executes capability by name. - Each capability may directly just call some command line - program or execute building pythonic function - """ - result = False - if self.check_parameters(capability, *args, **kwargs) is True: - image_path = kwargs['image_path'] - destination_disk = kwargs['destination_disk'] - target_mcu = kwargs['target_mcu'] - # Wait for mount point to be ready - self.check_mount_point_ready(destination_disk) # Blocking - # Prepare correct command line parameter values - image_base_name = basename(image_path) - destination_path = join(destination_disk, image_base_name) - if capability == 'smart': - if os.name == 'posix': - cmd = ['cp', image_path, destination_path] - result = self.run_command(cmd, shell=False) - - cmd = ['sync'] - result = self.run_command(cmd, shell=False) - elif os.name == 'nt': - cmd = ['copy', image_path, destination_path] - result = self.run_command(cmd, shell=True) - - # Give the OS and filesystem time to settle down - sleep(3) - - platform_name_filter = [target_mcu] - muts_list = {} - - remount_complete = False - - for i in range(0, 60): - print('Looking for %s with MBEDLS' % target_mcu) - muts_list = tools.test_api.get_autodetected_MUTS_list(platform_name_filter=platform_name_filter) - - if 1 in muts_list: - mut = muts_list[1] - destination_disk = mut['disk'] - destination_path = join(destination_disk, image_base_name) - - if mut['mcu'] == 'LPC1768' or mut['mcu'] == 'LPC11U24': - if exists(destination_disk) and exists(destination_path): - remount_complete = True - break; - else: - if exists(destination_disk) and not exists(destination_path): - remount_complete = True - break; - - sleep(1) - - if remount_complete: - print('Remount complete') - else: - print('Remount FAILED') - - if exists(destination_disk): - print('Disk exists') - else: - print('Disk does not exist') - - if exists(destination_path): - print('Image exists') - else: - print('Image does not exist') - - result = None - - - return result - -def load_plugin(): - """ Returns plugin available in this module - """ - return HostTestPluginCopyMethod_Smart() diff --git a/tools/host_tests/host_tests_plugins/module_reset_mbed.py b/tools/host_tests/host_tests_plugins/module_reset_mbed.py deleted file mode 100644 index b70bca93b40..00000000000 --- a/tools/host_tests/host_tests_plugins/module_reset_mbed.py +++ /dev/null @@ -1,74 +0,0 @@ -""" -mbed SDK -Copyright (c) 2011-2013 ARM Limited -SPDX-License-Identifier: Apache-2.0 - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" -from __future__ import print_function - -from .host_test_plugins import HostTestPluginBase - - -class HostTestPluginResetMethod_Mbed(HostTestPluginBase): - - def safe_sendBreak(self, serial): - """ Wraps serial.sendBreak() to avoid serial::serialposix.py exception on Linux - Traceback (most recent call last): - File "make.py", line 189, in - serial.sendBreak() - File "/usr/lib/python2.7/dist-packages/serial/serialposix.py", line 511, in sendBreak - termios.tcsendbreak(self.fd, int(duration/0.25)) - error: (32, 'Broken pipe') - """ - result = True - try: - serial.sendBreak() - except: - # In linux a termios.error is raised in sendBreak and in setBreak. - # The following setBreak() is needed to release the reset signal on the target mcu. - try: - serial.setBreak(False) - except: - result = False - return result - - # Plugin interface - name = 'HostTestPluginResetMethod_Mbed' - type = 'ResetMethod' - stable = True - capabilities = ['default'] - required_parameters = ['serial'] - - def setup(self, *args, **kwargs): - """ Configure plugin, this function should be called before plugin execute() method is used. - """ - return True - - def execute(self, capabilitity, *args, **kwargs): - """ Executes capability by name. - Each capability may directly just call some command line - program or execute building pythonic function - """ - result = False - if self.check_parameters(capabilitity, *args, **kwargs) is True: - if capabilitity == 'default': - serial = kwargs['serial'] - result = self.safe_sendBreak(serial) - return result - - -def load_plugin(): - """ Returns plugin available in this module - """ - return HostTestPluginResetMethod_Mbed() diff --git a/tools/host_tests/host_tests_plugins/module_reset_mps2.py b/tools/host_tests/host_tests_plugins/module_reset_mps2.py deleted file mode 100644 index 7775d0c1a11..00000000000 --- a/tools/host_tests/host_tests_plugins/module_reset_mps2.py +++ /dev/null @@ -1,80 +0,0 @@ -""" -mbed SDK -Copyright (c) 2011-2013 ARM Limited -SPDX-License-Identifier: Apache-2.0 - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" -from __future__ import print_function - -import os -from time import sleep -from .host_test_plugins import HostTestPluginBase - -# Note: This plugin is not fully functional, needs improvements - -class HostTestPluginResetMethod_MPS2(HostTestPluginBase): - """ Plugin used to reset ARM_MPS2 platform - Supports: - reboot.txt - startup from standby state, reboots when in run mode. - shutdown.txt - shutdown from run mode. - reset.txt - reset FPGA during run mode. - """ - def touch_file(self, file): - """ Touch file and set timestamp to items - """ - tfile = file+'.tmp' - fhandle = open(tfile, 'a') - try: - fhandle.close() - finally: - os.rename(tfile, file) - return True - - # Plugin interface - name = 'HostTestPluginResetMethod_MPS2' - type = 'ResetMethod' - capabilities = ['mps2-reboot', 'mps2-reset'] - required_parameters = ['disk'] - - def setup(self, *args, **kwargs): - """ Prepare / configure plugin to work. - This method can receive plugin specific parameters by kwargs and - ignore other parameters which may affect other plugins. - """ - return True - - def execute(self, capabilitity, *args, **kwargs): - """ Executes capability by name. - Each capability may directly just call some command line - program or execute building pythonic function - """ - return True - result = False - if self.check_parameters(capabilitity, *args, **kwargs) is True: - disk = kwargs['disk'] - - if capabilitity == 'mps2-reboot' and self.touch_file(disk + 'reboot.txt'): - sleep(20) - result = True - - elif capabilitity == 'mps2-reset' and self.touch_file(disk + 'reboot.txt'): - sleep(20) - result = True - - return result - -def load_plugin(): - """ Returns plugin available in this module - """ - return HostTestPluginResetMethod_MPS2() diff --git a/tools/host_tests/host_tests_plugins/module_reset_silabs.py b/tools/host_tests/host_tests_plugins/module_reset_silabs.py deleted file mode 100644 index a24222973d6..00000000000 --- a/tools/host_tests/host_tests_plugins/module_reset_silabs.py +++ /dev/null @@ -1,68 +0,0 @@ -""" -mbed SDK -Copyright (c) 2011-2013 ARM Limited -SPDX-License-Identifier: Apache-2.0 - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" -from __future__ import print_function - -from .host_test_plugins import HostTestPluginBase - - -class HostTestPluginResetMethod_SiLabs(HostTestPluginBase): - - # Plugin interface - name = 'HostTestPluginResetMethod_SiLabs' - type = 'ResetMethod' - stable = True - capabilities = ['eACommander', 'eACommander-usb'] - required_parameters = ['disk'] - - def setup(self, *args, **kwargs): - """ Configure plugin, this function should be called before plugin execute() method is used. - """ - # Note you need to have eACommander.exe on your system path! - self.EACOMMANDER_CMD = 'eACommander.exe' - return True - - def execute(self, capabilitity, *args, **kwargs): - """ Executes capability by name. - Each capability may directly just call some command line - program or execute building pythonic function - """ - result = False - if self.check_parameters(capabilitity, *args, **kwargs) is True: - disk = kwargs['disk'].rstrip('/\\') - - if capabilitity == 'eACommander': - # For this copy method 'disk' will be 'serialno' for eACommander command line parameters - # Note: Commands are executed in the order they are specified on the command line - cmd = [self.EACOMMANDER_CMD, - '--serialno', disk, - '--resettype', '2', '--reset',] - result = self.run_command(cmd) - elif capabilitity == 'eACommander-usb': - # For this copy method 'disk' will be 'usb address' for eACommander command line parameters - # Note: Commands are executed in the order they are specified on the command line - cmd = [self.EACOMMANDER_CMD, - '--usb', disk, - '--resettype', '2', '--reset',] - result = self.run_command(cmd) - return result - - -def load_plugin(): - """ Returns plugin available in this module - """ - return HostTestPluginResetMethod_SiLabs() diff --git a/tools/host_tests/mbedrpc.py b/tools/host_tests/mbedrpc.py deleted file mode 100644 index efafce39cec..00000000000 --- a/tools/host_tests/mbedrpc.py +++ /dev/null @@ -1,227 +0,0 @@ -# mbedRPC.py - mbed RPC interface for Python -# -##Copyright (c) 2010 ARM Ltd -## -##Permission is hereby granted, free of charge, to any person obtaining a copy -##of this software and associated documentation files (the "Software"), to deal -##in the Software without restriction, including without limitation the rights -##to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -##copies of the Software, and to permit persons to whom the Software is -##furnished to do so, subject to the following conditions: -## -##The above copyright notice and this permission notice shall be included in -##all copies or substantial portions of the Software. -## -##THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -##IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -##FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -##AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -##LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -##OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -##THE SOFTWARE. -# -# Example: -# >from mbedRPC import* -# >mbed = SerialRPC("COM5",9600) -# >myled = DigitalOut(mbed,"myled") <--- Where the text in quotations matches your RPC pin definition's second parameter, in this case it could be RpcDigitalOut myled(LED1,"myled"); -# >myled.write(1) -# > - -from future import standard_library -standard_library.install_aliases() -import serial, urllib.request, time - -# mbed super class -class mbed(object): - def __init__(self): - print("This will work as a demo but no transport mechanism has been selected") - - def rpc(self, name, method, args): - print("Superclass method not overridden") - - -# Transport mechanisms, derived from mbed -class SerialRPC(mbed): - def __init__(self, port, baud): - self.ser = serial.Serial(port) - self.ser.setBaudrate(baud) - - def rpc(self, name, method, args): - # creates the command to be sent serially - /name/method arg1 arg2 arg3 ... argN - str = "/" + name + "/" + method + " " + " ".join(args) + "\n" - # prints the command being executed - print(str) - # writes the command to serial - self.ser.write(str) - # strips trailing characters from the line just written - ret_val = self.ser.readline().strip() - return ret_val - - -class HTTPRPC(mbed): - def __init__(self, ip): - self.host = "http://" + ip - - def rpc(self, name, method, args): - response = urllib.request.urlopen(self.host + "/rpc/" + name + "/" + method + "%20" + "%20".join(args)) - return response.read().strip() - - -# generic mbed interface super class -class mbed_interface(object): - # initialize an mbed interface with a transport mechanism and pin name - def __init__(self, this_mbed, mpin): - self.mbed = this_mbed - if isinstance(mpin, str): - self.name = mpin - - def __del__(self): - r = self.mbed.rpc(self.name, "delete", []) - - def new(self, class_name, name, pin1, pin2 = "", pin3 = ""): - args = [arg for arg in [pin1,pin2,pin3,name] if arg != ""] - r = self.mbed.rpc(class_name, "new", args) - - # generic read - def read(self): - r = self.mbed.rpc(self.name, "read", []) - return int(r) - - -# for classes that need write functionality - inherits from the generic reading interface -class mbed_interface_write(mbed_interface): - def __init__(self, this_mbed, mpin): - mbed_interface.__init__(self, this_mbed, mpin) - - # generic write - def write(self, value): - r = self.mbed.rpc(self.name, "write", [str(value)]) - - -# mbed interfaces -class DigitalOut(mbed_interface_write): - def __init__(self, this_mbed, mpin): - mbed_interface_write.__init__(self, this_mbed, mpin) - - -class AnalogIn(mbed_interface): - def __init__(self, this_mbed, mpin): - mbed_interface.__init__(self, this_mbed, mpin) - - def read_u16(self): - r = self.mbed.rpc(self.name, "read_u16", []) - return int(r) - - -class AnalogOut(mbed_interface_write): - def __init__(self, this_mbed, mpin): - mbed_interface_write.__init__(self, this_mbed, mpin) - - def write_u16(self, value): - self.mbed.rpc(self.name, "write_u16", [str(value)]) - - def read(self): - r = self.mbed.rpc(self.name, "read", []) - return float(r) - - -class DigitalIn(mbed_interface): - def __init__(self, this_mbed, mpin): - mbed_interface.__init__(self, this_mbed, mpin) - - -class PwmOut(mbed_interface_write): - def __init__(self, this_mbed, mpin): - mbed_interface_write.__init__(self, this_mbed, mpin) - - def read(self): - r = self.mbed.rpc(self.name, "read", []) - return r - - def period(self, value): - self.mbed.rpc(self.name, "period", [str(value)]) - - def period_ms(self, value): - self.mbed.rpc(self.name, "period_ms", [str(value)]) - - def period_us(self, value): - self.mbed.rpc(self.name, "period_us", [str(value)]) - - def pulsewidth(self, value): - self.mbed.rpc(self.name, "pulsewidth", [str(value)]) - - def pulsewidth_ms(self, value): - self.mbed.rpc(self.name, "pulsewidth_ms", [str(value)]) - - def pulsewidth_us(self, value): - self.mbed.rpc(self.name, "pulsewidth_us", [str(value)]) - - -class RPCFunction(mbed_interface): - def __init__(self, this_mbed, name): - mbed_interface.__init__(self, this_mbed, name) - - def run(self, input): - r = self.mbed.rpc(self.name, "run", [input]) - return r - - -class RPCVariable(mbed_interface_write): - def __init__(self, this_mbed, name): - mbed_interface_write.__init__(self, this_mbed, name) - - def read(self): - r = self.mbed.rpc(self.name, "read", []) - return r - -class Timer(mbed_interface): - def __init__(self, this_mbed, name): - mbed_interface.__init__(self, this_mbed, name) - - def start(self): - r = self.mbed.rpc(self.name, "start", []) - - def stop(self): - r = self.mbed.rpc(self.name, "stop", []) - - def reset(self): - r = self.mbed.rpc(self.name, "reset", []) - - def read(self): - r = self.mbed.rpc(self.name, "read", []) - return float(re.search('\d+\.*\d*', r).group(0)) - - def read_ms(self): - r = self.mbed.rpc(self.name, "read_ms", []) - return float(re.search('\d+\.*\d*', r).group(0)) - - def read_us(self): - r = self.mbed.rpc(self.name, "read_us", []) - return float(re.search('\d+\.*\d*', r).group(0)) - -# Serial -class Serial(object): - def __init__(self, this_mbed, tx, rx=""): - self.mbed = this_mbed - if isinstance(tx, str): - self.name = tx - - def __del__(self): - r = self.mbed.rpc(self.name, "delete", []) - - def baud(self, value): - r = self.mbed.rpc(self.name, "baud", [str(value)]) - - def putc(self, value): - r = self.mbed.rpc(self.name, "putc", [str(value)]) - - def puts(self, value): - r = self.mbed.rpc(self.name, "puts", ["\"" + str(value) + "\""]) - - def getc(self): - r = self.mbed.rpc(self.name, "getc", []) - return int(r) - - -def wait(s): - time.sleep(s) diff --git a/tools/host_tests/midi.py b/tools/host_tests/midi.py deleted file mode 100644 index 3df89070b25..00000000000 --- a/tools/host_tests/midi.py +++ /dev/null @@ -1,90 +0,0 @@ -""" -Copyright (c) 2016 ARM Limited. All rights reserved. - -SPDX-License-Identifier: Apache-2.0 - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" - -from __future__ import print_function -import sys -import re -import time -import mido -from mido import Message - - -def test_midi_in(port): - expected_messages_count=0 - while expected_messages_count < 7: - for message in port.iter_pending(): - if message.type in ('note_on', 'note_off', 'program_change', 'sysex'): - yield message - expected_messages_count+=1 - time.sleep(0.1) - -def test_midi_loopback(input_port): - expected_messages_count=0 - while expected_messages_count < 1: - for message in input_port.iter_pending(): - print('Test MIDI OUT loopback received {}'.format(message.hex())) - expected_messages_count+=1 - -def test_midi_out_loopback(output_port,input_port): - print("Test MIDI OUT loopback") - output_port.send(Message('program_change', program=1)) - test_midi_loopback(input_port) - - output_port.send(Message('note_on', note=21)) - test_midi_loopback(input_port) - - output_port.send(Message('note_off', note=21)) - test_midi_loopback(input_port) - - output_port.send(Message('sysex', data=[0x7E,0x7F,0x09,0x01])) - test_midi_loopback(input_port) - - output_port.send(Message('sysex', data=[0x7F,0x7F,0x04,0x01,0x7F,0x7F])) - test_midi_loopback(input_port) - - output_port.send(Message('sysex', data=[0x41,0x10,0x42,0x12,0x40,0x00,0x7F,0x00,0x41])) - test_midi_loopback(input_port) - - output_port.send(Message('sysex', data=[0x41,0x10,0x42,0x12,0x40,0x00,0x04,0x7F,0x3D])) - test_midi_loopback(input_port) - -portname="" - -while portname=="": - print("Wait for MIDI IN plug ...") - for name in mido.get_input_names(): - matchObj = re.match( r'Mbed', name) - - if matchObj: - portname=name - time.sleep( 1 ) - -try: - input_port = mido.open_input(portname) - output_port = mido.open_output(portname) - - print('Using {}'.format(input_port)) - - print("Test MIDI IN") - - for message in test_midi_in(input_port): - print('Test MIDI IN received {}'.format(message.hex())) - - test_midi_out_loopback(output_port,input_port) -except KeyboardInterrupt: - pass diff --git a/tools/host_tests/net_test.py b/tools/host_tests/net_test.py deleted file mode 100644 index 325b7ef9564..00000000000 --- a/tools/host_tests/net_test.py +++ /dev/null @@ -1,28 +0,0 @@ -""" -mbed SDK -Copyright (c) 2011-2013 ARM Limited -SPDX-License-Identifier: Apache-2.0 - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" -from .host_test import Test, Simple -from sys import stdout - -class NETTest(Simple): - def __init__(self): - Test.__init__(self) - self.mbed.init_serial(115200) - self.mbed.reset() - -if __name__ == '__main__': - NETTest().run() diff --git a/tools/host_tests/rpc.py b/tools/host_tests/rpc.py deleted file mode 100644 index baaf453e5c0..00000000000 --- a/tools/host_tests/rpc.py +++ /dev/null @@ -1,57 +0,0 @@ -""" -mbed SDK -Copyright (c) 2011-2013 ARM Limited -SPDX-License-Identifier: Apache-2.0 - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" -from .host_test import Test -from .mbedrpc import SerialRPC, DigitalOut, DigitalIn, pin - - -class RpcTest(Test): - def test(self): - self.notify("RPC Test") - s = SerialRPC(self.mbed.port, debug=True) - - self.notify("Init remote objects") - - p_out = pin("p10") - p_in = pin("p11") - - if hasattr(self.mbed.options, 'micro'): - if self.mbed.options.micro == 'M0+': - print("Freedom Board: PTA12 <-> PTC4") - p_out = pin("PTA12") - p_in = pin("PTC4") - - self.output = DigitalOut(s, p_out); - self.input = DigitalIn(s, p_in); - - self.check = True - self.write_read_test(1) - self.write_read_test(0) - return self.check - - def write_read_test(self, v): - self.notify("Check %d" % v) - self.output.write(v) - if self.input.read() != v: - self.notify("ERROR") - self.check = False - else: - self.notify("OK") - - -if __name__ == '__main__': - RpcTest().run() diff --git a/tools/host_tests/rtc_auto.py b/tools/host_tests/rtc_auto.py deleted file mode 100644 index 3f9ded4bae6..00000000000 --- a/tools/host_tests/rtc_auto.py +++ /dev/null @@ -1,51 +0,0 @@ -""" -mbed SDK -Copyright (c) 2011-2013 ARM Limited -SPDX-License-Identifier: Apache-2.0 - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" - -import re -from time import time, strftime, gmtime - -class RTCTest(object): - PATTERN_RTC_VALUE = "\[(\d+)\] \[(\d+-\d+-\d+ \d+:\d+:\d+ [AaPpMm]{2})\]" - re_detect_rtc_value = re.compile(PATTERN_RTC_VALUE) - - def test(self, selftest): - test_result = True - start = time() - sec_prev = 0 - for i in range(0, 5): - # Timeout changed from default: we need to wait longer for some boards to start-up - c = selftest.mbed.serial_readline(timeout=10) - if c is None: - return selftest.RESULT_IO_SERIAL - selftest.notify(c.strip()) - delta = time() - start - m = self.re_detect_rtc_value.search(c) - if m and len(m.groups()): - sec = int(m.groups()[0]) - time_str = m.groups()[1] - correct_time_str = strftime("%Y-%m-%d %H:%M:%S %p", gmtime(float(sec))) - single_result = time_str == correct_time_str and sec > 0 and sec > sec_prev - test_result = test_result and single_result - result_msg = "OK" if single_result else "FAIL" - selftest.notify("HOST: [%s] [%s] received time %+d sec after %.2f sec... %s"% (sec, time_str, sec - sec_prev, delta, result_msg)) - sec_prev = sec - else: - test_result = False - break - start = time() - return selftest.RESULT_SUCCESS if test_result else selftest.RESULT_FAILURE diff --git a/tools/host_tests/serial_complete_auto.py b/tools/host_tests/serial_complete_auto.py deleted file mode 100644 index 9c6b3297f4e..00000000000 --- a/tools/host_tests/serial_complete_auto.py +++ /dev/null @@ -1,44 +0,0 @@ -""" -mbed SDK -Copyright (c) 2011-2013 ARM Limited -SPDX-License-Identifier: Apache-2.0 - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" - -import sys -import uuid -import time -import string -from sys import stdout - -class SerialCompleteTest(object): - - def test(self, selftest): - strip_chars = string.whitespace + "\0" - out_str = selftest.mbed.serial_readline() - selftest.notify("HOST: " + out_str) - - if not out_str: - selftest.notify("HOST: No output detected") - return selftest.RESULT_IO_SERIAL - - out_str_stripped = out_str.strip(strip_chars) - - if out_str_stripped != "123456789": - selftest.notify("HOST: Unexpected output. '123456789' Expected. but received '%s'" % out_str_stripped) - return selftest.RESULT_FAILURE - - else: - return selftest.RESULT_SUCCESS - diff --git a/tools/host_tests/serial_nc_rx_auto.py b/tools/host_tests/serial_nc_rx_auto.py deleted file mode 100644 index 2976ea7a3a2..00000000000 --- a/tools/host_tests/serial_nc_rx_auto.py +++ /dev/null @@ -1,88 +0,0 @@ -""" -mbed SDK -Copyright (c) 2011-2013 ARM Limited -SPDX-License-Identifier: Apache-2.0 - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" - -import sys -import uuid -import time -import string -from sys import stdout - -class SerialNCRXTest(object): - - def test(self, selftest): - selftest.mbed.flush(); - # Wait 0.5 seconds to ensure mbed is listening - time.sleep(0.5) - - #handshake with target to sync test start - selftest.mbed.serial_write("S"); - - strip_chars = string.whitespace + "\0" - - out_str = selftest.mbed.serial_readline() - - if not out_str: - selftest.notify("HOST: No output detected") - return selftest.RESULT_IO_SERIAL - - out_str_stripped = out_str.strip(strip_chars) - - if out_str_stripped != "RX OK - Start NC test": - selftest.notify("HOST: Unexpected output. Expected 'RX OK - Expected' but received '%s'" % out_str_stripped) - return selftest.RESULT_FAILURE - - # Wait 0.5 seconds to ensure mbed is listening - time.sleep(0.5) - - selftest.mbed.serial_write("E"); - - strip_chars = string.whitespace + "\0" - - out_str = selftest.mbed.serial_readline() - - if not out_str: - selftest.notify("HOST: No output detected") - return selftest.RESULT_IO_SERIAL - - out_str_stripped = out_str.strip(strip_chars) - - if out_str_stripped != "RX OK - Expected": - selftest.notify("HOST: Unexpected output. Expected 'RX OK - Expected' but received '%s'" % out_str_stripped) - return selftest.RESULT_FAILURE - - # Wait 0.5 seconds to ensure mbed is listening - time.sleep(0.5) - - # Send character, mbed shouldn't receive - selftest.mbed.serial_write("U"); - - out_str = selftest.mbed.serial_readline() - - # If no characters received, pass the test - if not out_str: - selftest.notify("HOST: No further output detected") - return selftest.RESULT_SUCCESS - else: - out_str_stripped = out_str.strip(strip_chars) - - if out_str_stripped == "RX OK - Unexpected": - selftest.notify("HOST: Unexpected output returned indicating RX still functioning") - else: - selftest.notify("HOST: Extraneous output '%s' detected indicating unknown error" % out_str_stripped) - - return selftest.RESULT_FAILURE diff --git a/tools/host_tests/serial_nc_tx_auto.py b/tools/host_tests/serial_nc_tx_auto.py deleted file mode 100644 index 29a9a6b795a..00000000000 --- a/tools/host_tests/serial_nc_tx_auto.py +++ /dev/null @@ -1,63 +0,0 @@ -""" -mbed SDK -Copyright (c) 2011-2013 ARM Limited -SPDX-License-Identifier: Apache-2.0 - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" - -import sys -import uuid -import time -import string -from sys import stdout - -class SerialNCTXTest(object): - - def test(self, selftest): - selftest.mbed.flush(); - # Wait 0.5 seconds to ensure mbed is listening - time.sleep(0.5) - - selftest.mbed.serial_write("S"); - - strip_chars = string.whitespace + "\0" - - out_str = selftest.mbed.serial_readline() - selftest.notify("HOST: " + out_str) - - if not out_str: - selftest.notify("HOST: No output detected") - return selftest.RESULT_IO_SERIAL - - out_str_stripped = out_str.strip(strip_chars) - - if out_str_stripped != "TX OK - Expected": - selftest.notify("HOST: Unexpected output. Expected 'TX OK - Expected' but received '%s'" % out_str_stripped) - return selftest.RESULT_FAILURE - - out_str = selftest.mbed.serial_readline() - - # If no characters received, pass the test - if not out_str: - selftest.notify("HOST: No further output detected") - return selftest.RESULT_SUCCESS - else: - out_str_stripped = out_str.strip(strip_chars) - - if out_str_stripped == "TX OK - Unexpected": - selftest.notify("HOST: Unexpected output returned indicating TX still functioning") - else: - selftest.notify("HOST: Extraneous output '%s' detected indicating unknown error" % out_str_stripped) - - return selftest.RESULT_FAILURE diff --git a/tools/host_tests/stdio_auto.py b/tools/host_tests/stdio_auto.py deleted file mode 100644 index fd0560f6614..00000000000 --- a/tools/host_tests/stdio_auto.py +++ /dev/null @@ -1,57 +0,0 @@ -""" -mbed SDK -Copyright (c) 2011-2013 ARM Limited -SPDX-License-Identifier: Apache-2.0 - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" - -import re -import random -from time import time - -class StdioTest(object): - PATTERN_INT_VALUE = "Your value was: (-?\d+)" - re_detect_int_value = re.compile(PATTERN_INT_VALUE) - - def test(self, selftest): - test_result = True - - c = selftest.mbed.serial_readline() # {{start}} preamble - if c is None: - return selftest.RESULT_IO_SERIAL - selftest.notify(c) - - for i in range(0, 10): - random_integer = random.randint(-99999, 99999) - selftest.notify("HOST: Generated number: " + str(random_integer)) - start = time() - selftest.mbed.serial_write(str(random_integer) + "\n") - - serial_stdio_msg = selftest.mbed.serial_readline() - if serial_stdio_msg is None: - return selftest.RESULT_IO_SERIAL - delay_time = time() - start - selftest.notify(serial_stdio_msg.strip()) - - # Searching for reply with scanned values - m = self.re_detect_int_value.search(serial_stdio_msg) - if m and len(m.groups()): - int_value = m.groups()[0] - int_value_cmp = random_integer == int(int_value) - test_result = test_result and int_value_cmp - selftest.notify("HOST: Number %s read after %.3f sec ... [%s]"% (int_value, delay_time, "OK" if int_value_cmp else "FAIL")) - else: - test_result = False - break - return selftest.RESULT_SUCCESS if test_result else selftest.RESULT_FAILURE diff --git a/tools/host_tests/tcpecho_client.py b/tools/host_tests/tcpecho_client.py deleted file mode 100644 index dd94076701e..00000000000 --- a/tools/host_tests/tcpecho_client.py +++ /dev/null @@ -1,58 +0,0 @@ -""" -mbed SDK -Copyright (c) 2011-2013 ARM Limited -SPDX-License-Identifier: Apache-2.0 - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" -import socket -import string, random -from time import time - -from mbed_settings import SERVER_ADDRESS - -ECHO_PORT = 7 - -LEN_PACKET = 127 -N_PACKETS = 5000 -TOT_BITS = float(LEN_PACKET * N_PACKETS * 8) * 2 -MEGA = float(1024 * 1024) -UPDATE_STEP = (N_PACKETS // 10) - -class TCP_EchoClient(object): - def __init__(self, host): - self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.s.connect((host, ECHO_PORT)) - self.packet = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(LEN_PACKET)) - - def __packet(self): - # Comment out the checks when measuring the throughput - # self.packet = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(LEN_PACKET)) - self.s.send(self.packet) - data = self.s.recv(LEN_PACKET) - # assert self.packet == data, "packet error:\n%s\n%s\n" % (self.packet, data) - - def test(self): - start = time() - for i in range(N_PACKETS): - if (i % UPDATE_STEP) == 0: print('%.2f%%' % ((float(i)/float(N_PACKETS)) * 100.)) - self.__packet() - t = time() - start - print('Throughput: (%.2f)Mbits/s' % ((TOT_BITS / t)/MEGA)) - - def __del__(self): - self.s.close() - -while True: - e = TCP_EchoClient(SERVER_ADDRESS) - e.test() diff --git a/tools/host_tests/tcpecho_client_auto.py b/tools/host_tests/tcpecho_client_auto.py deleted file mode 100644 index 072970b438c..00000000000 --- a/tools/host_tests/tcpecho_client_auto.py +++ /dev/null @@ -1,93 +0,0 @@ -""" -mbed SDK -Copyright (c) 2011-2013 ARM Limited -SPDX-License-Identifier: Apache-2.0 - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" -from __future__ import print_function - -import sys -import socket -from sys import stdout -try: - from SocketServer import BaseRequestHandler, TCPServer -except ImportError: - from socketserver import BaseRequestHandler, TCPServer - -class TCPEchoClient_Handler(BaseRequestHandler): - def handle(self): - """ One handle per connection - """ - print("HOST: Connection received...") - count = 1; - while True: - data = self.request.recv(1024) - if not data: break - self.request.sendall(data) - if '{{end}}' in str(data): - print() - print(str(data)) - else: - if not count % 10: - sys.stdout.write('.') - count += 1 - stdout.flush() - -class TCPEchoClientTest(object): - def send_server_ip_port(self, selftest, ip_address, port_no): - """ Set up network host. Reset target and and send server IP via serial to Mbed - """ - c = selftest.mbed.serial_readline() # 'TCPCllient waiting for server IP and port...' - if c is None: - self.print_result(selftest.RESULT_IO_SERIAL) - return - - selftest.notify(c.strip()) - selftest.notify("HOST: Sending server IP Address to target...") - - connection_str = ip_address + ":" + str(port_no) + "\n" - selftest.mbed.serial_write(connection_str) - selftest.notify(connection_str) - - # Two more strings about connection should be sent by MBED - for i in range(0, 2): - c = selftest.mbed.serial_readline() - if c is None: - selftest.print_result(self.RESULT_IO_SERIAL) - return - selftest.notify(c.strip()) - - def test(self, selftest): - # We need to discover SERVEP_IP and set up SERVER_PORT - # Note: Port 7 is Echo Protocol: - # - # Port number rationale: - # - # The Echo Protocol is a service in the Internet Protocol Suite defined - # in RFC 862. It was originally proposed for testing and measurement - # of round-trip times[citation needed] in IP networks. - # - # A host may connect to a server that supports the Echo Protocol using - # the Transmission Control Protocol (TCP) or the User Datagram Protocol - # (UDP) on the well-known port number 7. The server sends back an - # identical copy of the data it received. - SERVER_IP = str(socket.gethostbyname(socket.getfqdn())) - SERVER_PORT = 7 - - # Returning none will suppress host test from printing success code - server = TCPServer((SERVER_IP, SERVER_PORT), TCPEchoClient_Handler) - print("HOST: Listening for TCP connections: %s:%s" % - (SERVER_IP, str(SERVER_PORT))) - self.send_server_ip_port(selftest, SERVER_IP, SERVER_PORT) - server.serve_forever() diff --git a/tools/host_tests/tcpecho_server.py b/tools/host_tests/tcpecho_server.py deleted file mode 100644 index 65516bfb611..00000000000 --- a/tools/host_tests/tcpecho_server.py +++ /dev/null @@ -1,54 +0,0 @@ -""" -mbed SDK -Copyright (c) 2011-2013 ARM Limited -SPDX-License-Identifier: Apache-2.0 - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" -try: - from SocketServer import BaseRequestHandler, TCPServer -except ImportError: - from socketserver import BaseRequestHandler, TCPServer -from time import time - -from mbed_settings import LOCALHOST - -MAX_INDEX = 126 -MEGA = float(1024 * 1024) - -class TCP_EchoHandler(BaseRequestHandler): - def handle(self): - print("\nconnection received") - start = time() - bytes = 0 - index = 0 - while True: - data = self.request.recv(1024) - if not data: break - - bytes += len(data) - for n in map(ord, data): - if n != index: - print("data error %d != %d" % (n , index)) - index += 1 - if index > MAX_INDEX: - index = 0 - - self.request.sendall(data) - t = time() - start - b = float(bytes * 8) * 2 - print("Throughput: (%.2f)Mbits/s" % ((b/t)/MEGA)) - -server = TCPServer((LOCALHOST, 7), TCP_EchoHandler) -print("listening for connections") -server.serve_forever() diff --git a/tools/host_tests/tcpecho_server_auto.py b/tools/host_tests/tcpecho_server_auto.py deleted file mode 100644 index 035e1cc4a70..00000000000 --- a/tools/host_tests/tcpecho_server_auto.py +++ /dev/null @@ -1,86 +0,0 @@ -""" -mbed SDK -Copyright (c) 2011-2013 ARM Limited -SPDX-License-Identifier: Apache-2.0 - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" -from __future__ import print_function - -import re -import sys -import uuid -import socket -from sys import stdout - -class TCPEchoServerTest(object): - ECHO_SERVER_ADDRESS = "" - ECHO_PORT = 0 - ECHO_LOOPs = 100 - s = None # Socket - - PATTERN_SERVER_IP = "Server IP Address is (\d+).(\d+).(\d+).(\d+):(\d+)" - re_detect_server_ip = re.compile(PATTERN_SERVER_IP) - - def test(self, selftest): - result = False - c = selftest.mbed.serial_readline() - if c is None: - return selftest.RESULT_IO_SERIAL - selftest.notify(c) - - m = self.re_detect_server_ip.search(c) - if m and len(m.groups()): - self.ECHO_SERVER_ADDRESS = ".".join(m.groups()[:4]) - self.ECHO_PORT = int(m.groups()[4]) # must be integer for socket.connect method - selftest.notify("HOST: TCP Server found at: " + self.ECHO_SERVER_ADDRESS + ":" + str(self.ECHO_PORT)) - - # We assume this test fails so can't send 'error' message to server - try: - self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.s.connect((self.ECHO_SERVER_ADDRESS, self.ECHO_PORT)) - except Exception as e: - self.s = None - selftest.notify("HOST: Socket error: %s"% e) - return selftest.RESULT_ERROR - - print('HOST: Sending %d echo strings...'% self.ECHO_LOOPs,) - for i in range(0, self.ECHO_LOOPs): - TEST_STRING = str(uuid.uuid4()) - try: - self.s.sendall(TEST_STRING) - data = self.s.recv(128) - except Exception as e: - self.s = None - selftest.notify("HOST: Socket error: %s"% e) - return selftest.RESULT_ERROR - - received_str = repr(data)[1:-1] - if TEST_STRING == received_str: # We need to cut not needed single quotes from the string - sys.stdout.write('.') - stdout.flush() - result = True - else: - print("Expected: ") - print("'%s'"% TEST_STRING) - print("received: ") - print("'%s'"% received_str) - result = False - break - - if self.s is not None: - self.s.close() - else: - selftest.notify("HOST: TCP Server not found") - result = False - return selftest.RESULT_SUCCESS if result else selftest.RESULT_FAILURE diff --git a/tools/host_tests/tcpecho_server_loop.py b/tools/host_tests/tcpecho_server_loop.py deleted file mode 100644 index a388b33eb2f..00000000000 --- a/tools/host_tests/tcpecho_server_loop.py +++ /dev/null @@ -1,45 +0,0 @@ -""" -mbed SDK -Copyright (c) 2011-2013 ARM Limited -SPDX-License-Identifier: Apache-2.0 - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" -from __future__ import print_function -# Be sure that the tools directory is in the search path -import sys -from os.path import join, abspath, dirname -ROOT = abspath(join(dirname(__file__), "..", "..")) -sys.path.insert(0, ROOT) - -from mbed_settings import LOCALHOST -try: - from SocketServer import BaseRequestHandler, TCPServer -except ImportError: - from socketserver import BaseRequestHandler, TCPServer - - -class TCP_EchoHandler(BaseRequestHandler): - def handle(self): - print("\nHandle connection from:", self.client_address) - while True: - data = self.request.recv(1024) - if not data: break - self.request.sendall(data) - self.request.close() - print("socket closed") - -if __name__ == '__main__': - server = TCPServer((LOCALHOST, 7), TCP_EchoHandler) - print("listening for connections on:", (LOCALHOST, 7)) - server.serve_forever() diff --git a/tools/host_tests/udp_link_layer_auto.py b/tools/host_tests/udp_link_layer_auto.py deleted file mode 100644 index 52b58cc4bcc..00000000000 --- a/tools/host_tests/udp_link_layer_auto.py +++ /dev/null @@ -1,155 +0,0 @@ -""" -mbed SDK -Copyright (c) 2011-2013 ARM Limited -SPDX-License-Identifier: Apache-2.0 - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" - -""" -How to use: -make.py -m LPC1768 -t ARM -d E:\ -n NET_14 -udp_link_layer_auto.py -p COM20 -d E:\ -t 10 -""" - -import re -import uuid -import socket -try: - # Python 3 - import _thread as thread -except ImportError: - # Python 2 - import thread -from sys import stdout -from time import time, sleep -from .host_test import DefaultTest - -try: - from SocketServer import BaseRequestHandler, UDPServer -except ImportError: - from socketserver import BaseRequestHandler, UDPServer - - -# Received datagrams (with time) -dict_udp_recv_datagrams = dict() - -# Sent datagrams (with time) -dict_udp_sent_datagrams = dict() - - -class UDPEchoClient_Handler(BaseRequestHandler): - def handle(self): - """ One handle per connection - """ - _data, _socket = self.request - # Process received datagram - data_str = repr(_data)[1:-1] - dict_udp_recv_datagrams[data_str] = time() - - -def udp_packet_recv(threadName, server_ip, server_port): - """ This function will receive packet stream from mbed device - """ - server = UDPServer((server_ip, server_port), UDPEchoClient_Handler) - print("[UDP_COUNTER] Listening for connections... %s:%d"% (server_ip, server_port)) - server.serve_forever() - - -class UDPEchoServerTest(DefaultTest): - ECHO_SERVER_ADDRESS = "" # UDP IP of datagram bursts - ECHO_PORT = 0 # UDP port for datagram bursts - CONTROL_PORT = 23 # TCP port used to get stats from mbed device, e.g. counters - s = None # Socket - - TEST_PACKET_COUNT = 1000 # how many packets should be send - TEST_STRESS_FACTOR = 0.001 # stress factor: 10 ms - PACKET_SATURATION_RATIO = 29.9 # Acceptable packet transmission in % - - PATTERN_SERVER_IP = "Server IP Address is (\d+).(\d+).(\d+).(\d+):(\d+)" - re_detect_server_ip = re.compile(PATTERN_SERVER_IP) - - def get_control_data(self, command="stat\n"): - BUFFER_SIZE = 256 - try: - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.connect((self.ECHO_SERVER_ADDRESS, self.CONTROL_PORT)) - except Exception as e: - data = None - s.send(command) - data = s.recv(BUFFER_SIZE) - s.close() - return data - - def test(self): - serial_ip_msg = self.mbed.serial_readline() - if serial_ip_msg is None: - return self.RESULT_IO_SERIAL - stdout.write(serial_ip_msg) - stdout.flush() - # Searching for IP address and port prompted by server - m = self.re_detect_server_ip.search(serial_ip_msg) - if m and len(m.groups()): - self.ECHO_SERVER_ADDRESS = ".".join(m.groups()[:4]) - self.ECHO_PORT = int(m.groups()[4]) # must be integer for socket.connect method - self.notify("HOST: UDP Server found at: " + self.ECHO_SERVER_ADDRESS + ":" + str(self.ECHO_PORT)) - - # Open client socket to burst datagrams to UDP server in mbed - try: - self.s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - except Exception as e: - self.s = None - self.notify("HOST: Error: %s"% e) - return self.RESULT_ERROR - - # UDP replied receiver works in background to get echoed datagrams - SERVER_IP = str(socket.gethostbyname(socket.getfqdn())) - SERVER_PORT = self.ECHO_PORT + 1 - thread.start_new_thread(udp_packet_recv, ("Thread-udp-recv", SERVER_IP, SERVER_PORT)) - sleep(0.5) - - # Burst part - for no in range(self.TEST_PACKET_COUNT): - TEST_STRING = str(uuid.uuid4()) - payload = str(no) + "__" + TEST_STRING - self.s.sendto(payload, (self.ECHO_SERVER_ADDRESS, self.ECHO_PORT)) - dict_udp_sent_datagrams[payload] = time() - sleep(self.TEST_STRESS_FACTOR) - - if self.s is not None: - self.s.close() - - # Wait 5 seconds for packets to come - result = True - self.notify("HOST: Test Summary:") - for d in range(5): - sleep(1.0) - summary_datagram_success = (float(len(dict_udp_recv_datagrams)) / float(self.TEST_PACKET_COUNT)) * 100.0 - self.notify("HOST: Datagrams received after +%d sec: %.3f%% (%d / %d), stress=%.3f ms"% (d, - summary_datagram_success, - len(dict_udp_recv_datagrams), - self.TEST_PACKET_COUNT, - self.TEST_STRESS_FACTOR)) - result = result and (summary_datagram_success >= self.PACKET_SATURATION_RATIO) - stdout.flush() - - # Getting control data from test - self.notify("...") - self.notify("HOST: Mbed Summary:") - mbed_stats = self.get_control_data() - self.notify(mbed_stats) - return self.RESULT_SUCCESS if result else self.RESULT_FAILURE - - -if __name__ == '__main__': - UDPEchoServerTest().run() diff --git a/tools/host_tests/udpecho_client.py b/tools/host_tests/udpecho_client.py deleted file mode 100644 index 6e4fa729718..00000000000 --- a/tools/host_tests/udpecho_client.py +++ /dev/null @@ -1,56 +0,0 @@ -""" -mbed SDK -Copyright (c) 2011-2013 ARM Limited -SPDX-License-Identifier: Apache-2.0 - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" -from socket import socket, AF_INET, SOCK_DGRAM -import string, random -from time import time - -from mbed_settings import CLIENT_ADDRESS - -ECHO_PORT = 7 - -LEN_PACKET = 127 -N_PACKETS = 5000 -TOT_BITS = float(LEN_PACKET * N_PACKETS * 8) * 2 -MEGA = float(1024 * 1024) -UPDATE_STEP = (N_PACKETS // 10) - -class UDP_EchoClient(object): - s = socket(AF_INET, SOCK_DGRAM) - - def __init__(self, host): - self.host = host - self.packet = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(LEN_PACKET)) - - def __packet(self): - # Comment out the checks when measuring the throughput - # packet = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(LEN_PACKET)) - UDP_EchoClient.s.sendto(packet, (self.host, ECHO_PORT)) - data = UDP_EchoClient.s.recv(LEN_PACKET) - # assert packet == data, "packet error:\n%s\n%s\n" % (packet, data) - - def test(self): - start = time() - for i in range(N_PACKETS): - if (i % UPDATE_STEP) == 0: print('%.2f%%' % ((float(i)/float(N_PACKETS)) * 100.)) - self.__packet() - t = time() - start - print('Throughput: (%.2f)Mbits/s' % ((TOT_BITS / t)/MEGA)) - -while True: - e = UDP_EchoClient(CLIENT_ADDRESS) - e.test() diff --git a/tools/host_tests/udpecho_client_auto.py b/tools/host_tests/udpecho_client_auto.py deleted file mode 100644 index 3eeb37a2a69..00000000000 --- a/tools/host_tests/udpecho_client_auto.py +++ /dev/null @@ -1,81 +0,0 @@ -""" -mbed SDK -Copyright (c) 2011-2013 ARM Limited -SPDX-License-Identifier: Apache-2.0 - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" -from __future__ import print_function - -import sys -import socket -from sys import stdout -try: - from SocketServer import BaseRequestHandler, UDPServer -except ImportError: - from socketserver import BaseRequestHandler, UDPServer - -class UDPEchoClient_Handler(BaseRequestHandler): - def handle(self): - """ One handle per connection - """ - data, socket = self.request - socket.sendto(data, self.client_address) - if '{{end}}' in data: - print("\n%s" % data) - else: - sys.stdout.write('.') - stdout.flush() - -class UDPEchoClientTest(object): - - def send_server_ip_port(self, selftest, ip_address, port_no): - c = selftest.mbed.serial_readline() # 'UDPCllient waiting for server IP and port...' - if c is None: - selftest.print_result(selftest.RESULT_IO_SERIAL) - return - selftest.notify(c.strip()) - - selftest.notify("HOST: Sending server IP Address to target...") - connection_str = ip_address + ":" + str(port_no) + "\n" - selftest.mbed.serial_write(connection_str) - - c = selftest.mbed.serial_readline() # 'UDPCllient waiting for server IP and port...' - if c is None: - self.print_result(selftest.RESULT_IO_SERIAL) - return - selftest.notify(c.strip()) - return selftest.RESULT_PASSIVE - - def test(self, selftest): - # We need to discover SERVEP_IP and set up SERVER_PORT - # Note: Port 7 is Echo Protocol: - # - # Port number rationale: - # - # The Echo Protocol is a service in the Internet Protocol Suite defined - # in RFC 862. It was originally proposed for testing and measurement - # of round-trip times[citation needed] in IP networks. - # - # A host may connect to a server that supports the Echo Protocol using - # the Transmission Control Protocol (TCP) or the User Datagram Protocol - # (UDP) on the well-known port number 7. The server sends back an - # identical copy of the data it received. - SERVER_IP = str(socket.gethostbyname(socket.getfqdn())) - SERVER_PORT = 7 - - # Returning none will suppress host test from printing success code - server = UDPServer((SERVER_IP, SERVER_PORT), UDPEchoClient_Handler) - print("HOST: Listening for UDP connections...") - self.send_server_ip_port(selftest, SERVER_IP, SERVER_PORT) - server.serve_forever() diff --git a/tools/host_tests/udpecho_server.py b/tools/host_tests/udpecho_server.py deleted file mode 100644 index eb00de13452..00000000000 --- a/tools/host_tests/udpecho_server.py +++ /dev/null @@ -1,34 +0,0 @@ -""" -mbed SDK -Copyright (c) 2011-2013 ARM Limited -SPDX-License-Identifier: Apache-2.0 - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" -from __future__ import print_function -try: - from SocketServer import BaseRequestHandler, UDPServer -except ImportError: - from socketserver import BaseRequestHandler, UDPServer -from mbed_settings import SERVER_ADDRESS - -class UDP_EchoHandler(BaseRequestHandler): - def handle(self): - data, socket = self.request - print("client:", self.client_address) - print("data:", data) - socket.sendto(data, self.client_address) - -server = UDPServer((SERVER_ADDRESS, 7195), UDP_EchoHandler) -print("listening for connections") -server.serve_forever() diff --git a/tools/host_tests/udpecho_server_auto.py b/tools/host_tests/udpecho_server_auto.py deleted file mode 100644 index 10bbc88dde0..00000000000 --- a/tools/host_tests/udpecho_server_auto.py +++ /dev/null @@ -1,70 +0,0 @@ -""" -mbed SDK -Copyright (c) 2011-2013 ARM Limited -SPDX-License-Identifier: Apache-2.0 - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" -from __future__ import print_function - -import re -import sys -import uuid -from sys import stdout -from socket import socket, AF_INET, SOCK_DGRAM - -class UDPEchoServerTest(object): - ECHO_SERVER_ADDRESS = "" - ECHO_PORT = 0 - s = None # Socket - - PATTERN_SERVER_IP = "Server IP Address is (\d+).(\d+).(\d+).(\d+):(\d+)" - re_detect_server_ip = re.compile(PATTERN_SERVER_IP) - - def test(self, selftest): - result = True - serial_ip_msg = selftest.mbed.serial_readline() - if serial_ip_msg is None: - return selftest.RESULT_IO_SERIAL - selftest.notify(serial_ip_msg) - # Searching for IP address and port prompted by server - m = self.re_detect_server_ip.search(serial_ip_msg) - if m and len(m.groups()): - self.ECHO_SERVER_ADDRESS = ".".join(m.groups()[:4]) - self.ECHO_PORT = int(m.groups()[4]) # must be integer for socket.connect method - selftest.notify("HOST: UDP Server found at: " + self.ECHO_SERVER_ADDRESS + ":" + str(self.ECHO_PORT)) - - # We assume this test fails so can't send 'error' message to server - try: - self.s = socket(AF_INET, SOCK_DGRAM) - except Exception as e: - self.s = None - selftest.notify("HOST: Socket error: %s"% e) - return selftest.RESULT_ERROR - - for i in range(0, 100): - TEST_STRING = str(uuid.uuid4()) - self.s.sendto(TEST_STRING, (self.ECHO_SERVER_ADDRESS, self.ECHO_PORT)) - data = self.s.recv(len(TEST_STRING)) - received_str = repr(data)[1:-1] - if TEST_STRING != received_str: - result = False - break - sys.stdout.write('.') - stdout.flush() - else: - result = False - - if self.s is not None: - self.s.close() - return selftest.RESULT_SUCCESS if result else selftest.RESULT_FAILURE diff --git a/tools/host_tests/wait_us_auto.py b/tools/host_tests/wait_us_auto.py deleted file mode 100644 index c3f3794ff26..00000000000 --- a/tools/host_tests/wait_us_auto.py +++ /dev/null @@ -1,70 +0,0 @@ -""" -mbed SDK -Copyright (c) 2011-2013 ARM Limited -SPDX-License-Identifier: Apache-2.0 - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" - -from time import time - -class WaitusTest(object): - """ This test is reading single characters from stdio - and measures time between their occurrences. - """ - TICK_LOOP_COUNTER = 13 - TICK_LOOP_SUCCESSFUL_COUNTS = 10 - DEVIATION = 0.10 # +/-10% - - def test(self, selftest): - test_result = True - # First character to start test (to know after reset when test starts) - if selftest.mbed.set_serial_timeout(None) is None: - return selftest.RESULT_IO_SERIAL - c = selftest.mbed.serial_read(1) - if c is None: - return selftest.RESULT_IO_SERIAL - if c == '$': # target will printout TargetID e.g.: $$$$1040e649d5c09a09a3f6bc568adef61375c6 - #Read additional 39 bytes of TargetID - if selftest.mbed.serial_read(39) is None: - return selftest.RESULT_IO_SERIAL - c = selftest.mbed.serial_read(1) # Re-read first 'tick' - if c is None: - return selftest.RESULT_IO_SERIAL - start_serial_pool = time() - start = time() - - success_counter = 0 - - for i in range(0, self.TICK_LOOP_COUNTER): - c = selftest.mbed.serial_read(1) - if c is None: - return selftest.RESULT_IO_SERIAL - delta = time() - start - deviation = abs(delta - 1) - # Round values - delta = round(delta, 2) - deviation = round(deviation, 2) - # Check if time measurements are in given range - deviation_ok = True if delta > 0 and deviation <= self.DEVIATION else False - success_counter = success_counter+1 if deviation_ok else 0 - msg = "OK" if deviation_ok else "FAIL" - selftest.notify("%s in %.2f sec (%.2f) [%s]"% (c, delta, deviation, msg)) - start = time() - if success_counter >= self.TICK_LOOP_SUCCESSFUL_COUNTS: - break - measurement_time = time() - start_serial_pool - selftest.notify("Consecutive OK timer reads: %d"% success_counter) - selftest.notify("Completed in %.2f sec" % (measurement_time)) - test_result = True if success_counter >= self.TICK_LOOP_SUCCESSFUL_COUNTS else False - return selftest.RESULT_SUCCESS if test_result else selftest.RESULT_FAILURE diff --git a/tools/host_tests/wfi_auto.py b/tools/host_tests/wfi_auto.py deleted file mode 100644 index 7defd8cf990..00000000000 --- a/tools/host_tests/wfi_auto.py +++ /dev/null @@ -1,46 +0,0 @@ -""" -mbed SDK -Copyright (c) 2011-2013 ARM Limited -SPDX-License-Identifier: Apache-2.0 - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" - -import sys -import uuid -import time -from sys import stdout - -class WFITest(object): - - def test(self, selftest): - c = selftest.mbed.serial_readline() - - if c == None: - selftest.notify("HOST: No output detected") - return selftest.RESULT_IO_SERIAL - - if c.strip() != "0": - selftest.notify("HOST: Unexpected output. Expected '0' but received '%s'" % c.strip()) - return selftest.RESULT_FAILURE - - # Wait 10 seconds to allow serial prints (indicating failure) - selftest.mbed.set_serial_timeout(10) - - # If no characters received, pass the test - if not selftest.mbed.serial_readline(): - selftest.notify("HOST: No further output detected") - return selftest.RESULT_SUCCESS - else: - selftest.notify("HOST: Extra output detected") - return selftest.RESULT_FAILURE diff --git a/tools/test_api.py b/tools/test_api.py index 30f1a3ed710..0e5d6e85fb0 100644 --- a/tools/test_api.py +++ b/tools/test_api.py @@ -71,8 +71,7 @@ from tools.utils import argparse_many from tools.notifier.mock import MockNotifier from tools.notifier.term import TerminalNotifier - -import tools.host_tests.host_tests_plugins as host_tests_plugins +from mbed_host_tests import host_tests_plugins try: import mbed_lstools @@ -2168,7 +2167,7 @@ def build_tests(tests, base_source_paths, build_path, target, toolchain_name, 'toolchain_paths': TOOLCHAIN_PATHS, 'stats_depth': stats_depth, 'notify': MockNotifier(), - 'coverage_patterns': coverage_patterns, + 'coverage_patterns': coverage_patterns, 'resource_filter': resource_filter }