33import bio .terra .tanagra .api .shared .DataType ;
44import bio .terra .tanagra .indexing .job .BigQueryJob ;
55import bio .terra .tanagra .indexing .job .dataflow .beam .BigQueryBeamUtils ;
6+ import bio .terra .tanagra .indexing .job .dataflow .beam .CountUtils ;
67import bio .terra .tanagra .indexing .job .dataflow .beam .DataflowUtils ;
78import bio .terra .tanagra .query .sql .SqlField ;
89import bio .terra .tanagra .query .sql .SqlQueryField ;
910import bio .terra .tanagra .underlay .entitymodel .Attribute ;
1011import bio .terra .tanagra .underlay .entitymodel .Entity ;
12+ import bio .terra .tanagra .underlay .entitymodel .Hierarchy ;
1113import bio .terra .tanagra .underlay .entitymodel .Relationship ;
1214import bio .terra .tanagra .underlay .entitymodel .entitygroup .CriteriaOccurrence ;
1315import bio .terra .tanagra .underlay .indextable .ITEntityMain ;
16+ import bio .terra .tanagra .underlay .indextable .ITHierarchyAncestorDescendant ;
1417import bio .terra .tanagra .underlay .indextable .ITInstanceLevelDisplayHints ;
1518import bio .terra .tanagra .underlay .indextable .ITRelationshipIdPairs ;
1619import bio .terra .tanagra .underlay .serialization .SZIndexer ;
1720import com .google .api .services .bigquery .model .TableRow ;
1821import jakarta .annotation .Nullable ;
1922import java .io .Serializable ;
23+ import java .util .stream .StreamSupport ;
2024import org .apache .beam .sdk .Pipeline ;
2125import org .apache .beam .sdk .io .gcp .bigquery .BigQueryIO ;
2226import org .apache .beam .sdk .transforms .Count ;
2327import org .apache .beam .sdk .transforms .Distinct ;
2428import org .apache .beam .sdk .transforms .Filter ;
29+ import org .apache .beam .sdk .transforms .FlatMapElements ;
2530import org .apache .beam .sdk .transforms .MapElements ;
2631import org .apache .beam .sdk .transforms .Max ;
2732import org .apache .beam .sdk .transforms .Min ;
@@ -48,6 +53,8 @@ public class WriteInstanceLevelDisplayHints extends BigQueryJob {
4853 private final @ Nullable ITRelationshipIdPairs occurrenceCriteriaRelationshipIdPairsTable ;
4954 private final @ Nullable ITRelationshipIdPairs occurrencePrimaryRelationshipIdPairsTable ;
5055 private final ITInstanceLevelDisplayHints indexTable ;
56+ private final @ Nullable Hierarchy hierarchy ;
57+ private final @ Nullable ITHierarchyAncestorDescendant ancestorDescendantTable ;
5158
5259 @ SuppressWarnings ("checkstyle:ParameterNumber" )
5360 public WriteInstanceLevelDisplayHints (
@@ -59,7 +66,9 @@ public WriteInstanceLevelDisplayHints(
5966 ITEntityMain primaryEntityIndexTable ,
6067 @ Nullable ITRelationshipIdPairs occurrenceCriteriaRelationshipIdPairsTable ,
6168 @ Nullable ITRelationshipIdPairs occurrencePrimaryRelationshipIdPairsTable ,
62- ITInstanceLevelDisplayHints indexTable ) {
69+ ITInstanceLevelDisplayHints indexTable ,
70+ @ Nullable Hierarchy hierarchy ,
71+ @ Nullable ITHierarchyAncestorDescendant ancestorDescendantTable ) {
6372 super (indexerConfig );
6473 this .criteriaOccurrence = criteriaOccurrence ;
6574 this .occurrenceEntity = occurrenceEntity ;
@@ -69,6 +78,8 @@ public WriteInstanceLevelDisplayHints(
6978 this .occurrenceCriteriaRelationshipIdPairsTable = occurrenceCriteriaRelationshipIdPairsTable ;
7079 this .occurrencePrimaryRelationshipIdPairsTable = occurrencePrimaryRelationshipIdPairsTable ;
7180 this .indexTable = indexTable ;
81+ this .hierarchy = hierarchy ;
82+ this .ancestorDescendantTable = ancestorDescendantTable ;
7283 }
7384
7485 @ Override
@@ -119,8 +130,8 @@ public void run(boolean isDryRun) {
119130 readInRelationshipIdPairs (
120131 pipeline , occCriIdPairsSql , entityAIdColumnName , entityBIdColumnName );
121132
122- // Build a query to select all occurrence-criteria id pairs, and the pipeline steps to read the
123- // results and build a (occurrence id, criteria id) KV PCollection.
133+ // Build a query to select all occurrence-primary id pairs, and the pipeline steps to read the
134+ // results and build a (occurrence id, primary id) KV PCollection.
124135 String occPriIdPairsSql =
125136 getQueryRelationshipIdPairs (
126137 entityAIdColumnName ,
@@ -134,17 +145,32 @@ public void run(boolean isDryRun) {
134145 readInRelationshipIdPairs (
135146 pipeline , occPriIdPairsSql , entityAIdColumnName , entityBIdColumnName );
136147
148+ PCollection <KV <Long , Long >> rollupOccCriIdPairKVs = null ;
149+ if (hierarchy != null
150+ && criteriaOccurrence .hasRollupInstanceLevelDisplayHints (occurrenceEntity )) {
151+ PCollection <KV <Long , Long >> descendantAncestorRelationshipsPC =
152+ BigQueryBeamUtils .readDescendantAncestorRelationshipsFromBQ (
153+ pipeline , ancestorDescendantTable );
154+
155+ // Expand the set of occurrences to include a repeat for each ancestor.
156+ rollupOccCriIdPairKVs =
157+ CountUtils .repeatOccurrencesForHints (occCriIdPairKVs , descendantAncestorRelationshipsPC );
158+ }
159+ final PCollection <KV <Long , Long >> finalRollupOccCriIdPairKVs = rollupOccCriIdPairKVs ;
160+
137161 criteriaOccurrence
138162 .getAttributesWithInstanceLevelDisplayHints (occurrenceEntity )
139163 .forEach (
140- attribute -> {
164+ (attribute , rollup ) -> {
165+ PCollection <KV <Long , Long >> idPairsKVs =
166+ rollup ? finalRollupOccCriIdPairKVs : occCriIdPairKVs ;
141167 if (attribute .isValueDisplay ()) {
142168 LOGGER .info ("enum val hint: {}" , attribute .getName ());
143- enumValHint (occCriIdPairKVs , occPriIdPairKVs , occIdRowKVs , attribute );
169+ enumValHint (idPairsKVs , occPriIdPairKVs , occIdRowKVs , attribute );
144170 } else if (DataType .INT64 .equals (attribute .getDataType ())
145171 || DataType .DOUBLE .equals (attribute .getDataType ())) {
146172 LOGGER .info ("numeric range hint: {}" , attribute .getName ());
147- numericRangeHint (occCriIdPairKVs , occIdRowKVs , attribute );
173+ numericRangeHint (idPairsKVs , occIdRowKVs , attribute );
148174 } // TODO: Calculate display hints for other data types.
149175 });
150176
@@ -287,13 +313,15 @@ private void numericRangeHint(
287313 occIdAndNumValCriId
288314 .apply (Filter .by (cogb -> cogb .getValue ().getAll (numValTag ).iterator ().hasNext ()))
289315 .apply (
290- MapElements .into (
316+ FlatMapElements .into (
291317 TypeDescriptors .kvs (TypeDescriptors .longs (), TypeDescriptors .doubles ()))
292318 .via (
293- cogb ->
294- KV .of (
295- cogb .getValue ().getOnly (criIdTag ),
296- cogb .getValue ().getOnly (numValTag ))));
319+ cogb -> {
320+ Iterable <Long > criIds = cogb .getValue ().getAll (criIdTag );
321+ return StreamSupport .stream (criIds .spliterator (), false )
322+ .map ((Long criId ) -> KV .of (criId , cogb .getValue ().getOnly (numValTag )))
323+ .toList ();
324+ }));
297325
298326 // Compute numeric range for each criteriaId.
299327 PCollection <IdNumericRange > numericRanges = numericRangeHint (criteriaValuePairs );
@@ -361,23 +389,28 @@ private void enumValHint(
361389 .and (criIdTag , occCriIdPairs )
362390 .and (priIdTag , occPriIdPairs )
363391 .apply (CoGroupByKey .create ());
392+
364393 PCollection <KV <IdEnumValue , Long >> criteriaEnumPrimaryPairs =
365394 occIdAndAttrsCriIdPriId
366395 .apply (Filter .by (cogb -> cogb .getValue ().getAll (occAttrsTag ).iterator ().hasNext ()))
367396 .apply (
368- MapElements .into (
397+ FlatMapElements .into (
369398 TypeDescriptors .kvs (
370399 new TypeDescriptor <IdEnumValue >() {}, TypeDescriptors .longs ()))
371400 .via (
372401 cogb -> {
373- Long criId = cogb .getValue ().getOnly (criIdTag );
402+ Iterable < Long > criIds = cogb .getValue ().getAll (criIdTag );
374403 Long priId = cogb .getValue ().getOnly (priIdTag );
375404
376405 TableRow occAttrs = cogb .getValue ().getOnly (occAttrsTag );
377406 String enumValue = (String ) occAttrs .get (enumValColName );
378407 String enumDisplay = (String ) occAttrs .get (enumDisplayColName );
379408
380- return KV .of (new IdEnumValue (criId , enumValue , enumDisplay ), priId );
409+ return StreamSupport .stream (criIds .spliterator (), false )
410+ .map (
411+ (Long criId ) ->
412+ KV .of (new IdEnumValue (criId , enumValue , enumDisplay ), priId ))
413+ .toList ();
381414 }));
382415
383416 // Compute enum values and counts for each criteriaId.
0 commit comments