Skip to content
Open
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 99 additions & 11 deletions systems/framework/lcm_driven_loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,74 @@ namespace systems {
/// Note that we implement the class only in the header file because we don't
/// know what MessageTypes are beforehand.

/**
* Subscribes to and stores a copy of the most recent message on a given
* channel, for some @p Message type. All copies of a given Subscriber share
* the same underlying data. This class does NOT provide any mutex behavior
* for multi-threaded locking; it should only be used in cases where the
* governing DrakeLcmInterface::HandleSubscriptions is called from the same
* thread that owns all copies of this object.
*
* This is a duplicate of drake::lcm::Subscriber, but makes the subscription_
* object public, which we need to change the queue size.
*/
template <typename Message>
class Subscriber final {
public:
// Intentionally copyable so that it can be returned and stored by-value.
DRAKE_DEFAULT_COPY_AND_MOVE_AND_ASSIGN(Subscriber);

/**
* Subscribes to the (non-empty) @p channel on the given (non-null)
* @p lcm instance. The `lcm` pointer is only used during construction; it
* is not retained by this object. When a undecodable message is received,
* @p on_error handler is invoked; when `on_error` is not provided, an
* exception will be thrown instead.
*/
Subscriber(drake::lcm::DrakeLcmInterface* lcm, const std::string& channel,
std::function<void()> on_error = {}) {
subscription_ = drake::lcm::Subscribe<Message>(
lcm, channel,
[data = data_](const Message& message) {
data->message = message;
data->count++;
},
std::move(on_error));
if (subscription_) {
subscription_->set_unsubscribe_on_delete(true);
}
}

/**
* Returns the most recently received message, or a value-initialized (zeros)
* message otherwise.
*/
const Message& message() const { return data_->message; }
Message& message() { return data_->message; }

/** Returns the total number of received messages. */
int64_t count() const { return data_->count; }
int64_t& count() { return data_->count; }

/** Clears all data (sets the message and count to all zeros). */
void clear() {
data_->message = {};
data_->count = 0;
}

struct Data {
Message message{};
int64_t count{0};
};
// Share a single copy of our (mutable) message storage, for all Subscribers
// to view or modify *and* for our subscription closure to write into. This
// will not be destroyed until all Subscribers are gone AND the subscription
// closure has been destroyed.
std::shared_ptr<Data> data_{std::make_shared<Data>()};
// Keep our subscription active as long as a copy of this Subscriber remains.
std::shared_ptr<drake::lcm::DrakeSubscriptionInterface> subscription_;
};

// We set a default value for SwitchMessageType so that we can generalize this
// to both single and multi inputs.
template <typename InputMessageType,
Expand All @@ -68,13 +136,17 @@ class LcmDrivenLoop {
/// incoming lcm message
/// @param input_channel The name of the input channel
/// @param is_forced_publish A flag which enables publishing via diagram.
/// @param queue_capacity The queue size for the LCM subscriber on the input channels.
/// Defaults to 1, but should be set higher if stepping the LCMDrivenLoop will take
/// longer than the driving channel.
LcmDrivenLoop(drake::lcm::DrakeLcm* drake_lcm,
std::unique_ptr<drake::systems::Diagram<double>> diagram,
const drake::systems::System<double>* lcm_parser,
const std::string& input_channel, bool is_forced_publish)
const std::string& input_channel, bool is_forced_publish,
int queue_capacity=1)
: LcmDrivenLoop(drake_lcm, std::move(diagram), lcm_parser,
std::vector<std::string>(1, input_channel), input_channel,
"", is_forced_publish){};
"", is_forced_publish, queue_capacity){};

