Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lazily load streams in meta streams #1277

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 104 additions & 33 deletions server/streamreader/meta_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,33 +39,88 @@ MetaStream::MetaStream(PcmStream::Listener* pcmListener, const std::vector<std::
: PcmStream(pcmListener, ioc, server_settings, uri), first_read_(true)
{
auto path_components = utils::string::split(uri.path, '/');

for (const auto& stream : streams)
{
addStream(stream);
}

updateActiveStream();
}

bool MetaStream::isAllowed(const PcmStream& stream) const
{
auto path_components = utils::string::split(uri_.path, '/');
for (const auto& component : path_components)
{
if (component.empty())
continue;

bool found = false;
for (const auto& stream : streams)
if (component == WILDCARD || stream.getName() == component)
{
if (stream->getName() == component)
{
streams_.push_back(stream);
stream->addListener(this);
found = true;
break;
}
return true;
}
if (!found)
throw SnapException("Unknown stream: \"" + component + "\"");
}

if (streams_.empty())
throw SnapException("Meta stream '" + getName() + "' must contain at least one stream");
return false;
}

active_stream_ = streams_.front();
resampler_ = make_unique<Resampler>(active_stream_->getSampleFormat(), sampleFormat_);
void MetaStream::addStream(std::shared_ptr<PcmStream> stream)
{
if (isAllowed(*stream))
{
stream->addListener(this);
streams_.push_back(std::move(stream));
updateActiveStream();
}
}

void MetaStream::removeStream(const PcmStream& stream)
{
auto iter = std::find_if(streams_.begin(), streams_.end(), [id = stream.getId()](const auto& s) { return s->getId() == id; });
if (iter != streams_.end())
{
streams_.erase(iter);
updateActiveStream();
}
}

void MetaStream::updateActiveStream()
{
auto compareStreamOrder = [this](const std::shared_ptr<PcmStream>& first, const std::shared_ptr<PcmStream>& second)
{
if (first->getName() == second->getName())
return false;

auto path_components = utils::string::split(uri_.path, '/');
for (const auto& component : path_components)
{
if (component == first->getName())
return true;
if (component == second->getName())
return false;
if (component == WILDCARD)
return false;
}
return false;
};

std::lock_guard<std::recursive_mutex> lock(active_mutex_);
if (!streams_.empty())
{
auto new_active = std::min_element(streams_.begin(), streams_.end(), compareStreamOrder);
if (!active_stream_ || active_stream_->getId() != ((*new_active)->getId()))
{
active_stream_ = *streams_.begin();
resampler_ = make_unique<Resampler>(active_stream_->getSampleFormat(), sampleFormat_);
}
}
else
{
active_stream_ = nullptr;
resampler_ = nullptr;
}
}

MetaStream::~MetaStream()
{
Expand Down Expand Up @@ -133,7 +188,8 @@ void MetaStream::onStateChanged(const PcmStream* pcmStream, ReaderState state)
}
}

switch_stream(streams_.front());
if (!streams_.empty())
switch_stream(*streams_.begin());
setState(ReaderState::kIdle);
}

Expand Down Expand Up @@ -212,86 +268,100 @@ void MetaStream::onResync(const PcmStream* pcmStream, double ms)
void MetaStream::setShuffle(bool shuffle, ResultHandler handler)
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
active_stream_->setShuffle(shuffle, std::move(handler));
if (active_stream_)
active_stream_->setShuffle(shuffle, std::move(handler));
}

void MetaStream::setLoopStatus(LoopStatus status, ResultHandler handler)
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
active_stream_->setLoopStatus(status, std::move(handler));
if (active_stream_)
active_stream_->setLoopStatus(status, std::move(handler));
}

void MetaStream::setVolume(uint16_t volume, ResultHandler handler)
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
active_stream_->setVolume(volume, std::move(handler));
if (active_stream_)
active_stream_->setVolume(volume, std::move(handler));
}

void MetaStream::setMute(bool mute, ResultHandler handler)
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
active_stream_->setMute(mute, std::move(handler));
if (active_stream_)
active_stream_->setMute(mute, std::move(handler));
}

void MetaStream::setRate(float rate, ResultHandler handler)
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
active_stream_->setRate(rate, std::move(handler));
if (active_stream_)
active_stream_->setRate(rate, std::move(handler));
}


