artifacts: fix dedupe bug
This commit is contained in:
+39
-13
@@ -1,10 +1,11 @@
|
|||||||
use std::{collections::HashMap, ops::{Deref, DerefMut}};
|
use std::{collections::HashMap, ops::{Deref, DerefMut}};
|
||||||
use std::fmt::Debug;
|
use std::fmt::Debug;
|
||||||
|
|
||||||
|
use futures::StreamExt;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
use crate::artifacts::{Artifact, Merge, SourceID, beets::BeetsDB, mixxx::MixxxDB, musicbrainz::MBQuery, tools::DataSource};
|
use crate::artifacts::{Artifact, Contents, Merge, SourceID, beets::BeetsDB, mixxx::MixxxDB, musicbrainz::MBQuery, tools::DataSource};
|
||||||
|
|
||||||
pub struct ArtifactRef<'a> {
|
pub struct ArtifactRef<'a> {
|
||||||
id: Uuid,
|
id: Uuid,
|
||||||
@@ -74,8 +75,6 @@ impl Archive {
|
|||||||
pub async fn data_sync<Src: DataSource>(&mut self, datasrc: &mut Src, source: SourceID) -> usize where Src::Error: Debug {
|
pub async fn data_sync<Src: DataSource>(&mut self, datasrc: &mut Src, source: SourceID) -> usize where Src::Error: Debug {
|
||||||
let mut count = 0;
|
let mut count = 0;
|
||||||
|
|
||||||
let mut new_artifacts = vec![];
|
|
||||||
|
|
||||||
let pending = self.contents.iter_mut().filter_map(|(_, artifact)| {
|
let pending = self.contents.iter_mut().filter_map(|(_, artifact)| {
|
||||||
if !artifact.sources.contains(&source) {
|
if !artifact.sources.contains(&source) {
|
||||||
Some(artifact)
|
Some(artifact)
|
||||||
@@ -84,14 +83,23 @@ impl Archive {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
let futures = futures::stream::FuturesUnordered::new();
|
||||||
|
|
||||||
for artifact in pending {
|
for artifact in pending {
|
||||||
match datasrc.synchronize(artifact).await {
|
futures.push(datasrc.synchronize(artifact));
|
||||||
Ok(mut new_pending) => {
|
}
|
||||||
|
|
||||||
|
let results: Vec<_> = futures.collect().await;
|
||||||
|
for result in results {
|
||||||
|
match result {
|
||||||
|
Ok(new_pending) => {
|
||||||
count += new_pending.len() + 1;
|
count += new_pending.len() + 1;
|
||||||
new_artifacts.append(&mut new_pending);
|
for new in new_pending {
|
||||||
|
self.insert(new);
|
||||||
|
}
|
||||||
},
|
},
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
log::error!("Failed to synchronize {:?}: {:?}", artifact, err);
|
log::error!("Failed to synchronize: {:?}", err);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -100,14 +108,17 @@ impl Archive {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub async fn synchronize(&mut self) -> usize {
|
pub async fn synchronize(&mut self) -> usize {
|
||||||
log::info!("Synchronizing records");
|
log::debug!("Synchronizing records");
|
||||||
let mut count = 0;
|
let mut count = 0;
|
||||||
|
|
||||||
|
log::debug!("Synchronizing Mixxx");
|
||||||
count += self.data_sync(&mut MixxxDB, SourceID::Mixxx).await;
|
count += self.data_sync(&mut MixxxDB, SourceID::Mixxx).await;
|
||||||
|
log::debug!("Synchronizing Beets");
|
||||||
count += self.data_sync(&mut BeetsDB, SourceID::Beets).await;
|
count += self.data_sync(&mut BeetsDB, SourceID::Beets).await;
|
||||||
|
log::debug!("Synchronizing Musicbrainz");
|
||||||
count += self.data_sync(&mut MBQuery, SourceID::Musicbrainz).await;
|
count += self.data_sync(&mut MBQuery, SourceID::Musicbrainz).await;
|
||||||
|
|
||||||
log::info!("Updated {} records", count);
|
log::debug!("Updated {} records", count);
|
||||||
|
|
||||||
count
|
count
|
||||||
}
|
}
|
||||||
@@ -116,19 +127,32 @@ impl Archive {
|
|||||||
// If we are inserting a new artifact with a complete MBID...
|
// If we are inserting a new artifact with a complete MBID...
|
||||||
if let Some(mbid) = artifact.mbid.clone() {
|
if let Some(mbid) = artifact.mbid.clone() {
|
||||||
let search_id = mbid;
|
let search_id = mbid;
|
||||||
// And that one already exists...
|
// If an entry already exists keyed by this MBID, merge into it
|
||||||
if let Some(existing) = self.contents.get_mut(&search_id) {
|
if let Some(existing) = self.contents.get_mut(&search_id) {
|
||||||
// Update the data
|
|
||||||
existing.merge(artifact);
|
existing.merge(artifact);
|
||||||
ArtifactRef { id: search_id, archive: self }
|
ArtifactRef { id: search_id, archive: self }
|
||||||
} else {
|
} else {
|
||||||
// Otherwise, we have a valid ID from some source, but it isn't in the system yet, so lets just fill it up
|
// Otherwise, attempt to find existing artifacts with the same contents (but no MBID)
|
||||||
|
let mut targets: Vec<(Uuid, Artifact)> = self.contents.extract_if(|_, v| { v.contents == artifact.contents }).collect();
|
||||||
|
if let Some((target_id, mut target)) = targets.pop() {
|
||||||
|
// Merge any other extracted targets into the primary one
|
||||||
|
for (_, next) in targets {
|
||||||
|
target.merge(next);
|
||||||
|
}
|
||||||
|
// Merge the incoming artifact into the merged target
|
||||||
|
target.merge(artifact);
|
||||||
|
// Insert merged target under the canonical MBID key
|
||||||
|
self.contents.insert(search_id.clone(), target);
|
||||||
|
ArtifactRef { id: search_id, archive: self }
|
||||||
|
} else {
|
||||||
|
// No matching content found: insert under the MBID key
|
||||||
self.contents.insert(search_id.clone(), artifact);
|
self.contents.insert(search_id.clone(), artifact);
|
||||||
ArtifactRef { id: search_id, archive: self }
|
ArtifactRef { id: search_id, archive: self }
|
||||||
}
|
}
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
// Otherwise, we attempt to merge it in. In the end, there will somehow still be a record with this mbid
|
// Otherwise, we attempt to merge it in. In the end, there will somehow still be a record with this mbid
|
||||||
let mut targets: Vec<(Uuid, Artifact)> = self.contents.extract_if(|_, v| { *v == artifact }).collect();
|
let mut targets: Vec<(Uuid, Artifact)> = self.contents.extract_if(|_, v| { v.contents == artifact.contents }).collect();
|
||||||
if let Some((target_id, mut target)) = targets.pop() {
|
if let Some((target_id, mut target)) = targets.pop() {
|
||||||
let next_id = if let Some(ref mbid) = artifact.mbid {
|
let next_id = if let Some(ref mbid) = artifact.mbid {
|
||||||
// If the new artifact has an mbid, we start using that as the archive key
|
// If the new artifact has an mbid, we start using that as the archive key
|
||||||
@@ -142,6 +166,8 @@ impl Archive {
|
|||||||
target.merge(next);
|
target.merge(next);
|
||||||
}
|
}
|
||||||
target.merge(artifact);
|
target.merge(artifact);
|
||||||
|
// Re-insert the merged target back into the archive under the chosen id
|
||||||
|
self.contents.insert(next_id.clone(), target);
|
||||||
ArtifactRef { id: next_id, archive: self }
|
ArtifactRef { id: next_id, archive: self }
|
||||||
} else {
|
} else {
|
||||||
let new_id = Uuid::new_v4();
|
let new_id = Uuid::new_v4();
|
||||||
|
|||||||
@@ -101,7 +101,7 @@ impl DataSource for MBQuery {
|
|||||||
let artifact_id = artifact.mbid.clone().unwrap();
|
let artifact_id = artifact.mbid.clone().unwrap();
|
||||||
log::debug!("Synchronizing {} with musicbrainz", artifact_id);
|
log::debug!("Synchronizing {} with musicbrainz", artifact_id);
|
||||||
match artifact.contents {
|
match artifact.contents {
|
||||||
Contents::Track(ref mut target_track) => {
|
Contents::Track(_) => {
|
||||||
let mb_track = Recording::fetch()
|
let mb_track = Recording::fetch()
|
||||||
.id(&artifact_id.to_string())
|
.id(&artifact_id.to_string())
|
||||||
.with_releases().with_artists().with_annotations().execute_async().await;
|
.with_releases().with_artists().with_annotations().execute_async().await;
|
||||||
@@ -118,12 +118,7 @@ impl DataSource for MBQuery {
|
|||||||
|
|
||||||
ret.push(track.clone());
|
ret.push(track.clone());
|
||||||
ret.append(&mut new_artifacts);
|
ret.append(&mut new_artifacts);
|
||||||
|
artifact.merge(track);
|
||||||
artifact.sources.insert(SourceID::Musicbrainz);
|
|
||||||
|
|
||||||
if let Contents::Track(track) = track.contents {
|
|
||||||
target_track.merge(track);
|
|
||||||
}
|
|
||||||
},
|
},
|
||||||
_ => ()
|
_ => ()
|
||||||
}
|
}
|
||||||
@@ -133,18 +128,16 @@ impl DataSource for MBQuery {
|
|||||||
|
|
||||||
async fn query(&mut self, args: &Self::Args) -> Result<Vec<Artifact>, Self::Error> {
|
async fn query(&mut self, args: &Self::Args) -> Result<Vec<Artifact>, Self::Error> {
|
||||||
let mut ret = vec![];
|
let mut ret = vec![];
|
||||||
|
log::debug!("Fetching recording id {}", args.mbid);
|
||||||
for mbid in &args.mb_ids {
|
|
||||||
log::debug!("Fetching recording id {}", mbid);
|
|
||||||
let track = Recording::fetch()
|
let track = Recording::fetch()
|
||||||
.id(&mbid)
|
.id(&args.mbid)
|
||||||
.with_releases().with_artists().with_annotations().execute_async().await;
|
.with_releases().with_artists().with_annotations().execute_async().await;
|
||||||
|
|
||||||
let track = match track {
|
let track = match track {
|
||||||
Ok(track) => track,
|
Ok(track) => track,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
log::error!("Failed to grab musicbrainz data: {:?}", err);
|
log::error!("Failed to grab musicbrainz data: {:?}", err);
|
||||||
continue;
|
return Err(err)
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -152,7 +145,6 @@ impl DataSource for MBQuery {
|
|||||||
|
|
||||||
ret.push(track);
|
ret.push(track);
|
||||||
ret.append(&mut new_artifacts);
|
ret.append(&mut new_artifacts);
|
||||||
}
|
|
||||||
|
|
||||||
Ok(ret)
|
Ok(ret)
|
||||||
}
|
}
|
||||||
@@ -170,5 +162,5 @@ impl ToolDescription for MBQuery {
|
|||||||
|
|
||||||
#[derive(Debug, Default, Deserialize, Serialize, JsonSchema)]
|
#[derive(Debug, Default, Deserialize, Serialize, JsonSchema)]
|
||||||
pub struct MusicbrainzQueryArgs {
|
pub struct MusicbrainzQueryArgs {
|
||||||
pub mb_ids: Vec<String>
|
pub mbid: String
|
||||||
}
|
}
|
||||||
Reference in New Issue
Block a user