/// Constructor for multi-input LcmDrivenLoop
/// @param drake_lcm DrakeLcm
Expand All @@ -85,12 +157,16 @@ class LcmDrivenLoop {
/// @param active_channel The name of the initial active input channel
/// @param switch_channel The name of the switch channel
/// @param is_forced_publish A flag which enables publishing via diagram.
/// @param queue_capacity The queue size for the LCM subscriber on the input channels.
/// Defaults to 1, but should be set higher if stepping the LCMDrivenLoop will take
/// longer than the driving channel.
LcmDrivenLoop(drake::lcm::DrakeLcm* drake_lcm,
std::unique_ptr<drake::systems::Diagram<double>> diagram,
const drake::systems::System<double>* lcm_parser,
std::vector<std::string> input_channels,
const std::string& active_channel,
const std::string& switch_channel, bool is_forced_publish,
int queue_capacity=1,
const std::string& backup_drive_channel = "")
: drake_lcm_(drake_lcm),
lcm_parser_(lcm_parser),
Expand All @@ -108,15 +184,16 @@ class LcmDrivenLoop {
DRAKE_DEMAND(!input_channels.empty());
if (input_channels.size() > 1) {
DRAKE_DEMAND(!switch_channel.empty());
switch_sub_ = std::make_unique<drake::lcm::Subscriber<SwitchMessageType>>(
switch_sub_ = std::make_unique<Subscriber<SwitchMessageType>>(
drake_lcm_, switch_channel);
}

// Create subscribers for inputs
for (const auto& name : input_channels) {
std::cout << "Constructing subscriber for " << name << std::endl;
name_to_input_sub_map_.insert(std::make_pair(
name, drake::lcm::Subscriber<InputMessageType>(drake_lcm_, name)));
name, Subscriber<InputMessageType>(drake_lcm_, name)));
name_to_input_sub_map_.at(name).subscription_->set_queue_capacity(queue_capacity);
}

// Make sure input_channels contains active_channel, and then set initial
Expand All @@ -133,7 +210,7 @@ class LcmDrivenLoop {
active_channel_ = active_channel;

if (!backup_drive_channel.empty()) {
state_sub_ = std::make_unique<drake::lcm::Subscriber<lcmt_robot_output>>(
state_sub_ = std::make_unique<Subscriber<lcmt_robot_output>>(
drake_lcm_, backup_drive_channel);
}
};
Expand All @@ -157,6 +234,14 @@ class LcmDrivenLoop {
return simulator_->get_mutable_context();
}


InputMessageType wait_for_message() const {
LcmHandleSubscriptionsUntil(drake_lcm_, [&]() {
return name_to_input_sub_map_.at(active_channel_).count() > 0;
});
return name_to_input_sub_map_.at(active_channel_).message();
}

// Start simulating the diagram
void Simulate(double end_time = std::numeric_limits<double>::infinity()) {
// Get mutable contexts
Expand Down Expand Up @@ -226,6 +311,11 @@ class LcmDrivenLoop {
is_new_state_message;
});

// Pump drake's LCM subscribers to empty their internal queues
// until all LCM buffers are up-to-date.
// Addresses https://github.com/RobotLocomotion/drake/issues/15234
while (drake_lcm_->HandleSubscriptions(0) > 0);

// Update the diagram context when there is new input message
if (is_new_input_message || too_long_between_input_messages_) {
// Write the InputMessageType message into the context if lcm_parser is
Expand Down Expand Up @@ -323,16 +413,14 @@ class LcmDrivenLoop {

std::string diagram_name_ = "diagram";
std::string active_channel_;
std::unique_ptr<drake::lcm::Subscriber<SwitchMessageType>> switch_sub_ =
nullptr;
std::map<std::string, drake::lcm::Subscriber<InputMessageType>>
name_to_input_sub_map_;
std::unique_ptr<drake::lcm::Subscriber<lcmt_robot_output>> state_sub_;
std::unique_ptr<Subscriber<SwitchMessageType>> switch_sub_ = nullptr;
std::map<std::string, Subscriber<InputMessageType>> name_to_input_sub_map_;
std::unique_ptr<Subscriber<lcmt_robot_output>> state_sub_;

bool is_forced_publish_;
bool too_long_between_input_messages_ = false;
double last_input_msg_time_;
};

} // namespace systems
} // namespace dairlib
} // namespace dairlib