// Control commands
void MetaStream::setPosition(std::chrono::milliseconds position, ResultHandler handler)
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
active_stream_->setPosition(position, std::move(handler));
if (active_stream_)
active_stream_->setPosition(position, std::move(handler));
}

void MetaStream::seek(std::chrono::milliseconds offset, ResultHandler handler)
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
active_stream_->seek(offset, std::move(handler));
if (active_stream_)
active_stream_->seek(offset, std::move(handler));
}

void MetaStream::next(ResultHandler handler)
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
active_stream_->next(std::move(handler));
if (active_stream_)
active_stream_->next(std::move(handler));
}

void MetaStream::previous(ResultHandler handler)
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
active_stream_->previous(std::move(handler));
if (active_stream_)
active_stream_->previous(std::move(handler));
}

void MetaStream::pause(ResultHandler handler)
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
active_stream_->pause(std::move(handler));
if (active_stream_)
active_stream_->pause(std::move(handler));
}

void MetaStream::playPause(ResultHandler handler)
{
LOG(DEBUG, LOG_TAG) << "PlayPause\n";
std::lock_guard<std::recursive_mutex> lock(mutex_);
if (active_stream_->getState() == ReaderState::kIdle)
play(handler);
else
active_stream_->playPause(std::move(handler));
if (active_stream_)
{
if (active_stream_->getState() == ReaderState::kIdle)
play(handler);
else
active_stream_->playPause(std::move(handler));
}
}

void MetaStream::stop(ResultHandler handler)
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
active_stream_->stop(std::move(handler));
if (active_stream_)
active_stream_->stop(std::move(handler));
}

void MetaStream::play(ResultHandler handler)
{
LOG(DEBUG, LOG_TAG) << "Play\n";
std::lock_guard<std::recursive_mutex> lock(mutex_);
if ((active_stream_->getProperties().can_play) && (active_stream_->getProperties().playback_status != PlaybackStatus::kPlaying))
if ((active_stream_) && (active_stream_->getProperties().can_play) && (active_stream_->getProperties().playback_status != PlaybackStatus::kPlaying))
return active_stream_->play(std::move(handler));

for (const auto& stream : streams_)
Expand All @@ -303,7 +373,8 @@ void MetaStream::play(ResultHandler handler)
}

// call play on the active stream to get the handler called
active_stream_->play(std::move(handler));
if (active_stream_)
active_stream_->play(std::move(handler));
}


Expand Down
9 changes: 9 additions & 0 deletions server/streamreader/meta_stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ namespace streamreader
*/
class MetaStream : public PcmStream, public PcmStream::Listener
{
public:
static inline const std::string WILDCARD = "*";

public:
/// ctor. Encoded PCM data is passed to the PcmStream::Listener
MetaStream(PcmStream::Listener* pcmListener, const std::vector<std::shared_ptr<PcmStream>>& streams, boost::asio::io_context& ioc,
Expand All @@ -49,6 +52,9 @@ class MetaStream : public PcmStream, public PcmStream::Listener
void start() override;
void stop() override;

void addStream(std::shared_ptr<PcmStream> stream);
void removeStream(const PcmStream& stream);

// Setter for properties
void setShuffle(bool shuffle, ResultHandler handler) override;
void setLoopStatus(LoopStatus status, ResultHandler handler) override;
Expand All @@ -67,6 +73,9 @@ class MetaStream : public PcmStream, public PcmStream::Listener
void play(ResultHandler handler) override;

protected:
bool isAllowed(const PcmStream& stream) const;
void updateActiveStream();

/// Implementation of PcmStream::Listener
void onPropertiesChanged(const PcmStream* pcmStream, const Properties& properties) override;
void onStateChanged(const PcmStream* pcmStream, ReaderState state) override;
Expand Down
9 changes: 9 additions & 0 deletions server/streamreader/stream_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ PcmStreamPtr StreamManager::addStream(StreamUri& streamUri)
{
if (s->getName() == stream->getName())
throw SnapException("Stream with name \"" + stream->getName() + "\" already exists");

if (auto meta = dynamic_cast<MetaStream*>(s.get()))
meta->addStream(stream);
}
streams_.push_back(stream);
}
Expand All @@ -161,6 +164,12 @@ void StreamManager::removeStream(const std::string& name)
{
(*iter)->stop();
streams_.erase(iter);

for (const auto& s : streams_)
{
if (auto meta = dynamic_cast<MetaStream*>(s.get()))
meta->removeStream(**iter);
}
}
}

Expand Down