1use core::panic;
2use std::cell::RefCell;
3use std::collections::HashMap;
4#[cfg(feature = "build")]
5use std::collections::HashSet;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use slotmap::{SecondaryMap, SparseSecondaryMap};
21#[cfg(feature = "build")]
22use syn::parse_quote;
23use syn::visit::{self, Visit};
24use syn::visit_mut::VisitMut;
25
26use crate::compile::builder::{CycleId, ExternalPortId};
27#[cfg(feature = "build")]
28use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
29use crate::location::dynamic::LocationId;
30use crate::location::{LocationKey, NetworkHint};
31
32pub mod backtrace;
33use backtrace::Backtrace;
34
35#[derive(Clone, Hash)]
39pub struct DebugExpr(pub Box<syn::Expr>);
40
41impl From<syn::Expr> for DebugExpr {
42 fn from(expr: syn::Expr) -> Self {
43 Self(Box::new(expr))
44 }
45}
46
47impl Deref for DebugExpr {
48 type Target = syn::Expr;
49
50 fn deref(&self) -> &Self::Target {
51 &self.0
52 }
53}
54
55impl ToTokens for DebugExpr {
56 fn to_tokens(&self, tokens: &mut TokenStream) {
57 self.0.to_tokens(tokens);
58 }
59}
60
61impl Debug for DebugExpr {
62 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63 write!(f, "{}", self.0.to_token_stream())
64 }
65}
66
67impl Display for DebugExpr {
68 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69 let original = self.0.as_ref().clone();
70 let simplified = simplify_q_macro(original);
71
72 write!(f, "q!({})", quote::quote!(#simplified))
75 }
76}
77
78fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
80 let mut simplifier = QMacroSimplifier::new();
83 simplifier.visit_expr_mut(&mut expr);
84
85 if let Some(simplified) = simplifier.simplified_result {
87 simplified
88 } else {
89 expr
90 }
91}
92
93#[derive(Default)]
95pub struct QMacroSimplifier {
96 pub simplified_result: Option<syn::Expr>,
97}
98
99impl QMacroSimplifier {
100 pub fn new() -> Self {
101 Self::default()
102 }
103}
104
105impl VisitMut for QMacroSimplifier {
106 fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
107 if self.simplified_result.is_some() {
109 return;
110 }
111
112 if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
113 && self.is_stageleft_runtime_support_call(&path_expr.path)
115 && let Some(closure) = self.extract_closure_from_args(&call.args)
117 {
118 self.simplified_result = Some(closure);
119 return;
120 }
121
122 syn::visit_mut::visit_expr_mut(self, expr);
125 }
126}
127
128impl QMacroSimplifier {
129 fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
130 if let Some(last_segment) = path.segments.last() {
132 let fn_name = last_segment.ident.to_string();
133 fn_name.contains("_type_hint")
135 && path.segments.len() > 2
136 && path.segments[0].ident == "stageleft"
137 && path.segments[1].ident == "runtime_support"
138 } else {
139 false
140 }
141 }
142
143 fn extract_closure_from_args(
144 &self,
145 args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
146 ) -> Option<syn::Expr> {
147 for arg in args {
149 if let syn::Expr::Closure(_) = arg {
150 return Some(arg.clone());
151 }
152 if let Some(closure_expr) = self.find_closure_in_expr(arg) {
154 return Some(closure_expr);
155 }
156 }
157 None
158 }
159
160 fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
161 let mut visitor = ClosureFinder {
162 found_closure: None,
163 prefer_inner_blocks: true,
164 };
165 visitor.visit_expr(expr);
166 visitor.found_closure
167 }
168}
169
170struct ClosureFinder {
172 found_closure: Option<syn::Expr>,
173 prefer_inner_blocks: bool,
174}
175
176impl<'ast> Visit<'ast> for ClosureFinder {
177 fn visit_expr(&mut self, expr: &'ast syn::Expr) {
178 if self.found_closure.is_some() {
180 return;
181 }
182
183 match expr {
184 syn::Expr::Closure(_) => {
185 self.found_closure = Some(expr.clone());
186 }
187 syn::Expr::Block(block) if self.prefer_inner_blocks => {
188 for stmt in &block.block.stmts {
190 if let syn::Stmt::Expr(stmt_expr, _) = stmt
191 && let syn::Expr::Block(_) = stmt_expr
192 {
193 let mut inner_visitor = ClosureFinder {
195 found_closure: None,
196 prefer_inner_blocks: false, };
198 inner_visitor.visit_expr(stmt_expr);
199 if inner_visitor.found_closure.is_some() {
200 self.found_closure = Some(stmt_expr.clone());
202 return;
203 }
204 }
205 }
206
207 visit::visit_expr(self, expr);
209
210 if self.found_closure.is_some() {
213 }
215 }
216 _ => {
217 visit::visit_expr(self, expr);
219 }
220 }
221 }
222}
223
224#[derive(Clone, PartialEq, Eq, Hash)]
228pub struct DebugType(pub Box<syn::Type>);
229
230impl From<syn::Type> for DebugType {
231 fn from(t: syn::Type) -> Self {
232 Self(Box::new(t))
233 }
234}
235
236impl Deref for DebugType {
237 type Target = syn::Type;
238
239 fn deref(&self) -> &Self::Target {
240 &self.0
241 }
242}
243
244impl ToTokens for DebugType {
245 fn to_tokens(&self, tokens: &mut TokenStream) {
246 self.0.to_tokens(tokens);
247 }
248}
249
250impl Debug for DebugType {
251 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
252 write!(f, "{}", self.0.to_token_stream())
253 }
254}
255
256pub enum DebugInstantiate {
257 Building,
258 Finalized(Box<DebugInstantiateFinalized>),
259}
260
261#[cfg_attr(
262 not(feature = "build"),
263 expect(
264 dead_code,
265 reason = "sink, source unused without `feature = \"build\"`."
266 )
267)]
268pub struct DebugInstantiateFinalized {
269 sink: syn::Expr,
270 source: syn::Expr,
271 connect_fn: Option<Box<dyn FnOnce()>>,
272}
273
274impl From<DebugInstantiateFinalized> for DebugInstantiate {
275 fn from(f: DebugInstantiateFinalized) -> Self {
276 Self::Finalized(Box::new(f))
277 }
278}
279
280impl Debug for DebugInstantiate {
281 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
282 write!(f, "<network instantiate>")
283 }
284}
285
286impl Hash for DebugInstantiate {
287 fn hash<H: Hasher>(&self, _state: &mut H) {
288 }
290}
291
292impl Clone for DebugInstantiate {
293 fn clone(&self) -> Self {
294 match self {
295 DebugInstantiate::Building => DebugInstantiate::Building,
296 DebugInstantiate::Finalized(_) => {
297 panic!("DebugInstantiate::Finalized should not be cloned")
298 }
299 }
300 }
301}
302
303#[derive(Debug, Hash, Clone)]
312pub enum ClusterMembersState {
313 Uninit,
315 Stream(DebugExpr),
318 Tee(LocationId, LocationId),
322}
323
324#[derive(Debug, Hash, Clone)]
326pub enum HydroSource {
327 Stream(DebugExpr),
328 ExternalNetwork(),
329 Iter(DebugExpr),
330 Spin(),
331 ClusterMembers(LocationId, ClusterMembersState),
332 Embedded(syn::Ident),
333}
334
335#[cfg(feature = "build")]
336pub trait DfirBuilder {
342 fn singleton_intermediates(&self) -> bool;
344
345 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
347
348 fn batch(
349 &mut self,
350 in_ident: syn::Ident,
351 in_location: &LocationId,
352 in_kind: &CollectionKind,
353 out_ident: &syn::Ident,
354 out_location: &LocationId,
355 op_meta: &HydroIrOpMetadata,
356 );
357 fn yield_from_tick(
358 &mut self,
359 in_ident: syn::Ident,
360 in_location: &LocationId,
361 in_kind: &CollectionKind,
362 out_ident: &syn::Ident,
363 out_location: &LocationId,
364 );
365
366 fn begin_atomic(
367 &mut self,
368 in_ident: syn::Ident,
369 in_location: &LocationId,
370 in_kind: &CollectionKind,
371 out_ident: &syn::Ident,
372 out_location: &LocationId,
373 op_meta: &HydroIrOpMetadata,
374 );
375 fn end_atomic(
376 &mut self,
377 in_ident: syn::Ident,
378 in_location: &LocationId,
379 in_kind: &CollectionKind,
380 out_ident: &syn::Ident,
381 );
382
383 #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
384 fn observe_nondet(
385 &mut self,
386 trusted: bool,
387 location: &LocationId,
388 in_ident: syn::Ident,
389 in_kind: &CollectionKind,
390 out_ident: &syn::Ident,
391 out_kind: &CollectionKind,
392 op_meta: &HydroIrOpMetadata,
393 );
394
395 #[expect(clippy::too_many_arguments, reason = "TODO")]
396 fn create_network(
397 &mut self,
398 from: &LocationId,
399 to: &LocationId,
400 input_ident: syn::Ident,
401 out_ident: &syn::Ident,
402 serialize: Option<&DebugExpr>,
403 sink: syn::Expr,
404 source: syn::Expr,
405 deserialize: Option<&DebugExpr>,
406 tag_id: usize,
407 networking_info: &crate::networking::NetworkingInfo,
408 );
409
410 fn create_external_source(
411 &mut self,
412 on: &LocationId,
413 source_expr: syn::Expr,
414 out_ident: &syn::Ident,
415 deserialize: Option<&DebugExpr>,
416 tag_id: usize,
417 );
418
419 fn create_external_output(
420 &mut self,
421 on: &LocationId,
422 sink_expr: syn::Expr,
423 input_ident: &syn::Ident,
424 serialize: Option<&DebugExpr>,
425 tag_id: usize,
426 );
427}
428
429#[cfg(feature = "build")]
430impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
431 fn singleton_intermediates(&self) -> bool {
432 false
433 }
434
435 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
436 self.entry(location.root().key())
437 .expect("location was removed")
438 .or_default()
439 }
440
441 fn batch(
442 &mut self,
443 in_ident: syn::Ident,
444 in_location: &LocationId,
445 in_kind: &CollectionKind,
446 out_ident: &syn::Ident,
447 _out_location: &LocationId,
448 _op_meta: &HydroIrOpMetadata,
449 ) {
450 let builder = self.get_dfir_mut(in_location.root());
451 if in_kind.is_bounded()
452 && matches!(
453 in_kind,
454 CollectionKind::Singleton { .. }
455 | CollectionKind::Optional { .. }
456 | CollectionKind::KeyedSingleton { .. }
457 )
458 {
459 assert!(in_location.is_top_level());
460 builder.add_dfir(
461 parse_quote! {
462 #out_ident = #in_ident -> persist::<'static>();
463 },
464 None,
465 None,
466 );
467 } else {
468 builder.add_dfir(
469 parse_quote! {
470 #out_ident = #in_ident;
471 },
472 None,
473 None,
474 );
475 }
476 }
477
478 fn yield_from_tick(
479 &mut self,
480 in_ident: syn::Ident,
481 in_location: &LocationId,
482 _in_kind: &CollectionKind,
483 out_ident: &syn::Ident,
484 _out_location: &LocationId,
485 ) {
486 let builder = self.get_dfir_mut(in_location.root());
487 builder.add_dfir(
488 parse_quote! {
489 #out_ident = #in_ident;
490 },
491 None,
492 None,
493 );
494 }
495
496 fn begin_atomic(
497 &mut self,
498 in_ident: syn::Ident,
499 in_location: &LocationId,
500 _in_kind: &CollectionKind,
501 out_ident: &syn::Ident,
502 _out_location: &LocationId,
503 _op_meta: &HydroIrOpMetadata,
504 ) {
505 let builder = self.get_dfir_mut(in_location.root());
506 builder.add_dfir(
507 parse_quote! {
508 #out_ident = #in_ident;
509 },
510 None,
511 None,
512 );
513 }
514
515 fn end_atomic(
516 &mut self,
517 in_ident: syn::Ident,
518 in_location: &LocationId,
519 _in_kind: &CollectionKind,
520 out_ident: &syn::Ident,
521 ) {
522 let builder = self.get_dfir_mut(in_location.root());
523 builder.add_dfir(
524 parse_quote! {
525 #out_ident = #in_ident;
526 },
527 None,
528 None,
529 );
530 }
531
532 fn observe_nondet(
533 &mut self,
534 _trusted: bool,
535 location: &LocationId,
536 in_ident: syn::Ident,
537 _in_kind: &CollectionKind,
538 out_ident: &syn::Ident,
539 _out_kind: &CollectionKind,
540 _op_meta: &HydroIrOpMetadata,
541 ) {
542 let builder = self.get_dfir_mut(location);
543 builder.add_dfir(
544 parse_quote! {
545 #out_ident = #in_ident;
546 },
547 None,
548 None,
549 );
550 }
551
552 fn create_network(
553 &mut self,
554 from: &LocationId,
555 to: &LocationId,
556 input_ident: syn::Ident,
557 out_ident: &syn::Ident,
558 serialize: Option<&DebugExpr>,
559 sink: syn::Expr,
560 source: syn::Expr,
561 deserialize: Option<&DebugExpr>,
562 tag_id: usize,
563 _networking_info: &crate::networking::NetworkingInfo,
564 ) {
565 let sender_builder = self.get_dfir_mut(from);
566 if let Some(serialize_pipeline) = serialize {
567 sender_builder.add_dfir(
568 parse_quote! {
569 #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
570 },
571 None,
572 Some(&format!("send{}", tag_id)),
574 );
575 } else {
576 sender_builder.add_dfir(
577 parse_quote! {
578 #input_ident -> dest_sink(#sink);
579 },
580 None,
581 Some(&format!("send{}", tag_id)),
582 );
583 }
584
585 let receiver_builder = self.get_dfir_mut(to);
586 if let Some(deserialize_pipeline) = deserialize {
587 receiver_builder.add_dfir(
588 parse_quote! {
589 #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
590 },
591 None,
592 Some(&format!("recv{}", tag_id)),
593 );
594 } else {
595 receiver_builder.add_dfir(
596 parse_quote! {
597 #out_ident = source_stream(#source);
598 },
599 None,
600 Some(&format!("recv{}", tag_id)),
601 );
602 }
603 }
604
605 fn create_external_source(
606 &mut self,
607 on: &LocationId,
608 source_expr: syn::Expr,
609 out_ident: &syn::Ident,
610 deserialize: Option<&DebugExpr>,
611 tag_id: usize,
612 ) {
613 let receiver_builder = self.get_dfir_mut(on);
614 if let Some(deserialize_pipeline) = deserialize {
615 receiver_builder.add_dfir(
616 parse_quote! {
617 #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
618 },
619 None,
620 Some(&format!("recv{}", tag_id)),
621 );
622 } else {
623 receiver_builder.add_dfir(
624 parse_quote! {
625 #out_ident = source_stream(#source_expr);
626 },
627 None,
628 Some(&format!("recv{}", tag_id)),
629 );
630 }
631 }
632
633 fn create_external_output(
634 &mut self,
635 on: &LocationId,
636 sink_expr: syn::Expr,
637 input_ident: &syn::Ident,
638 serialize: Option<&DebugExpr>,
639 tag_id: usize,
640 ) {
641 let sender_builder = self.get_dfir_mut(on);
642 if let Some(serialize_fn) = serialize {
643 sender_builder.add_dfir(
644 parse_quote! {
645 #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
646 },
647 None,
648 Some(&format!("send{}", tag_id)),
650 );
651 } else {
652 sender_builder.add_dfir(
653 parse_quote! {
654 #input_ident -> dest_sink(#sink_expr);
655 },
656 None,
657 Some(&format!("send{}", tag_id)),
658 );
659 }
660 }
661}
662
663#[cfg(feature = "build")]
664pub enum BuildersOrCallback<'a, L, N>
665where
666 L: FnMut(&mut HydroRoot, &mut usize),
667 N: FnMut(&mut HydroNode, &mut usize),
668{
669 Builders(&'a mut dyn DfirBuilder),
670 Callback(L, N),
671}
672
673#[derive(Debug, Hash)]
677pub enum HydroRoot {
678 ForEach {
679 f: DebugExpr,
680 input: Box<HydroNode>,
681 op_metadata: HydroIrOpMetadata,
682 },
683 SendExternal {
684 to_external_key: LocationKey,
685 to_port_id: ExternalPortId,
686 to_many: bool,
687 unpaired: bool,
688 serialize_fn: Option<DebugExpr>,
689 instantiate_fn: DebugInstantiate,
690 input: Box<HydroNode>,
691 op_metadata: HydroIrOpMetadata,
692 },
693 DestSink {
694 sink: DebugExpr,
695 input: Box<HydroNode>,
696 op_metadata: HydroIrOpMetadata,
697 },
698 CycleSink {
699 cycle_id: CycleId,
700 input: Box<HydroNode>,
701 op_metadata: HydroIrOpMetadata,
702 },
703 EmbeddedOutput {
704 ident: syn::Ident,
705 input: Box<HydroNode>,
706 op_metadata: HydroIrOpMetadata,
707 },
708}
709
710impl HydroRoot {
711 #[cfg(feature = "build")]
712 #[expect(clippy::too_many_arguments, reason = "TODO(internal)")]
713 pub fn compile_network<'a, D>(
714 &mut self,
715 extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
716 seen_tees: &mut SeenSharedNodes,
717 seen_cluster_members: &mut HashSet<(LocationId, LocationId)>,
718 processes: &SparseSecondaryMap<LocationKey, D::Process>,
719 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
720 externals: &SparseSecondaryMap<LocationKey, D::External>,
721 env: &mut D::InstantiateEnv,
722 ) where
723 D: Deploy<'a>,
724 {
725 let refcell_extra_stmts = RefCell::new(extra_stmts);
726 let refcell_env = RefCell::new(env);
727 let refcell_seen_cluster_members = RefCell::new(seen_cluster_members);
728 self.transform_bottom_up(
729 &mut |l| {
730 if let HydroRoot::SendExternal {
731 input,
732 to_external_key,
733 to_port_id,
734 to_many,
735 unpaired,
736 instantiate_fn,
737 ..
738 } = l
739 {
740 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
741 DebugInstantiate::Building => {
742 let to_node = externals
743 .get(*to_external_key)
744 .unwrap_or_else(|| {
745 panic!("A external used in the graph was not instantiated: {}", to_external_key)
746 })
747 .clone();
748
749 match input.metadata().location_id.root() {
750 &LocationId::Process(process_key) => {
751 if *to_many {
752 (
753 (
754 D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
755 parse_quote!(DUMMY),
756 ),
757 Box::new(|| {}) as Box<dyn FnOnce()>,
758 )
759 } else {
760 let from_node = processes
761 .get(process_key)
762 .unwrap_or_else(|| {
763 panic!("A process used in the graph was not instantiated: {}", process_key)
764 })
765 .clone();
766
767 let sink_port = from_node.next_port();
768 let source_port = to_node.next_port();
769
770 if *unpaired {
771 use stageleft::quote_type;
772 use tokio_util::codec::LengthDelimitedCodec;
773
774 to_node.register(*to_port_id, source_port.clone());
775
776 let _ = D::e2o_source(
777 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
778 &to_node, &source_port,
779 &from_node, &sink_port,
780 "e_type::<LengthDelimitedCodec>(),
781 format!("{}_{}", *to_external_key, *to_port_id)
782 );
783 }
784
785 (
786 (
787 D::o2e_sink(
788 &from_node,
789 &sink_port,
790 &to_node,
791 &source_port,
792 format!("{}_{}", *to_external_key, *to_port_id)
793 ),
794 parse_quote!(DUMMY),
795 ),
796 if *unpaired {
797 D::e2o_connect(
798 &to_node,
799 &source_port,
800 &from_node,
801 &sink_port,
802 *to_many,
803 NetworkHint::Auto,
804 )
805 } else {
806 Box::new(|| {}) as Box<dyn FnOnce()>
807 },
808 )
809 }
810 }
811 LocationId::Cluster(_) => todo!(),
812 _ => panic!()
813 }
814 },
815
816 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
817 };
818
819 *instantiate_fn = DebugInstantiateFinalized {
820 sink: sink_expr,
821 source: source_expr,
822 connect_fn: Some(connect_fn),
823 }
824 .into();
825 } else if let HydroRoot::EmbeddedOutput { ident, input, .. } = l {
826 let element_type = match &input.metadata().collection_kind {
827 CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
828 _ => panic!("Embedded output must have Stream collection kind"),
829 };
830 let location_key = match input.metadata().location_id.root() {
831 LocationId::Process(key) | LocationId::Cluster(key) => *key,
832 _ => panic!("Embedded output must be on a process or cluster"),
833 };
834 D::register_embedded_output(
835 &mut refcell_env.borrow_mut(),
836 location_key,
837 ident,
838 &element_type,
839 );
840 }
841 },
842 &mut |n| {
843 if let HydroNode::Network {
844 name,
845 networking_info,
846 input,
847 instantiate_fn,
848 metadata,
849 ..
850 } = n
851 {
852 let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
853 DebugInstantiate::Building => instantiate_network::<D>(
854 &mut refcell_env.borrow_mut(),
855 input.metadata().location_id.root(),
856 metadata.location_id.root(),
857 processes,
858 clusters,
859 name.as_deref(),
860 networking_info,
861 ),
862
863 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
864 };
865
866 *instantiate_fn = DebugInstantiateFinalized {
867 sink: sink_expr,
868 source: source_expr,
869 connect_fn: Some(connect_fn),
870 }
871 .into();
872 } else if let HydroNode::ExternalInput {
873 from_external_key,
874 from_port_id,
875 from_many,
876 codec_type,
877 port_hint,
878 instantiate_fn,
879 metadata,
880 ..
881 } = n
882 {
883 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
884 DebugInstantiate::Building => {
885 let from_node = externals
886 .get(*from_external_key)
887 .unwrap_or_else(|| {
888 panic!(
889 "A external used in the graph was not instantiated: {}",
890 from_external_key,
891 )
892 })
893 .clone();
894
895 match metadata.location_id.root() {
896 &LocationId::Process(process_key) => {
897 let to_node = processes
898 .get(process_key)
899 .unwrap_or_else(|| {
900 panic!("A process used in the graph was not instantiated: {}", process_key)
901 })
902 .clone();
903
904 let sink_port = from_node.next_port();
905 let source_port = to_node.next_port();
906
907 from_node.register(*from_port_id, sink_port.clone());
908
909 (
910 (
911 parse_quote!(DUMMY),
912 if *from_many {
913 D::e2o_many_source(
914 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
915 &to_node, &source_port,
916 codec_type.0.as_ref(),
917 format!("{}_{}", *from_external_key, *from_port_id)
918 )
919 } else {
920 D::e2o_source(
921 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
922 &from_node, &sink_port,
923 &to_node, &source_port,
924 codec_type.0.as_ref(),
925 format!("{}_{}", *from_external_key, *from_port_id)
926 )
927 },
928 ),
929 D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
930 )
931 }
932 LocationId::Cluster(_) => todo!(),
933 _ => panic!()
934 }
935 },
936
937 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
938 };
939
940 *instantiate_fn = DebugInstantiateFinalized {
941 sink: sink_expr,
942 source: source_expr,
943 connect_fn: Some(connect_fn),
944 }
945 .into();
946 } else if let HydroNode::Source { source: HydroSource::Embedded(ident), metadata } = n {
947 let element_type = match &metadata.collection_kind {
948 CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
949 _ => panic!("Embedded source must have Stream collection kind"),
950 };
951 let location_key = match metadata.location_id.root() {
952 LocationId::Process(key) | LocationId::Cluster(key) => *key,
953 _ => panic!("Embedded source must be on a process or cluster"),
954 };
955 D::register_embedded_input(
956 &mut refcell_env.borrow_mut(),
957 location_key,
958 ident,
959 &element_type,
960 );
961 } else if let HydroNode::Source { source: HydroSource::ClusterMembers(location_id, state), metadata } = n {
962 match state {
963 ClusterMembersState::Uninit => {
964 let at_location = metadata.location_id.root().clone();
965 let key = (at_location.clone(), LocationId::Cluster(location_id.key()));
966 if refcell_seen_cluster_members.borrow_mut().insert(key) {
967 let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
969 D::cluster_membership_stream(&mut refcell_env.borrow_mut(), &at_location, location_id),
970 &(),
971 );
972 *state = ClusterMembersState::Stream(expr.into());
973 } else {
974 *state = ClusterMembersState::Tee(at_location, location_id.clone());
976 }
977 }
978 ClusterMembersState::Stream(_) | ClusterMembersState::Tee(..) => {
979 panic!("cluster members already finalized");
980 }
981 }
982 }
983 },
984 seen_tees,
985 false,
986 );
987 }
988
989 pub fn connect_network(&mut self, seen_tees: &mut SeenSharedNodes) {
990 self.transform_bottom_up(
991 &mut |l| {
992 if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
993 match instantiate_fn {
994 DebugInstantiate::Building => panic!("network not built"),
995
996 DebugInstantiate::Finalized(finalized) => {
997 (finalized.connect_fn.take().unwrap())();
998 }
999 }
1000 }
1001 },
1002 &mut |n| {
1003 if let HydroNode::Network { instantiate_fn, .. }
1004 | HydroNode::ExternalInput { instantiate_fn, .. } = n
1005 {
1006 match instantiate_fn {
1007 DebugInstantiate::Building => panic!("network not built"),
1008
1009 DebugInstantiate::Finalized(finalized) => {
1010 (finalized.connect_fn.take().unwrap())();
1011 }
1012 }
1013 }
1014 },
1015 seen_tees,
1016 false,
1017 );
1018 }
1019
1020 pub fn transform_bottom_up(
1021 &mut self,
1022 transform_root: &mut impl FnMut(&mut HydroRoot),
1023 transform_node: &mut impl FnMut(&mut HydroNode),
1024 seen_tees: &mut SeenSharedNodes,
1025 check_well_formed: bool,
1026 ) {
1027 self.transform_children(
1028 |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
1029 seen_tees,
1030 );
1031
1032 transform_root(self);
1033 }
1034
1035 pub fn transform_children(
1036 &mut self,
1037 mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
1038 seen_tees: &mut SeenSharedNodes,
1039 ) {
1040 match self {
1041 HydroRoot::ForEach { input, .. }
1042 | HydroRoot::SendExternal { input, .. }
1043 | HydroRoot::DestSink { input, .. }
1044 | HydroRoot::CycleSink { input, .. }
1045 | HydroRoot::EmbeddedOutput { input, .. } => {
1046 transform(input, seen_tees);
1047 }
1048 }
1049 }
1050
1051 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroRoot {
1052 match self {
1053 HydroRoot::ForEach {
1054 f,
1055 input,
1056 op_metadata,
1057 } => HydroRoot::ForEach {
1058 f: f.clone(),
1059 input: Box::new(input.deep_clone(seen_tees)),
1060 op_metadata: op_metadata.clone(),
1061 },
1062 HydroRoot::SendExternal {
1063 to_external_key,
1064 to_port_id,
1065 to_many,
1066 unpaired,
1067 serialize_fn,
1068 instantiate_fn,
1069 input,
1070 op_metadata,
1071 } => HydroRoot::SendExternal {
1072 to_external_key: *to_external_key,
1073 to_port_id: *to_port_id,
1074 to_many: *to_many,
1075 unpaired: *unpaired,
1076 serialize_fn: serialize_fn.clone(),
1077 instantiate_fn: instantiate_fn.clone(),
1078 input: Box::new(input.deep_clone(seen_tees)),
1079 op_metadata: op_metadata.clone(),
1080 },
1081 HydroRoot::DestSink {
1082 sink,
1083 input,
1084 op_metadata,
1085 } => HydroRoot::DestSink {
1086 sink: sink.clone(),
1087 input: Box::new(input.deep_clone(seen_tees)),
1088 op_metadata: op_metadata.clone(),
1089 },
1090 HydroRoot::CycleSink {
1091 cycle_id,
1092 input,
1093 op_metadata,
1094 } => HydroRoot::CycleSink {
1095 cycle_id: *cycle_id,
1096 input: Box::new(input.deep_clone(seen_tees)),
1097 op_metadata: op_metadata.clone(),
1098 },
1099 HydroRoot::EmbeddedOutput {
1100 ident,
1101 input,
1102 op_metadata,
1103 } => HydroRoot::EmbeddedOutput {
1104 ident: ident.clone(),
1105 input: Box::new(input.deep_clone(seen_tees)),
1106 op_metadata: op_metadata.clone(),
1107 },
1108 }
1109 }
1110
1111 #[cfg(feature = "build")]
1112 pub fn emit(
1113 &mut self,
1114 graph_builders: &mut dyn DfirBuilder,
1115 seen_tees: &mut SeenSharedNodes,
1116 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1117 next_stmt_id: &mut usize,
1118 ) {
1119 self.emit_core(
1120 &mut BuildersOrCallback::<
1121 fn(&mut HydroRoot, &mut usize),
1122 fn(&mut HydroNode, &mut usize),
1123 >::Builders(graph_builders),
1124 seen_tees,
1125 built_tees,
1126 next_stmt_id,
1127 );
1128 }
1129
1130 #[cfg(feature = "build")]
1131 pub fn emit_core(
1132 &mut self,
1133 builders_or_callback: &mut BuildersOrCallback<
1134 impl FnMut(&mut HydroRoot, &mut usize),
1135 impl FnMut(&mut HydroNode, &mut usize),
1136 >,
1137 seen_tees: &mut SeenSharedNodes,
1138 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1139 next_stmt_id: &mut usize,
1140 ) {
1141 match self {
1142 HydroRoot::ForEach { f, input, .. } => {
1143 let input_ident =
1144 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1145
1146 match builders_or_callback {
1147 BuildersOrCallback::Builders(graph_builders) => {
1148 graph_builders
1149 .get_dfir_mut(&input.metadata().location_id)
1150 .add_dfir(
1151 parse_quote! {
1152 #input_ident -> for_each(#f);
1153 },
1154 None,
1155 Some(&next_stmt_id.to_string()),
1156 );
1157 }
1158 BuildersOrCallback::Callback(leaf_callback, _) => {
1159 leaf_callback(self, next_stmt_id);
1160 }
1161 }
1162
1163 *next_stmt_id += 1;
1164 }
1165
1166 HydroRoot::SendExternal {
1167 serialize_fn,
1168 instantiate_fn,
1169 input,
1170 ..
1171 } => {
1172 let input_ident =
1173 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1174
1175 match builders_or_callback {
1176 BuildersOrCallback::Builders(graph_builders) => {
1177 let (sink_expr, _) = match instantiate_fn {
1178 DebugInstantiate::Building => (
1179 syn::parse_quote!(DUMMY_SINK),
1180 syn::parse_quote!(DUMMY_SOURCE),
1181 ),
1182
1183 DebugInstantiate::Finalized(finalized) => {
1184 (finalized.sink.clone(), finalized.source.clone())
1185 }
1186 };
1187
1188 graph_builders.create_external_output(
1189 &input.metadata().location_id,
1190 sink_expr,
1191 &input_ident,
1192 serialize_fn.as_ref(),
1193 *next_stmt_id,
1194 );
1195 }
1196 BuildersOrCallback::Callback(leaf_callback, _) => {
1197 leaf_callback(self, next_stmt_id);
1198 }
1199 }
1200
1201 *next_stmt_id += 1;
1202 }
1203
1204 HydroRoot::DestSink { sink, input, .. } => {
1205 let input_ident =
1206 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1207
1208 match builders_or_callback {
1209 BuildersOrCallback::Builders(graph_builders) => {
1210 graph_builders
1211 .get_dfir_mut(&input.metadata().location_id)
1212 .add_dfir(
1213 parse_quote! {
1214 #input_ident -> dest_sink(#sink);
1215 },
1216 None,
1217 Some(&next_stmt_id.to_string()),
1218 );
1219 }
1220 BuildersOrCallback::Callback(leaf_callback, _) => {
1221 leaf_callback(self, next_stmt_id);
1222 }
1223 }
1224
1225 *next_stmt_id += 1;
1226 }
1227
1228 HydroRoot::CycleSink {
1229 cycle_id, input, ..
1230 } => {
1231 let input_ident =
1232 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1233
1234 match builders_or_callback {
1235 BuildersOrCallback::Builders(graph_builders) => {
1236 let elem_type: syn::Type = match &input.metadata().collection_kind {
1237 CollectionKind::KeyedSingleton {
1238 key_type,
1239 value_type,
1240 ..
1241 }
1242 | CollectionKind::KeyedStream {
1243 key_type,
1244 value_type,
1245 ..
1246 } => {
1247 parse_quote!((#key_type, #value_type))
1248 }
1249 CollectionKind::Stream { element_type, .. }
1250 | CollectionKind::Singleton { element_type, .. }
1251 | CollectionKind::Optional { element_type, .. } => {
1252 parse_quote!(#element_type)
1253 }
1254 };
1255
1256 let cycle_id_ident = cycle_id.as_ident();
1257 graph_builders
1258 .get_dfir_mut(&input.metadata().location_id)
1259 .add_dfir(
1260 parse_quote! {
1261 #cycle_id_ident = #input_ident -> identity::<#elem_type>();
1262 },
1263 None,
1264 None,
1265 );
1266 }
1267 BuildersOrCallback::Callback(_, _) => {}
1269 }
1270 }
1271
1272 HydroRoot::EmbeddedOutput { ident, input, .. } => {
1273 let input_ident =
1274 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1275
1276 match builders_or_callback {
1277 BuildersOrCallback::Builders(graph_builders) => {
1278 graph_builders
1279 .get_dfir_mut(&input.metadata().location_id)
1280 .add_dfir(
1281 parse_quote! {
1282 #input_ident -> for_each(&mut #ident);
1283 },
1284 None,
1285 Some(&next_stmt_id.to_string()),
1286 );
1287 }
1288 BuildersOrCallback::Callback(leaf_callback, _) => {
1289 leaf_callback(self, next_stmt_id);
1290 }
1291 }
1292
1293 *next_stmt_id += 1;
1294 }
1295 }
1296 }
1297
1298 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1299 match self {
1300 HydroRoot::ForEach { op_metadata, .. }
1301 | HydroRoot::SendExternal { op_metadata, .. }
1302 | HydroRoot::DestSink { op_metadata, .. }
1303 | HydroRoot::CycleSink { op_metadata, .. }
1304 | HydroRoot::EmbeddedOutput { op_metadata, .. } => op_metadata,
1305 }
1306 }
1307
1308 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1309 match self {
1310 HydroRoot::ForEach { op_metadata, .. }
1311 | HydroRoot::SendExternal { op_metadata, .. }
1312 | HydroRoot::DestSink { op_metadata, .. }
1313 | HydroRoot::CycleSink { op_metadata, .. }
1314 | HydroRoot::EmbeddedOutput { op_metadata, .. } => op_metadata,
1315 }
1316 }
1317
1318 pub fn input(&self) -> &HydroNode {
1319 match self {
1320 HydroRoot::ForEach { input, .. }
1321 | HydroRoot::SendExternal { input, .. }
1322 | HydroRoot::DestSink { input, .. }
1323 | HydroRoot::CycleSink { input, .. }
1324 | HydroRoot::EmbeddedOutput { input, .. } => input,
1325 }
1326 }
1327
1328 pub fn input_metadata(&self) -> &HydroIrMetadata {
1329 self.input().metadata()
1330 }
1331
1332 pub fn print_root(&self) -> String {
1333 match self {
1334 HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1335 HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1336 HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1337 HydroRoot::CycleSink { cycle_id, .. } => format!("CycleSink({})", cycle_id),
1338 HydroRoot::EmbeddedOutput { ident, .. } => {
1339 format!("EmbeddedOutput({})", ident)
1340 }
1341 }
1342 }
1343
1344 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1345 match self {
1346 HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
1347 transform(f);
1348 }
1349 HydroRoot::SendExternal { .. }
1350 | HydroRoot::CycleSink { .. }
1351 | HydroRoot::EmbeddedOutput { .. } => {}
1352 }
1353 }
1354}
1355
1356#[cfg(feature = "build")]
1357pub fn emit(ir: &mut Vec<HydroRoot>) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1358 let mut builders = SecondaryMap::new();
1359 let mut seen_tees = HashMap::new();
1360 let mut built_tees = HashMap::new();
1361 let mut next_stmt_id = 0;
1362 for leaf in ir {
1363 leaf.emit(
1364 &mut builders,
1365 &mut seen_tees,
1366 &mut built_tees,
1367 &mut next_stmt_id,
1368 );
1369 }
1370 builders
1371}
1372
1373#[cfg(feature = "build")]
1374pub fn traverse_dfir(
1375 ir: &mut [HydroRoot],
1376 transform_root: impl FnMut(&mut HydroRoot, &mut usize),
1377 transform_node: impl FnMut(&mut HydroNode, &mut usize),
1378) {
1379 let mut seen_tees = HashMap::new();
1380 let mut built_tees = HashMap::new();
1381 let mut next_stmt_id = 0;
1382 let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1383 ir.iter_mut().for_each(|leaf| {
1384 leaf.emit_core(
1385 &mut callback,
1386 &mut seen_tees,
1387 &mut built_tees,
1388 &mut next_stmt_id,
1389 );
1390 });
1391}
1392
1393pub fn transform_bottom_up(
1394 ir: &mut [HydroRoot],
1395 transform_root: &mut impl FnMut(&mut HydroRoot),
1396 transform_node: &mut impl FnMut(&mut HydroNode),
1397 check_well_formed: bool,
1398) {
1399 let mut seen_tees = HashMap::new();
1400 ir.iter_mut().for_each(|leaf| {
1401 leaf.transform_bottom_up(
1402 transform_root,
1403 transform_node,
1404 &mut seen_tees,
1405 check_well_formed,
1406 );
1407 });
1408}
1409
1410pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1411 let mut seen_tees = HashMap::new();
1412 ir.iter()
1413 .map(|leaf| leaf.deep_clone(&mut seen_tees))
1414 .collect()
1415}
1416
1417type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1418thread_local! {
1419 static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1420}
1421
1422pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1423 PRINTED_TEES.with(|printed_tees| {
1424 let mut printed_tees_mut = printed_tees.borrow_mut();
1425 *printed_tees_mut = Some((0, HashMap::new()));
1426 drop(printed_tees_mut);
1427
1428 let ret = f();
1429
1430 let mut printed_tees_mut = printed_tees.borrow_mut();
1431 *printed_tees_mut = None;
1432
1433 ret
1434 })
1435}
1436
1437pub struct SharedNode(pub Rc<RefCell<HydroNode>>);
1438
1439impl SharedNode {
1440 pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
1441 Rc::as_ptr(&self.0)
1442 }
1443}
1444
1445impl Debug for SharedNode {
1446 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1447 PRINTED_TEES.with(|printed_tees| {
1448 let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
1449 let printed_tees_mut = printed_tees_mut_borrow.as_mut();
1450
1451 if let Some(printed_tees_mut) = printed_tees_mut {
1452 if let Some(existing) = printed_tees_mut
1453 .1
1454 .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
1455 {
1456 write!(f, "<shared {}>", existing)
1457 } else {
1458 let next_id = printed_tees_mut.0;
1459 printed_tees_mut.0 += 1;
1460 printed_tees_mut
1461 .1
1462 .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
1463 drop(printed_tees_mut_borrow);
1464 write!(f, "<shared {}>: ", next_id)?;
1465 Debug::fmt(&self.0.borrow(), f)
1466 }
1467 } else {
1468 drop(printed_tees_mut_borrow);
1469 write!(f, "<shared>: ")?;
1470 Debug::fmt(&self.0.borrow(), f)
1471 }
1472 })
1473 }
1474}
1475
1476impl Hash for SharedNode {
1477 fn hash<H: Hasher>(&self, state: &mut H) {
1478 self.0.borrow_mut().hash(state);
1479 }
1480}
1481
1482#[derive(Clone, PartialEq, Eq, Debug)]
1483pub enum BoundKind {
1484 Unbounded,
1485 Bounded,
1486}
1487
1488#[derive(Clone, PartialEq, Eq, Debug)]
1489pub enum StreamOrder {
1490 NoOrder,
1491 TotalOrder,
1492}
1493
1494#[derive(Clone, PartialEq, Eq, Debug)]
1495pub enum StreamRetry {
1496 AtLeastOnce,
1497 ExactlyOnce,
1498}
1499
1500#[derive(Clone, PartialEq, Eq, Debug)]
1501pub enum KeyedSingletonBoundKind {
1502 Unbounded,
1503 BoundedValue,
1504 Bounded,
1505}
1506
1507#[derive(Clone, PartialEq, Eq, Debug)]
1508pub enum CollectionKind {
1509 Stream {
1510 bound: BoundKind,
1511 order: StreamOrder,
1512 retry: StreamRetry,
1513 element_type: DebugType,
1514 },
1515 Singleton {
1516 bound: BoundKind,
1517 element_type: DebugType,
1518 },
1519 Optional {
1520 bound: BoundKind,
1521 element_type: DebugType,
1522 },
1523 KeyedStream {
1524 bound: BoundKind,
1525 value_order: StreamOrder,
1526 value_retry: StreamRetry,
1527 key_type: DebugType,
1528 value_type: DebugType,
1529 },
1530 KeyedSingleton {
1531 bound: KeyedSingletonBoundKind,
1532 key_type: DebugType,
1533 value_type: DebugType,
1534 },
1535}
1536
1537impl CollectionKind {
1538 pub fn is_bounded(&self) -> bool {
1539 matches!(
1540 self,
1541 CollectionKind::Stream {
1542 bound: BoundKind::Bounded,
1543 ..
1544 } | CollectionKind::Singleton {
1545 bound: BoundKind::Bounded,
1546 ..
1547 } | CollectionKind::Optional {
1548 bound: BoundKind::Bounded,
1549 ..
1550 } | CollectionKind::KeyedStream {
1551 bound: BoundKind::Bounded,
1552 ..
1553 } | CollectionKind::KeyedSingleton {
1554 bound: KeyedSingletonBoundKind::Bounded,
1555 ..
1556 }
1557 )
1558 }
1559}
1560
1561#[derive(Clone)]
1562pub struct HydroIrMetadata {
1563 pub location_id: LocationId,
1564 pub collection_kind: CollectionKind,
1565 pub cardinality: Option<usize>,
1566 pub tag: Option<String>,
1567 pub op: HydroIrOpMetadata,
1568}
1569
1570impl Hash for HydroIrMetadata {
1572 fn hash<H: Hasher>(&self, _: &mut H) {}
1573}
1574
1575impl PartialEq for HydroIrMetadata {
1576 fn eq(&self, _: &Self) -> bool {
1577 true
1578 }
1579}
1580
1581impl Eq for HydroIrMetadata {}
1582
1583impl Debug for HydroIrMetadata {
1584 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1585 f.debug_struct("HydroIrMetadata")
1586 .field("location_id", &self.location_id)
1587 .field("collection_kind", &self.collection_kind)
1588 .finish()
1589 }
1590}
1591
1592#[derive(Clone)]
1595pub struct HydroIrOpMetadata {
1596 pub backtrace: Backtrace,
1597 pub cpu_usage: Option<f64>,
1598 pub network_recv_cpu_usage: Option<f64>,
1599 pub id: Option<usize>,
1600}
1601
1602impl HydroIrOpMetadata {
1603 #[expect(
1604 clippy::new_without_default,
1605 reason = "explicit calls to new ensure correct backtrace bounds"
1606 )]
1607 pub fn new() -> HydroIrOpMetadata {
1608 Self::new_with_skip(1)
1609 }
1610
1611 fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
1612 HydroIrOpMetadata {
1613 backtrace: Backtrace::get_backtrace(2 + skip_count),
1614 cpu_usage: None,
1615 network_recv_cpu_usage: None,
1616 id: None,
1617 }
1618 }
1619}
1620
1621impl Debug for HydroIrOpMetadata {
1622 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1623 f.debug_struct("HydroIrOpMetadata").finish()
1624 }
1625}
1626
1627impl Hash for HydroIrOpMetadata {
1628 fn hash<H: Hasher>(&self, _: &mut H) {}
1629}
1630
1631#[derive(Debug, Hash)]
1634pub enum HydroNode {
1635 Placeholder,
1636
1637 Cast {
1645 inner: Box<HydroNode>,
1646 metadata: HydroIrMetadata,
1647 },
1648
1649 ObserveNonDet {
1655 inner: Box<HydroNode>,
1656 trusted: bool, metadata: HydroIrMetadata,
1658 },
1659
1660 Source {
1661 source: HydroSource,
1662 metadata: HydroIrMetadata,
1663 },
1664
1665 SingletonSource {
1666 value: DebugExpr,
1667 first_tick_only: bool,
1668 metadata: HydroIrMetadata,
1669 },
1670
1671 CycleSource {
1672 cycle_id: CycleId,
1673 metadata: HydroIrMetadata,
1674 },
1675
1676 Tee {
1677 inner: SharedNode,
1678 metadata: HydroIrMetadata,
1679 },
1680
1681 Partition {
1682 inner: SharedNode,
1683 f: DebugExpr,
1684 is_true: bool,
1685 metadata: HydroIrMetadata,
1686 },
1687
1688 BeginAtomic {
1689 inner: Box<HydroNode>,
1690 metadata: HydroIrMetadata,
1691 },
1692
1693 EndAtomic {
1694 inner: Box<HydroNode>,
1695 metadata: HydroIrMetadata,
1696 },
1697
1698 Batch {
1699 inner: Box<HydroNode>,
1700 metadata: HydroIrMetadata,
1701 },
1702
1703 YieldConcat {
1704 inner: Box<HydroNode>,
1705 metadata: HydroIrMetadata,
1706 },
1707
1708 Chain {
1709 first: Box<HydroNode>,
1710 second: Box<HydroNode>,
1711 metadata: HydroIrMetadata,
1712 },
1713
1714 ChainFirst {
1715 first: Box<HydroNode>,
1716 second: Box<HydroNode>,
1717 metadata: HydroIrMetadata,
1718 },
1719
1720 CrossProduct {
1721 left: Box<HydroNode>,
1722 right: Box<HydroNode>,
1723 metadata: HydroIrMetadata,
1724 },
1725
1726 CrossSingleton {
1727 left: Box<HydroNode>,
1728 right: Box<HydroNode>,
1729 metadata: HydroIrMetadata,
1730 },
1731
1732 Join {
1733 left: Box<HydroNode>,
1734 right: Box<HydroNode>,
1735 metadata: HydroIrMetadata,
1736 },
1737
1738 Difference {
1739 pos: Box<HydroNode>,
1740 neg: Box<HydroNode>,
1741 metadata: HydroIrMetadata,
1742 },
1743
1744 AntiJoin {
1745 pos: Box<HydroNode>,
1746 neg: Box<HydroNode>,
1747 metadata: HydroIrMetadata,
1748 },
1749
1750 ResolveFutures {
1751 input: Box<HydroNode>,
1752 metadata: HydroIrMetadata,
1753 },
1754 ResolveFuturesOrdered {
1755 input: Box<HydroNode>,
1756 metadata: HydroIrMetadata,
1757 },
1758
1759 Map {
1760 f: DebugExpr,
1761 input: Box<HydroNode>,
1762 metadata: HydroIrMetadata,
1763 },
1764 FlatMap {
1765 f: DebugExpr,
1766 input: Box<HydroNode>,
1767 metadata: HydroIrMetadata,
1768 },
1769 Filter {
1770 f: DebugExpr,
1771 input: Box<HydroNode>,
1772 metadata: HydroIrMetadata,
1773 },
1774 FilterMap {
1775 f: DebugExpr,
1776 input: Box<HydroNode>,
1777 metadata: HydroIrMetadata,
1778 },
1779
1780 DeferTick {
1781 input: Box<HydroNode>,
1782 metadata: HydroIrMetadata,
1783 },
1784 Enumerate {
1785 input: Box<HydroNode>,
1786 metadata: HydroIrMetadata,
1787 },
1788 Inspect {
1789 f: DebugExpr,
1790 input: Box<HydroNode>,
1791 metadata: HydroIrMetadata,
1792 },
1793
1794 Unique {
1795 input: Box<HydroNode>,
1796 metadata: HydroIrMetadata,
1797 },
1798
1799 Sort {
1800 input: Box<HydroNode>,
1801 metadata: HydroIrMetadata,
1802 },
1803 Fold {
1804 init: DebugExpr,
1805 acc: DebugExpr,
1806 input: Box<HydroNode>,
1807 metadata: HydroIrMetadata,
1808 },
1809
1810 Scan {
1811 init: DebugExpr,
1812 acc: DebugExpr,
1813 input: Box<HydroNode>,
1814 metadata: HydroIrMetadata,
1815 },
1816 FoldKeyed {
1817 init: DebugExpr,
1818 acc: DebugExpr,
1819 input: Box<HydroNode>,
1820 metadata: HydroIrMetadata,
1821 },
1822
1823 Reduce {
1824 f: DebugExpr,
1825 input: Box<HydroNode>,
1826 metadata: HydroIrMetadata,
1827 },
1828 ReduceKeyed {
1829 f: DebugExpr,
1830 input: Box<HydroNode>,
1831 metadata: HydroIrMetadata,
1832 },
1833 ReduceKeyedWatermark {
1834 f: DebugExpr,
1835 input: Box<HydroNode>,
1836 watermark: Box<HydroNode>,
1837 metadata: HydroIrMetadata,
1838 },
1839
1840 Network {
1841 name: Option<String>,
1842 networking_info: crate::networking::NetworkingInfo,
1843 serialize_fn: Option<DebugExpr>,
1844 instantiate_fn: DebugInstantiate,
1845 deserialize_fn: Option<DebugExpr>,
1846 input: Box<HydroNode>,
1847 metadata: HydroIrMetadata,
1848 },
1849
1850 ExternalInput {
1851 from_external_key: LocationKey,
1852 from_port_id: ExternalPortId,
1853 from_many: bool,
1854 codec_type: DebugType,
1855 port_hint: NetworkHint,
1856 instantiate_fn: DebugInstantiate,
1857 deserialize_fn: Option<DebugExpr>,
1858 metadata: HydroIrMetadata,
1859 },
1860
1861 Counter {
1862 tag: String,
1863 duration: DebugExpr,
1864 prefix: String,
1865 input: Box<HydroNode>,
1866 metadata: HydroIrMetadata,
1867 },
1868}
1869
1870pub type SeenSharedNodes = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
1871pub type SeenSharedNodeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
1872
1873impl HydroNode {
1874 pub fn transform_bottom_up(
1875 &mut self,
1876 transform: &mut impl FnMut(&mut HydroNode),
1877 seen_tees: &mut SeenSharedNodes,
1878 check_well_formed: bool,
1879 ) {
1880 self.transform_children(
1881 |n, s| n.transform_bottom_up(transform, s, check_well_formed),
1882 seen_tees,
1883 );
1884
1885 transform(self);
1886
1887 let self_location = self.metadata().location_id.root();
1888
1889 if check_well_formed {
1890 match &*self {
1891 HydroNode::Network { .. } => {}
1892 _ => {
1893 self.input_metadata().iter().for_each(|i| {
1894 if i.location_id.root() != self_location {
1895 panic!(
1896 "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
1897 i,
1898 i.location_id.root(),
1899 self,
1900 self_location
1901 )
1902 }
1903 });
1904 }
1905 }
1906 }
1907 }
1908
1909 #[inline(always)]
1910 pub fn transform_children(
1911 &mut self,
1912 mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
1913 seen_tees: &mut SeenSharedNodes,
1914 ) {
1915 match self {
1916 HydroNode::Placeholder => {
1917 panic!();
1918 }
1919
1920 HydroNode::Source { .. }
1921 | HydroNode::SingletonSource { .. }
1922 | HydroNode::CycleSource { .. }
1923 | HydroNode::ExternalInput { .. } => {}
1924
1925 HydroNode::Tee { inner, .. } => {
1926 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1927 *inner = SharedNode(transformed.clone());
1928 } else {
1929 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
1930 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
1931 let mut orig = inner.0.replace(HydroNode::Placeholder);
1932 transform(&mut orig, seen_tees);
1933 *transformed_cell.borrow_mut() = orig;
1934 *inner = SharedNode(transformed_cell);
1935 }
1936 }
1937
1938 HydroNode::Partition { inner, .. } => {
1939 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1940 *inner = SharedNode(transformed.clone());
1941 } else {
1942 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
1943 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
1944 let mut orig = inner.0.replace(HydroNode::Placeholder);
1945 transform(&mut orig, seen_tees);
1946 *transformed_cell.borrow_mut() = orig;
1947 *inner = SharedNode(transformed_cell);
1948 }
1949 }
1950
1951 HydroNode::Cast { inner, .. }
1952 | HydroNode::ObserveNonDet { inner, .. }
1953 | HydroNode::BeginAtomic { inner, .. }
1954 | HydroNode::EndAtomic { inner, .. }
1955 | HydroNode::Batch { inner, .. }
1956 | HydroNode::YieldConcat { inner, .. } => {
1957 transform(inner.as_mut(), seen_tees);
1958 }
1959
1960 HydroNode::Chain { first, second, .. } => {
1961 transform(first.as_mut(), seen_tees);
1962 transform(second.as_mut(), seen_tees);
1963 }
1964
1965 HydroNode::ChainFirst { first, second, .. } => {
1966 transform(first.as_mut(), seen_tees);
1967 transform(second.as_mut(), seen_tees);
1968 }
1969
1970 HydroNode::CrossSingleton { left, right, .. }
1971 | HydroNode::CrossProduct { left, right, .. }
1972 | HydroNode::Join { left, right, .. } => {
1973 transform(left.as_mut(), seen_tees);
1974 transform(right.as_mut(), seen_tees);
1975 }
1976
1977 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
1978 transform(pos.as_mut(), seen_tees);
1979 transform(neg.as_mut(), seen_tees);
1980 }
1981
1982 HydroNode::ReduceKeyedWatermark {
1983 input, watermark, ..
1984 } => {
1985 transform(input.as_mut(), seen_tees);
1986 transform(watermark.as_mut(), seen_tees);
1987 }
1988
1989 HydroNode::Map { input, .. }
1990 | HydroNode::ResolveFutures { input, .. }
1991 | HydroNode::ResolveFuturesOrdered { input, .. }
1992 | HydroNode::FlatMap { input, .. }
1993 | HydroNode::Filter { input, .. }
1994 | HydroNode::FilterMap { input, .. }
1995 | HydroNode::Sort { input, .. }
1996 | HydroNode::DeferTick { input, .. }
1997 | HydroNode::Enumerate { input, .. }
1998 | HydroNode::Inspect { input, .. }
1999 | HydroNode::Unique { input, .. }
2000 | HydroNode::Network { input, .. }
2001 | HydroNode::Fold { input, .. }
2002 | HydroNode::Scan { input, .. }
2003 | HydroNode::FoldKeyed { input, .. }
2004 | HydroNode::Reduce { input, .. }
2005 | HydroNode::ReduceKeyed { input, .. }
2006 | HydroNode::Counter { input, .. } => {
2007 transform(input.as_mut(), seen_tees);
2008 }
2009 }
2010 }
2011
2012 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroNode {
2013 match self {
2014 HydroNode::Placeholder => HydroNode::Placeholder,
2015 HydroNode::Cast { inner, metadata } => HydroNode::Cast {
2016 inner: Box::new(inner.deep_clone(seen_tees)),
2017 metadata: metadata.clone(),
2018 },
2019 HydroNode::ObserveNonDet {
2020 inner,
2021 trusted,
2022 metadata,
2023 } => HydroNode::ObserveNonDet {
2024 inner: Box::new(inner.deep_clone(seen_tees)),
2025 trusted: *trusted,
2026 metadata: metadata.clone(),
2027 },
2028 HydroNode::Source { source, metadata } => HydroNode::Source {
2029 source: source.clone(),
2030 metadata: metadata.clone(),
2031 },
2032 HydroNode::SingletonSource {
2033 value,
2034 first_tick_only,
2035 metadata,
2036 } => HydroNode::SingletonSource {
2037 value: value.clone(),
2038 first_tick_only: *first_tick_only,
2039 metadata: metadata.clone(),
2040 },
2041 HydroNode::CycleSource { cycle_id, metadata } => HydroNode::CycleSource {
2042 cycle_id: *cycle_id,
2043 metadata: metadata.clone(),
2044 },
2045 HydroNode::Tee { inner, metadata } => {
2046 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2047 HydroNode::Tee {
2048 inner: SharedNode(transformed.clone()),
2049 metadata: metadata.clone(),
2050 }
2051 } else {
2052 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2053 seen_tees.insert(inner.as_ptr(), new_rc.clone());
2054 let cloned = inner.0.borrow().deep_clone(seen_tees);
2055 *new_rc.borrow_mut() = cloned;
2056 HydroNode::Tee {
2057 inner: SharedNode(new_rc),
2058 metadata: metadata.clone(),
2059 }
2060 }
2061 }
2062 HydroNode::Partition {
2063 inner,
2064 f,
2065 is_true,
2066 metadata,
2067 } => {
2068 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2069 HydroNode::Partition {
2070 inner: SharedNode(transformed.clone()),
2071 f: f.clone(),
2072 is_true: *is_true,
2073 metadata: metadata.clone(),
2074 }
2075 } else {
2076 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2077 seen_tees.insert(inner.as_ptr(), new_rc.clone());
2078 let cloned = inner.0.borrow().deep_clone(seen_tees);
2079 *new_rc.borrow_mut() = cloned;
2080 HydroNode::Partition {
2081 inner: SharedNode(new_rc),
2082 f: f.clone(),
2083 is_true: *is_true,
2084 metadata: metadata.clone(),
2085 }
2086 }
2087 }
2088 HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
2089 inner: Box::new(inner.deep_clone(seen_tees)),
2090 metadata: metadata.clone(),
2091 },
2092 HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
2093 inner: Box::new(inner.deep_clone(seen_tees)),
2094 metadata: metadata.clone(),
2095 },
2096 HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
2097 inner: Box::new(inner.deep_clone(seen_tees)),
2098 metadata: metadata.clone(),
2099 },
2100 HydroNode::Batch { inner, metadata } => HydroNode::Batch {
2101 inner: Box::new(inner.deep_clone(seen_tees)),
2102 metadata: metadata.clone(),
2103 },
2104 HydroNode::Chain {
2105 first,
2106 second,
2107 metadata,
2108 } => HydroNode::Chain {
2109 first: Box::new(first.deep_clone(seen_tees)),
2110 second: Box::new(second.deep_clone(seen_tees)),
2111 metadata: metadata.clone(),
2112 },
2113 HydroNode::ChainFirst {
2114 first,
2115 second,
2116 metadata,
2117 } => HydroNode::ChainFirst {
2118 first: Box::new(first.deep_clone(seen_tees)),
2119 second: Box::new(second.deep_clone(seen_tees)),
2120 metadata: metadata.clone(),
2121 },
2122 HydroNode::CrossProduct {
2123 left,
2124 right,
2125 metadata,
2126 } => HydroNode::CrossProduct {
2127 left: Box::new(left.deep_clone(seen_tees)),
2128 right: Box::new(right.deep_clone(seen_tees)),
2129 metadata: metadata.clone(),
2130 },
2131 HydroNode::CrossSingleton {
2132 left,
2133 right,
2134 metadata,
2135 } => HydroNode::CrossSingleton {
2136 left: Box::new(left.deep_clone(seen_tees)),
2137 right: Box::new(right.deep_clone(seen_tees)),
2138 metadata: metadata.clone(),
2139 },
2140 HydroNode::Join {
2141 left,
2142 right,
2143 metadata,
2144 } => HydroNode::Join {
2145 left: Box::new(left.deep_clone(seen_tees)),
2146 right: Box::new(right.deep_clone(seen_tees)),
2147 metadata: metadata.clone(),
2148 },
2149 HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
2150 pos: Box::new(pos.deep_clone(seen_tees)),
2151 neg: Box::new(neg.deep_clone(seen_tees)),
2152 metadata: metadata.clone(),
2153 },
2154 HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
2155 pos: Box::new(pos.deep_clone(seen_tees)),
2156 neg: Box::new(neg.deep_clone(seen_tees)),
2157 metadata: metadata.clone(),
2158 },
2159 HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
2160 input: Box::new(input.deep_clone(seen_tees)),
2161 metadata: metadata.clone(),
2162 },
2163 HydroNode::ResolveFuturesOrdered { input, metadata } => {
2164 HydroNode::ResolveFuturesOrdered {
2165 input: Box::new(input.deep_clone(seen_tees)),
2166 metadata: metadata.clone(),
2167 }
2168 }
2169 HydroNode::Map { f, input, metadata } => HydroNode::Map {
2170 f: f.clone(),
2171 input: Box::new(input.deep_clone(seen_tees)),
2172 metadata: metadata.clone(),
2173 },
2174 HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
2175 f: f.clone(),
2176 input: Box::new(input.deep_clone(seen_tees)),
2177 metadata: metadata.clone(),
2178 },
2179 HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
2180 f: f.clone(),
2181 input: Box::new(input.deep_clone(seen_tees)),
2182 metadata: metadata.clone(),
2183 },
2184 HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
2185 f: f.clone(),
2186 input: Box::new(input.deep_clone(seen_tees)),
2187 metadata: metadata.clone(),
2188 },
2189 HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
2190 input: Box::new(input.deep_clone(seen_tees)),
2191 metadata: metadata.clone(),
2192 },
2193 HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
2194 input: Box::new(input.deep_clone(seen_tees)),
2195 metadata: metadata.clone(),
2196 },
2197 HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
2198 f: f.clone(),
2199 input: Box::new(input.deep_clone(seen_tees)),
2200 metadata: metadata.clone(),
2201 },
2202 HydroNode::Unique { input, metadata } => HydroNode::Unique {
2203 input: Box::new(input.deep_clone(seen_tees)),
2204 metadata: metadata.clone(),
2205 },
2206 HydroNode::Sort { input, metadata } => HydroNode::Sort {
2207 input: Box::new(input.deep_clone(seen_tees)),
2208 metadata: metadata.clone(),
2209 },
2210 HydroNode::Fold {
2211 init,
2212 acc,
2213 input,
2214 metadata,
2215 } => HydroNode::Fold {
2216 init: init.clone(),
2217 acc: acc.clone(),
2218 input: Box::new(input.deep_clone(seen_tees)),
2219 metadata: metadata.clone(),
2220 },
2221 HydroNode::Scan {
2222 init,
2223 acc,
2224 input,
2225 metadata,
2226 } => HydroNode::Scan {
2227 init: init.clone(),
2228 acc: acc.clone(),
2229 input: Box::new(input.deep_clone(seen_tees)),
2230 metadata: metadata.clone(),
2231 },
2232 HydroNode::FoldKeyed {
2233 init,
2234 acc,
2235 input,
2236 metadata,
2237 } => HydroNode::FoldKeyed {
2238 init: init.clone(),
2239 acc: acc.clone(),
2240 input: Box::new(input.deep_clone(seen_tees)),
2241 metadata: metadata.clone(),
2242 },
2243 HydroNode::ReduceKeyedWatermark {
2244 f,
2245 input,
2246 watermark,
2247 metadata,
2248 } => HydroNode::ReduceKeyedWatermark {
2249 f: f.clone(),
2250 input: Box::new(input.deep_clone(seen_tees)),
2251 watermark: Box::new(watermark.deep_clone(seen_tees)),
2252 metadata: metadata.clone(),
2253 },
2254 HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
2255 f: f.clone(),
2256 input: Box::new(input.deep_clone(seen_tees)),
2257 metadata: metadata.clone(),
2258 },
2259 HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
2260 f: f.clone(),
2261 input: Box::new(input.deep_clone(seen_tees)),
2262 metadata: metadata.clone(),
2263 },
2264 HydroNode::Network {
2265 name,
2266 networking_info,
2267 serialize_fn,
2268 instantiate_fn,
2269 deserialize_fn,
2270 input,
2271 metadata,
2272 } => HydroNode::Network {
2273 name: name.clone(),
2274 networking_info: networking_info.clone(),
2275 serialize_fn: serialize_fn.clone(),
2276 instantiate_fn: instantiate_fn.clone(),
2277 deserialize_fn: deserialize_fn.clone(),
2278 input: Box::new(input.deep_clone(seen_tees)),
2279 metadata: metadata.clone(),
2280 },
2281 HydroNode::ExternalInput {
2282 from_external_key,
2283 from_port_id,
2284 from_many,
2285 codec_type,
2286 port_hint,
2287 instantiate_fn,
2288 deserialize_fn,
2289 metadata,
2290 } => HydroNode::ExternalInput {
2291 from_external_key: *from_external_key,
2292 from_port_id: *from_port_id,
2293 from_many: *from_many,
2294 codec_type: codec_type.clone(),
2295 port_hint: *port_hint,
2296 instantiate_fn: instantiate_fn.clone(),
2297 deserialize_fn: deserialize_fn.clone(),
2298 metadata: metadata.clone(),
2299 },
2300 HydroNode::Counter {
2301 tag,
2302 duration,
2303 prefix,
2304 input,
2305 metadata,
2306 } => HydroNode::Counter {
2307 tag: tag.clone(),
2308 duration: duration.clone(),
2309 prefix: prefix.clone(),
2310 input: Box::new(input.deep_clone(seen_tees)),
2311 metadata: metadata.clone(),
2312 },
2313 }
2314 }
2315
2316 #[cfg(feature = "build")]
2317 pub fn emit_core(
2318 &mut self,
2319 builders_or_callback: &mut BuildersOrCallback<
2320 impl FnMut(&mut HydroRoot, &mut usize),
2321 impl FnMut(&mut HydroNode, &mut usize),
2322 >,
2323 seen_tees: &mut SeenSharedNodes,
2324 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
2325 next_stmt_id: &mut usize,
2326 ) -> syn::Ident {
2327 let mut ident_stack: Vec<syn::Ident> = Vec::new();
2328
2329 self.transform_bottom_up(
2330 &mut |node: &mut HydroNode| {
2331 let out_location = node.metadata().location_id.clone();
2332 match node {
2333 HydroNode::Placeholder => {
2334 panic!()
2335 }
2336
2337 HydroNode::Cast { .. } => {
2338 match builders_or_callback {
2341 BuildersOrCallback::Builders(_) => {}
2342 BuildersOrCallback::Callback(_, node_callback) => {
2343 node_callback(node, next_stmt_id);
2344 }
2345 }
2346
2347 *next_stmt_id += 1;
2348 }
2350
2351 HydroNode::ObserveNonDet {
2352 inner,
2353 trusted,
2354 metadata,
2355 ..
2356 } => {
2357 let inner_ident = ident_stack.pop().unwrap();
2358
2359 let observe_ident =
2360 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2361
2362 match builders_or_callback {
2363 BuildersOrCallback::Builders(graph_builders) => {
2364 graph_builders.observe_nondet(
2365 *trusted,
2366 &inner.metadata().location_id,
2367 inner_ident,
2368 &inner.metadata().collection_kind,
2369 &observe_ident,
2370 &metadata.collection_kind,
2371 &metadata.op,
2372 );
2373 }
2374 BuildersOrCallback::Callback(_, node_callback) => {
2375 node_callback(node, next_stmt_id);
2376 }
2377 }
2378
2379 *next_stmt_id += 1;
2380
2381 ident_stack.push(observe_ident);
2382 }
2383
2384 HydroNode::Batch {
2385 inner, metadata, ..
2386 } => {
2387 let inner_ident = ident_stack.pop().unwrap();
2388
2389 let batch_ident =
2390 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2391
2392 match builders_or_callback {
2393 BuildersOrCallback::Builders(graph_builders) => {
2394 graph_builders.batch(
2395 inner_ident,
2396 &inner.metadata().location_id,
2397 &inner.metadata().collection_kind,
2398 &batch_ident,
2399 &out_location,
2400 &metadata.op,
2401 );
2402 }
2403 BuildersOrCallback::Callback(_, node_callback) => {
2404 node_callback(node, next_stmt_id);
2405 }
2406 }
2407
2408 *next_stmt_id += 1;
2409
2410 ident_stack.push(batch_ident);
2411 }
2412
2413 HydroNode::YieldConcat { inner, .. } => {
2414 let inner_ident = ident_stack.pop().unwrap();
2415
2416 let yield_ident =
2417 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2418
2419 match builders_or_callback {
2420 BuildersOrCallback::Builders(graph_builders) => {
2421 graph_builders.yield_from_tick(
2422 inner_ident,
2423 &inner.metadata().location_id,
2424 &inner.metadata().collection_kind,
2425 &yield_ident,
2426 &out_location,
2427 );
2428 }
2429 BuildersOrCallback::Callback(_, node_callback) => {
2430 node_callback(node, next_stmt_id);
2431 }
2432 }
2433
2434 *next_stmt_id += 1;
2435
2436 ident_stack.push(yield_ident);
2437 }
2438
2439 HydroNode::BeginAtomic { inner, metadata } => {
2440 let inner_ident = ident_stack.pop().unwrap();
2441
2442 let begin_ident =
2443 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2444
2445 match builders_or_callback {
2446 BuildersOrCallback::Builders(graph_builders) => {
2447 graph_builders.begin_atomic(
2448 inner_ident,
2449 &inner.metadata().location_id,
2450 &inner.metadata().collection_kind,
2451 &begin_ident,
2452 &out_location,
2453 &metadata.op,
2454 );
2455 }
2456 BuildersOrCallback::Callback(_, node_callback) => {
2457 node_callback(node, next_stmt_id);
2458 }
2459 }
2460
2461 *next_stmt_id += 1;
2462
2463 ident_stack.push(begin_ident);
2464 }
2465
2466 HydroNode::EndAtomic { inner, .. } => {
2467 let inner_ident = ident_stack.pop().unwrap();
2468
2469 let end_ident =
2470 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2471
2472 match builders_or_callback {
2473 BuildersOrCallback::Builders(graph_builders) => {
2474 graph_builders.end_atomic(
2475 inner_ident,
2476 &inner.metadata().location_id,
2477 &inner.metadata().collection_kind,
2478 &end_ident,
2479 );
2480 }
2481 BuildersOrCallback::Callback(_, node_callback) => {
2482 node_callback(node, next_stmt_id);
2483 }
2484 }
2485
2486 *next_stmt_id += 1;
2487
2488 ident_stack.push(end_ident);
2489 }
2490
2491 HydroNode::Source {
2492 source, metadata, ..
2493 } => {
2494 if let HydroSource::ExternalNetwork() = source {
2495 ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
2496 } else {
2497 let source_ident =
2498 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2499
2500 let source_stmt = match source {
2501 HydroSource::Stream(expr) => {
2502 debug_assert!(metadata.location_id.is_top_level());
2503 parse_quote! {
2504 #source_ident = source_stream(#expr);
2505 }
2506 }
2507
2508 HydroSource::ExternalNetwork() => {
2509 unreachable!()
2510 }
2511
2512 HydroSource::Iter(expr) => {
2513 if metadata.location_id.is_top_level() {
2514 parse_quote! {
2515 #source_ident = source_iter(#expr);
2516 }
2517 } else {
2518 parse_quote! {
2520 #source_ident = source_iter(#expr) -> persist::<'static>();
2521 }
2522 }
2523 }
2524
2525 HydroSource::Spin() => {
2526 debug_assert!(metadata.location_id.is_top_level());
2527 parse_quote! {
2528 #source_ident = spin();
2529 }
2530 }
2531
2532 HydroSource::ClusterMembers(target_loc, state) => {
2533 debug_assert!(metadata.location_id.is_top_level());
2534
2535 let members_tee_ident = syn::Ident::new(
2536 &format!(
2537 "__cluster_members_tee_{}_{}",
2538 metadata.location_id.root().key(),
2539 target_loc.key(),
2540 ),
2541 Span::call_site(),
2542 );
2543
2544 match state {
2545 ClusterMembersState::Stream(d) => {
2546 parse_quote! {
2547 #members_tee_ident = source_stream(#d) -> tee();
2548 #source_ident = #members_tee_ident;
2549 }
2550 },
2551 ClusterMembersState::Uninit => syn::parse_quote! {
2552 #source_ident = source_stream(DUMMY);
2553 },
2554 ClusterMembersState::Tee(..) => parse_quote! {
2555 #source_ident = #members_tee_ident;
2556 },
2557 }
2558 }
2559
2560 HydroSource::Embedded(ident) => {
2561 parse_quote! {
2562 #source_ident = source_stream(#ident);
2563 }
2564 }
2565 };
2566
2567 match builders_or_callback {
2568 BuildersOrCallback::Builders(graph_builders) => {
2569 let builder = graph_builders.get_dfir_mut(&out_location);
2570 builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
2571 }
2572 BuildersOrCallback::Callback(_, node_callback) => {
2573 node_callback(node, next_stmt_id);
2574 }
2575 }
2576
2577 *next_stmt_id += 1;
2578
2579 ident_stack.push(source_ident);
2580 }
2581 }
2582
2583 HydroNode::SingletonSource { value, first_tick_only, metadata } => {
2584 let source_ident =
2585 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2586
2587 match builders_or_callback {
2588 BuildersOrCallback::Builders(graph_builders) => {
2589 let builder = graph_builders.get_dfir_mut(&out_location);
2590
2591 if *first_tick_only {
2592 assert!(
2593 !metadata.location_id.is_top_level(),
2594 "first_tick_only SingletonSource must be inside a tick"
2595 );
2596 }
2597
2598 if *first_tick_only
2599 || (metadata.location_id.is_top_level()
2600 && metadata.collection_kind.is_bounded())
2601 {
2602 builder.add_dfir(
2603 parse_quote! {
2604 #source_ident = source_iter([#value]);
2605 },
2606 None,
2607 Some(&next_stmt_id.to_string()),
2608 );
2609 } else {
2610 builder.add_dfir(
2611 parse_quote! {
2612 #source_ident = source_iter([#value]) -> persist::<'static>();
2613 },
2614 None,
2615 Some(&next_stmt_id.to_string()),
2616 );
2617 }
2618 }
2619 BuildersOrCallback::Callback(_, node_callback) => {
2620 node_callback(node, next_stmt_id);
2621 }
2622 }
2623
2624 *next_stmt_id += 1;
2625
2626 ident_stack.push(source_ident);
2627 }
2628
2629 HydroNode::CycleSource { cycle_id, .. } => {
2630 let ident = cycle_id.as_ident();
2631
2632 match builders_or_callback {
2633 BuildersOrCallback::Builders(_) => {}
2634 BuildersOrCallback::Callback(_, node_callback) => {
2635 node_callback(node, next_stmt_id);
2636 }
2637 }
2638
2639 *next_stmt_id += 1;
2641
2642 ident_stack.push(ident);
2643 }
2644
2645 HydroNode::Tee { inner, .. } => {
2646 let ret_ident = if let Some(built_idents) =
2647 built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
2648 {
2649 match builders_or_callback {
2650 BuildersOrCallback::Builders(_) => {}
2651 BuildersOrCallback::Callback(_, node_callback) => {
2652 node_callback(node, next_stmt_id);
2653 }
2654 }
2655
2656 built_idents[0].clone()
2657 } else {
2658 let inner_ident = ident_stack.pop().unwrap();
2661
2662 let tee_ident =
2663 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2664
2665 built_tees.insert(
2666 inner.0.as_ref() as *const RefCell<HydroNode>,
2667 vec![tee_ident.clone()],
2668 );
2669
2670 match builders_or_callback {
2671 BuildersOrCallback::Builders(graph_builders) => {
2672 let builder = graph_builders.get_dfir_mut(&out_location);
2673 builder.add_dfir(
2674 parse_quote! {
2675 #tee_ident = #inner_ident -> tee();
2676 },
2677 None,
2678 Some(&next_stmt_id.to_string()),
2679 );
2680 }
2681 BuildersOrCallback::Callback(_, node_callback) => {
2682 node_callback(node, next_stmt_id);
2683 }
2684 }
2685
2686 tee_ident
2687 };
2688
2689 *next_stmt_id += 1;
2693 ident_stack.push(ret_ident);
2694 }
2695
2696 HydroNode::Partition {
2697 inner, f, is_true, ..
2698 } => {
2699 let is_true = *is_true; let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
2701 let ret_ident = if let Some(built_idents) = built_tees.get(&ptr) {
2702 match builders_or_callback {
2703 BuildersOrCallback::Builders(_) => {}
2704 BuildersOrCallback::Callback(_, node_callback) => {
2705 node_callback(node, next_stmt_id);
2706 }
2707 }
2708
2709 let idx = if is_true { 0 } else { 1 };
2710 built_idents[idx].clone()
2711 } else {
2712 let inner_ident = ident_stack.pop().unwrap();
2715
2716 let partition_ident = syn::Ident::new(
2717 &format!("stream_{}_partition", *next_stmt_id),
2718 Span::call_site(),
2719 );
2720 let true_ident = syn::Ident::new(
2721 &format!("stream_{}_true", *next_stmt_id),
2722 Span::call_site(),
2723 );
2724 let false_ident = syn::Ident::new(
2725 &format!("stream_{}_false", *next_stmt_id),
2726 Span::call_site(),
2727 );
2728
2729 built_tees.insert(
2730 ptr,
2731 vec![true_ident.clone(), false_ident.clone()],
2732 );
2733
2734 match builders_or_callback {
2735 BuildersOrCallback::Builders(graph_builders) => {
2736 let builder = graph_builders.get_dfir_mut(&out_location);
2737 builder.add_dfir(
2738 parse_quote! {
2739 #partition_ident = #inner_ident -> partition(|__item, __num_outputs| if (#f)(__item) { 0_usize } else { 1_usize });
2740 #true_ident = #partition_ident[0];
2741 #false_ident = #partition_ident[1];
2742 },
2743 None,
2744 Some(&next_stmt_id.to_string()),
2745 );
2746 }
2747 BuildersOrCallback::Callback(_, node_callback) => {
2748 node_callback(node, next_stmt_id);
2749 }
2750 }
2751
2752 if is_true { true_ident } else { false_ident }
2753 };
2754
2755 *next_stmt_id += 1;
2756 ident_stack.push(ret_ident);
2757 }
2758
2759 HydroNode::Chain { .. } => {
2760 let second_ident = ident_stack.pop().unwrap();
2762 let first_ident = ident_stack.pop().unwrap();
2763
2764 let chain_ident =
2765 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2766
2767 match builders_or_callback {
2768 BuildersOrCallback::Builders(graph_builders) => {
2769 let builder = graph_builders.get_dfir_mut(&out_location);
2770 builder.add_dfir(
2771 parse_quote! {
2772 #chain_ident = chain();
2773 #first_ident -> [0]#chain_ident;
2774 #second_ident -> [1]#chain_ident;
2775 },
2776 None,
2777 Some(&next_stmt_id.to_string()),
2778 );
2779 }
2780 BuildersOrCallback::Callback(_, node_callback) => {
2781 node_callback(node, next_stmt_id);
2782 }
2783 }
2784
2785 *next_stmt_id += 1;
2786
2787 ident_stack.push(chain_ident);
2788 }
2789
2790 HydroNode::ChainFirst { .. } => {
2791 let second_ident = ident_stack.pop().unwrap();
2792 let first_ident = ident_stack.pop().unwrap();
2793
2794 let chain_ident =
2795 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2796
2797 match builders_or_callback {
2798 BuildersOrCallback::Builders(graph_builders) => {
2799 let builder = graph_builders.get_dfir_mut(&out_location);
2800 builder.add_dfir(
2801 parse_quote! {
2802 #chain_ident = chain_first_n(1);
2803 #first_ident -> [0]#chain_ident;
2804 #second_ident -> [1]#chain_ident;
2805 },
2806 None,
2807 Some(&next_stmt_id.to_string()),
2808 );
2809 }
2810 BuildersOrCallback::Callback(_, node_callback) => {
2811 node_callback(node, next_stmt_id);
2812 }
2813 }
2814
2815 *next_stmt_id += 1;
2816
2817 ident_stack.push(chain_ident);
2818 }
2819
2820 HydroNode::CrossSingleton { right, .. } => {
2821 let right_ident = ident_stack.pop().unwrap();
2822 let left_ident = ident_stack.pop().unwrap();
2823
2824 let cross_ident =
2825 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2826
2827 match builders_or_callback {
2828 BuildersOrCallback::Builders(graph_builders) => {
2829 let builder = graph_builders.get_dfir_mut(&out_location);
2830
2831 if right.metadata().location_id.is_top_level()
2832 && right.metadata().collection_kind.is_bounded()
2833 {
2834 builder.add_dfir(
2835 parse_quote! {
2836 #cross_ident = cross_singleton();
2837 #left_ident -> [input]#cross_ident;
2838 #right_ident -> persist::<'static>() -> [single]#cross_ident;
2839 },
2840 None,
2841 Some(&next_stmt_id.to_string()),
2842 );
2843 } else {
2844 builder.add_dfir(
2845 parse_quote! {
2846 #cross_ident = cross_singleton();
2847 #left_ident -> [input]#cross_ident;
2848 #right_ident -> [single]#cross_ident;
2849 },
2850 None,
2851 Some(&next_stmt_id.to_string()),
2852 );
2853 }
2854 }
2855 BuildersOrCallback::Callback(_, node_callback) => {
2856 node_callback(node, next_stmt_id);
2857 }
2858 }
2859
2860 *next_stmt_id += 1;
2861
2862 ident_stack.push(cross_ident);
2863 }
2864
2865 HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
2866 let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
2867 parse_quote!(cross_join_multiset)
2868 } else {
2869 parse_quote!(join_multiset)
2870 };
2871
2872 let (HydroNode::CrossProduct { left, right, .. }
2873 | HydroNode::Join { left, right, .. }) = node
2874 else {
2875 unreachable!()
2876 };
2877
2878 let is_top_level = left.metadata().location_id.is_top_level()
2879 && right.metadata().location_id.is_top_level();
2880 let left_lifetime = if left.metadata().location_id.is_top_level() {
2881 quote!('static)
2882 } else {
2883 quote!('tick)
2884 };
2885
2886 let right_lifetime = if right.metadata().location_id.is_top_level() {
2887 quote!('static)
2888 } else {
2889 quote!('tick)
2890 };
2891
2892 let right_ident = ident_stack.pop().unwrap();
2893 let left_ident = ident_stack.pop().unwrap();
2894
2895 let stream_ident =
2896 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2897
2898 match builders_or_callback {
2899 BuildersOrCallback::Builders(graph_builders) => {
2900 let builder = graph_builders.get_dfir_mut(&out_location);
2901 builder.add_dfir(
2902 if is_top_level {
2903 parse_quote! {
2906 #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
2907 #left_ident -> [0]#stream_ident;
2908 #right_ident -> [1]#stream_ident;
2909 }
2910 } else {
2911 parse_quote! {
2912 #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
2913 #left_ident -> [0]#stream_ident;
2914 #right_ident -> [1]#stream_ident;
2915 }
2916 }
2917 ,
2918 None,
2919 Some(&next_stmt_id.to_string()),
2920 );
2921 }
2922 BuildersOrCallback::Callback(_, node_callback) => {
2923 node_callback(node, next_stmt_id);
2924 }
2925 }
2926
2927 *next_stmt_id += 1;
2928
2929 ident_stack.push(stream_ident);
2930 }
2931
2932 HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
2933 let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
2934 parse_quote!(difference)
2935 } else {
2936 parse_quote!(anti_join)
2937 };
2938
2939 let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
2940 node
2941 else {
2942 unreachable!()
2943 };
2944
2945 let neg_lifetime = if neg.metadata().location_id.is_top_level() {
2946 quote!('static)
2947 } else {
2948 quote!('tick)
2949 };
2950
2951 let neg_ident = ident_stack.pop().unwrap();
2952 let pos_ident = ident_stack.pop().unwrap();
2953
2954 let stream_ident =
2955 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2956
2957 match builders_or_callback {
2958 BuildersOrCallback::Builders(graph_builders) => {
2959 let builder = graph_builders.get_dfir_mut(&out_location);
2960 builder.add_dfir(
2961 parse_quote! {
2962 #stream_ident = #operator::<'tick, #neg_lifetime>();
2963 #pos_ident -> [pos]#stream_ident;
2964 #neg_ident -> [neg]#stream_ident;
2965 },
2966 None,
2967 Some(&next_stmt_id.to_string()),
2968 );
2969 }
2970 BuildersOrCallback::Callback(_, node_callback) => {
2971 node_callback(node, next_stmt_id);
2972 }
2973 }
2974
2975 *next_stmt_id += 1;
2976
2977 ident_stack.push(stream_ident);
2978 }
2979
2980 HydroNode::ResolveFutures { .. } => {
2981 let input_ident = ident_stack.pop().unwrap();
2982
2983 let futures_ident =
2984 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2985
2986 match builders_or_callback {
2987 BuildersOrCallback::Builders(graph_builders) => {
2988 let builder = graph_builders.get_dfir_mut(&out_location);
2989 builder.add_dfir(
2990 parse_quote! {
2991 #futures_ident = #input_ident -> resolve_futures();
2992 },
2993 None,
2994 Some(&next_stmt_id.to_string()),
2995 );
2996 }
2997 BuildersOrCallback::Callback(_, node_callback) => {
2998 node_callback(node, next_stmt_id);
2999 }
3000 }
3001
3002 *next_stmt_id += 1;
3003
3004 ident_stack.push(futures_ident);
3005 }
3006
3007 HydroNode::ResolveFuturesOrdered { .. } => {
3008 let input_ident = ident_stack.pop().unwrap();
3009
3010 let futures_ident =
3011 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3012
3013 match builders_or_callback {
3014 BuildersOrCallback::Builders(graph_builders) => {
3015 let builder = graph_builders.get_dfir_mut(&out_location);
3016 builder.add_dfir(
3017 parse_quote! {
3018 #futures_ident = #input_ident -> resolve_futures_ordered();
3019 },
3020 None,
3021 Some(&next_stmt_id.to_string()),
3022 );
3023 }
3024 BuildersOrCallback::Callback(_, node_callback) => {
3025 node_callback(node, next_stmt_id);
3026 }
3027 }
3028
3029 *next_stmt_id += 1;
3030
3031 ident_stack.push(futures_ident);
3032 }
3033
3034 HydroNode::Map { f, .. } => {
3035 let input_ident = ident_stack.pop().unwrap();
3036
3037 let map_ident =
3038 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3039
3040 match builders_or_callback {
3041 BuildersOrCallback::Builders(graph_builders) => {
3042 let builder = graph_builders.get_dfir_mut(&out_location);
3043 builder.add_dfir(
3044 parse_quote! {
3045 #map_ident = #input_ident -> map(#f);
3046 },
3047 None,
3048 Some(&next_stmt_id.to_string()),
3049 );
3050 }
3051 BuildersOrCallback::Callback(_, node_callback) => {
3052 node_callback(node, next_stmt_id);
3053 }
3054 }
3055
3056 *next_stmt_id += 1;
3057
3058 ident_stack.push(map_ident);
3059 }
3060
3061 HydroNode::FlatMap { f, .. } => {
3062 let input_ident = ident_stack.pop().unwrap();
3063
3064 let flat_map_ident =
3065 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3066
3067 match builders_or_callback {
3068 BuildersOrCallback::Builders(graph_builders) => {
3069 let builder = graph_builders.get_dfir_mut(&out_location);
3070 builder.add_dfir(
3071 parse_quote! {
3072 #flat_map_ident = #input_ident -> flat_map(#f);
3073 },
3074 None,
3075 Some(&next_stmt_id.to_string()),
3076 );
3077 }
3078 BuildersOrCallback::Callback(_, node_callback) => {
3079 node_callback(node, next_stmt_id);
3080 }
3081 }
3082
3083 *next_stmt_id += 1;
3084
3085 ident_stack.push(flat_map_ident);
3086 }
3087
3088 HydroNode::Filter { f, .. } => {
3089 let input_ident = ident_stack.pop().unwrap();
3090
3091 let filter_ident =
3092 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3093
3094 match builders_or_callback {
3095 BuildersOrCallback::Builders(graph_builders) => {
3096 let builder = graph_builders.get_dfir_mut(&out_location);
3097 builder.add_dfir(
3098 parse_quote! {
3099 #filter_ident = #input_ident -> filter(#f);
3100 },
3101 None,
3102 Some(&next_stmt_id.to_string()),
3103 );
3104 }
3105 BuildersOrCallback::Callback(_, node_callback) => {
3106 node_callback(node, next_stmt_id);
3107 }
3108 }
3109
3110 *next_stmt_id += 1;
3111
3112 ident_stack.push(filter_ident);
3113 }
3114
3115 HydroNode::FilterMap { f, .. } => {
3116 let input_ident = ident_stack.pop().unwrap();
3117
3118 let filter_map_ident =
3119 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3120
3121 match builders_or_callback {
3122 BuildersOrCallback::Builders(graph_builders) => {
3123 let builder = graph_builders.get_dfir_mut(&out_location);
3124 builder.add_dfir(
3125 parse_quote! {
3126 #filter_map_ident = #input_ident -> filter_map(#f);
3127 },
3128 None,
3129 Some(&next_stmt_id.to_string()),
3130 );
3131 }
3132 BuildersOrCallback::Callback(_, node_callback) => {
3133 node_callback(node, next_stmt_id);
3134 }
3135 }
3136
3137 *next_stmt_id += 1;
3138
3139 ident_stack.push(filter_map_ident);
3140 }
3141
3142 HydroNode::Sort { .. } => {
3143 let input_ident = ident_stack.pop().unwrap();
3144
3145 let sort_ident =
3146 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3147
3148 match builders_or_callback {
3149 BuildersOrCallback::Builders(graph_builders) => {
3150 let builder = graph_builders.get_dfir_mut(&out_location);
3151 builder.add_dfir(
3152 parse_quote! {
3153 #sort_ident = #input_ident -> sort();
3154 },
3155 None,
3156 Some(&next_stmt_id.to_string()),
3157 );
3158 }
3159 BuildersOrCallback::Callback(_, node_callback) => {
3160 node_callback(node, next_stmt_id);
3161 }
3162 }
3163
3164 *next_stmt_id += 1;
3165
3166 ident_stack.push(sort_ident);
3167 }
3168
3169 HydroNode::DeferTick { .. } => {
3170 let input_ident = ident_stack.pop().unwrap();
3171
3172 let defer_tick_ident =
3173 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3174
3175 match builders_or_callback {
3176 BuildersOrCallback::Builders(graph_builders) => {
3177 let builder = graph_builders.get_dfir_mut(&out_location);
3178 builder.add_dfir(
3179 parse_quote! {
3180 #defer_tick_ident = #input_ident -> defer_tick_lazy();
3181 },
3182 None,
3183 Some(&next_stmt_id.to_string()),
3184 );
3185 }
3186 BuildersOrCallback::Callback(_, node_callback) => {
3187 node_callback(node, next_stmt_id);
3188 }
3189 }
3190
3191 *next_stmt_id += 1;
3192
3193 ident_stack.push(defer_tick_ident);
3194 }
3195
3196 HydroNode::Enumerate { input, .. } => {
3197 let input_ident = ident_stack.pop().unwrap();
3198
3199 let enumerate_ident =
3200 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3201
3202 match builders_or_callback {
3203 BuildersOrCallback::Builders(graph_builders) => {
3204 let builder = graph_builders.get_dfir_mut(&out_location);
3205 let lifetime = if input.metadata().location_id.is_top_level() {
3206 quote!('static)
3207 } else {
3208 quote!('tick)
3209 };
3210 builder.add_dfir(
3211 parse_quote! {
3212 #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
3213 },
3214 None,
3215 Some(&next_stmt_id.to_string()),
3216 );
3217 }
3218 BuildersOrCallback::Callback(_, node_callback) => {
3219 node_callback(node, next_stmt_id);
3220 }
3221 }
3222
3223 *next_stmt_id += 1;
3224
3225 ident_stack.push(enumerate_ident);
3226 }
3227
3228 HydroNode::Inspect { f, .. } => {
3229 let input_ident = ident_stack.pop().unwrap();
3230
3231 let inspect_ident =
3232 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3233
3234 match builders_or_callback {
3235 BuildersOrCallback::Builders(graph_builders) => {
3236 let builder = graph_builders.get_dfir_mut(&out_location);
3237 builder.add_dfir(
3238 parse_quote! {
3239 #inspect_ident = #input_ident -> inspect(#f);
3240 },
3241 None,
3242 Some(&next_stmt_id.to_string()),
3243 );
3244 }
3245 BuildersOrCallback::Callback(_, node_callback) => {
3246 node_callback(node, next_stmt_id);
3247 }
3248 }
3249
3250 *next_stmt_id += 1;
3251
3252 ident_stack.push(inspect_ident);
3253 }
3254
3255 HydroNode::Unique { input, .. } => {
3256 let input_ident = ident_stack.pop().unwrap();
3257
3258 let unique_ident =
3259 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3260
3261 match builders_or_callback {
3262 BuildersOrCallback::Builders(graph_builders) => {
3263 let builder = graph_builders.get_dfir_mut(&out_location);
3264 let lifetime = if input.metadata().location_id.is_top_level() {
3265 quote!('static)
3266 } else {
3267 quote!('tick)
3268 };
3269
3270 builder.add_dfir(
3271 parse_quote! {
3272 #unique_ident = #input_ident -> unique::<#lifetime>();
3273 },
3274 None,
3275 Some(&next_stmt_id.to_string()),
3276 );
3277 }
3278 BuildersOrCallback::Callback(_, node_callback) => {
3279 node_callback(node, next_stmt_id);
3280 }
3281 }
3282
3283 *next_stmt_id += 1;
3284
3285 ident_stack.push(unique_ident);
3286 }
3287
3288 HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } => {
3289 let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
3290 if input.metadata().location_id.is_top_level()
3291 && input.metadata().collection_kind.is_bounded()
3292 {
3293 parse_quote!(fold_no_replay)
3294 } else {
3295 parse_quote!(fold)
3296 }
3297 } else if matches!(node, HydroNode::Scan { .. }) {
3298 parse_quote!(scan)
3299 } else if let HydroNode::FoldKeyed { input, .. } = node {
3300 if input.metadata().location_id.is_top_level()
3301 && input.metadata().collection_kind.is_bounded()
3302 {
3303 todo!("Fold keyed on a top-level bounded collection is not yet supported")
3304 } else {
3305 parse_quote!(fold_keyed)
3306 }
3307 } else {
3308 unreachable!()
3309 };
3310
3311 let (HydroNode::Fold { input, .. }
3312 | HydroNode::FoldKeyed { input, .. }
3313 | HydroNode::Scan { input, .. }) = node
3314 else {
3315 unreachable!()
3316 };
3317
3318 let lifetime = if input.metadata().location_id.is_top_level() {
3319 quote!('static)
3320 } else {
3321 quote!('tick)
3322 };
3323
3324 let input_ident = ident_stack.pop().unwrap();
3325
3326 let (HydroNode::Fold { init, acc, .. }
3327 | HydroNode::FoldKeyed { init, acc, .. }
3328 | HydroNode::Scan { init, acc, .. }) = &*node
3329 else {
3330 unreachable!()
3331 };
3332
3333 let fold_ident =
3334 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3335
3336 match builders_or_callback {
3337 BuildersOrCallback::Builders(graph_builders) => {
3338 if matches!(node, HydroNode::Fold { .. })
3339 && node.metadata().location_id.is_top_level()
3340 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3341 && graph_builders.singleton_intermediates()
3342 && !node.metadata().collection_kind.is_bounded()
3343 {
3344 let builder = graph_builders.get_dfir_mut(&out_location);
3345
3346 let acc: syn::Expr = parse_quote!({
3347 let mut __inner = #acc;
3348 move |__state, __value| {
3349 __inner(__state, __value);
3350 Some(__state.clone())
3351 }
3352 });
3353
3354 builder.add_dfir(
3355 parse_quote! {
3356 source_iter([(#init)()]) -> [0]#fold_ident;
3357 #input_ident -> scan::<#lifetime>(#init, #acc) -> [1]#fold_ident;
3358 #fold_ident = chain();
3359 },
3360 None,
3361 Some(&next_stmt_id.to_string()),
3362 );
3363 } else if matches!(node, HydroNode::FoldKeyed { .. })
3364 && node.metadata().location_id.is_top_level()
3365 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3366 && graph_builders.singleton_intermediates()
3367 && !node.metadata().collection_kind.is_bounded()
3368 {
3369 let builder = graph_builders.get_dfir_mut(&out_location);
3370
3371 let acc: syn::Expr = parse_quote!({
3372 let mut __init = #init;
3373 let mut __inner = #acc;
3374 move |__state, __kv: (_, _)| {
3375 let __state = __state
3377 .entry(::std::clone::Clone::clone(&__kv.0))
3378 .or_insert_with(|| (__init)());
3379 __inner(__state, __kv.1);
3380 Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
3381 }
3382 });
3383
3384 builder.add_dfir(
3385 parse_quote! {
3386 #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #acc);
3387 },
3388 None,
3389 Some(&next_stmt_id.to_string()),
3390 );
3391 } else {
3392 let builder = graph_builders.get_dfir_mut(&out_location);
3393 builder.add_dfir(
3394 parse_quote! {
3395 #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
3396 },
3397 None,
3398 Some(&next_stmt_id.to_string()),
3399 );
3400 }
3401 }
3402 BuildersOrCallback::Callback(_, node_callback) => {
3403 node_callback(node, next_stmt_id);
3404 }
3405 }
3406
3407 *next_stmt_id += 1;
3408
3409 ident_stack.push(fold_ident);
3410 }
3411
3412 HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
3413 let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
3414 if input.metadata().location_id.is_top_level()
3415 && input.metadata().collection_kind.is_bounded()
3416 {
3417 parse_quote!(reduce_no_replay)
3418 } else {
3419 parse_quote!(reduce)
3420 }
3421 } else if let HydroNode::ReduceKeyed { input, .. } = node {
3422 if input.metadata().location_id.is_top_level()
3423 && input.metadata().collection_kind.is_bounded()
3424 {
3425 todo!(
3426 "Calling keyed reduce on a top-level bounded collection is not supported"
3427 )
3428 } else {
3429 parse_quote!(reduce_keyed)
3430 }
3431 } else {
3432 unreachable!()
3433 };
3434
3435 let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
3436 else {
3437 unreachable!()
3438 };
3439
3440 let lifetime = if input.metadata().location_id.is_top_level() {
3441 quote!('static)
3442 } else {
3443 quote!('tick)
3444 };
3445
3446 let input_ident = ident_stack.pop().unwrap();
3447
3448 let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
3449 else {
3450 unreachable!()
3451 };
3452
3453 let reduce_ident =
3454 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3455
3456 match builders_or_callback {
3457 BuildersOrCallback::Builders(graph_builders) => {
3458 if matches!(node, HydroNode::Reduce { .. })
3459 && node.metadata().location_id.is_top_level()
3460 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3461 && graph_builders.singleton_intermediates()
3462 && !node.metadata().collection_kind.is_bounded()
3463 {
3464 todo!(
3465 "Reduce with optional intermediates is not yet supported in simulator"
3466 );
3467 } else if matches!(node, HydroNode::ReduceKeyed { .. })
3468 && node.metadata().location_id.is_top_level()
3469 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3470 && graph_builders.singleton_intermediates()
3471 && !node.metadata().collection_kind.is_bounded()
3472 {
3473 todo!(
3474 "Reduce keyed with optional intermediates is not yet supported in simulator"
3475 );
3476 } else {
3477 let builder = graph_builders.get_dfir_mut(&out_location);
3478 builder.add_dfir(
3479 parse_quote! {
3480 #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
3481 },
3482 None,
3483 Some(&next_stmt_id.to_string()),
3484 );
3485 }
3486 }
3487 BuildersOrCallback::Callback(_, node_callback) => {
3488 node_callback(node, next_stmt_id);
3489 }
3490 }
3491
3492 *next_stmt_id += 1;
3493
3494 ident_stack.push(reduce_ident);
3495 }
3496
3497 HydroNode::ReduceKeyedWatermark {
3498 f,
3499 input,
3500 metadata,
3501 ..
3502 } => {
3503 let lifetime = if input.metadata().location_id.is_top_level() {
3504 quote!('static)
3505 } else {
3506 quote!('tick)
3507 };
3508
3509 let watermark_ident = ident_stack.pop().unwrap();
3511 let input_ident = ident_stack.pop().unwrap();
3512
3513 let chain_ident = syn::Ident::new(
3514 &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
3515 Span::call_site(),
3516 );
3517
3518 let fold_ident =
3519 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3520
3521 let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
3522 && input.metadata().collection_kind.is_bounded()
3523 {
3524 parse_quote!(fold_no_replay)
3525 } else {
3526 parse_quote!(fold)
3527 };
3528
3529 match builders_or_callback {
3530 BuildersOrCallback::Builders(graph_builders) => {
3531 if metadata.location_id.is_top_level()
3532 && !(matches!(metadata.location_id, LocationId::Atomic(_)))
3533 && graph_builders.singleton_intermediates()
3534 && !metadata.collection_kind.is_bounded()
3535 {
3536 todo!(
3537 "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
3538 )
3539 } else {
3540 let builder = graph_builders.get_dfir_mut(&out_location);
3541 builder.add_dfir(
3542 parse_quote! {
3543 #chain_ident = chain();
3544 #input_ident
3545 -> map(|x| (Some(x), None))
3546 -> [0]#chain_ident;
3547 #watermark_ident
3548 -> map(|watermark| (None, Some(watermark)))
3549 -> [1]#chain_ident;
3550
3551 #fold_ident = #chain_ident
3552 -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
3553 let __reduce_keyed_fn = #f;
3554 move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
3555 if let Some((k, v)) = opt_payload {
3556 if let Some(curr_watermark) = *opt_curr_watermark {
3557 if k < curr_watermark {
3558 return;
3559 }
3560 }
3561 match map.entry(k) {
3562 ::std::collections::hash_map::Entry::Vacant(e) => {
3563 e.insert(v);
3564 }
3565 ::std::collections::hash_map::Entry::Occupied(mut e) => {
3566 __reduce_keyed_fn(e.get_mut(), v);
3567 }
3568 }
3569 } else {
3570 let watermark = opt_watermark.unwrap();
3571 if let Some(curr_watermark) = *opt_curr_watermark {
3572 if watermark <= curr_watermark {
3573 return;
3574 }
3575 }
3576 *opt_curr_watermark = opt_watermark;
3577 map.retain(|k, _| *k >= watermark);
3578 }
3579 }
3580 })
3581 -> flat_map(|(map, _curr_watermark)| map);
3582 },
3583 None,
3584 Some(&next_stmt_id.to_string()),
3585 );
3586 }
3587 }
3588 BuildersOrCallback::Callback(_, node_callback) => {
3589 node_callback(node, next_stmt_id);
3590 }
3591 }
3592
3593 *next_stmt_id += 1;
3594
3595 ident_stack.push(fold_ident);
3596 }
3597
3598 HydroNode::Network {
3599 networking_info,
3600 serialize_fn: serialize_pipeline,
3601 instantiate_fn,
3602 deserialize_fn: deserialize_pipeline,
3603 input,
3604 ..
3605 } => {
3606 let input_ident = ident_stack.pop().unwrap();
3607
3608 let receiver_stream_ident =
3609 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3610
3611 match builders_or_callback {
3612 BuildersOrCallback::Builders(graph_builders) => {
3613 let (sink_expr, source_expr) = match instantiate_fn {
3614 DebugInstantiate::Building => (
3615 syn::parse_quote!(DUMMY_SINK),
3616 syn::parse_quote!(DUMMY_SOURCE),
3617 ),
3618
3619 DebugInstantiate::Finalized(finalized) => {
3620 (finalized.sink.clone(), finalized.source.clone())
3621 }
3622 };
3623
3624 graph_builders.create_network(
3625 &input.metadata().location_id,
3626 &out_location,
3627 input_ident,
3628 &receiver_stream_ident,
3629 serialize_pipeline.as_ref(),
3630 sink_expr,
3631 source_expr,
3632 deserialize_pipeline.as_ref(),
3633 *next_stmt_id,
3634 networking_info,
3635 );
3636 }
3637 BuildersOrCallback::Callback(_, node_callback) => {
3638 node_callback(node, next_stmt_id);
3639 }
3640 }
3641
3642 *next_stmt_id += 1;
3643
3644 ident_stack.push(receiver_stream_ident);
3645 }
3646
3647 HydroNode::ExternalInput {
3648 instantiate_fn,
3649 deserialize_fn: deserialize_pipeline,
3650 ..
3651 } => {
3652 let receiver_stream_ident =
3653 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3654
3655 match builders_or_callback {
3656 BuildersOrCallback::Builders(graph_builders) => {
3657 let (_, source_expr) = match instantiate_fn {
3658 DebugInstantiate::Building => (
3659 syn::parse_quote!(DUMMY_SINK),
3660 syn::parse_quote!(DUMMY_SOURCE),
3661 ),
3662
3663 DebugInstantiate::Finalized(finalized) => {
3664 (finalized.sink.clone(), finalized.source.clone())
3665 }
3666 };
3667
3668 graph_builders.create_external_source(
3669 &out_location,
3670 source_expr,
3671 &receiver_stream_ident,
3672 deserialize_pipeline.as_ref(),
3673 *next_stmt_id,
3674 );
3675 }
3676 BuildersOrCallback::Callback(_, node_callback) => {
3677 node_callback(node, next_stmt_id);
3678 }
3679 }
3680
3681 *next_stmt_id += 1;
3682
3683 ident_stack.push(receiver_stream_ident);
3684 }
3685
3686 HydroNode::Counter {
3687 tag,
3688 duration,
3689 prefix,
3690 ..
3691 } => {
3692 let input_ident = ident_stack.pop().unwrap();
3693
3694 let counter_ident =
3695 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3696
3697 match builders_or_callback {
3698 BuildersOrCallback::Builders(graph_builders) => {
3699 let arg = format!("{}({})", prefix, tag);
3700 let builder = graph_builders.get_dfir_mut(&out_location);
3701 builder.add_dfir(
3702 parse_quote! {
3703 #counter_ident = #input_ident -> _counter(#arg, #duration);
3704 },
3705 None,
3706 Some(&next_stmt_id.to_string()),
3707 );
3708 }
3709 BuildersOrCallback::Callback(_, node_callback) => {
3710 node_callback(node, next_stmt_id);
3711 }
3712 }
3713
3714 *next_stmt_id += 1;
3715
3716 ident_stack.push(counter_ident);
3717 }
3718 }
3719 },
3720 seen_tees,
3721 false,
3722 );
3723
3724 ident_stack
3725 .pop()
3726 .expect("ident_stack should have exactly one element after traversal")
3727 }
3728
3729 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
3730 match self {
3731 HydroNode::Placeholder => {
3732 panic!()
3733 }
3734 HydroNode::Cast { .. } | HydroNode::ObserveNonDet { .. } => {}
3735 HydroNode::Source { source, .. } => match source {
3736 HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
3737 HydroSource::ExternalNetwork()
3738 | HydroSource::Spin()
3739 | HydroSource::ClusterMembers(_, _)
3740 | HydroSource::Embedded(_) => {} },
3742 HydroNode::SingletonSource { value, .. } => {
3743 transform(value);
3744 }
3745 HydroNode::CycleSource { .. }
3746 | HydroNode::Tee { .. }
3747 | HydroNode::YieldConcat { .. }
3748 | HydroNode::BeginAtomic { .. }
3749 | HydroNode::EndAtomic { .. }
3750 | HydroNode::Batch { .. }
3751 | HydroNode::Chain { .. }
3752 | HydroNode::ChainFirst { .. }
3753 | HydroNode::CrossProduct { .. }
3754 | HydroNode::CrossSingleton { .. }
3755 | HydroNode::ResolveFutures { .. }
3756 | HydroNode::ResolveFuturesOrdered { .. }
3757 | HydroNode::Join { .. }
3758 | HydroNode::Difference { .. }
3759 | HydroNode::AntiJoin { .. }
3760 | HydroNode::DeferTick { .. }
3761 | HydroNode::Enumerate { .. }
3762 | HydroNode::Unique { .. }
3763 | HydroNode::Sort { .. } => {}
3764 HydroNode::Map { f, .. }
3765 | HydroNode::FlatMap { f, .. }
3766 | HydroNode::Filter { f, .. }
3767 | HydroNode::FilterMap { f, .. }
3768 | HydroNode::Inspect { f, .. }
3769 | HydroNode::Partition { f, .. }
3770 | HydroNode::Reduce { f, .. }
3771 | HydroNode::ReduceKeyed { f, .. }
3772 | HydroNode::ReduceKeyedWatermark { f, .. } => {
3773 transform(f);
3774 }
3775 HydroNode::Fold { init, acc, .. }
3776 | HydroNode::Scan { init, acc, .. }
3777 | HydroNode::FoldKeyed { init, acc, .. } => {
3778 transform(init);
3779 transform(acc);
3780 }
3781 HydroNode::Network {
3782 serialize_fn,
3783 deserialize_fn,
3784 ..
3785 } => {
3786 if let Some(serialize_fn) = serialize_fn {
3787 transform(serialize_fn);
3788 }
3789 if let Some(deserialize_fn) = deserialize_fn {
3790 transform(deserialize_fn);
3791 }
3792 }
3793 HydroNode::ExternalInput { deserialize_fn, .. } => {
3794 if let Some(deserialize_fn) = deserialize_fn {
3795 transform(deserialize_fn);
3796 }
3797 }
3798 HydroNode::Counter { duration, .. } => {
3799 transform(duration);
3800 }
3801 }
3802 }
3803
3804 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
3805 &self.metadata().op
3806 }
3807
3808 pub fn metadata(&self) -> &HydroIrMetadata {
3809 match self {
3810 HydroNode::Placeholder => {
3811 panic!()
3812 }
3813 HydroNode::Cast { metadata, .. } => metadata,
3814 HydroNode::ObserveNonDet { metadata, .. } => metadata,
3815 HydroNode::Source { metadata, .. } => metadata,
3816 HydroNode::SingletonSource { metadata, .. } => metadata,
3817 HydroNode::CycleSource { metadata, .. } => metadata,
3818 HydroNode::Tee { metadata, .. } => metadata,
3819 HydroNode::Partition { metadata, .. } => metadata,
3820 HydroNode::YieldConcat { metadata, .. } => metadata,
3821 HydroNode::BeginAtomic { metadata, .. } => metadata,
3822 HydroNode::EndAtomic { metadata, .. } => metadata,
3823 HydroNode::Batch { metadata, .. } => metadata,
3824 HydroNode::Chain { metadata, .. } => metadata,
3825 HydroNode::ChainFirst { metadata, .. } => metadata,
3826 HydroNode::CrossProduct { metadata, .. } => metadata,
3827 HydroNode::CrossSingleton { metadata, .. } => metadata,
3828 HydroNode::Join { metadata, .. } => metadata,
3829 HydroNode::Difference { metadata, .. } => metadata,
3830 HydroNode::AntiJoin { metadata, .. } => metadata,
3831 HydroNode::ResolveFutures { metadata, .. } => metadata,
3832 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3833 HydroNode::Map { metadata, .. } => metadata,
3834 HydroNode::FlatMap { metadata, .. } => metadata,
3835 HydroNode::Filter { metadata, .. } => metadata,
3836 HydroNode::FilterMap { metadata, .. } => metadata,
3837 HydroNode::DeferTick { metadata, .. } => metadata,
3838 HydroNode::Enumerate { metadata, .. } => metadata,
3839 HydroNode::Inspect { metadata, .. } => metadata,
3840 HydroNode::Unique { metadata, .. } => metadata,
3841 HydroNode::Sort { metadata, .. } => metadata,
3842 HydroNode::Scan { metadata, .. } => metadata,
3843 HydroNode::Fold { metadata, .. } => metadata,
3844 HydroNode::FoldKeyed { metadata, .. } => metadata,
3845 HydroNode::Reduce { metadata, .. } => metadata,
3846 HydroNode::ReduceKeyed { metadata, .. } => metadata,
3847 HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3848 HydroNode::ExternalInput { metadata, .. } => metadata,
3849 HydroNode::Network { metadata, .. } => metadata,
3850 HydroNode::Counter { metadata, .. } => metadata,
3851 }
3852 }
3853
3854 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
3855 &mut self.metadata_mut().op
3856 }
3857
3858 pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
3859 match self {
3860 HydroNode::Placeholder => {
3861 panic!()
3862 }
3863 HydroNode::Cast { metadata, .. } => metadata,
3864 HydroNode::ObserveNonDet { metadata, .. } => metadata,
3865 HydroNode::Source { metadata, .. } => metadata,
3866 HydroNode::SingletonSource { metadata, .. } => metadata,
3867 HydroNode::CycleSource { metadata, .. } => metadata,
3868 HydroNode::Tee { metadata, .. } => metadata,
3869 HydroNode::Partition { metadata, .. } => metadata,
3870 HydroNode::YieldConcat { metadata, .. } => metadata,
3871 HydroNode::BeginAtomic { metadata, .. } => metadata,
3872 HydroNode::EndAtomic { metadata, .. } => metadata,
3873 HydroNode::Batch { metadata, .. } => metadata,
3874 HydroNode::Chain { metadata, .. } => metadata,
3875 HydroNode::ChainFirst { metadata, .. } => metadata,
3876 HydroNode::CrossProduct { metadata, .. } => metadata,
3877 HydroNode::CrossSingleton { metadata, .. } => metadata,
3878 HydroNode::Join { metadata, .. } => metadata,
3879 HydroNode::Difference { metadata, .. } => metadata,
3880 HydroNode::AntiJoin { metadata, .. } => metadata,
3881 HydroNode::ResolveFutures { metadata, .. } => metadata,
3882 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3883 HydroNode::Map { metadata, .. } => metadata,
3884 HydroNode::FlatMap { metadata, .. } => metadata,
3885 HydroNode::Filter { metadata, .. } => metadata,
3886 HydroNode::FilterMap { metadata, .. } => metadata,
3887 HydroNode::DeferTick { metadata, .. } => metadata,
3888 HydroNode::Enumerate { metadata, .. } => metadata,
3889 HydroNode::Inspect { metadata, .. } => metadata,
3890 HydroNode::Unique { metadata, .. } => metadata,
3891 HydroNode::Sort { metadata, .. } => metadata,
3892 HydroNode::Scan { metadata, .. } => metadata,
3893 HydroNode::Fold { metadata, .. } => metadata,
3894 HydroNode::FoldKeyed { metadata, .. } => metadata,
3895 HydroNode::Reduce { metadata, .. } => metadata,
3896 HydroNode::ReduceKeyed { metadata, .. } => metadata,
3897 HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3898 HydroNode::ExternalInput { metadata, .. } => metadata,
3899 HydroNode::Network { metadata, .. } => metadata,
3900 HydroNode::Counter { metadata, .. } => metadata,
3901 }
3902 }
3903
3904 pub fn input(&self) -> Vec<&HydroNode> {
3905 match self {
3906 HydroNode::Placeholder => {
3907 panic!()
3908 }
3909 HydroNode::Source { .. }
3910 | HydroNode::SingletonSource { .. }
3911 | HydroNode::ExternalInput { .. }
3912 | HydroNode::CycleSource { .. }
3913 | HydroNode::Tee { .. }
3914 | HydroNode::Partition { .. } => {
3915 vec![]
3917 }
3918 HydroNode::Cast { inner, .. }
3919 | HydroNode::ObserveNonDet { inner, .. }
3920 | HydroNode::YieldConcat { inner, .. }
3921 | HydroNode::BeginAtomic { inner, .. }
3922 | HydroNode::EndAtomic { inner, .. }
3923 | HydroNode::Batch { inner, .. } => {
3924 vec![inner]
3925 }
3926 HydroNode::Chain { first, second, .. } => {
3927 vec![first, second]
3928 }
3929 HydroNode::ChainFirst { first, second, .. } => {
3930 vec![first, second]
3931 }
3932 HydroNode::CrossProduct { left, right, .. }
3933 | HydroNode::CrossSingleton { left, right, .. }
3934 | HydroNode::Join { left, right, .. } => {
3935 vec![left, right]
3936 }
3937 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
3938 vec![pos, neg]
3939 }
3940 HydroNode::Map { input, .. }
3941 | HydroNode::FlatMap { input, .. }
3942 | HydroNode::Filter { input, .. }
3943 | HydroNode::FilterMap { input, .. }
3944 | HydroNode::Sort { input, .. }
3945 | HydroNode::DeferTick { input, .. }
3946 | HydroNode::Enumerate { input, .. }
3947 | HydroNode::Inspect { input, .. }
3948 | HydroNode::Unique { input, .. }
3949 | HydroNode::Network { input, .. }
3950 | HydroNode::Counter { input, .. }
3951 | HydroNode::ResolveFutures { input, .. }
3952 | HydroNode::ResolveFuturesOrdered { input, .. }
3953 | HydroNode::Fold { input, .. }
3954 | HydroNode::FoldKeyed { input, .. }
3955 | HydroNode::Reduce { input, .. }
3956 | HydroNode::ReduceKeyed { input, .. }
3957 | HydroNode::Scan { input, .. } => {
3958 vec![input]
3959 }
3960 HydroNode::ReduceKeyedWatermark {
3961 input, watermark, ..
3962 } => {
3963 vec![input, watermark]
3964 }
3965 }
3966 }
3967
3968 pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
3969 self.input()
3970 .iter()
3971 .map(|input_node| input_node.metadata())
3972 .collect()
3973 }
3974
3975 pub fn print_root(&self) -> String {
3976 match self {
3977 HydroNode::Placeholder => {
3978 panic!()
3979 }
3980 HydroNode::Cast { .. } => "Cast()".to_owned(),
3981 HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
3982 HydroNode::Source { source, .. } => format!("Source({:?})", source),
3983 HydroNode::SingletonSource {
3984 value,
3985 first_tick_only,
3986 ..
3987 } => format!(
3988 "SingletonSource({:?}, first_tick_only={})",
3989 value, first_tick_only
3990 ),
3991 HydroNode::CycleSource { cycle_id, .. } => format!("CycleSource({})", cycle_id),
3992 HydroNode::Tee { inner, .. } => format!("Tee({})", inner.0.borrow().print_root()),
3993 HydroNode::Partition { f, is_true, .. } => {
3994 format!("Partition({:?}, is_true={})", f, is_true)
3995 }
3996 HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
3997 HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
3998 HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
3999 HydroNode::Batch { .. } => "Batch()".to_owned(),
4000 HydroNode::Chain { first, second, .. } => {
4001 format!("Chain({}, {})", first.print_root(), second.print_root())
4002 }
4003 HydroNode::ChainFirst { first, second, .. } => {
4004 format!(
4005 "ChainFirst({}, {})",
4006 first.print_root(),
4007 second.print_root()
4008 )
4009 }
4010 HydroNode::CrossProduct { left, right, .. } => {
4011 format!(
4012 "CrossProduct({}, {})",
4013 left.print_root(),
4014 right.print_root()
4015 )
4016 }
4017 HydroNode::CrossSingleton { left, right, .. } => {
4018 format!(
4019 "CrossSingleton({}, {})",
4020 left.print_root(),
4021 right.print_root()
4022 )
4023 }
4024 HydroNode::Join { left, right, .. } => {
4025 format!("Join({}, {})", left.print_root(), right.print_root())
4026 }
4027 HydroNode::Difference { pos, neg, .. } => {
4028 format!("Difference({}, {})", pos.print_root(), neg.print_root())
4029 }
4030 HydroNode::AntiJoin { pos, neg, .. } => {
4031 format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
4032 }
4033 HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
4034 HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
4035 HydroNode::Map { f, .. } => format!("Map({:?})", f),
4036 HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
4037 HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
4038 HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
4039 HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
4040 HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
4041 HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
4042 HydroNode::Unique { .. } => "Unique()".to_owned(),
4043 HydroNode::Sort { .. } => "Sort()".to_owned(),
4044 HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
4045 HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
4046 HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
4047 HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
4048 HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
4049 HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
4050 HydroNode::Network { .. } => "Network()".to_owned(),
4051 HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
4052 HydroNode::Counter { tag, duration, .. } => {
4053 format!("Counter({:?}, {:?})", tag, duration)
4054 }
4055 }
4056 }
4057}
4058
4059#[cfg(feature = "build")]
4060fn instantiate_network<'a, D>(
4061 env: &mut D::InstantiateEnv,
4062 from_location: &LocationId,
4063 to_location: &LocationId,
4064 processes: &SparseSecondaryMap<LocationKey, D::Process>,
4065 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
4066 name: Option<&str>,
4067 networking_info: &crate::networking::NetworkingInfo,
4068) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
4069where
4070 D: Deploy<'a>,
4071{
4072 let ((sink, source), connect_fn) = match (from_location, to_location) {
4073 (&LocationId::Process(from), &LocationId::Process(to)) => {
4074 let from_node = processes
4075 .get(from)
4076 .unwrap_or_else(|| {
4077 panic!("A process used in the graph was not instantiated: {}", from)
4078 })
4079 .clone();
4080 let to_node = processes
4081 .get(to)
4082 .unwrap_or_else(|| {
4083 panic!("A process used in the graph was not instantiated: {}", to)
4084 })
4085 .clone();
4086
4087 let sink_port = from_node.next_port();
4088 let source_port = to_node.next_port();
4089
4090 (
4091 D::o2o_sink_source(
4092 env,
4093 &from_node,
4094 &sink_port,
4095 &to_node,
4096 &source_port,
4097 name,
4098 networking_info,
4099 ),
4100 D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
4101 )
4102 }
4103 (&LocationId::Process(from), &LocationId::Cluster(to)) => {
4104 let from_node = processes
4105 .get(from)
4106 .unwrap_or_else(|| {
4107 panic!("A process used in the graph was not instantiated: {}", from)
4108 })
4109 .clone();
4110 let to_node = clusters
4111 .get(to)
4112 .unwrap_or_else(|| {
4113 panic!("A cluster used in the graph was not instantiated: {}", to)
4114 })
4115 .clone();
4116
4117 let sink_port = from_node.next_port();
4118 let source_port = to_node.next_port();
4119
4120 (
4121 D::o2m_sink_source(
4122 env,
4123 &from_node,
4124 &sink_port,
4125 &to_node,
4126 &source_port,
4127 name,
4128 networking_info,
4129 ),
4130 D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
4131 )
4132 }
4133 (&LocationId::Cluster(from), &LocationId::Process(to)) => {
4134 let from_node = clusters
4135 .get(from)
4136 .unwrap_or_else(|| {
4137 panic!("A cluster used in the graph was not instantiated: {}", from)
4138 })
4139 .clone();
4140 let to_node = processes
4141 .get(to)
4142 .unwrap_or_else(|| {
4143 panic!("A process used in the graph was not instantiated: {}", to)
4144 })
4145 .clone();
4146
4147 let sink_port = from_node.next_port();
4148 let source_port = to_node.next_port();
4149
4150 (
4151 D::m2o_sink_source(
4152 env,
4153 &from_node,
4154 &sink_port,
4155 &to_node,
4156 &source_port,
4157 name,
4158 networking_info,
4159 ),
4160 D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
4161 )
4162 }
4163 (&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
4164 let from_node = clusters
4165 .get(from)
4166 .unwrap_or_else(|| {
4167 panic!("A cluster used in the graph was not instantiated: {}", from)
4168 })
4169 .clone();
4170 let to_node = clusters
4171 .get(to)
4172 .unwrap_or_else(|| {
4173 panic!("A cluster used in the graph was not instantiated: {}", to)
4174 })
4175 .clone();
4176
4177 let sink_port = from_node.next_port();
4178 let source_port = to_node.next_port();
4179
4180 (
4181 D::m2m_sink_source(
4182 env,
4183 &from_node,
4184 &sink_port,
4185 &to_node,
4186 &source_port,
4187 name,
4188 networking_info,
4189 ),
4190 D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
4191 )
4192 }
4193 (LocationId::Tick(_, _), _) => panic!(),
4194 (_, LocationId::Tick(_, _)) => panic!(),
4195 (LocationId::Atomic(_), _) => panic!(),
4196 (_, LocationId::Atomic(_)) => panic!(),
4197 };
4198 (sink, source, connect_fn)
4199}
4200
4201#[cfg(test)]
4202mod test {
4203 use std::mem::size_of;
4204
4205 use stageleft::{QuotedWithContext, q};
4206
4207 use super::*;
4208
4209 #[test]
4210 #[cfg_attr(
4211 not(feature = "build"),
4212 ignore = "expects inclusion of feature-gated fields"
4213 )]
4214 fn hydro_node_size() {
4215 assert_eq!(size_of::<HydroNode>(), 248);
4216 }
4217
4218 #[test]
4219 #[cfg_attr(
4220 not(feature = "build"),
4221 ignore = "expects inclusion of feature-gated fields"
4222 )]
4223 fn hydro_root_size() {
4224 assert_eq!(size_of::<HydroRoot>(), 136);
4225 }
4226
4227 #[test]
4228 fn test_simplify_q_macro_basic() {
4229 let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
4231 let result = simplify_q_macro(simple_expr.clone());
4232 assert_eq!(result, simple_expr);
4233 }
4234
4235 #[test]
4236 fn test_simplify_q_macro_actual_stageleft_call() {
4237 let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
4239 let result = simplify_q_macro(stageleft_call);
4240 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
4243 }
4244
4245 #[test]
4246 fn test_closure_no_pipe_at_start() {
4247 let stageleft_call = q!({
4249 let foo = 123;
4250 move |b: usize| b + foo
4251 })
4252 .splice_fn1_ctx(&());
4253 let result = simplify_q_macro(stageleft_call);
4254 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
4255 }
4256}