246 lines
10 KiB
Python
246 lines
10 KiB
Python
# Copyright 2015 The Chromium Authors. All rights reserved.
|
|
# Use of this source code is governed by a BSD-style license that can be
|
|
# found in the LICENSE file.
|
|
|
|
import dbus
|
|
import json
|
|
import logging
|
|
import os
|
|
import shutil
|
|
import tempfile
|
|
import time
|
|
|
|
from autotest_lib.client.cros import dbus_util
|
|
from autotest_lib.client.common_lib import error
|
|
from autotest_lib.client.common_lib import utils
|
|
from autotest_lib.client.common_lib.cros.fake_device_server import \
|
|
fake_gcd_helper
|
|
from autotest_lib.client.common_lib.cros.fake_device_server.client_lib import \
|
|
commands
|
|
from autotest_lib.client.common_lib.cros.fake_device_server.client_lib import \
|
|
devices
|
|
from autotest_lib.client.common_lib.cros.fake_device_server.client_lib import \
|
|
fail_control
|
|
from autotest_lib.client.common_lib.cros.fake_device_server.client_lib import \
|
|
oauth
|
|
from autotest_lib.client.common_lib.cros.fake_device_server.client_lib import \
|
|
registration
|
|
from autotest_lib.client.common_lib.cros.tendo import buffet_config
|
|
from autotest_lib.client.common_lib.cros.tendo import buffet_dbus_helper
|
|
|
|
|
|
TEST_NAME = 'test_name '
|
|
TEST_DESCRIPTION = 'test_description '
|
|
TEST_LOCATION = 'test_location '
|
|
|
|
TEST_COMMAND_CATEGORY = 'registration_test'
|
|
TEST_COMMAND_NAME = '_TestEcho'
|
|
TEST_COMMAND_PARAM = 'message'
|
|
TEST_COMMAND_DEFINITION = {
|
|
TEST_COMMAND_CATEGORY: {
|
|
TEST_COMMAND_NAME: {
|
|
'parameters': { TEST_COMMAND_PARAM: { 'type': 'string' } },
|
|
'results': {},
|
|
'name': 'Test Echo Command',
|
|
}
|
|
}
|
|
}
|
|
|
|
STATUS_UNCONFIGURED = 'unconfigured'
|
|
STATUS_CONNECTING = 'connecting'
|
|
STATUS_CONNECTED = 'connected'
|
|
STATUS_INVALID_CREDENTIALS = 'invalid_credentials'
|
|
|
|
def _assert_has(resource, key, value, resource_description):
|
|
if resource is None:
|
|
raise error.TestFail('Wanted %s[%s]=%r, but %s is None.' %
|
|
(resource_description, key, value))
|
|
if key not in resource:
|
|
raise error.TestFail('%s not in %s' % (key, resource_description))
|
|
|
|
if resource[key] != value:
|
|
raise error.TestFail('Wanted %s[%s]=%r, but got %r' %
|
|
(resource_description, key, value, resource[key]))
|
|
|
|
|
|
class BuffetTester(object):
|
|
"""Helper class for buffet tests."""
|
|
|
|
|
|
def __init__(self):
|
|
"""Initialization routine."""
|
|
# We're going to confirm buffet is polling by issuing commands to
|
|
# the mock GCD server, then checking that buffet gets them. The
|
|
# commands are test.TestEcho commands with a single parameter
|
|
# |message|. |self._expected_messages| is a list of these messages.
|
|
self._expected_messages = []
|
|
# We store our command definitions under this root.
|
|
self._temp_dir_path = None
|
|
# Spin up our mock server.
|
|
self._gcd = fake_gcd_helper.FakeGCDHelper()
|
|
self._gcd.start()
|
|
# Create the command definition we want to use.
|
|
self._temp_dir_path = tempfile.mkdtemp()
|
|
commands_dir = os.path.join(self._temp_dir_path, 'commands')
|
|
os.mkdir(commands_dir)
|
|
command_definition_path = os.path.join(
|
|
commands_dir, '%s.json' % TEST_COMMAND_CATEGORY)
|
|
with open(command_definition_path, 'w') as f:
|
|
f.write(json.dumps(TEST_COMMAND_DEFINITION))
|
|
utils.run('chown -R buffet:buffet %s' % self._temp_dir_path)
|
|
logging.debug('Created test commands definition: %s',
|
|
command_definition_path)
|
|
# Create client proxies for interacting with oyr fake server.
|
|
self._registration_client = registration.RegistrationClient(
|
|
server_url=buffet_config.LOCAL_SERVICE_URL,
|
|
api_key=buffet_config.TEST_API_KEY)
|
|
self._device_client = devices.DevicesClient(
|
|
server_url=buffet_config.LOCAL_SERVICE_URL,
|
|
api_key=buffet_config.TEST_API_KEY)
|
|
self._oauth_client = oauth.OAuthClient(
|
|
server_url=buffet_config.LOCAL_SERVICE_URL,
|
|
api_key=buffet_config.TEST_API_KEY)
|
|
self._fail_control_client = fail_control.FailControlClient(
|
|
server_url=buffet_config.LOCAL_SERVICE_URL,
|
|
api_key=buffet_config.TEST_API_KEY)
|
|
self._command_client = commands.CommandsClient(
|
|
server_url=buffet_config.LOCAL_SERVICE_URL,
|
|
api_key=buffet_config.TEST_API_KEY)
|
|
self._config = buffet_config.BuffetConfig(
|
|
log_verbosity=3,
|
|
test_definitions_dir=self._temp_dir_path)
|
|
|
|
|
|
def check_buffet_status_is(self, expected_status,
|
|
expected_device_id='',
|
|
timeout_seconds=0):
|
|
"""Assert that buffet has the given registration status.
|
|
|
|
Optionally, a timeout can be specified to wait until the
|
|
status changes.
|
|
|
|
@param expected_device_id: device id created during registration.
|
|
@param expected_status: the status to wait for.
|
|
@param timeout_seconds: number of seconds to wait for status to change.
|
|
|
|
"""
|
|
buffet = buffet_dbus_helper.BuffetDBusHelper()
|
|
start_time = time.time()
|
|
while True:
|
|
actual_status = buffet.status
|
|
actual_device_id = buffet.device_id
|
|
if (actual_status == expected_status and
|
|
actual_device_id == expected_device_id):
|
|
return
|
|
time_spent = time.time() - start_time
|
|
if time_spent > timeout_seconds:
|
|
if actual_status != expected_status:
|
|
raise error.TestFail('Buffet should be %s, but is %s '
|
|
'(waited %.1f seconds).' %
|
|
(expected_status, actual_status,
|
|
time_spent))
|
|
if actual_device_id != expected_device_id:
|
|
raise error.TestFail('Device ID should be %s, but is %s '
|
|
'(waited %.1f seconds).' %
|
|
(expected_device_id, actual_device_id,
|
|
time_spent))
|
|
time.sleep(0.5)
|
|
|
|
|
|
def check_buffet_is_polling(self, device_id, timeout_seconds=30):
|
|
"""Assert that buffet is polling for new commands.
|
|
|
|
@param device_id: string device id created during registration.
|
|
@param timeout_seconds: number of seconds to wait for polling
|
|
to start.
|
|
|
|
"""
|
|
new_command_message = ('This is message %d' %
|
|
len(self._expected_messages))
|
|
command_resource = {
|
|
'name': '%s.%s' % (TEST_COMMAND_CATEGORY, TEST_COMMAND_NAME),
|
|
'deviceId': device_id,
|
|
'parameters': {TEST_COMMAND_PARAM: new_command_message}
|
|
}
|
|
self._expected_messages.append(new_command_message)
|
|
self._command_client.create_command(device_id, command_resource)
|
|
# Confirm that the command eventually appears on buffet.
|
|
buffet = buffet_dbus_helper.BuffetDBusHelper()
|
|
polling_interval_seconds = 0.5
|
|
start_time = time.time()
|
|
while time.time() - start_time < timeout_seconds:
|
|
objects = dbus_util.dbus2primitive(
|
|
buffet.object_manager.GetManagedObjects())
|
|
cmds = [interfaces[buffet_dbus_helper.COMMAND_INTERFACE]
|
|
for path, interfaces in objects.iteritems()
|
|
if buffet_dbus_helper.COMMAND_INTERFACE in interfaces]
|
|
messages = [cmd['Parameters'][TEST_COMMAND_PARAM] for cmd in cmds
|
|
if (cmd['Name'] == '%s.%s' % (TEST_COMMAND_CATEGORY,
|
|
TEST_COMMAND_NAME))]
|
|
# |cmds| is a list of property sets
|
|
if len(messages) != len(self._expected_messages):
|
|
# Still waiting for our pending command to show up.
|
|
time.sleep(polling_interval_seconds)
|
|
continue
|
|
logging.debug('Finally saw the right number of commands over '
|
|
'DBus: %r', cmds)
|
|
if sorted(messages) != sorted(self._expected_messages):
|
|
raise error.TestFail(
|
|
'Expected commands with messages=%r but got %r.' %
|
|
(self._expected_messages, messages))
|
|
logging.info('Buffet has DBus proxies for commands with '
|
|
'messages: %r', self._expected_messages)
|
|
return
|
|
raise error.TestFail('Timed out waiting for Buffet to expose '
|
|
'pending commands with messages: %r' %
|
|
self._expected_messages)
|
|
|
|
|
|
def register_with_server(self):
|
|
"""Make buffet register with the cloud server.
|
|
|
|
This includes the whole registration flow and ends with buffet
|
|
obtained an access token for future interactions. The status
|
|
is guaranteed to be STATUS_CONNECTED when this
|
|
method returns.
|
|
|
|
@return string: the device_id obtained during registration.
|
|
|
|
"""
|
|
ticket = self._registration_client.create_registration_ticket()
|
|
logging.info('Created ticket: %r', ticket)
|
|
buffet = buffet_dbus_helper.BuffetDBusHelper()
|
|
buffet.manager.UpdateDeviceInfo(dbus.String(TEST_NAME),
|
|
dbus.String(TEST_DESCRIPTION),
|
|
dbus.String(TEST_LOCATION))
|
|
device_id = dbus_util.dbus2primitive(
|
|
buffet.manager.RegisterDevice(dbus.String(ticket['id'])))
|
|
# Confirm that registration has populated some fields.
|
|
device_resource = self._device_client.get_device(device_id)
|
|
logging.debug('Got device resource=%r', device_resource)
|
|
_assert_has(device_resource, 'name', TEST_NAME,
|
|
'device resource')
|
|
_assert_has(device_resource, 'modelManifestId', 'AATST',
|
|
'device resource')
|
|
logging.info('Registration successful')
|
|
self.check_buffet_status_is(STATUS_CONNECTED,
|
|
expected_device_id=device_id,
|
|
timeout_seconds=5)
|
|
return device_id
|
|
|
|
|
|
def restart_buffet(self, reset_state):
|
|
"""Function for restarting the buffet daemon.
|
|
|
|
@param reset_state: If True, all local buffet state will be deleted.
|
|
"""
|
|
self._config.restart_with_config(clean_state=reset_state)
|
|
|
|
|
|
def close(self):
|
|
"""Cleanup to be used when done with this instance."""
|
|
buffet_config.naive_restart()
|
|
self._gcd.close()
|
|
if self._temp_dir_path is not None:
|
|
shutil.rmtree(self._temp_dir_path, True)
|