diff --git a/db4-storage/src/generic_time_ops.rs b/db4-storage/src/generic_time_ops.rs index 49dad06cbf..b663d05272 100644 --- a/db4-storage/src/generic_time_ops.rs +++ b/db4-storage/src/generic_time_ops.rs @@ -1,11 +1,12 @@ use crate::{NodeEntryRef, segments::additions::MemAdditions, utils::Iter3}; +use std::ops::Range; + use raphtory_api::core::entities::LayerId; use raphtory_core::{ entities::{ELID, LayerIds, layers::Multiple}, storage::timeindex::{EventTime, TimeIndexOps}, }; use raphtory_itertools::FastMergeExt; -use std::ops::Range; #[derive(Clone, Debug)] pub enum LayerIter<'a> { diff --git a/db4-storage/src/lib.rs b/db4-storage/src/lib.rs index d5bc9618ba..43e1372fc3 100644 --- a/db4-storage/src/lib.rs +++ b/db4-storage/src/lib.rs @@ -156,6 +156,12 @@ impl From for LocalPOS { } } +impl From for usize { + fn from(id: LocalPOS) -> Self { + id.0 as usize + } +} + impl LocalPOS { pub fn as_vid(self, page_id: usize, max_page_len: u32) -> VID { VID(page_id * (max_page_len as usize) + (self.0 as usize)) diff --git a/db4-storage/src/pages/edge_page/bulk_writer.rs b/db4-storage/src/pages/edge_page/bulk_writer.rs new file mode 100644 index 0000000000..575a7b22a5 --- /dev/null +++ b/db4-storage/src/pages/edge_page/bulk_writer.rs @@ -0,0 +1,136 @@ +use crate::{ + LocalPOS, api::edges::EdgeSegmentOps, pages::edge_page::writer::EdgeWriter, + segments::edge::segment::MemEdgeSegment, +}; +use raphtory_api::core::entities::{ + EID, LayerId, VID, + properties::{meta::STATIC_GRAPH_LAYER_ID, prop::AsPropRef}, +}; +use raphtory_core::storage::timeindex::{AsTime, EventTime}; +use std::ops::DerefMut; + +pub struct BulkEdgeWriter< + 'a, + MP: DerefMut + std::fmt::Debug, + ES: EdgeSegmentOps, +> { + ew: EdgeWriter<'a, MP, ES>, + layers: Vec, + earliest: EventTime, + latest: EventTime, +} + +impl<'a, MP: DerefMut + std::fmt::Debug, ES: EdgeSegmentOps> + From> for BulkEdgeWriter<'a, MP, ES> +{ + fn from(value: EdgeWriter<'a, MP, ES>) -> Self { + Self { + ew: value, + layers: vec![0], + earliest: EventTime::MAX, + latest: EventTime::MIN, + } + } +} + +impl<'a, MP: DerefMut + std::fmt::Debug, ES: EdgeSegmentOps> + BulkEdgeWriter<'a, MP, ES> +{ + pub fn bulk_add_edge( + &mut self, + t: EventTime, + edge_pos: LocalPOS, + src: VID, + dst: VID, + edge_exists: bool, + layer_id: LayerId, + c_props: impl IntoIterator, + t_props: impl IntoIterator, + ) { + if !edge_exists { + if self + .ew + .writer + .insert_static_edge_internal(edge_pos, src, dst, STATIC_GRAPH_LAYER_ID) + { + self.increment_layer_num_edges(STATIC_GRAPH_LAYER_ID); + } + } + + if self + .ew + .writer + .insert_edge_internal(t, edge_pos, src, dst, layer_id, t_props) + && !self.ew.page.immut_has_edge(edge_pos, layer_id) + { + self.increment_layer_num_edges(layer_id); + } + + self.update_time(t); + + self.ew + .writer + .update_const_properties(edge_pos, src, dst, layer_id, c_props); + } + + pub fn bulk_delete_edge( + &mut self, + t: EventTime, + edge_pos: LocalPOS, + src: VID, + dst: VID, + exists: bool, + layer_id: LayerId, + ) { + if !exists { + if self + .ew + .writer + .insert_static_edge_internal(edge_pos, src, dst, STATIC_GRAPH_LAYER_ID) + { + self.increment_layer_num_edges(STATIC_GRAPH_LAYER_ID); + } + } + + self.update_time(t); + if self + .ew + .writer + .delete_edge_internal(t, edge_pos, src, dst, layer_id) + && !self.ew.page.immut_has_edge(edge_pos, layer_id) + { + self.increment_layer_num_edges(layer_id); + } + } + + #[inline] + fn increment_layer_num_edges(&mut self, layer_id: LayerId) { + if self.layers.len() <= layer_id.0 { + self.layers.resize_with(layer_id.0 + 1, Default::default); + } + self.layers[layer_id.0] += 1; + } + + #[inline] + fn update_time(&mut self, t: EventTime) { + self.earliest = self.earliest.min(t); + self.latest = self.latest.max(t); + } + + #[inline(always)] + pub fn resolve_pos(&self, edge_id: EID) -> Option { + self.ew.resolve_pos(edge_id) + } +} + +impl<'a, MP: DerefMut + std::fmt::Debug, ES: EdgeSegmentOps> Drop + for BulkEdgeWriter<'a, MP, ES> +{ + fn drop(&mut self) { + for (layer_id, count) in self.layers.iter().enumerate() { + self.ew.graph_stats.increment_by(LayerId(layer_id), *count); + } + self.ew.graph_stats.update_time(self.earliest.t()); + self.ew.graph_stats.update_time(self.latest.t()); + } +} diff --git a/db4-storage/src/pages/edge_page/mod.rs b/db4-storage/src/pages/edge_page/mod.rs index d3baa81782..a1f6628c62 100644 --- a/db4-storage/src/pages/edge_page/mod.rs +++ b/db4-storage/src/pages/edge_page/mod.rs @@ -1 +1,2 @@ +pub mod bulk_writer; pub mod writer; diff --git a/db4-storage/src/pages/edge_page/writer.rs b/db4-storage/src/pages/edge_page/writer.rs index 55d8fa19a6..a843537492 100644 --- a/db4-storage/src/pages/edge_page/writer.rs +++ b/db4-storage/src/pages/edge_page/writer.rs @@ -178,6 +178,7 @@ impl<'a, MP: DerefMut + std::fmt::Debug, ES: EdgeSegmen self.page.segment_id() } + #[inline] fn increment_layer_num_edges(&self, layer_id: LayerId) { self.graph_stats.increment(layer_id); } diff --git a/db4-storage/src/pages/layer_counter.rs b/db4-storage/src/pages/layer_counter.rs index e94f8567b7..3342fdd14a 100644 --- a/db4-storage/src/pages/layer_counter.rs +++ b/db4-storage/src/pages/layer_counter.rs @@ -70,9 +70,15 @@ impl GraphStats { self.latest.get() } + #[inline(always)] pub fn increment(&self, layer_id: LayerId) -> usize { + self.increment_by(layer_id, 1) + } + + #[inline(always)] + pub fn increment_by(&self, layer_id: LayerId, count: usize) -> usize { let counter = self.get_or_create_layer(layer_id); - counter.fetch_add(1, std::sync::atomic::Ordering::Release) + counter.fetch_add(count, std::sync::atomic::Ordering::Release) } pub fn get(&self, layer_id: LayerId) -> usize { diff --git a/db4-storage/src/pages/locked/edges.rs b/db4-storage/src/pages/locked/edges.rs index 79f116ad4e..d3b94f7b4e 100644 --- a/db4-storage/src/pages/locked/edges.rs +++ b/db4-storage/src/pages/locked/edges.rs @@ -4,7 +4,11 @@ use crate::{ LocalPOS, api::edges::EdgeSegmentOps, error::StorageError, - pages::{edge_page::writer::EdgeWriter, layer_counter::GraphStats, resolve_pos}, + pages::{ + edge_page::{bulk_writer::BulkEdgeWriter, writer::EdgeWriter}, + layer_counter::GraphStats, + resolve_pos, + }, persist::strategy::PersistenceStrategy, segments::edge::segment::MemEdgeSegment, }; @@ -44,6 +48,11 @@ impl<'a, ES: EdgeSegmentOps> LockedEdgePage<'a, ES> { EdgeWriter::new(self.num_edges, self.page, self.lock.deref_mut()) } + #[inline(always)] + pub fn bulk_writer(&mut self) -> BulkEdgeWriter<'_, &mut MemEdgeSegment, ES> { + EdgeWriter::new(self.num_edges, self.page, self.lock.deref_mut()).into() + } + #[inline(always)] pub fn page_id(&self) -> usize { self.page_id diff --git a/db4-storage/src/pages/locked/nodes.rs b/db4-storage/src/pages/locked/nodes.rs index 0cb296d27e..3b394b4edf 100644 --- a/db4-storage/src/pages/locked/nodes.rs +++ b/db4-storage/src/pages/locked/nodes.rs @@ -2,7 +2,11 @@ use crate::{ LocalPOS, api::nodes::NodeSegmentOps, error::StorageError, - pages::{layer_counter::GraphStats, node_page::writer::NodeWriter, resolve_pos}, + pages::{ + layer_counter::GraphStats, + node_page::{bulk_writer::BulkNodeWriter, writer::NodeWriter}, + resolve_pos, + }, persist::strategy::PersistenceStrategy, segments::node::segment::MemNodeSegment, }; @@ -47,6 +51,11 @@ impl<'a, NS: NodeSegmentOps> LockedNodePage<'a, NS> { NodeWriter::new(self.page, self.layer_counter, self.lock.deref_mut()) } + #[inline(always)] + pub fn bulk_writer(&mut self) -> BulkNodeWriter<'_, &mut MemNodeSegment, NS> { + NodeWriter::new(self.page, self.layer_counter, self.lock.deref_mut()).into() + } + pub fn head(&mut self) -> &mut MemNodeSegment { self.lock.deref_mut() } diff --git a/db4-storage/src/pages/node_page/bulk_writer.rs b/db4-storage/src/pages/node_page/bulk_writer.rs new file mode 100644 index 0000000000..7c1b484bf7 --- /dev/null +++ b/db4-storage/src/pages/node_page/bulk_writer.rs @@ -0,0 +1,145 @@ +use std::ops::DerefMut; + +use raphtory_api::core::entities::properties::{ + meta::{NODE_ID_IDX, STATIC_GRAPH_LAYER_ID}, + prop::Prop, +}; +use raphtory_core::{ + entities::{EID, ELID, GID, LayerId, VID}, + storage::timeindex::AsTime, +}; + +use crate::{ + LocalPOS, api::nodes::NodeSegmentOps, pages::node_page::writer::NodeWriter, + segments::node::segment::MemNodeSegment, +}; + +#[derive(Debug)] +pub struct BulkNodeWriter<'a, MP: DerefMut + 'a, NS: NodeSegmentOps> { + nw: NodeWriter<'a, MP, NS>, + layers: Vec, +} + +impl<'a, MP: DerefMut + 'a, NS: NodeSegmentOps> + From> for BulkNodeWriter<'a, MP, NS> +{ + fn from(value: NodeWriter<'a, MP, NS>) -> Self { + Self { + nw: value, + layers: Vec::new(), + } + } +} + +impl<'a, MP: DerefMut + 'a, NS: NodeSegmentOps> + BulkNodeWriter<'a, MP, NS> +{ + #[inline] + pub fn get_out_edge(&self, pos: LocalPOS, dst: VID, layer_id: LayerId) -> Option { + self.nw.get_out_edge(pos, dst, layer_id) + } + + #[inline(always)] + pub fn resolve_pos(&self, node_id: VID) -> Option { + self.nw.resolve_pos(node_id) + } + + #[inline(always)] + pub fn add_static_outbound_edge( + &mut self, + src_pos: LocalPOS, + dst: impl Into, + e_id: impl Into, + ) { + let e_id = e_id.into(); + self.nw.add_outbound_edge_inner::( + None, + src_pos, + dst, + e_id.with_layer(STATIC_GRAPH_LAYER_ID), + |layer_id| { + Self::update_layer_count(layer_id, &mut self.layers); + }, + ); + } + + pub fn add_static_inbound_edge( + &mut self, + dst_pos: LocalPOS, + src: impl Into, + e_id: impl Into, + ) { + let e_id = e_id.into(); + self.nw.add_inbound_edge_inner::( + None, + dst_pos, + src, + e_id.with_layer(STATIC_GRAPH_LAYER_ID), + |layer_id| { + Self::update_layer_count(layer_id, &mut self.layers); + }, + ); + } + + #[inline(always)] + pub fn add_outbound_edge( + &mut self, + t: Option, + src_pos: impl Into, + dst: impl Into, + e_id: impl Into, + ) { + self.nw + .add_outbound_edge_inner(t, src_pos, dst, e_id, |layer_id| { + Self::update_layer_count(layer_id, &mut self.layers); + }); + } + + pub fn add_inbound_edge( + &mut self, + t: Option, + dst_pos: impl Into, + src: impl Into, + e_id: impl Into, + ) { + self.nw + .add_inbound_edge_inner(t, dst_pos, src, e_id, |layer_id| { + Self::update_layer_count(layer_id, &mut self.layers); + }); + } + + fn update_layer_count(layer_id: LayerId, layers: &mut Vec) { + if layers.len() <= layer_id.0 { + layers.resize_with(layer_id.0 + 1, Default::default); + } + layers[layer_id.0] += 1; + } + + #[inline(always)] + pub fn update_timestamp(&mut self, t: T, pos: LocalPOS, e_id: ELID) { + self.nw.update_timestamp(t, pos, e_id); + } + + #[inline(always)] + pub fn store_node_id(&mut self, pos: LocalPOS, layer_id: LayerId, gid: GID) { + let gid = match gid { + GID::U64(id) => Prop::U64(id), + GID::Str(s) => Prop::str(s), + }; + let props = [(NODE_ID_IDX, gid)]; + self.nw + .update_c_props_inner(pos, layer_id, props, |layer_id| { + Self::update_layer_count(layer_id, &mut self.layers); + }); + } +} + +impl<'a, MP: DerefMut, ES: NodeSegmentOps> Drop + for BulkNodeWriter<'a, MP, ES> +{ + fn drop(&mut self) { + for (layer_id, count) in self.layers.iter().enumerate() { + self.nw.l_counter.increment_by(LayerId(layer_id), *count); + } + } +} diff --git a/db4-storage/src/pages/node_page/mod.rs b/db4-storage/src/pages/node_page/mod.rs index d3baa81782..a1f6628c62 100644 --- a/db4-storage/src/pages/node_page/mod.rs +++ b/db4-storage/src/pages/node_page/mod.rs @@ -1 +1,2 @@ +pub mod bulk_writer; pub mod writer; diff --git a/db4-storage/src/pages/node_page/writer.rs b/db4-storage/src/pages/node_page/writer.rs index 4b07c8b634..1bc3a2cb73 100644 --- a/db4-storage/src/pages/node_page/writer.rs +++ b/db4-storage/src/pages/node_page/writer.rs @@ -58,7 +58,9 @@ impl<'a, MP: DerefMut + 'a, NS: NodeSegmentOps> NodeWri dst: impl Into, e_id: impl Into, ) { - self.add_outbound_edge_inner(t, src_pos, dst, e_id); + self.add_outbound_edge_inner(t, src_pos, dst, e_id, |layer_id| { + self.l_counter.increment(layer_id); + }); } pub fn add_static_outbound_edge( @@ -73,15 +75,19 @@ impl<'a, MP: DerefMut + 'a, NS: NodeSegmentOps> NodeWri src_pos, dst, e_id.with_layer(STATIC_GRAPH_LAYER_ID), + |layer_id| { + self.l_counter.increment(layer_id); + }, ); } - fn add_outbound_edge_inner( + pub(crate) fn add_outbound_edge_inner( &mut self, t: Option, src_pos: impl Into, dst: impl Into, e_id: impl Into, + mut layer_counter: impl FnMut(LayerId) -> (), ) { let src_pos = src_pos.into(); let dst = dst.into(); @@ -95,7 +101,7 @@ impl<'a, MP: DerefMut + 'a, NS: NodeSegmentOps> NodeWri self.mut_segment.increment_est_size(add); if is_new_node && !self.page.has_node(src_pos, layer_id) { - self.l_counter.increment(layer_id); + layer_counter(layer_id); } } @@ -106,7 +112,9 @@ impl<'a, MP: DerefMut + 'a, NS: NodeSegmentOps> NodeWri src: impl Into, e_id: impl Into, ) { - self.add_inbound_edge_inner(t, dst_pos, src, e_id); + self.add_inbound_edge_inner(t, dst_pos, src, e_id, |layer| { + self.l_counter.increment(layer); + }); } pub fn add_static_inbound_edge( @@ -121,15 +129,19 @@ impl<'a, MP: DerefMut + 'a, NS: NodeSegmentOps> NodeWri dst_pos, src, e_id.with_layer(STATIC_GRAPH_LAYER_ID), + |layer| { + self.l_counter.increment(layer); + }, ); } - fn add_inbound_edge_inner( + pub(crate) fn add_inbound_edge_inner( &mut self, t: Option, dst_pos: impl Into, src: impl Into, e_id: impl Into, + mut layer_counter: impl FnMut(LayerId) -> (), ) { let e_id = e_id.into(); let src = src.into(); @@ -143,7 +155,7 @@ impl<'a, MP: DerefMut + 'a, NS: NodeSegmentOps> NodeWri self.mut_segment.increment_est_size(add); if is_new_node && !self.page.has_node(dst_pos, layer) { - self.l_counter.increment(layer); + layer_counter(layer); } } @@ -177,11 +189,23 @@ impl<'a, MP: DerefMut + 'a, NS: NodeSegmentOps> NodeWri pos: LocalPOS, layer_id: LayerId, props: impl IntoIterator, + ) { + self.update_c_props_inner(pos, layer_id, props, |layer_id| { + self.l_counter.increment(layer_id); + }); + } + + pub(crate) fn update_c_props_inner( + &mut self, + pos: LocalPOS, + layer_id: LayerId, + props: impl IntoIterator, + mut layer_counter: impl FnMut(LayerId) -> (), ) { let (is_new_node, add) = self.mut_segment.update_metadata(pos, layer_id, props); self.mut_segment.increment_est_size(add); if is_new_node && !self.page.has_node(pos, layer_id) { - self.l_counter.increment(layer_id); + layer_counter(layer_id); } } diff --git a/db4-storage/src/properties/mod.rs b/db4-storage/src/properties/mod.rs index 456d6f199f..bdccd66da7 100644 --- a/db4-storage/src/properties/mod.rs +++ b/db4-storage/src/properties/mod.rs @@ -38,6 +38,7 @@ pub struct Properties { has_properties: bool, has_deletions: bool, pub additions_count: usize, + pub deletions_count: usize, } pub(crate) struct PropMutEntry<'a> { @@ -286,6 +287,10 @@ impl Properties { self.t_properties.len() } + pub fn deletions_count(&self) -> usize { + self.deletions_count + } + pub fn num_updates(&self) -> usize { self.t_properties.len() + self @@ -355,6 +360,7 @@ impl<'a> PropMutEntry<'a> { } self.properties.has_deletions = true; + self.properties.deletions_count += 1; let prop_timestamps = &mut self.properties.deletions[self.row]; prop_timestamps.set(t, edge_id.unwrap_or_default()); diff --git a/db4-storage/src/segments/edge/segment.rs b/db4-storage/src/segments/edge/segment.rs index 54e4d8ab0a..b39173ce35 100644 --- a/db4-storage/src/segments/edge/segment.rs +++ b/db4-storage/src/segments/edge/segment.rs @@ -94,6 +94,10 @@ impl MemEdgeSegment { .fetch_add(increment, Ordering::Relaxed); } + pub fn memory_tracker(&self) -> &Arc { + &self.global_memory_tracker + } + pub fn edge_meta(&self) -> &Arc { self.layers[0].meta() } diff --git a/db4-storage/src/segments/mod.rs b/db4-storage/src/segments/mod.rs index 222c8b2d71..9efd11bc84 100644 --- a/db4-storage/src/segments/mod.rs +++ b/db4-storage/src/segments/mod.rs @@ -160,6 +160,8 @@ pub struct SegmentContainer { max_page_len: u32, properties: Properties, meta: Arc, + out_count: usize, // used to count num edges + inb_count: usize, // used to count num edges } pub trait HasRow: Default + Send + Sync + Sized { @@ -178,6 +180,8 @@ impl SegmentContainer { max_page_len, properties: Default::default(), meta, + out_count: 0, + inb_count: 0, } } @@ -190,6 +194,22 @@ impl SegmentContainer { ) } + pub fn inc_out_count(&mut self, i: usize) { + self.out_count += i; + } + + pub fn inc_inb_count(&mut self, i: usize) { + self.inb_count += i; + } + + pub fn out_count(&self) -> usize { + self.out_count + } + + pub fn inb_count(&self) -> usize { + self.inb_count + } + #[inline] pub fn est_size(&self) -> usize { // TODO: this is a rough estimate and should be improved @@ -226,6 +246,10 @@ impl SegmentContainer { self.properties.t_len() } + pub fn deletions_len(&self) -> usize { + self.properties.deletions_count() + } + /// Reserves a local row for the given item position. /// If the item position already exists, it returns a mutable reference to the existing item. /// Left variant indicates that the item was already present, diff --git a/db4-storage/src/segments/node/segment.rs b/db4-storage/src/segments/node/segment.rs index d556075f4e..bd406bb536 100644 --- a/db4-storage/src/segments/node/segment.rs +++ b/db4-storage/src/segments/node/segment.rs @@ -103,6 +103,10 @@ impl MemNodeSegment { self.est_size } + pub fn memory_tracker(&self) -> &Arc { + &self.global_mem_tracker + } + pub(crate) fn increment_global_est_size(&self, increment: usize) { self.global_mem_tracker .fetch_add(increment, Ordering::Relaxed); @@ -261,11 +265,15 @@ impl MemNodeSegment { let layer = self.get_or_create_layer(layer_id); let est_size = layer.est_size(); - let add_out = layer.reserve_local_row(src_pos); - let new_entry = add_out.is_new(); - let add_out = add_out.inner(); - let is_new_edge = add_out.adj.add_edge_out(dst, e_id.eid()); - let row = add_out.row; + let (is_new_edge, row, new_entry) = { + let add_out = layer.reserve_local_row(src_pos); + let new_entry = add_out.is_new(); + let add_out = add_out.inner(); + let is_new_edge = add_out.adj.add_edge_out(dst, e_id.eid()); + let row = add_out.row; + (is_new_edge, row, new_entry) + }; + layer.inc_out_count(is_new_edge as usize); if let Some(t) = t { self.update_timestamp_inner(t, row, e_id); } @@ -290,11 +298,16 @@ impl MemNodeSegment { let layer = self.get_or_create_layer(layer_id); let est_size = layer.est_size(); - let add_in = layer.reserve_local_row(dst_pos); - let new_entry = add_in.is_new(); - let add_in = add_in.inner(); - let is_new_edge = add_in.adj.add_edge_into(src, e_id.eid()); - let row = add_in.row; + let (is_new_edge, row, new_entry) = { + let add_in = layer.reserve_local_row(dst_pos); + let new_entry = add_in.is_new(); + let add_in = add_in.inner(); + let is_new_edge = add_in.adj.add_edge_into(src, e_id.eid()); + let row = add_in.row; + (is_new_edge, row, new_entry) + }; + + layer.inc_inb_count(is_new_edge as usize); if let Some(t) = t { self.update_timestamp_inner(t, row, e_id); diff --git a/raphtory-api/src/core/entities/mod.rs b/raphtory-api/src/core/entities/mod.rs index 18dd85f92b..6427359939 100644 --- a/raphtory-api/src/core/entities/mod.rs +++ b/raphtory-api/src/core/entities/mod.rs @@ -145,7 +145,21 @@ pub struct ELID { } /// Edge id with deletion flag -#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Default, Serialize, Deserialize)] +#[derive( + Copy, + Clone, + PartialEq, + Eq, + PartialOrd, + Ord, + Hash, + Default, + Serialize, + Deserialize, + Pod, + Zeroable, +)] +#[repr(transparent)] pub struct EDID(pub usize); impl Debug for EDID { diff --git a/raphtory-api/src/core/entities/properties/prop/prop_type.rs b/raphtory-api/src/core/entities/properties/prop/prop_type.rs index 1807084aa5..3dd5172e86 100644 --- a/raphtory-api/src/core/entities/properties/prop/prop_type.rs +++ b/raphtory-api/src/core/entities/properties/prop/prop_type.rs @@ -1,6 +1,7 @@ use arrow_schema::DataType; use serde::{Deserialize, Serialize}; use std::{ + cell::LazyCell, collections::HashMap, fmt, fmt::{Display, Formatter}, @@ -76,6 +77,18 @@ impl Display for PropType { } } +const CONTAINER_SIZE: LazyCell = LazyCell::new(|| { + std::env::var("RAPHTORY_PROP_CONTAINER_SIZE") + .ok() + .map(|size| { + size.parse::().unwrap_or_else(|_| { + eprintln!("RAPHTORY_PROP_CONTAINER_SIZE not set or invalid, defaulting to 64"); + 64 + }) + }) + .unwrap_or(64) +}); + impl PropType { pub fn inner(&self) -> Option<&PropType> { match self { @@ -142,17 +155,17 @@ impl PropType { // This is the best guess for the size of one row of properties pub fn est_size(&self) -> usize { - const CONTAINER_SIZE: usize = 64; + let container_size = *CONTAINER_SIZE; match self { - PropType::Str => CONTAINER_SIZE, + PropType::Str => container_size, PropType::U8 | PropType::Bool => 1, PropType::U16 => 2, PropType::I32 | PropType::F32 | PropType::U32 => 4, PropType::I64 | PropType::F64 | PropType::U64 => 8, PropType::NDTime | PropType::DTime => 8, - PropType::List(p_type) => p_type.est_size() * CONTAINER_SIZE, + PropType::List(p_type) => p_type.est_size() * container_size, PropType::Map(p_map) => { - p_map.values().map(|v| v.est_size()).sum::() * CONTAINER_SIZE + p_map.values().map(|v| v.est_size()).sum::() * container_size } PropType::Decimal { .. } => 16, PropType::Empty => 0, @@ -208,7 +221,7 @@ pub struct InvalidPropertyTypeErr(pub DataType); pub mod arrow { use crate::core::entities::properties::prop::{PropType, EMPTY_MAP_FIELD_NAME}; - use arrow_schema::{DataType, TimeUnit}; + use arrow_schema::{DataType, Field, Fields, TimeUnit}; impl From<&DataType> for PropType { fn from(value: &DataType) -> Self { @@ -244,6 +257,55 @@ pub mod arrow { } } } + + impl From<&PropType> for DataType { + fn from(value: &PropType) -> Self { + match value { + PropType::Str => DataType::Utf8View, + PropType::U8 => DataType::UInt8, + PropType::U16 => DataType::UInt16, + PropType::I32 => DataType::Int32, + PropType::I64 => DataType::Int64, + PropType::U32 => DataType::UInt32, + PropType::U64 => DataType::UInt64, + PropType::F32 => DataType::Float32, + PropType::F64 => DataType::Float64, + PropType::Decimal { scale } => { + DataType::Decimal128(38, (*scale).try_into().unwrap()) + } + PropType::Bool => DataType::Boolean, + PropType::NDTime => DataType::Timestamp(TimeUnit::Millisecond, None), + PropType::DTime => DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())), + PropType::List(p_type) => DataType::LargeList( + Field::new("data", DataType::from(p_type.as_ref()), true).into(), + ), + PropType::Map(p_type) => { + let mut fields = p_type + .iter() + .map(|(name, p_type)| Field::new(name, DataType::from(p_type), true)) + .collect::>(); + fields.sort_by(|l, r| l.name().cmp(r.name())); + + if fields.is_empty() { + DataType::Struct(Fields::from_iter([Field::new( + EMPTY_MAP_FIELD_NAME, + DataType::Null, + true, + )])) + } else { + DataType::Struct(fields.into()) + } + } + PropType::Empty => DataType::Null, + } + } + } + + impl From for DataType { + fn from(value: PropType) -> Self { + DataType::from(&value) + } + } } // step through these types trees and check they are structurally the same @@ -369,6 +431,8 @@ pub fn check_for_unification(l: &PropType, r: &PropType) -> Option { #[cfg(test)] mod test { use super::*; + use arrow_schema::{DataType, Field, Fields, TimeUnit}; + use proptest::{collection::btree_map, prelude::*}; #[test] fn test_unify_types_ne() { @@ -501,4 +565,74 @@ mod test { let size = size_of::(); println!("PropError = {size}") } + + fn field_name() -> impl Strategy { + proptest::string::string_regex("[a-z][a-z0-9_]{0,6}") + .unwrap() + .prop_filter("not the empty map sentinel", |name| { + name != crate::core::entities::properties::prop::EMPTY_MAP_FIELD_NAME + }) + } + + fn canonical_data_type() -> impl Strategy { + let leaf = prop_oneof![ + Just(DataType::Boolean), + Just(DataType::Int32), + Just(DataType::Int64), + Just(DataType::UInt8), + Just(DataType::UInt16), + Just(DataType::UInt32), + Just(DataType::UInt64), + Just(DataType::Float32), + Just(DataType::Float64), + Just(DataType::Utf8View), + Just(DataType::Timestamp(TimeUnit::Millisecond, None)), + Just(DataType::Timestamp( + TimeUnit::Millisecond, + Some("UTC".into()) + )), + (0i8..=38).prop_map(|scale| DataType::Decimal128(38, scale)), + Just(DataType::Null), + ]; + + leaf.prop_recursive(4, 64, 4, |inner| { + prop_oneof![ + inner.clone().prop_map(|data_type| DataType::LargeList( + Field::new("data", data_type, true).into() + )), + btree_map(field_name(), inner, 0..4).prop_map(|fields| { + if fields.is_empty() { + DataType::Struct(Fields::from_iter([Field::new( + crate::core::entities::properties::prop::EMPTY_MAP_FIELD_NAME, + DataType::Null, + true, + )])) + } else { + DataType::Struct( + fields + .into_iter() + .map(|(name, data_type)| Field::new(name, data_type, true)) + .collect::>() + .into(), + ) + } + }), + ] + }) + } + + proptest! { + #[test] + fn data_type_to_prop_type_to_data_type_is_transitive(data_type in canonical_data_type()) { + prop_assert_eq!(DataType::from(PropType::from(&data_type)), data_type); + } + + #[test] + fn prop_type_to_data_type_to_prop_type_is_transitive(data_type in canonical_data_type()) { + let prop_type = PropType::from(&data_type); + let round_tripped: DataType = (&prop_type).into(); + + prop_assert_eq!(PropType::from(&round_tripped), prop_type); + } + } } diff --git a/raphtory-api/src/core/storage/timeindex.rs b/raphtory-api/src/core/storage/timeindex.rs index 8bcef2bfc4..91c27ea030 100644 --- a/raphtory-api/src/core/storage/timeindex.rs +++ b/raphtory-api/src/core/storage/timeindex.rs @@ -1,3 +1,4 @@ +use bytemuck::{Pod, Zeroable}; use chrono::{DateTime, Utc}; use itertools::Itertools; use serde::{Deserialize, Serialize}; @@ -24,7 +25,10 @@ impl fmt::Display for TimeError { impl std::error::Error for TimeError {} -#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq, Ord, PartialOrd, Eq, Hash)] +#[derive( + Debug, Copy, Clone, Serialize, Deserialize, PartialEq, Ord, PartialOrd, Eq, Hash, Pod, Zeroable, +)] +#[repr(C)] pub struct EventTime(pub i64, pub usize); impl PartialEq for EventTime { @@ -181,11 +185,11 @@ impl<'a, L: TimeIndexOps<'a>, R: TimeIndexOps<'a, IndexType = L::IndexType>> Tim } fn first(&self) -> Option { - self.0.first().into_iter().chain(self.1.first()).min() + Iterator::min(self.0.first().into_iter().chain(self.1.first())) // rust-analyzer } fn last(&self) -> Option { - self.0.last().into_iter().chain(self.1.last()).max() + Iterator::max(self.0.last().into_iter().chain(self.1.last())) // rust-analyzer } fn iter(self) -> impl Iterator + Send + Sync + 'a { diff --git a/raphtory-core/src/storage/mod.rs b/raphtory-core/src/storage/mod.rs index 6aaf77501a..ca4d9f1dfa 100644 --- a/raphtory-core/src/storage/mod.rs +++ b/raphtory-core/src/storage/mod.rs @@ -165,7 +165,7 @@ impl PropColumn { col } - pub(crate) fn dtype(&self) -> PropType { + pub(crate) fn dtype_for_error_report(&self) -> PropType { match self { PropColumn::Empty(_) => PropType::Empty, PropColumn::Bool(_) => PropType::Bool, @@ -230,7 +230,7 @@ impl PropColumn { } (col, prop) => { Err(IllegalPropType { - expected: col.dtype(), + expected: col.dtype_for_error_report(), actual: prop.into_prop().dtype(), })?; } @@ -261,7 +261,7 @@ impl PropColumn { )?, (col, prop) => { Err(IllegalPropType { - expected: col.dtype(), + expected: col.dtype_for_error_report(), actual: prop.clone().into_prop().dtype(), })?; } @@ -308,7 +308,7 @@ impl PropColumn { } (col, prop) => { Err(IllegalPropType { - expected: col.dtype(), + expected: col.dtype_for_error_report(), actual: prop.into_prop().dtype(), })?; } diff --git a/raphtory-itertools/src/merge.rs b/raphtory-itertools/src/merge.rs index 93c5fbdec8..a455aee064 100644 --- a/raphtory-itertools/src/merge.rs +++ b/raphtory-itertools/src/merge.rs @@ -13,8 +13,7 @@ pub trait FastMergeExt: Iterator { /// /// Iterator element type is `Self::Item`. fn fast_merge_by< - F: FnMut(&::Item, &::Item) -> bool - + Clone, + F: FnMut(&::Item, &::Item) -> bool, >( self, cmp_fn: F, @@ -52,8 +51,7 @@ pub trait FastMergeExt: Iterator { /// /// Iterator element type is `Self::Item`. fn fast_merge_by_rev< - F: FnMut(&::Item, &::Item) -> bool - + Clone, + F: FnMut(&::Item, &::Item) -> bool, >( self, first: F, @@ -93,7 +91,7 @@ pub enum FastMerge> { Many(KMergeBy), } -impl + Clone> FastMerge { +impl> FastMerge { pub(crate) fn new(mut iters: impl Iterator, predicate: P) -> Self { let (lower, _) = iters.size_hint(); if lower > 2 { diff --git a/raphtory-storage/src/graph/graph.rs b/raphtory-storage/src/graph/graph.rs index 0526415cb4..1dc73c6994 100644 --- a/raphtory-storage/src/graph/graph.rs +++ b/raphtory-storage/src/graph/graph.rs @@ -18,8 +18,8 @@ use raphtory_api::core::entities::{ use raphtory_core::entities::{edges::edge_ref::EdgeRef, nodes::node_ref::NodeRef}; use std::{fmt::Debug, iter, path::Path, sync::Arc}; use storage::{ - error::StorageError, pages::SegmentCounts, state::StateIndex, Extension, GIDResolver, - GraphPropEntry, + error::StorageError, pages::SegmentCounts, persist::strategy::PersistenceStrategy, + state::StateIndex, Extension, GIDResolver, GraphPropEntry, }; use thiserror::Error; @@ -307,6 +307,10 @@ impl GraphStorage { } } + pub fn total_allocated_memory(&self) -> usize { + self.extension().estimated_size() + } + pub fn node_segment_counts(&self) -> SegmentCounts { match self { GraphStorage::Mem(storage) => storage.nodes.segment_counts(), diff --git a/raphtory/examples/eth_loader.rs b/raphtory/examples/eth_loader.rs index 565454eeb6..42adf3a58a 100644 --- a/raphtory/examples/eth_loader.rs +++ b/raphtory/examples/eth_loader.rs @@ -4,22 +4,32 @@ use raphtory::io::parquet_loaders::load_edges_from_parquet; use raphtory::{arrow_loader::df_loaders::edges::ColumnNames, errors::GraphError, prelude::*}; use std::path::{Path, PathBuf}; +#[cfg(target_os = "macos")] +use tikv_jemallocator::Jemalloc; + +#[cfg(target_os = "macos")] +#[global_allocator] +static GLOBAL: Jemalloc = Jemalloc; /// Load ETH data from Parquet files into a Raphtory Graph. #[cfg(feature = "io")] fn load_eth_graph(parquet_path: &Path, graph: &Graph) -> Result<(), GraphError> { // ── Static Nodes ────────────────────────────────────────────────────── - let props = (1..=20).map(|i| format!("prop_{i}")).collect::>(); - let props = props.iter().map(|s| s.as_ref()).collect::>(); load_edges_from_parquet( graph, parquet_path, - ColumnNames::new("time", None, "source", "target", None), + ColumnNames::new( + "transaction_timestamp", + None, + "transfer_sender_cluster_id", + "transfer_receiver_cluster_id", + None, + ), true, - &props, + &["receiver_address", "transfer_amount_usd"], &[], None, - Some("G500"), + Some("dac"), None, None, )?; diff --git a/raphtory/examples/snb_loader.rs b/raphtory/examples/snb_loader.rs index c528fbff59..9e71a1febf 100644 --- a/raphtory/examples/snb_loader.rs +++ b/raphtory/examples/snb_loader.rs @@ -1,9 +1,13 @@ #[cfg(feature = "io")] use raphtory::io::parquet_loaders::{load_edges_from_parquet, load_nodes_from_parquet}; +#[cfg(feature = "io")] use raphtory::{arrow_loader::df_loaders::edges::ColumnNames, errors::GraphError, prelude::*}; +use serde::Deserialize; +#[cfg(feature = "io")] use std::path::{Path, PathBuf}; /// Construct the path to a named Parquet file inside `parquet_dir`. +#[cfg(feature = "io")] fn pq(parquet_dir: &Path, name: &str) -> PathBuf { parquet_dir.join(format!("{}.parquet", name)) } @@ -15,541 +19,436 @@ use tikv_jemallocator::Jemalloc; #[global_allocator] static GLOBAL: Jemalloc = Jemalloc; -/// Load SNB data from Parquet files into a Raphtory Graph. #[cfg(feature = "io")] -fn load_snb_graph(parquet_dir: &Path, graph: &Graph) -> Result<(), GraphError> { - // ── Static Nodes ────────────────────────────────────────────────────── - - // println!("Loading Places..."); - // load_nodes_from_parquet( - // graph, - // &pq(parquet_dir, "place"), - // "_time", - // None, - // "_node_id", - // None, - // Some("type"), - // &["name", "url", "id"], - // &[], - // None, - // None, - // true, - // None, - // )?; - // println!(" ✓ places"); - - // let fp = pq(parquet_dir, "place_IS_PART_OF_place"); - // if fp.exists() { - // load_edges_from_parquet( - // graph, - // &fp, - // ColumnNames::new("_time", None, "START_ID", "END_ID", None), - // true, - // &[], - // &[], - // None, - // Some("IS_PART_OF"), - // None, - // None, - // )?; - // graph.flush()?; - // println!(" ✓ IS_PART_OF edges"); - // } - - // println!("Loading Organisations..."); - // load_nodes_from_parquet( - // graph, - // &pq(parquet_dir, "organisation"), - // "_time", - // None, - // "_node_id", - // None, - // Some("type"), - // &["name", "url", "id"], - // &[], - // None, - // None, - // true, - // None, - // )?; - // println!(" ✓ organisations"); - - // load_edges_from_parquet( - // graph, - // &pq(parquet_dir, "organisation_IS_LOCATED_IN_place"), - // ColumnNames::new("_time", None, "START_ID", "END_ID", None), - // true, - // &[], - // &[], - // None, - // Some("IS_LOCATED_IN"), - // None, - // None, - // )?; - // graph.flush()?; - // println!(" ✓ Organisation IS_LOCATED_IN edges"); - - // println!("Loading Tags..."); - // load_nodes_from_parquet( - // graph, - // &pq(parquet_dir, "tag"), - // "_time", - // None, - // "_node_id", - // Some("Tag"), - // None, - // &["name", "url", "id"], - // &[], - // None, - // None, - // true, - // None, - // )?; - // println!(" ✓ tags"); - - // let fp = pq(parquet_dir, "tagclass"); - // if fp.exists() { - // println!("Loading TagClasses..."); - // load_nodes_from_parquet( - // graph, - // &fp, - // "_time", - // None, - // "_node_id", - // Some("TagClass"), - // None, - // &["name", "url", "id"], - // &[], - // None, - // None, - // true, - // None, - // )?; - // println!(" ✓ tag classes"); - // } - - // let fp = pq(parquet_dir, "tag_HAS_TYPE_tagclass"); - // if fp.exists() { - // load_edges_from_parquet( - // graph, - // &fp, - // ColumnNames::new("_time", None, "START_ID", "END_ID", None), - // true, - // &[], - // &[], - // None, - // Some("HAS_TYPE"), - // None, - // None, - // )?; - // graph.flush()?; - // println!(" ✓ HAS_TYPE edges"); - // } - - // let fp = pq(parquet_dir, "tagclass_IS_SUBCLASS_OF_tagclass"); - // if fp.exists() { - // load_edges_from_parquet( - // graph, - // &fp, - // ColumnNames::new("_time", None, "START_ID", "END_ID", None), - // true, - // &[], - // &[], - // None, - // Some("IS_SUBCLASS_OF"), - // None, - // None, - // )?; - // graph.flush()?; - // println!(" ✓ IS_SUBCLASS_OF edges"); - // } - - // // ── Dynamic Nodes ───────────────────────────────────────────────────── - - // println!("Loading Persons..."); - // load_nodes_from_parquet( - // graph, - // &pq(parquet_dir, "person"), - // "creationDate", - // None, - // "_node_id", - // Some("Person"), - // None, - // &[ - // "firstName", - // "lastName", - // "gender", - // "birthday", - // "locationIP", - // "browserUsed", - // "language", - // "email", - // "id", - // "creationDate", - // ], - // &[], - // None, - // None, - // true, - // None, - // )?; - // println!(" ✓ persons"); - - // load_edges_from_parquet( - // graph, - // &pq(parquet_dir, "person_IS_LOCATED_IN_place"), - // ColumnNames::new("_time", None, "START_ID", "END_ID", None), - // true, - // &[], - // &[], - // None, - // Some("IS_LOCATED_IN"), - // None, - // None, - // )?; - // graph.flush()?; - - // println!("Loading Forums..."); - // load_nodes_from_parquet( - // graph, - // &pq(parquet_dir, "forum"), - // "creationDate", - // None, - // "_node_id", - // Some("Forum"), - // None, - // &["title", "id", "creationDate"], - // &[], - // None, - // None, - // true, - // None, - // )?; - // println!(" ✓ forums"); - - // load_edges_from_parquet( - // graph, - // &pq(parquet_dir, "forum_HAS_MODERATOR_person"), - // ColumnNames::new("_time", None, "START_ID", "END_ID", None), - // true, - // &[], - // &[], - // None, - // Some("HAS_MODERATOR"), - // None, - // None, - // )?; - // graph.flush()?; - - // println!("Loading Posts..."); - // load_nodes_from_parquet( - // graph, - // &pq(parquet_dir, "post"), - // "creationDate", - // None, - // "_node_id", - // Some("Post"), - // None, - // &[ - // "imageFile", - // "locationIP", - // "browserUsed", - // "language", - // "content", - // "length", - // "id", - // "creationDate", - // ], - // &[], - // None, - // None, - // true, - // None, - // )?; - // println!(" ✓ posts"); +struct NodeParquetInput { + path: PathBuf, + time_col: String, + id_col: String, + node_type: Option, + node_type_col: Option, + property_cols: Vec, +} - // load_edges_from_parquet( - // graph, - // &pq(parquet_dir, "post_HAS_CREATOR_person"), - // ColumnNames::new("_time", None, "START_ID", "END_ID", None), - // true, - // &[], - // &[], - // None, - // Some("HAS_CREATOR"), - // None, - // None, - // )?; - // graph.flush()?; +#[cfg(feature = "io")] +impl NodeParquetInput { + fn new<'a>( + path: impl AsRef, + time_col: &str, + id_col: &str, + node_type: Option<&str>, + node_type_col: Option<&str>, + property_cols: Vec<&str>, + ) -> NodeParquetInput { + NodeParquetInput { + path: path.as_ref().into(), + time_col: time_col.into(), + id_col: id_col.into(), + node_type: node_type.map(Into::into), + node_type_col: node_type_col.map(Into::into), + property_cols: property_cols.into_iter().map(|s| s.into()).collect(), + } + } + + fn path_as_string(&self) -> &str { + self.path.iter().last().and_then(|p| p.to_str()).unwrap() + } +} +#[cfg(feature = "io")] +struct EdgeParquetInput { + path: PathBuf, + time_col: String, + src_col: String, + dst_col: String, + layer: Option, + property_cols: Vec, +} - // load_edges_from_parquet( - // graph, - // &pq(parquet_dir, "post_IS_LOCATED_IN_place"), - // ColumnNames::new("_time", None, "START_ID", "END_ID", None), - // true, - // &[], - // &[], - // None, - // Some("IS_LOCATED_IN"), - // None, - // None, - // )?; - // graph.flush()?; +#[cfg(feature = "io")] +impl EdgeParquetInput { + fn new( + path: impl AsRef, + time_col: &str, + src_col: &str, + dst_col: &str, + layer: Option<&str>, + property_cols: Vec<&str>, + ) -> EdgeParquetInput { + EdgeParquetInput { + path: path.as_ref().into(), + time_col: time_col.into(), + src_col: src_col.into(), + dst_col: dst_col.into(), + layer: layer.map(Into::into), + property_cols: property_cols.into_iter().map(Into::into).collect(), + } + } + + fn path_as_string(&self) -> &str { + self.path.iter().last().and_then(|p| p.to_str()).unwrap() + } +} - // load_edges_from_parquet( - // graph, - // &pq(parquet_dir, "forum_CONTAINER_OF_post"), - // ColumnNames::new("_time", None, "START_ID", "END_ID", None), - // true, - // &[], - // &[], - // None, - // Some("CONTAINER_OF"), - // None, - // None, - // )?; - // graph.flush()?; +#[cfg(feature = "io")] +fn load_snb_graph_v2( + nodes: impl IntoIterator, + edges: impl IntoIterator, + graph: &Graph, +) -> Result<(), GraphError> { + for node in nodes { + println!("Loading nodes from Parquet file with time column '{}', id column '{}', label column '{:?}', and property columns {:?}...", node.time_col, node.id_col, node.node_type, node.property_cols); + load_nodes_from_parquet( + graph, + &node.path, + &node.time_col, + None, + &node.id_col, + node.node_type.as_deref(), + node.node_type_col.as_deref(), + &node + .property_cols + .iter() + .map(|s| s.as_ref()) + .collect::>(), + &[], + None, + None, + None, + None, + None, + true, + None, + )?; + println!( + " ✓ Finished loading nodes from Parquet file with id column '{}'", + node.id_col + ); + } + + for edge in edges { + println!("Loading edges from Parquet file with time column '{}', src column '{}', dst column '{}', label column '{:?}', and property columns {:?}...", edge.time_col, edge.src_col, edge.dst_col, edge.layer, edge.property_cols); + load_edges_from_parquet( + graph, + &edge.path, + ColumnNames::new(&edge.time_col, None, &edge.src_col, &edge.dst_col, None), + true, + &edge + .property_cols + .iter() + .map(|s| s.as_ref()) + .collect::>(), + &[], + None, + edge.layer.as_deref(), + None, + None, + )?; + println!( + " ✓ Finished loading edges from Parquet file with src column '{}'", + edge.src_col + ); + } + Ok(()) +} - println!("Loading Comments..."); - load_nodes_from_parquet( - graph, - &pq(parquet_dir, "comment"), - "creationDate", - None, - "_node_id", - Some("Comment"), - None, - &[ - "locationIP", - "browserUsed", - "content", - "length", - "id", +/// Load SNB data from Parquet files into a Raphtory Graph. +#[cfg(feature = "io")] +fn load_snb_graph( + parquet_dir: &Path, + filter: Option, + graph: &Graph, +) -> Result<(), GraphError> { + let node_inputs = [ + NodeParquetInput::new( + pq(parquet_dir, "place"), + "_time", + "_node_id", + None, + Some("type"), + vec!["name", "url", "id"], + ), + NodeParquetInput::new( + pq(parquet_dir, "organisation"), + "_time", + "_node_id", + None, + Some("type"), + vec!["name", "url", "id"], + ), + NodeParquetInput::new( + pq(parquet_dir, "tag"), + "_time", + "_node_id", + Some("Tag"), + None, + vec!["name", "url", "id"], + ), + NodeParquetInput::new( + pq(parquet_dir, "tagclass"), + "_time", + "_node_id", + Some("TagClass"), + None, + vec!["name", "url", "id"], + ), + NodeParquetInput::new( + pq(parquet_dir, "person"), "creationDate", - ], - &[], - None, - None, - None, - None, - None, - true, - None, - )?; - println!(" ✓ comments"); - // graph.flush()?; - - load_edges_from_parquet( - graph, - &pq(parquet_dir, "comment_HAS_CREATOR_person"), - ColumnNames::new("_time", None, "START_ID", "END_ID", None), - true, - &[], - &[], - None, - Some("HAS_CREATOR"), - None, - None, - )?; - // graph.flush()?; - - load_edges_from_parquet( - graph, - &pq(parquet_dir, "comment_IS_LOCATED_IN_place"), - ColumnNames::new("_time", None, "START_ID", "END_ID", None), - true, - &[], - &[], - None, - Some("IS_LOCATED_IN"), - None, - None, - )?; - // graph.flush()?; - - load_edges_from_parquet( - graph, - &pq(parquet_dir, "comment_REPLY_OF_post"), - ColumnNames::new("_time", None, "START_ID", "END_ID", None), - true, - &[], - &[], - None, - Some("REPLY_OF"), - None, - None, - )?; - - load_edges_from_parquet( - graph, - &pq(parquet_dir, "comment_REPLY_OF_comment"), - ColumnNames::new("_time", None, "START_ID", "END_ID", None), - true, - &[], - &[], - None, - Some("REPLY_OF"), - None, - None, - )?; - // graph.flush()?; - - // ── Edge-only relationships ─────────────────────────────────────────── - - println!("Loading KNOWS edges..."); - load_edges_from_parquet( - graph, - &pq(parquet_dir, "person_KNOWS_person"), - ColumnNames::new("creationDate", None, "START_ID", "END_ID", None), - true, - &["creationDate"], - &[], - None, - Some("KNOWS"), - None, - None, - )?; - graph.flush()?; - println!(" ✓ KNOWS edges"); - - println!("Loading LIKES edges..."); - load_edges_from_parquet( - graph, - &pq(parquet_dir, "person_LIKES_post"), - ColumnNames::new("creationDate", None, "START_ID", "END_ID", None), - true, - &["creationDate"], - &[], - None, - Some("LIKES"), - None, - None, - )?; - graph.flush()?; - println!(" ✓ LIKES (Post) edges"); - - load_edges_from_parquet( - graph, - &pq(parquet_dir, "person_LIKES_comment"), - ColumnNames::new("creationDate", None, "START_ID", "END_ID", None), - true, - &["creationDate"], - &[], - None, - Some("LIKES"), - None, - None, - )?; - graph.flush()?; - println!(" ✓ LIKES (Comment) edges"); - - println!("Loading HAS_MEMBER edges..."); - load_edges_from_parquet( - graph, - &pq(parquet_dir, "forum_HAS_MEMBER_person"), - ColumnNames::new("joinDate", None, "START_ID", "END_ID", None), - true, - &["joinDate"], - &[], - None, - Some("HAS_MEMBER"), - None, - None, - )?; - graph.flush()?; - println!(" ✓ HAS_MEMBER edges"); - - println!("Loading STUDY_AT edges..."); - load_edges_from_parquet( - graph, - &pq(parquet_dir, "person_STUDY_AT_organisation"), - ColumnNames::new("_time", None, "START_ID", "END_ID", None), - true, - &["classYear"], - &[], - None, - Some("STUDY_AT"), - None, - None, - )?; - graph.flush()?; - println!(" ✓ STUDY_AT edges"); - - println!("Loading WORK_AT edges..."); - load_edges_from_parquet( - graph, - &pq(parquet_dir, "person_WORK_AT_organisation"), - ColumnNames::new("_time", None, "START_ID", "END_ID", None), - true, - &["workFrom"], - &[], - None, - Some("WORK_AT"), - None, - None, - )?; - graph.flush()?; - println!(" ✓ WORK_AT edges"); - - println!("Loading HAS_TAG edges..."); - load_edges_from_parquet( - graph, - &pq(parquet_dir, "post_HAS_TAG_tag"), - ColumnNames::new("_time", None, "START_ID", "END_ID", None), - true, - &[], - &[], - None, - Some("HAS_TAG"), - None, - None, - )?; - graph.flush()?; - - load_edges_from_parquet( - graph, - &pq(parquet_dir, "comment_HAS_TAG_tag"), - ColumnNames::new("_time", None, "START_ID", "END_ID", None), - true, - &[], - &[], - None, - Some("HAS_TAG"), - None, - None, - )?; - graph.flush()?; + "_node_id", + Some("Person"), + None, + vec![ + "firstName", + "lastName", + "gender", + "birthday", + "locationIP", + "browserUsed", + "language", + "email", + "id", + "creationDate", + ], + ), + NodeParquetInput::new( + pq(parquet_dir, "forum"), + "creationDate", + "_node_id", + Some("Forum"), + None, + vec!["title", "id", "creationDate"], + ), + NodeParquetInput::new( + pq(parquet_dir, "post"), + "creationDate", + "_node_id", + Some("Post"), + None, + vec![ + "imageFile", + "locationIP", + "browserUsed", + "language", + "content", + "length", + "id", + "creationDate", + ], + ), + NodeParquetInput::new( + pq(parquet_dir, "comment"), + "creationDate", + "_node_id", + Some("Comment"), + None, + vec![ + "locationIP", + "browserUsed", + "content", + "length", + "id", + "creationDate", + ], + ), + ]; + + let edge_inputs = [ + EdgeParquetInput::new( + pq(parquet_dir, "place_IS_PART_OF_place"), + "_time", + "START_ID", + "END_ID", + Some("IS_PART_OF"), + vec![], + ), + EdgeParquetInput::new( + pq(parquet_dir, "organisation_IS_LOCATED_IN_place"), + "_time", + "START_ID", + "END_ID", + Some("IS_LOCATED_IN"), + vec![], + ), + EdgeParquetInput::new( + pq(parquet_dir, "person_IS_LOCATED_IN_place"), + "_time", + "START_ID", + "END_ID", + Some("IS_LOCATED_IN"), + vec![], + ), + EdgeParquetInput::new( + pq(parquet_dir, "post_IS_LOCATED_IN_place"), + "_time", + "START_ID", + "END_ID", + Some("IS_LOCATED_IN"), + vec![], + ), + EdgeParquetInput::new( + pq(parquet_dir, "forum_HAS_MODERATOR_person"), + "_time", + "START_ID", + "END_ID", + Some("HAS_MODERATOR"), + vec![], + ), + EdgeParquetInput::new( + pq(parquet_dir, "post_HAS_CREATOR_person"), + "_time", + "START_ID", + "END_ID", + Some("HAS_CREATOR"), + vec![], + ), + EdgeParquetInput::new( + pq(parquet_dir, "comment_HAS_CREATOR_person"), + "_time", + "START_ID", + "END_ID", + Some("HAS_CREATOR"), + vec![], + ), + EdgeParquetInput::new( + pq(parquet_dir, "forum_CONTAINER_OF_post"), + "_time", + "START_ID", + "END_ID", + Some("CONTAINER_OF"), + vec![], + ), + EdgeParquetInput::new( + pq(parquet_dir, "comment_REPLY_OF_post"), + "_time", + "START_ID", + "END_ID", + Some("REPLY_OF"), + vec![], + ), + EdgeParquetInput::new( + pq(parquet_dir, "comment_REPLY_OF_comment"), + "_time", + "START_ID", + "END_ID", + Some("REPLY_OF"), + vec![], + ), + EdgeParquetInput::new( + pq(parquet_dir, "person_KNOWS_person"), + "creationDate", + "START_ID", + "END_ID", + Some("KNOWS"), + vec![], + ), + EdgeParquetInput::new( + pq(parquet_dir, "person_LIKES_post"), + "creationDate", + "START_ID", + "END_ID", + Some("LIKES"), + vec![], + ), + EdgeParquetInput::new( + pq(parquet_dir, "person_LIKES_comment"), + "creationDate", + "START_ID", + "END_ID", + Some("LIKES"), + vec![], + ), + EdgeParquetInput::new( + pq(parquet_dir, "forum_HAS_MEMBER_person"), + "joinDate", + "START_ID", + "END_ID", + Some("HAS_MEMBER"), + vec![], + ), + EdgeParquetInput::new( + pq(parquet_dir, "person_STUDY_AT_organisation"), + "_time", + "START_ID", + "END_ID", + Some("STUDY_AT"), + vec![], + ), + EdgeParquetInput::new( + pq(parquet_dir, "person_WORK_AT_organisation"), + "_time", + "START_ID", + "END_ID", + Some("WORK_AT"), + vec![], + ), + EdgeParquetInput::new( + pq(parquet_dir, "post_HAS_TAG_tag"), + "_time", + "START_ID", + "END_ID", + Some("HAS_TAG"), + vec![], + ), + EdgeParquetInput::new( + pq(parquet_dir, "comment_HAS_TAG_tag"), + "_time", + "START_ID", + "END_ID", + Some("HAS_TAG"), + vec![], + ), + EdgeParquetInput::new( + pq(parquet_dir, "forum_HAS_TAG_tag"), + "_time", + "START_ID", + "END_ID", + Some("HAS_TAG"), + vec![], + ), + EdgeParquetInput::new( + pq(parquet_dir, "person_HAS_INTEREST_tag"), + "_time", + "START_ID", + "END_ID", + Some("HAS_INTEREST"), + vec![], + ), + ]; + + let edge_inputs = edge_inputs + .into_iter() + .filter(|edge| { + filter + .as_ref() + .and_then(|filter| filter.edges.as_ref()) + .map(|e_f| e_f.iter().any(|name| edge.path_as_string().contains(name))) + .unwrap_or(true) + }) + .collect::>(); + + let node_inputs = node_inputs + .into_iter() + .filter(|node| { + filter + .as_ref() + .and_then(|filter| filter.nodes.as_ref()) + .map(|e_f| e_f.iter().any(|name| node.path_as_string().contains(name))) + .unwrap_or(true) + }) + .collect::>(); - load_edges_from_parquet( - graph, - &pq(parquet_dir, "forum_HAS_TAG_tag"), - ColumnNames::new("_time", None, "START_ID", "END_ID", None), - true, - &[], - &[], - None, - Some("HAS_TAG"), - None, - None, - )?; - graph.flush()?; - println!(" ✓ HAS_TAG edges"); + println!( + "edge_inputs: {:?}, node_inputs: {:?}", + edge_inputs + .iter() + .map(|e| e.path_as_string()) + .collect::>(), + node_inputs + .iter() + .map(|e| e.path_as_string()) + .collect::>(), + ); - println!("Loading HAS_INTEREST edges..."); - load_edges_from_parquet( - graph, - &pq(parquet_dir, "person_HAS_INTEREST_tag"), - ColumnNames::new("_time", None, "START_ID", "END_ID", None), - true, - &[], - &[], - None, - Some("HAS_INTEREST"), - None, - None, - )?; - graph.flush()?; - println!(" ✓ HAS_INTEREST edges"); + load_snb_graph_v2(node_inputs, edge_inputs, graph)?; println!( "\n✅ Graph loaded: {} nodes, {} edges", @@ -559,18 +458,34 @@ fn load_snb_graph(parquet_dir: &Path, graph: &Graph) -> Result<(), GraphError> { Ok(()) } +#[derive(Deserialize)] +struct Filter { + nodes: Option>, + edges: Option>, +} + #[cfg(feature = "io")] fn main() { let parquet_dir = std::env::args() .nth(1) .map(|dir| PathBuf::from(dir)) .unwrap_or_else(|| panic!("Usage: snb_loader ")); - let graph_path = std::env::args() + let filter = std::env::args() .nth(2) + .map(|s| serde_json::from_str::(&s)) + .transpose() + .unwrap(); + + let graph_path = std::env::args() + .nth(3) .map(|graph| PathBuf::from(graph)) .unwrap_or_else(|| parquet_dir.join("..").join("graph")); - let graph = Graph::new_at_path(&graph_path).unwrap(); - load_snb_graph(&parquet_dir, &graph).unwrap() + let graph = if !graph_path.exists() { + Graph::new_at_path(&graph_path).unwrap() + } else { + Graph::load(&graph_path).unwrap() + }; + load_snb_graph(&parquet_dir, filter, &graph).unwrap() } #[cfg(not(feature = "io"))] diff --git a/raphtory/src/arrow_loader/df_loaders/edges.rs b/raphtory/src/arrow_loader/df_loaders/edges.rs index 6e68e9074e..581a01aaf1 100644 --- a/raphtory/src/arrow_loader/df_loaders/edges.rs +++ b/raphtory/src/arrow_loader/df_loaders/edges.rs @@ -5,9 +5,10 @@ use kdam::BarExt; use crate::{ arrow_loader::{ - dataframe::{DFChunk, DFView, SecondaryIndexCol}, + dataframe::{DFChunk, DFView}, df_loaders::{ - extract_secondary_index_col, process_shared_properties, resolve_nodes_with_cache, + extract_secondary_index_col, group_rows_by_vid_segment, process_shared_properties, + resolve_nodes_with_cache, secondary_index_at, }, layer_col::lift_layer_col, node_col::NodeCol, @@ -763,7 +764,7 @@ fn update_edge_properties>( ) { let mut t_props = vec![]; let mut c_props = vec![]; - let mut writer = shard.writer(); + let mut writer = shard.bulk_writer(); for (row, src, dst, time, secondary_index, eid, layer, exists) in zip { if let Some(eid_pos) = writer.resolve_pos(eid) { @@ -803,7 +804,7 @@ fn update_inbound_edges>( zip: impl Iterator, delete: bool, ) { - let mut writer = shard.writer(); + let mut writer = shard.bulk_writer(); for ( _row, src, @@ -859,7 +860,7 @@ fn add_and_resolve_outbound_edges< locked_page: &mut LockedNodePage<'_, NS>, delete: bool, ) { - let mut writer = locked_page.writer(); + let mut writer = locked_page.bulk_writer(); for (row, src, dst, time, secondary_index, layer) in zip { if let Some(src_pos) = writer.resolve_pos(src) { let t = EventTime(time, secondary_index); @@ -903,22 +904,6 @@ fn add_and_resolve_outbound_edges< } } -fn group_rows_by_vid_segment( - vids: &[VID], - max_segment_len: u32, - num_segments: usize, -) -> Vec> { - let mut rows_by_segment = vec![Vec::new(); num_segments]; - for (row, vid) in vids.iter().enumerate() { - let (segment_id, _) = resolve_pos(vid.index(), max_segment_len); - let rows = rows_by_segment - .get_mut(segment_id) - .expect("segment not found while grouping by vid"); - rows.push(row); - } - rows_by_segment -} - fn group_rows_by_eid_segment( eids: &[EID], max_segment_len: u32, @@ -935,19 +920,11 @@ fn group_rows_by_eid_segment( rows_by_segment } -#[inline(always)] -fn secondary_index_at(col: &SecondaryIndexCol, row: usize) -> usize { - match col { - SecondaryIndexCol::DataFrame(arr) => arr.value(row) as usize, - SecondaryIndexCol::Range(range) => range.start + row, - } -} - pub fn store_node_ids>( gid_str_cache: &[(GidRef<'_>, VID)], locked_page: &mut LockedNodePage<'_, NS>, ) { - let mut writer = locked_page.writer(); + let mut writer = locked_page.bulk_writer(); for (src_gid, vid) in gid_str_cache.iter() { if let Some(src_pos) = writer.resolve_pos(*vid) { writer.store_node_id(src_pos, STATIC_GRAPH_LAYER_ID, (*src_gid).into()); diff --git a/raphtory/src/arrow_loader/df_loaders/mod.rs b/raphtory/src/arrow_loader/df_loaders/mod.rs index 7e4cf46d18..5074b06014 100644 --- a/raphtory/src/arrow_loader/df_loaders/mod.rs +++ b/raphtory/src/arrow_loader/df_loaders/mod.rs @@ -23,6 +23,7 @@ use std::{ env, sync::atomic::{AtomicUsize, Ordering}, }; +use storage::pages::resolve_pos; pub mod edge_props; pub mod edges; @@ -357,3 +358,27 @@ fn resolve_nodes_with_cache_generic<'a, V: Send + Sync>( })?; Ok(gid_str_cache) } + +pub(crate) fn group_rows_by_vid_segment( + vids: &[VID], + max_segment_len: u32, + num_segments: usize, +) -> Vec> { + let mut rows_by_segment = vec![Vec::new(); num_segments]; + for (row, vid) in vids.iter().enumerate() { + let (segment_id, _) = resolve_pos(vid.index(), max_segment_len); + let rows = rows_by_segment + .get_mut(segment_id) + .expect("segment not found while grouping by vid"); + rows.push(row); + } + rows_by_segment +} + +#[inline(always)] +fn secondary_index_at(col: &SecondaryIndexCol, row: usize) -> usize { + match col { + SecondaryIndexCol::DataFrame(arr) => arr.value(row) as usize, + SecondaryIndexCol::Range(range) => range.start + row, + } +} diff --git a/raphtory/src/arrow_loader/df_loaders/nodes.rs b/raphtory/src/arrow_loader/df_loaders/nodes.rs index b5088c159e..1646a5f7b4 100644 --- a/raphtory/src/arrow_loader/df_loaders/nodes.rs +++ b/raphtory/src/arrow_loader/df_loaders/nodes.rs @@ -24,7 +24,10 @@ use raphtory_storage::mutation::addition_ops::{InternalAdditionOps, SessionAddit use rayon::prelude::*; use std::{ collections::HashMap, - sync::atomic::{AtomicBool, Ordering}, + sync::{ + atomic::{AtomicBool, Ordering}, + mpsc, + }, }; use storage::{ api::nodes::NodeSegmentOps, @@ -37,7 +40,8 @@ use crate::arrow_loader::df_loaders::build_progress_bar; use crate::arrow_loader::{ dataframe::{DFChunk, DFView}, df_loaders::{ - extract_secondary_index_col, process_shared_properties, resolve_nodes_and_type_with_cache, + extract_secondary_index_col, group_rows_by_vid_segment, process_shared_properties, + resolve_nodes_and_type_with_cache, secondary_index_at, }, layer_col::{lift_layer_col, lift_node_type_col, LayerCol}, node_col::NodeCol, @@ -48,6 +52,75 @@ use crate::arrow_loader::{ use kdam::BarExt; /// If layer_id_col is provided, then layer_col must also be provided +#[allow(clippy::too_many_arguments)] +pub fn load_nodes_from_df_prefetch< + G: StaticGraphViewOps + PropertyAdditionOps + AdditionOps + std::fmt::Debug, + I1: Iterator> + Send, +>( + df_view: DFView, + time: &str, + secondary_index: Option<&str>, + node_id: &str, + properties: &[&str], + metadata: &[&str], + shared_metadata: Option<&HashMap>, + node_type: Option<&str>, + node_type_col: Option<&str>, + graph: &G, + resolve_nodes: bool, + layer: Option<&str>, + layer_col: Option<&str>, + layer_id_col: Option<&str>, +) -> Result<(), GraphError> { + let DFView { + names, + chunks, + num_rows, + } = df_view; + + LOAD_POOL.install(|| { + rayon::scope(|s| { + let (tx, rx) = mpsc::sync_channel(2); + + s.spawn(move |_| { + let sender = tx; + for chunk in chunks { + if let Err(e) = sender.send(chunk) { + eprintln!("Error sending chunk to loader: {}", e); + break; + } + } + }); + + let df_view_prefetch = DFView { + names, + chunks: rx.into_iter(), + num_rows, + }; + + load_nodes_from_df( + df_view_prefetch, + time, + secondary_index, + node_id, + properties, + metadata, + shared_metadata, + node_type, + node_type_col, + graph, + resolve_nodes, + layer, + layer_col, + layer_id_col, + )?; + Ok::<(), GraphError>(()) + })?; + + Ok(()) + }) +} + #[allow(clippy::too_many_arguments)] pub fn load_nodes_from_df< G: StaticGraphViewOps + PropertyAdditionOps + AdditionOps + std::fmt::Debug, @@ -199,20 +272,38 @@ pub fn load_nodes_from_df< node_stats.update_time(time); }; + let max_node_segment_len = write_locked_graph + .graph() + .storage() + .nodes() + .max_segment_len() as usize; + let rows_by_segment = group_rows_by_vid_segment( + src_vids, + max_node_segment_len as u32, + write_locked_graph.nodes.len(), + ); + write_locked_graph .nodes .par_iter_mut() .enumerate() .try_for_each(|(segment_id, shard)| { - if !node_segments_touched[segment_id].load(Ordering::Relaxed) { + let node_rows = &rows_by_segment[segment_id]; + + if node_rows.is_empty() { // we need to graph a writer nevertheless as it may have old data that needs to flush if shard.segment().is_dirty() { - let mut _writer = shard.writer(); + let _writer = shard.writer(); } return Ok::<_, GraphError>(()); } // Zip all columns for iteration. - let zip = izip!(src_vids.iter(), time_col.iter(), secondary_index_col.iter(),); + let zip = node_rows.iter().map(|&row| { + let vid = &src_vids[row]; + let time = time_col[row]; + let secondary_index = secondary_index_at(&secondary_index_col, row); + (row, vid, time, secondary_index) + }); // resolve_nodes=false // assumes we are loading our own graph, via the parquet loaders, @@ -222,7 +313,7 @@ pub fn load_nodes_from_df< } let mut writer = shard.writer(); - for (row, (vid, time, secondary_index)) in zip.enumerate() { + for (row, vid, time, secondary_index) in zip { if let Some(mut_node) = writer.resolve_pos(*vid) { let t = EventTime(time, secondary_index); let layer_id = layer_col_resolved diff --git a/raphtory/src/db/api/view/internal/time_semantics/filtered_edge.rs b/raphtory/src/db/api/view/internal/time_semantics/filtered_edge.rs index a97d4e4fa5..737ad7c4a5 100644 --- a/raphtory/src/db/api/view/internal/time_semantics/filtered_edge.rs +++ b/raphtory/src/db/api/view/internal/time_semantics/filtered_edge.rs @@ -334,7 +334,6 @@ impl<'a> FilteredEdgeStorageOps<'a> for EdgeEntryRef<'a> { ) -> impl Iterator + 'a { self.layer_ids_iter(layer_ids) .filter(move |layer_id| view.internal_filter_edge_layer(self, *layer_id)) - .map(|layer_id| layer_id) } fn filtered_additions_iter( diff --git a/raphtory/src/io/parquet_loaders.rs b/raphtory/src/io/parquet_loaders.rs index b60c44c6be..d807375218 100644 --- a/raphtory/src/io/parquet_loaders.rs +++ b/raphtory/src/io/parquet_loaders.rs @@ -3,7 +3,7 @@ use crate::{ dataframe::*, df_loaders::{ edges::{load_edges_from_df_prefetch, ColumnNames}, - nodes::{load_node_props_from_df, load_nodes_from_df}, + nodes::{load_node_props_from_df, load_nodes_from_df_prefetch}, *, }, }, @@ -89,7 +89,7 @@ pub fn load_nodes_from_parquet< schema.clone(), )?; df_view.check_cols_exist(&cols_to_check)?; - load_nodes_from_df( + load_nodes_from_df_prefetch( df_view, time, secondary_index, diff --git a/raphtory/src/parquet_encoder/mod.rs b/raphtory/src/parquet_encoder/mod.rs index f07706b9b4..c93e20788e 100644 --- a/raphtory/src/parquet_encoder/mod.rs +++ b/raphtory/src/parquet_encoder/mod.rs @@ -93,6 +93,7 @@ pub(crate) fn run_encode_indexed< encode_fn: impl Fn(II, &G, &mut Decoder, &mut S) -> Result<(), GraphError> + Sync, ) -> Result<(), GraphError> { let schema = derive_schema(meta, g.id_type(), default_fields_fn)?; + let num_digits = 8; items.try_for_each(|(chunk, items)| {