diff --git a/systems/framework/lcm_driven_loop.h b/systems/framework/lcm_driven_loop.h index e84a473d26..b4857a17eb 100644 --- a/systems/framework/lcm_driven_loop.h +++ b/systems/framework/lcm_driven_loop.h @@ -53,6 +53,74 @@ namespace systems { /// Note that we implement the class only in the header file because we don't /// know what MessageTypes are beforehand. +/** + * Subscribes to and stores a copy of the most recent message on a given + * channel, for some @p Message type. All copies of a given Subscriber share + * the same underlying data. This class does NOT provide any mutex behavior + * for multi-threaded locking; it should only be used in cases where the + * governing DrakeLcmInterface::HandleSubscriptions is called from the same + * thread that owns all copies of this object. + * + * This is a duplicate of drake::lcm::Subscriber, but makes the subscription_ + * object public, which we need to change the queue size. + */ +template +class Subscriber final { + public: + // Intentionally copyable so that it can be returned and stored by-value. + DRAKE_DEFAULT_COPY_AND_MOVE_AND_ASSIGN(Subscriber); + + /** + * Subscribes to the (non-empty) @p channel on the given (non-null) + * @p lcm instance. The `lcm` pointer is only used during construction; it + * is not retained by this object. When a undecodable message is received, + * @p on_error handler is invoked; when `on_error` is not provided, an + * exception will be thrown instead. + */ + Subscriber(drake::lcm::DrakeLcmInterface* lcm, const std::string& channel, + std::function on_error = {}) { + subscription_ = drake::lcm::Subscribe( + lcm, channel, + [data = data_](const Message& message) { + data->message = message; + data->count++; + }, + std::move(on_error)); + if (subscription_) { + subscription_->set_unsubscribe_on_delete(true); + } + } + + /** + * Returns the most recently received message, or a value-initialized (zeros) + * message otherwise. + */ + const Message& message() const { return data_->message; } + Message& message() { return data_->message; } + + /** Returns the total number of received messages. */ + int64_t count() const { return data_->count; } + int64_t& count() { return data_->count; } + + /** Clears all data (sets the message and count to all zeros). */ + void clear() { + data_->message = {}; + data_->count = 0; + } + + struct Data { + Message message{}; + int64_t count{0}; + }; + // Share a single copy of our (mutable) message storage, for all Subscribers + // to view or modify *and* for our subscription closure to write into. This + // will not be destroyed until all Subscribers are gone AND the subscription + // closure has been destroyed. + std::shared_ptr data_{std::make_shared()}; + // Keep our subscription active as long as a copy of this Subscriber remains. + std::shared_ptr subscription_; +}; + // We set a default value for SwitchMessageType so that we can generalize this // to both single and multi inputs. template > diagram, const drake::systems::System* lcm_parser, - const std::string& input_channel, bool is_forced_publish) + const std::string& input_channel, bool is_forced_publish, + int queue_capacity=1) : LcmDrivenLoop(drake_lcm, std::move(diagram), lcm_parser, std::vector(1, input_channel), input_channel, - "", is_forced_publish){}; + "", is_forced_publish, queue_capacity){}; /// Constructor for multi-input LcmDrivenLoop /// @param drake_lcm DrakeLcm @@ -85,12 +157,16 @@ class LcmDrivenLoop { /// @param active_channel The name of the initial active input channel /// @param switch_channel The name of the switch channel /// @param is_forced_publish A flag which enables publishing via diagram. + /// @param queue_capacity The queue size for the LCM subscriber on the input channels. + /// Defaults to 1, but should be set higher if stepping the LCMDrivenLoop will take + /// longer than the driving channel. LcmDrivenLoop(drake::lcm::DrakeLcm* drake_lcm, std::unique_ptr> diagram, const drake::systems::System* lcm_parser, std::vector input_channels, const std::string& active_channel, const std::string& switch_channel, bool is_forced_publish, + int queue_capacity=1, const std::string& backup_drive_channel = "") : drake_lcm_(drake_lcm), lcm_parser_(lcm_parser), @@ -108,7 +184,7 @@ class LcmDrivenLoop { DRAKE_DEMAND(!input_channels.empty()); if (input_channels.size() > 1) { DRAKE_DEMAND(!switch_channel.empty()); - switch_sub_ = std::make_unique>( + switch_sub_ = std::make_unique>( drake_lcm_, switch_channel); } @@ -116,7 +192,8 @@ class LcmDrivenLoop { for (const auto& name : input_channels) { std::cout << "Constructing subscriber for " << name << std::endl; name_to_input_sub_map_.insert(std::make_pair( - name, drake::lcm::Subscriber(drake_lcm_, name))); + name, Subscriber(drake_lcm_, name))); + name_to_input_sub_map_.at(name).subscription_->set_queue_capacity(queue_capacity); } // Make sure input_channels contains active_channel, and then set initial @@ -133,7 +210,7 @@ class LcmDrivenLoop { active_channel_ = active_channel; if (!backup_drive_channel.empty()) { - state_sub_ = std::make_unique>( + state_sub_ = std::make_unique>( drake_lcm_, backup_drive_channel); } }; @@ -226,6 +303,11 @@ class LcmDrivenLoop { is_new_state_message; }); + // Pump drake's LCM subscribers to empty their internal queues + // until all LCM buffers are up-to-date. + // Addresses https://github.com/RobotLocomotion/drake/issues/15234 + while (drake_lcm_->HandleSubscriptions(0) > 0); + // Update the diagram context when there is new input message if (is_new_input_message || too_long_between_input_messages_) { // Write the InputMessageType message into the context if lcm_parser is @@ -323,11 +405,9 @@ class LcmDrivenLoop { std::string diagram_name_ = "diagram"; std::string active_channel_; - std::unique_ptr> switch_sub_ = - nullptr; - std::map> - name_to_input_sub_map_; - std::unique_ptr> state_sub_; + std::unique_ptr> switch_sub_ = nullptr; + std::map> name_to_input_sub_map_; + std::unique_ptr> state_sub_; bool is_forced_publish_; bool too_long_between_input_messages_ = false; @@ -335,4 +415,4 @@ class LcmDrivenLoop { }; } // namespace systems -} // namespace dairlib \ No newline at end of file +} // namespace dairlib