449 lines
15 KiB
C++
449 lines
15 KiB
C++
// Copyright 2015 The Chromium OS Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file.
|
|
|
|
#include <brillo/message_loops/base_message_loop.h>
|
|
|
|
#include <fcntl.h>
|
|
#include <sys/stat.h>
|
|
#include <sys/types.h>
|
|
#include <unistd.h>
|
|
|
|
#ifndef __APPLE__
|
|
#include <sys/sysmacros.h>
|
|
#endif
|
|
|
|
#ifndef __ANDROID_HOST__
|
|
// Used for MISC_MAJOR. Only required for the target and not always available
|
|
// for the host.
|
|
#include <linux/major.h>
|
|
#endif
|
|
|
|
#include <vector>
|
|
|
|
#include <base/bind.h>
|
|
#include <base/bind_helpers.h>
|
|
#include <base/files/file_path.h>
|
|
#include <base/files/file_util.h>
|
|
#include <base/run_loop.h>
|
|
#include <base/strings/string_number_conversions.h>
|
|
#include <base/strings/string_split.h>
|
|
|
|
#include <brillo/location_logging.h>
|
|
#include <brillo/strings/string_utils.h>
|
|
|
|
using base::Closure;
|
|
|
|
namespace {
|
|
|
|
const char kMiscMinorPath[] = "/proc/misc";
|
|
const char kBinderDriverName[] = "binder";
|
|
|
|
} // namespace
|
|
|
|
namespace brillo {
|
|
|
|
const int BaseMessageLoop::kInvalidMinor = -1;
|
|
const int BaseMessageLoop::kUninitializedMinor = -2;
|
|
|
|
BaseMessageLoop::BaseMessageLoop() {
|
|
CHECK(!base::MessageLoop::current())
|
|
<< "You can't create a base::MessageLoopForIO when another "
|
|
"base::MessageLoop is already created for this thread.";
|
|
owned_base_loop_.reset(new base::MessageLoopForIO);
|
|
base_loop_ = owned_base_loop_.get();
|
|
}
|
|
|
|
BaseMessageLoop::BaseMessageLoop(base::MessageLoopForIO* base_loop)
|
|
: base_loop_(base_loop) {}
|
|
|
|
BaseMessageLoop::~BaseMessageLoop() {
|
|
for (auto& io_task : io_tasks_) {
|
|
DVLOG_LOC(io_task.second.location(), 1)
|
|
<< "Removing file descriptor watcher task_id " << io_task.first
|
|
<< " leaked on BaseMessageLoop, scheduled from this location.";
|
|
io_task.second.StopWatching();
|
|
}
|
|
|
|
// Note all pending canceled delayed tasks when destroying the message loop.
|
|
size_t lazily_deleted_tasks = 0;
|
|
for (const auto& delayed_task : delayed_tasks_) {
|
|
if (delayed_task.second.closure.is_null()) {
|
|
lazily_deleted_tasks++;
|
|
} else {
|
|
DVLOG_LOC(delayed_task.second.location, 1)
|
|
<< "Removing delayed task_id " << delayed_task.first
|
|
<< " leaked on BaseMessageLoop, scheduled from this location.";
|
|
}
|
|
}
|
|
if (lazily_deleted_tasks) {
|
|
LOG(INFO) << "Leaking " << lazily_deleted_tasks << " canceled tasks.";
|
|
}
|
|
}
|
|
|
|
MessageLoop::TaskId BaseMessageLoop::PostDelayedTask(
|
|
const base::Location& from_here,
|
|
const Closure &task,
|
|
base::TimeDelta delay) {
|
|
TaskId task_id = NextTaskId();
|
|
bool base_scheduled = base_loop_->task_runner()->PostDelayedTask(
|
|
from_here,
|
|
base::Bind(&BaseMessageLoop::OnRanPostedTask,
|
|
weak_ptr_factory_.GetWeakPtr(),
|
|
task_id),
|
|
delay);
|
|
DVLOG_LOC(from_here, 1) << "Scheduling delayed task_id " << task_id
|
|
<< " to run in " << delay << ".";
|
|
if (!base_scheduled)
|
|
return MessageLoop::kTaskIdNull;
|
|
|
|
delayed_tasks_.emplace(task_id,
|
|
DelayedTask{from_here, task_id, std::move(task)});
|
|
return task_id;
|
|
}
|
|
|
|
MessageLoop::TaskId BaseMessageLoop::WatchFileDescriptor(
|
|
const base::Location& from_here,
|
|
int fd,
|
|
WatchMode mode,
|
|
bool persistent,
|
|
const Closure &task) {
|
|
// base::MessageLoopForIO CHECKS that "fd >= 0", so we handle that case here.
|
|
if (fd < 0)
|
|
return MessageLoop::kTaskIdNull;
|
|
|
|
base::MessagePumpForIO::Mode base_mode = base::MessagePumpForIO::WATCH_READ;
|
|
switch (mode) {
|
|
case MessageLoop::kWatchRead:
|
|
base_mode = base::MessagePumpForIO::WATCH_READ;
|
|
break;
|
|
case MessageLoop::kWatchWrite:
|
|
base_mode = base::MessagePumpForIO::WATCH_WRITE;
|
|
break;
|
|
default:
|
|
return MessageLoop::kTaskIdNull;
|
|
}
|
|
|
|
TaskId task_id = NextTaskId();
|
|
auto it_bool = io_tasks_.emplace(
|
|
std::piecewise_construct,
|
|
std::forward_as_tuple(task_id),
|
|
std::forward_as_tuple(
|
|
from_here, this, task_id, fd, base_mode, persistent, task));
|
|
// This should always insert a new element.
|
|
DCHECK(it_bool.second);
|
|
bool scheduled = it_bool.first->second.StartWatching();
|
|
DVLOG_LOC(from_here, 1)
|
|
<< "Watching fd " << fd << " for "
|
|
<< (mode == MessageLoop::kWatchRead ? "reading" : "writing")
|
|
<< (persistent ? " persistently" : " just once")
|
|
<< " as task_id " << task_id
|
|
<< (scheduled ? " successfully" : " failed.");
|
|
|
|
if (!scheduled) {
|
|
io_tasks_.erase(task_id);
|
|
return MessageLoop::kTaskIdNull;
|
|
}
|
|
|
|
#ifndef __ANDROID_HOST__
|
|
// Determine if the passed fd is the binder file descriptor. For that, we need
|
|
// to check that is a special char device and that the major and minor device
|
|
// numbers match. The binder file descriptor can't be removed and added back
|
|
// to an epoll group when there's work available to be done by the file
|
|
// descriptor due to bugs in the binder driver (b/26524111) when used with
|
|
// epoll. Therefore, we flag the binder fd and never attempt to remove it.
|
|
// This may cause the binder file descriptor to be attended with higher
|
|
// priority and cause starvation of other events.
|
|
struct stat buf;
|
|
if (fstat(fd, &buf) == 0 &&
|
|
S_ISCHR(buf.st_mode) &&
|
|
major(buf.st_rdev) == MISC_MAJOR &&
|
|
minor(buf.st_rdev) == GetBinderMinor()) {
|
|
it_bool.first->second.RunImmediately();
|
|
}
|
|
#endif
|
|
|
|
return task_id;
|
|
}
|
|
|
|
bool BaseMessageLoop::CancelTask(TaskId task_id) {
|
|
if (task_id == kTaskIdNull)
|
|
return false;
|
|
auto delayed_task_it = delayed_tasks_.find(task_id);
|
|
if (delayed_task_it == delayed_tasks_.end()) {
|
|
// This might be an IOTask then.
|
|
auto io_task_it = io_tasks_.find(task_id);
|
|
if (io_task_it == io_tasks_.end())
|
|
return false;
|
|
return io_task_it->second.CancelTask();
|
|
}
|
|
// A DelayedTask was found for this task_id at this point.
|
|
|
|
// Check if the callback was already canceled but we have the entry in
|
|
// delayed_tasks_ since it didn't fire yet in the message loop.
|
|
if (delayed_task_it->second.closure.is_null())
|
|
return false;
|
|
|
|
DVLOG_LOC(delayed_task_it->second.location, 1)
|
|
<< "Removing task_id " << task_id << " scheduled from this location.";
|
|
// We reset to closure to a null Closure to release all the resources
|
|
// used by this closure at this point, but we don't remove the task_id from
|
|
// delayed_tasks_ since we can't tell base::MessageLoopForIO to not run it.
|
|
delayed_task_it->second.closure = Closure();
|
|
|
|
return true;
|
|
}
|
|
|
|
bool BaseMessageLoop::RunOnce(bool may_block) {
|
|
run_once_ = true;
|
|
base::RunLoop run_loop; // Uses the base::MessageLoopForIO implicitly.
|
|
base_run_loop_ = &run_loop;
|
|
if (!may_block)
|
|
run_loop.RunUntilIdle();
|
|
else
|
|
run_loop.Run();
|
|
base_run_loop_ = nullptr;
|
|
// If the flag was reset to false, it means a closure was run.
|
|
if (!run_once_)
|
|
return true;
|
|
|
|
run_once_ = false;
|
|
return false;
|
|
}
|
|
|
|
void BaseMessageLoop::Run() {
|
|
base::RunLoop run_loop; // Uses the base::MessageLoopForIO implicitly.
|
|
base_run_loop_ = &run_loop;
|
|
run_loop.Run();
|
|
base_run_loop_ = nullptr;
|
|
}
|
|
|
|
void BaseMessageLoop::BreakLoop() {
|
|
if (base_run_loop_ == nullptr) {
|
|
DVLOG(1) << "Message loop not running, ignoring BreakLoop().";
|
|
return; // Message loop not running, nothing to do.
|
|
}
|
|
base_run_loop_->Quit();
|
|
}
|
|
|
|
Closure BaseMessageLoop::QuitClosure() const {
|
|
if (base_run_loop_ == nullptr)
|
|
return base::DoNothing();
|
|
return base_run_loop_->QuitClosure();
|
|
}
|
|
|
|
MessageLoop::TaskId BaseMessageLoop::NextTaskId() {
|
|
TaskId res;
|
|
do {
|
|
res = ++last_id_;
|
|
// We would run out of memory before we run out of task ids.
|
|
} while (!res ||
|
|
delayed_tasks_.find(res) != delayed_tasks_.end() ||
|
|
io_tasks_.find(res) != io_tasks_.end());
|
|
return res;
|
|
}
|
|
|
|
void BaseMessageLoop::OnRanPostedTask(MessageLoop::TaskId task_id) {
|
|
auto task_it = delayed_tasks_.find(task_id);
|
|
DCHECK(task_it != delayed_tasks_.end());
|
|
if (!task_it->second.closure.is_null()) {
|
|
DVLOG_LOC(task_it->second.location, 1)
|
|
<< "Running delayed task_id " << task_id
|
|
<< " scheduled from this location.";
|
|
// Mark the task as canceled while we are running it so CancelTask returns
|
|
// false.
|
|
Closure closure = std::move(task_it->second.closure);
|
|
task_it->second.closure = Closure();
|
|
closure.Run();
|
|
|
|
// If the |run_once_| flag is set, it is because we are instructed to run
|
|
// only once callback.
|
|
if (run_once_) {
|
|
run_once_ = false;
|
|
BreakLoop();
|
|
}
|
|
}
|
|
delayed_tasks_.erase(task_it);
|
|
}
|
|
|
|
void BaseMessageLoop::OnFileReadyPostedTask(MessageLoop::TaskId task_id) {
|
|
auto task_it = io_tasks_.find(task_id);
|
|
// Even if this task was canceled while we were waiting in the message loop
|
|
// for this method to run, the entry in io_tasks_ should still be present, but
|
|
// won't do anything.
|
|
DCHECK(task_it != io_tasks_.end());
|
|
task_it->second.OnFileReadyPostedTask();
|
|
}
|
|
|
|
int BaseMessageLoop::ParseBinderMinor(
|
|
const std::string& file_contents) {
|
|
int result = kInvalidMinor;
|
|
// Split along '\n', then along the ' '. Note that base::SplitString trims all
|
|
// white spaces at the beginning and end after splitting.
|
|
std::vector<std::string> lines =
|
|
base::SplitString(file_contents, "\n", base::TRIM_WHITESPACE,
|
|
base::SPLIT_WANT_ALL);
|
|
for (const std::string& line : lines) {
|
|
if (line.empty())
|
|
continue;
|
|
std::string number;
|
|
std::string name;
|
|
if (!string_utils::SplitAtFirst(line, " ", &number, &name, false))
|
|
continue;
|
|
|
|
if (name == kBinderDriverName && base::StringToInt(number, &result))
|
|
break;
|
|
}
|
|
return result;
|
|
}
|
|
|
|
unsigned int BaseMessageLoop::GetBinderMinor() {
|
|
if (binder_minor_ != kUninitializedMinor)
|
|
return binder_minor_;
|
|
|
|
std::string proc_misc;
|
|
if (!base::ReadFileToString(base::FilePath(kMiscMinorPath), &proc_misc))
|
|
return binder_minor_;
|
|
binder_minor_ = ParseBinderMinor(proc_misc);
|
|
return binder_minor_;
|
|
}
|
|
|
|
BaseMessageLoop::IOTask::IOTask(const base::Location& location,
|
|
BaseMessageLoop* loop,
|
|
MessageLoop::TaskId task_id,
|
|
int fd,
|
|
base::MessagePumpForIO::Mode base_mode,
|
|
bool persistent,
|
|
const Closure& task)
|
|
: location_(location), loop_(loop), task_id_(task_id),
|
|
fd_(fd), base_mode_(base_mode), persistent_(persistent), closure_(task),
|
|
fd_watcher_(FROM_HERE) {}
|
|
|
|
bool BaseMessageLoop::IOTask::StartWatching() {
|
|
// Please see MessagePumpLibevent for definition.
|
|
static_assert(std::is_same<base::MessagePumpForIO, base::MessagePumpLibevent>::value,
|
|
"MessagePumpForIO::WatchFileDescriptor is not supported "
|
|
"when MessagePumpForIO is not a MessagePumpLibevent.");
|
|
|
|
return static_cast<base::MessagePumpLibevent*>(
|
|
loop_->base_loop_->pump_.get())->WatchFileDescriptor(
|
|
fd_, persistent_, base_mode_, &fd_watcher_, this);
|
|
}
|
|
|
|
void BaseMessageLoop::IOTask::StopWatching() {
|
|
// This is safe to call even if we are not watching for it.
|
|
fd_watcher_.StopWatchingFileDescriptor();
|
|
}
|
|
|
|
void BaseMessageLoop::IOTask::OnFileCanReadWithoutBlocking(int /* fd */) {
|
|
OnFileReady();
|
|
}
|
|
|
|
void BaseMessageLoop::IOTask::OnFileCanWriteWithoutBlocking(int /* fd */) {
|
|
OnFileReady();
|
|
}
|
|
|
|
void BaseMessageLoop::IOTask::OnFileReady() {
|
|
// For file descriptors marked with the immediate_run flag, we don't call
|
|
// StopWatching() and wait, instead we dispatch the callback immediately.
|
|
if (immediate_run_) {
|
|
posted_task_pending_ = true;
|
|
OnFileReadyPostedTask();
|
|
return;
|
|
}
|
|
|
|
// When the file descriptor becomes available we stop watching for it and
|
|
// schedule a task to run the callback from the main loop. The callback will
|
|
// run using the same scheduler used to run other delayed tasks, avoiding
|
|
// starvation of the available posted tasks if there are file descriptors
|
|
// always available. The new posted task will use the same TaskId as the
|
|
// current file descriptor watching task an could be canceled in either state,
|
|
// when waiting for the file descriptor or waiting in the main loop.
|
|
StopWatching();
|
|
bool base_scheduled = loop_->base_loop_->task_runner()->PostTask(
|
|
location_,
|
|
base::Bind(&BaseMessageLoop::OnFileReadyPostedTask,
|
|
loop_->weak_ptr_factory_.GetWeakPtr(),
|
|
task_id_));
|
|
posted_task_pending_ = true;
|
|
if (base_scheduled) {
|
|
DVLOG_LOC(location_, 1)
|
|
<< "Dispatching task_id " << task_id_ << " for "
|
|
<< (base_mode_ == base::MessagePumpForIO::WATCH_READ ?
|
|
"reading" : "writing")
|
|
<< " file descriptor " << fd_ << ", scheduled from this location.";
|
|
} else {
|
|
// In the rare case that PostTask() fails, we fall back to run it directly.
|
|
// This would indicate a bigger problem with the message loop setup.
|
|
LOG(ERROR) << "Error on base::MessageLoopForIO::PostTask().";
|
|
OnFileReadyPostedTask();
|
|
}
|
|
}
|
|
|
|
void BaseMessageLoop::IOTask::OnFileReadyPostedTask() {
|
|
// We can't access |this| after running the |closure_| since it could call
|
|
// CancelTask on its own task_id, so we copy the members we need now.
|
|
BaseMessageLoop* loop_ptr = loop_;
|
|
DCHECK(posted_task_pending_ = true);
|
|
posted_task_pending_ = false;
|
|
|
|
// If this task was already canceled, the closure will be null and there is
|
|
// nothing else to do here. This execution doesn't count a step for RunOnce()
|
|
// unless we have a callback to run.
|
|
if (closure_.is_null()) {
|
|
loop_->io_tasks_.erase(task_id_);
|
|
return;
|
|
}
|
|
|
|
DVLOG_LOC(location_, 1)
|
|
<< "Running task_id " << task_id_ << " for "
|
|
<< (base_mode_ == base::MessagePumpForIO::WATCH_READ ?
|
|
"reading" : "writing")
|
|
<< " file descriptor " << fd_ << ", scheduled from this location.";
|
|
|
|
if (persistent_) {
|
|
// In the persistent case we just run the callback. If this callback cancels
|
|
// the task id, we can't access |this| anymore, so we re-start watching the
|
|
// file descriptor before running the callback, unless this is a fd where
|
|
// we didn't stop watching the file descriptor when it became available.
|
|
if (!immediate_run_)
|
|
StartWatching();
|
|
closure_.Run();
|
|
} else {
|
|
// This will destroy |this|, the fd_watcher and therefore stop watching this
|
|
// file descriptor.
|
|
Closure closure_copy = std::move(closure_);
|
|
loop_->io_tasks_.erase(task_id_);
|
|
// Run the closure from the local copy we just made.
|
|
closure_copy.Run();
|
|
}
|
|
|
|
if (loop_ptr->run_once_) {
|
|
loop_ptr->run_once_ = false;
|
|
loop_ptr->BreakLoop();
|
|
}
|
|
}
|
|
|
|
bool BaseMessageLoop::IOTask::CancelTask() {
|
|
if (closure_.is_null())
|
|
return false;
|
|
|
|
DVLOG_LOC(location_, 1)
|
|
<< "Removing task_id " << task_id_ << " scheduled from this location.";
|
|
|
|
if (!posted_task_pending_) {
|
|
// Destroying the FileDescriptorWatcher implicitly stops watching the file
|
|
// descriptor. This will delete our instance.
|
|
loop_->io_tasks_.erase(task_id_);
|
|
return true;
|
|
}
|
|
// The IOTask is waiting for the message loop to run its delayed task, so
|
|
// it is not watching for the file descriptor. We release the closure
|
|
// resources now but keep the IOTask instance alive while we wait for the
|
|
// callback to run and delete the IOTask.
|
|
closure_ = Closure();
|
|
return true;
|
|
}
|
|
|
|
} // namespace brillo
|