2828
2929import java .io .*;
3030import java .lang .reflect .Field ;
31+ import java .lang .reflect .ParameterizedType ;
3132import java .nio .charset .StandardCharsets ;
3233import java .nio .file .Files ;
3334import java .nio .file .Path ;
@@ -90,6 +91,10 @@ public class CsvDataTableStore implements DataTableStore, AutoCloseable {
9091 private final String fileExtension ;
9192 private final Map <String , String > prefixColumns ;
9293 private final Map <String , String > suffixColumns ;
94+ private static final ObjectMapper ROW_MAPPER = new ObjectMapper ()
95+ .registerModule (new ParameterNamesModule ())
96+ .configure (DeserializationFeature .FAIL_ON_UNKNOWN_PROPERTIES , false );
97+
9398 private final ConcurrentHashMap <String , BucketWriter > writers = new ConcurrentHashMap <>();
9499 private final ConcurrentHashMap <String , RowMetadata > rowMetadata = new ConcurrentHashMap <>();
95100 private final ConcurrentHashMap <String , DataTable <?>> knownTables = new ConcurrentHashMap <>();
@@ -180,15 +185,30 @@ private static InputStream defaultInputStream(Path path) {
180185 @ Override
181186 public <Row > void insertRow (DataTable <Row > dataTable , ExecutionContext ctx , Row row ) {
182187 String metaKey = metaKey (dataTable .getName (), dataTable .getGroup ());
183- rowMetadata .computeIfAbsent (metaKey , k -> RowMetadata .from (dataTable ));
188+ rowMetadata .computeIfAbsent (metaKey , k -> RowMetadata .of (dataTable . getType () ));
184189 knownTables .putIfAbsent (fileKey (dataTable ), dataTable );
185190 String fileKey = fileKey (dataTable );
186191 BucketWriter writer = writers .computeIfAbsent (fileKey , k -> createBucketWriter (dataTable ));
187192 writer .writeRow (row );
188193 }
189194
195+ @ Deprecated
190196 @ Override
191197 public Stream <?> getRows (String dataTableName , @ Nullable String group ) {
198+ RowMetadata meta = rowMetadata .get (metaKey (dataTableName , group ));
199+ return readRows (dataTableName , group , meta );
200+ }
201+
202+ @ SuppressWarnings ("unchecked" )
203+ @ Override
204+ public <Row > Stream <Row > getRows (Class <? extends DataTable <Row >> dataTableClass , @ Nullable String group ) {
205+ Class <Row > rowType = (Class <Row >) ((ParameterizedType ) dataTableClass .getGenericSuperclass ())
206+ .getActualTypeArguments ()[0 ];
207+ return readRows (dataTableClass .getName (), group , RowMetadata .of (rowType ));
208+ }
209+
210+ @ SuppressWarnings ("unchecked" )
211+ private <T > Stream <T > readRows (String dataTableName , @ Nullable String group , @ Nullable RowMetadata meta ) {
192212 // Close (not just flush) matching writers so that compression trailers
193213 // (e.g., GZIP footer) are written, making the files fully readable.
194214 // Removed writers will be lazily re-created in append mode on the next insertRow().
@@ -203,8 +223,6 @@ public Stream<?> getRows(String dataTableName, @Nullable String group) {
203223 }
204224 }
205225
206- RowMetadata meta = rowMetadata .get (metaKey (dataTableName , group ));
207-
208226 List <Object > allRows = new ArrayList <>();
209227 //noinspection DataFlowIssue
210228 File [] files = outputDir .toFile ().listFiles ((dir , name ) -> name .endsWith (fileExtension ));
@@ -241,6 +259,7 @@ public Stream<?> getRows(String dataTableName, @Nullable String group) {
241259
242260 try (InputStream is = inputStreamFactory .apply (file .toPath ())) {
243261 CsvParserSettings settings = new CsvParserSettings ();
262+ settings .setMaxCharsPerColumn (-1 );
244263 settings .setHeaderExtractionEnabled (true );
245264 settings .getFormat ().setComment ('#' );
246265 CsvParser parser = new CsvParser (settings );
@@ -265,7 +284,7 @@ public Stream<?> getRows(String dataTableName, @Nullable String group) {
265284 }
266285 }
267286
268- return allRows .stream ();
287+ return ( Stream < T >) allRows .stream ();
269288 }
270289
271290 @ Override
@@ -385,44 +404,35 @@ private static String metaKey(String dataTableName, @Nullable String group) {
385404 }
386405
387406 /**
388- * Holds the row class and its @Column field names so that
389- * String[] rows read from CSV can be deserialized back to typed objects
390- * via Jackson's {@link ObjectMapper#convertValue} .
407+ * Caches the {@link Column @Column} field names for a row class so they
408+ * are only computed once, and converts CSV {@code String[]} rows back to
409+ * typed objects via Jackson .
391410 */
392411 private static class RowMetadata {
393- private static final ObjectMapper MAPPER = new ObjectMapper ()
394- .registerModule (new ParameterNamesModule ())
395- .configure (DeserializationFeature .FAIL_ON_UNKNOWN_PROPERTIES , false );
396-
397- final String rowClassName ;
412+ final Class <?> rowClass ;
398413 final List <String > fieldNames ;
399414
400- RowMetadata (String rowClassName , List <String > fieldNames ) {
401- this .rowClassName = rowClassName ;
415+ private RowMetadata (Class <?> rowClass , List <String > fieldNames ) {
416+ this .rowClass = rowClass ;
402417 this .fieldNames = fieldNames ;
403418 }
404419
405- static RowMetadata from (DataTable <?> dataTable ) {
406- Class <?> rowClass = dataTable .getType ();
420+ static RowMetadata of (Class <?> rowClass ) {
407421 List <String > names = new ArrayList <>();
408422 for (Field f : rowClass .getDeclaredFields ()) {
409423 if (f .isAnnotationPresent (Column .class )) {
410424 names .add (f .getName ());
411425 }
412426 }
413- return new RowMetadata (rowClass . getName () , names );
427+ return new RowMetadata (rowClass , names );
414428 }
415429
416430 Object toRow (String [] values ) {
417431 Map <String , String > map = new LinkedHashMap <>();
418432 for (int i = 0 ; i < fieldNames .size (); i ++) {
419433 map .put (fieldNames .get (i ), i < values .length ? values [i ] : "" );
420434 }
421- try {
422- return MAPPER .convertValue (map , Class .forName (rowClassName ));
423- } catch (ClassNotFoundException e ) {
424- throw new IllegalStateException ("Row class not found: " + rowClassName , e );
425- }
435+ return ROW_MAPPER .convertValue (map , rowClass );
426436 }
427437 }
428438
@@ -486,7 +496,7 @@ public static String sanitize(String value) {
486496 prefix = prefix .substring (0 , lastDash );
487497 }
488498 }
489- String hash = sha256Prefix (value , 4 );
499+ String hash = sha256Prefix (value );
490500 return prefix + "-" + hash ;
491501 }
492502
@@ -528,18 +538,18 @@ public static String sanitize(String value) {
528538 return new DataTableDescriptor (name , name , instanceName , "" , group , Collections .emptyList ());
529539 }
530540
531- private static String sha256Prefix (String input , int hexChars ) {
541+ private static String sha256Prefix (String input ) {
532542 try {
533543 MessageDigest digest = MessageDigest .getInstance ("SHA-256" );
534544 byte [] hash = digest .digest (input .getBytes (StandardCharsets .UTF_8 ));
535545 StringBuilder hex = new StringBuilder ();
536546 for (byte b : hash ) {
537547 hex .append (String .format ("%02x" , b ));
538- if (hex .length () >= hexChars ) {
548+ if (hex .length () >= 4 ) {
539549 break ;
540550 }
541551 }
542- return hex .substring (0 , Math .min (hexChars , hex .length ()));
552+ return hex .substring (0 , Math .min (4 , hex .length ()));
543553 } catch (NoSuchAlgorithmException e ) {
544554 throw new IllegalStateException (e );
545555 }
0 commit comments