Skip to content

Commit

Permalink
main: use session only once
Browse files Browse the repository at this point in the history
  • Loading branch information
eladyn committed Jan 19, 2025
1 parent 59e8ef0 commit 298afe6
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 60 deletions.
33 changes: 15 additions & 18 deletions src/dbus_mpris.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ const MPRIS_PATH: &str = "/org/mpris/MediaPlayer2";
const CONTROLS_PATH: &str = "/rs/spotifyd/Controls";

pub enum ControlMessage {
SetSpirc(Arc<Spirc>),
DropSpirc,
SetSession(Arc<Spirc>, Session),
DropSession,
Shutdown,
}

Expand All @@ -52,28 +52,24 @@ pub(crate) struct DbusServer {
}

impl DbusServer {
pub fn new(
event_rx: UnboundedReceiver<PlayerEvent>,
dbus_type: DBusType,
session: Session,
) -> DbusServer {
pub fn new(event_rx: UnboundedReceiver<PlayerEvent>, dbus_type: DBusType) -> DbusServer {
let (control_tx, control_rx) = tokio::sync::mpsc::unbounded_channel();
let dbus_future = Box::pin(create_dbus_server(event_rx, control_rx, dbus_type, session));
let dbus_future = Box::pin(create_dbus_server(event_rx, control_rx, dbus_type));
DbusServer {
dbus_future,
control_tx,
}
}

pub fn set_spirc(&self, spirc: Arc<Spirc>) -> Result<(), DbusError> {
pub fn set_session(&self, spirc: Arc<Spirc>, session: Session) -> Result<(), DbusError> {
self.control_tx
.send(ControlMessage::SetSpirc(spirc))
.send(ControlMessage::SetSession(spirc, session))
.map_err(|_| DbusError::ControlChannelBroken)
}

pub fn drop_spirc(&self) -> Result<(), DbusError> {
pub fn drop_session(&self) -> Result<(), DbusError> {
self.control_tx
.send(ControlMessage::DropSpirc)
.send(ControlMessage::DropSession)
.map_err(|_| DbusError::ControlChannelBroken)
}

Expand Down Expand Up @@ -403,7 +399,6 @@ async fn create_dbus_server(
mut event_rx: UnboundedReceiver<PlayerEvent>,
mut control_rx: UnboundedReceiver<ControlMessage>,
dbus_type: DBusType,
session: Session,
) -> Result<(), DbusError> {
let (resource, conn) = match dbus_type {
DBusType::Session => connection::new_session_sync(),
Expand Down Expand Up @@ -456,6 +451,7 @@ async fn create_dbus_server(
);

let mut spirc: Option<Arc<Spirc>> = None;
let mut session: Option<Session> = None;

struct ConnectionData {
conn_id: String,
Expand All @@ -477,11 +473,10 @@ async fn create_dbus_server(

if let PlayerEvent::SessionConnected { connection_id, .. } = event {
let mut cr = crossroads.lock().await;
let spirc = spirc.clone().unwrap();
let seeked_fn = register_player_interface(
&mut cr,
spirc,
session.clone(),
spirc.clone().unwrap(),
session.clone().unwrap(),
current_state.clone(),
quit_tx.clone(),
);
Expand Down Expand Up @@ -539,17 +534,19 @@ async fn create_dbus_server(
ControlMessage::Shutdown => {
break;
},
ControlMessage::SetSpirc(new_spirc) => {
ControlMessage::SetSession(new_spirc, new_session) => {
let mut cr = crossroads.lock().await;
register_controls_interface(&mut cr, new_spirc.clone());
spirc = Some(new_spirc);
session = Some(new_session);
}
ControlMessage::DropSpirc => {
ControlMessage::DropSession => {
let mut cr = crossroads.lock().await;
conn.release_name(&mpris_name).await?;
cr.remove::<()>(&MPRIS_PATH.into());
cr.remove::<()>(&CONTROLS_PATH.into());
spirc = None;
session = None;
}
}
}
Expand Down
82 changes: 60 additions & 22 deletions src/main_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,24 @@ use futures::{
Future, FutureExt, StreamExt,
};
use librespot_connect::{config::ConnectConfig, spirc::Spirc};
use librespot_core::{authentication::Credentials, config::DeviceType, session::Session, Error};
use librespot_core::{
authentication::Credentials, cache::Cache, config::DeviceType, session::Session, Error,
SessionConfig,
};
use librespot_discovery::Discovery;
use librespot_playback::{mixer::Mixer, player::Player};
use librespot_playback::{
audio_backend::Sink,
config::{AudioFormat, PlayerConfig},
mixer::Mixer,
player::Player,
};
use log::error;
use std::pin::Pin;
use std::sync::Arc;

#[cfg(not(feature = "dbus_mpris"))]
type DbusServer = Pending<()>;

pub struct SpotifydState {
pub device_name: String,
pub player_event_program: Option<String>,
}

pub(crate) enum CredentialsProvider {
Discovery(Peekable<Discovery>),
SpotifyCredentials(Credentials),
Expand Down Expand Up @@ -61,40 +64,72 @@ impl CredentialsProvider {
}

pub(crate) struct MainLoop {
pub(crate) spotifyd_state: SpotifydState,
pub(crate) session_config: SessionConfig,
pub(crate) player_config: PlayerConfig,
pub(crate) cache: Option<Cache>,
pub(crate) mixer: Arc<dyn Mixer>,
pub(crate) session: Session,
pub(crate) player: Arc<Player>,
pub(crate) backend: fn(Option<String>, AudioFormat) -> Box<dyn Sink>,
pub(crate) audio_device: Option<String>,
pub(crate) audio_format: AudioFormat,
pub(crate) has_volume_ctrl: bool,
pub(crate) initial_volume: Option<u16>,
pub(crate) shell: String,
pub(crate) device_type: DeviceType,
pub(crate) device_name: String,
pub(crate) player_event_program: Option<String>,
#[cfg_attr(not(feature = "dbus_mpris"), allow(unused))]
pub(crate) use_mpris: bool,
#[cfg_attr(not(feature = "dbus_mpris"), allow(unused))]
pub(crate) dbus_type: DBusType,
pub(crate) credentials_provider: CredentialsProvider,
}

struct ConnectionInfo<SpircTask: Future<Output = ()>> {
spirc: Spirc,
#[cfg_attr(not(feature = "dbus_mpris"), expect(unused))]
session: Session,
player: Arc<Player>,
spirc_task: SpircTask,
}

impl MainLoop {
async fn get_connection(&mut self) -> Result<(Spirc, impl Future<Output = ()>), Error> {
async fn get_connection(&mut self) -> Result<ConnectionInfo<impl Future<Output = ()>>, Error> {
let creds = self.credentials_provider.get_credentials().await;

let session = Session::new(self.session_config.clone(), self.cache.clone());
let player = {
let audio_device = self.audio_device.clone();
let audio_format = self.audio_format;
let backend = self.backend;
Player::new(
self.player_config.clone(),
session.clone(),
self.mixer.get_soft_volume(),
move || backend(audio_device, audio_format),
)
};

// TODO: expose is_group
Spirc::new(
ConnectConfig {
name: self.spotifyd_state.device_name.clone(),
name: self.device_name.clone(),
device_type: self.device_type,
is_group: false,
initial_volume: self.initial_volume,
has_volume_ctrl: self.has_volume_ctrl,
},
self.session.clone(),
session.clone(),
creds,
self.player.clone(),
player.clone(),
self.mixer.clone(),
)
.await
.map(|(spirc, spirc_task)| ConnectionInfo {
spirc,
session,
player,
spirc_task,
})
}

pub(crate) async fn run(mut self) {
Expand All @@ -107,15 +142,14 @@ impl MainLoop {
#[cfg(feature = "dbus_mpris")]
let mpris_event_tx = if self.use_mpris {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
*dbus_server.as_mut() =
Either::Left(DbusServer::new(rx, self.dbus_type, self.session.clone()));
*dbus_server.as_mut() = Either::Left(DbusServer::new(rx, self.dbus_type));
Some(tx)
} else {
None
};

'mainloop: loop {
let (spirc, spirc_task) = tokio::select!(
let connection = tokio::select!(
_ = &mut ctrl_c => {
if let CredentialsProvider::Discovery(stream) = self.credentials_provider {
let _ = stream.into_inner().shutdown().await;
Expand All @@ -133,13 +167,17 @@ impl MainLoop {
}
);

let spirc_task = connection.spirc_task;
tokio::pin!(spirc_task);

let shared_spirc = Arc::new(spirc);
let shared_spirc = Arc::new(connection.spirc);

#[cfg(feature = "dbus_mpris")]
if let Either::Left(mut dbus_server) = Either::as_pin_mut(dbus_server.as_mut()) {
if let Err(err) = dbus_server.as_mut().set_spirc(shared_spirc.clone()) {
if let Err(err) = dbus_server
.as_mut()
.set_session(shared_spirc.clone(), connection.session)
{
error!("failed to configure dbus server: {err}");
let _ = shared_spirc.shutdown();
break 'mainloop;
Expand All @@ -148,7 +186,7 @@ impl MainLoop {

let mut running_event_program = Box::pin(Fuse::terminated());

let mut event_channel = self.player.get_player_event_channel();
let mut event_channel = connection.player.get_player_event_channel();

loop {
tokio::select!(
Expand Down Expand Up @@ -187,7 +225,7 @@ impl MainLoop {
if let Some(ref tx) = mpris_event_tx {
tx.send(event.clone()).unwrap();
}
if let Some(ref cmd) = self.spotifyd_state.player_event_program {
if let Some(ref cmd) = self.player_event_program {
match spawn_program_on_event(&self.shell, cmd, event) {
Ok(child) => running_event_program = Box::pin(child.wait().fuse()),
Err(e) => error!("{}", e),
Expand All @@ -207,7 +245,7 @@ impl MainLoop {
}
#[cfg(feature = "dbus_mpris")]
if let Either::Left(dbus_server) = Either::as_pin_mut(dbus_server.as_mut()) {
if let Err(err) = dbus_server.drop_spirc() {
if let Err(err) = dbus_server.drop_session() {
error!("failed to reconfigure dbus server: {err}");
break 'mainloop;
}
Expand Down
30 changes: 10 additions & 20 deletions src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ use crate::{
};
#[cfg(feature = "dbus_keyring")]
use keyring::Entry;
use librespot_core::{authentication::Credentials, cache::Cache, config::DeviceType, Session};
use librespot_core::{authentication::Credentials, cache::Cache, config::DeviceType};
use librespot_playback::mixer::MixerConfig;
use librespot_playback::{
audio_backend::{Sink, BACKENDS},
config::AudioFormat,
mixer::{self, Mixer},
};
use librespot_playback::{mixer::MixerConfig, player::Player};
#[allow(unused_imports)] // cfg
use log::{debug, error, info, warn};
use std::{str::FromStr, sync::Arc, thread, time::Duration};
Expand Down Expand Up @@ -121,31 +121,21 @@ pub(crate) fn initial_state(config: config::SpotifydConfig) -> main_loop::MainLo

let backend = find_backend(backend.as_ref().map(String::as_ref));

let session = Session::new(session_config, cache);
let player = {
let audio_device = config.audio_device;
let audio_format = config.audio_format;
Player::new(
player_config,
session.clone(),
mixer.get_soft_volume(),
move || backend(audio_device, audio_format),
)
};

main_loop::MainLoop {
credentials_provider,
mixer,
spotifyd_state: main_loop::SpotifydState {
device_name: config.device_name,
player_event_program: config.onevent,
},
session,
player,
session_config,
cache,
audio_device: config.audio_device,
audio_format: config.audio_format,
player_config,
backend,
initial_volume: config.initial_volume,
has_volume_ctrl,
shell: config.shell,
device_type,
device_name: config.device_name,
player_event_program: config.onevent,
use_mpris: config.use_mpris,
dbus_type: config.dbus_type,
}
Expand Down

0 comments on commit 298afe6

Please sign in to comment.