From 2d7153eaf75a1149d1be550c21070141b6ad9610 Mon Sep 17 00:00:00 2001 From: Victoria Fischer Date: Mon, 22 Jun 2026 08:54:33 +0200 Subject: [PATCH] artifacts: fix dedupe bug --- src/artifacts/archive.rs | 56 ++++++++++++++++++++++++++---------- src/artifacts/musicbrainz.rs | 42 +++++++++++---------------- 2 files changed, 58 insertions(+), 40 deletions(-) diff --git a/src/artifacts/archive.rs b/src/artifacts/archive.rs index 2b9166a..464f829 100644 --- a/src/artifacts/archive.rs +++ b/src/artifacts/archive.rs @@ -1,10 +1,11 @@ use std::{collections::HashMap, ops::{Deref, DerefMut}}; use std::fmt::Debug; +use futures::StreamExt; use serde::{Deserialize, Serialize}; use uuid::Uuid; -use crate::artifacts::{Artifact, Merge, SourceID, beets::BeetsDB, mixxx::MixxxDB, musicbrainz::MBQuery, tools::DataSource}; +use crate::artifacts::{Artifact, Contents, Merge, SourceID, beets::BeetsDB, mixxx::MixxxDB, musicbrainz::MBQuery, tools::DataSource}; pub struct ArtifactRef<'a> { id: Uuid, @@ -74,8 +75,6 @@ impl Archive { pub async fn data_sync(&mut self, datasrc: &mut Src, source: SourceID) -> usize where Src::Error: Debug { let mut count = 0; - let mut new_artifacts = vec![]; - let pending = self.contents.iter_mut().filter_map(|(_, artifact)| { if !artifact.sources.contains(&source) { Some(artifact) @@ -84,14 +83,23 @@ impl Archive { } }); + let futures = futures::stream::FuturesUnordered::new(); + for artifact in pending { - match datasrc.synchronize(artifact).await { - Ok(mut new_pending) => { + futures.push(datasrc.synchronize(artifact)); + } + + let results: Vec<_> = futures.collect().await; + for result in results { + match result { + Ok(new_pending) => { count += new_pending.len() + 1; - new_artifacts.append(&mut new_pending); + for new in new_pending { + self.insert(new); + } }, Err(err) => { - log::error!("Failed to synchronize {:?}: {:?}", artifact, err); + log::error!("Failed to synchronize: {:?}", err); } } } @@ -100,14 +108,17 @@ impl Archive { } pub async fn synchronize(&mut self) -> usize { - log::info!("Synchronizing records"); + log::debug!("Synchronizing records"); let mut count = 0; + log::debug!("Synchronizing Mixxx"); count += self.data_sync(&mut MixxxDB, SourceID::Mixxx).await; + log::debug!("Synchronizing Beets"); count += self.data_sync(&mut BeetsDB, SourceID::Beets).await; + log::debug!("Synchronizing Musicbrainz"); count += self.data_sync(&mut MBQuery, SourceID::Musicbrainz).await; - log::info!("Updated {} records", count); + log::debug!("Updated {} records", count); count } @@ -116,19 +127,32 @@ impl Archive { // If we are inserting a new artifact with a complete MBID... if let Some(mbid) = artifact.mbid.clone() { let search_id = mbid; - // And that one already exists... + // If an entry already exists keyed by this MBID, merge into it if let Some(existing) = self.contents.get_mut(&search_id) { - // Update the data existing.merge(artifact); ArtifactRef { id: search_id, archive: self } } else { - // Otherwise, we have a valid ID from some source, but it isn't in the system yet, so lets just fill it up - self.contents.insert(search_id.clone(), artifact); - ArtifactRef { id: search_id, archive: self } + // Otherwise, attempt to find existing artifacts with the same contents (but no MBID) + let mut targets: Vec<(Uuid, Artifact)> = self.contents.extract_if(|_, v| { v.contents == artifact.contents }).collect(); + if let Some((target_id, mut target)) = targets.pop() { + // Merge any other extracted targets into the primary one + for (_, next) in targets { + target.merge(next); + } + // Merge the incoming artifact into the merged target + target.merge(artifact); + // Insert merged target under the canonical MBID key + self.contents.insert(search_id.clone(), target); + ArtifactRef { id: search_id, archive: self } + } else { + // No matching content found: insert under the MBID key + self.contents.insert(search_id.clone(), artifact); + ArtifactRef { id: search_id, archive: self } + } } } else { // Otherwise, we attempt to merge it in. In the end, there will somehow still be a record with this mbid - let mut targets: Vec<(Uuid, Artifact)> = self.contents.extract_if(|_, v| { *v == artifact }).collect(); + let mut targets: Vec<(Uuid, Artifact)> = self.contents.extract_if(|_, v| { v.contents == artifact.contents }).collect(); if let Some((target_id, mut target)) = targets.pop() { let next_id = if let Some(ref mbid) = artifact.mbid { // If the new artifact has an mbid, we start using that as the archive key @@ -142,6 +166,8 @@ impl Archive { target.merge(next); } target.merge(artifact); + // Re-insert the merged target back into the archive under the chosen id + self.contents.insert(next_id.clone(), target); ArtifactRef { id: next_id, archive: self } } else { let new_id = Uuid::new_v4(); diff --git a/src/artifacts/musicbrainz.rs b/src/artifacts/musicbrainz.rs index 85a505a..318044c 100644 --- a/src/artifacts/musicbrainz.rs +++ b/src/artifacts/musicbrainz.rs @@ -101,7 +101,7 @@ impl DataSource for MBQuery { let artifact_id = artifact.mbid.clone().unwrap(); log::debug!("Synchronizing {} with musicbrainz", artifact_id); match artifact.contents { - Contents::Track(ref mut target_track) => { + Contents::Track(_) => { let mb_track = Recording::fetch() .id(&artifact_id.to_string()) .with_releases().with_artists().with_annotations().execute_async().await; @@ -118,12 +118,7 @@ impl DataSource for MBQuery { ret.push(track.clone()); ret.append(&mut new_artifacts); - - artifact.sources.insert(SourceID::Musicbrainz); - - if let Contents::Track(track) = track.contents { - target_track.merge(track); - } + artifact.merge(track); }, _ => () } @@ -133,26 +128,23 @@ impl DataSource for MBQuery { async fn query(&mut self, args: &Self::Args) -> Result, Self::Error> { let mut ret = vec![]; + log::debug!("Fetching recording id {}", args.mbid); + let track = Recording::fetch() + .id(&args.mbid) + .with_releases().with_artists().with_annotations().execute_async().await; - for mbid in &args.mb_ids { - log::debug!("Fetching recording id {}", mbid); - let track = Recording::fetch() - .id(&mbid) - .with_releases().with_artists().with_annotations().execute_async().await; + let track = match track { + Ok(track) => track, + Err(err) => { + log::error!("Failed to grab musicbrainz data: {:?}", err); + return Err(err) + } + }; - let track = match track { - Ok(track) => track, - Err(err) => { - log::error!("Failed to grab musicbrainz data: {:?}", err); - continue; - } - }; + let (track, mut new_artifacts) = Self::extract_recording_data(track); - let (track, mut new_artifacts) = Self::extract_recording_data(track); - - ret.push(track); - ret.append(&mut new_artifacts); - } + ret.push(track); + ret.append(&mut new_artifacts); Ok(ret) } @@ -170,5 +162,5 @@ impl ToolDescription for MBQuery { #[derive(Debug, Default, Deserialize, Serialize, JsonSchema)] pub struct MusicbrainzQueryArgs { - pub mb_ids: Vec + pub mbid: String } \ No newline at end of file