1515 */
1616package org .openrewrite ;
1717
18+ import com .univocity .parsers .csv .CsvParser ;
19+ import com .univocity .parsers .csv .CsvParserSettings ;
1820import com .univocity .parsers .csv .CsvWriter ;
1921import com .univocity .parsers .csv .CsvWriterSettings ;
2022import org .jspecify .annotations .Nullable ;
4648 * // Plain CSV
4749 * new CsvDataTableStore(outputDir)
4850 *
49- * // GZIP compressed with repository columns
51+ * // GZIP compressed with repository columns (write-only)
5052 * new CsvDataTableStore(outputDir,
5153 * path -> new GZIPOutputStream(Files.newOutputStream(path)),
5254 * ".csv.gz",
53- * List.of(Map.entry("repositoryOrigin", origin), Map.entry("repositoryPath", path)),
54- * List.of(Map.entry("org1", orgValue)))
55+ * Map.of("repositoryOrigin", origin, "repositoryPath", path),
56+ * Map.of("org1", orgValue))
57+ *
58+ * // GZIP compressed with read-back support
59+ * new CsvDataTableStore(outputDir,
60+ * path -> new GZIPOutputStream(Files.newOutputStream(path)),
61+ * path -> new GZIPInputStream(Files.newInputStream(path)),
62+ * ".csv.gz",
63+ * Map.of("repositoryOrigin", origin, "repositoryPath", path),
64+ * Map.of("org1", orgValue))
5565 * }</pre>
5666 */
5767public class CsvDataTableStore implements DataTableStore , AutoCloseable {
5868
5969 private final Path outputDir ;
6070 private final Function <Path , OutputStream > outputStreamFactory ;
71+ private final Function <Path , InputStream > inputStreamFactory ;
6172 private final String fileExtension ;
6273 private final Map <String , String > prefixColumns ;
6374 private final Map <String , String > suffixColumns ;
@@ -67,27 +78,55 @@ public class CsvDataTableStore implements DataTableStore, AutoCloseable {
6778 * Create a store that writes plain CSV files.
6879 */
6980 public CsvDataTableStore (Path outputDir ) {
70- this (outputDir , CsvDataTableStore ::defaultOutputStream , ".csv" ,
71- Collections .emptyMap (), Collections .emptyMap ());
81+ this (outputDir , CsvDataTableStore ::defaultOutputStream , CsvDataTableStore :: defaultInputStream ,
82+ ".csv" , Collections .emptyMap (), Collections .emptyMap ());
7283 }
7384
7485 /**
75- * Create a store with full control over output stream creation, file extension,
86+ * Create a store with control over output stream creation, file extension,
7687 * and additional static columns prepended/appended to each row.
88+ * <p>
89+ * {@link #getRows} will always return empty results from this constructor.
90+ * Use the six-argument constructor to provide a matching input stream factory
91+ * if read-back is needed.
7792 *
7893 * @param outputDir directory to write files into
7994 * @param outputStreamFactory creates an output stream for each file path (e.g., wrapping with GZIPOutputStream)
8095 * @param fileExtension file extension including dot (e.g., ".csv" or ".csv.gz")
8196 * @param prefixColumns static columns prepended to each row, in insertion order
8297 * @param suffixColumns static columns appended to each row, in insertion order
98+ * @deprecated Use the six-argument constructor that accepts an {@code inputStreamFactory}
8399 */
100+ @ Deprecated
84101 public CsvDataTableStore (Path outputDir ,
85102 Function <Path , OutputStream > outputStreamFactory ,
86103 String fileExtension ,
87104 Map <String , String > prefixColumns ,
88105 Map <String , String > suffixColumns ) {
106+ this (outputDir , outputStreamFactory , path -> new ByteArrayInputStream (new byte [0 ]),
107+ fileExtension , prefixColumns , suffixColumns );
108+ }
109+
110+ /**
111+ * Create a store with full control over output and input stream creation, file extension,
112+ * and additional static columns prepended/appended to each row.
113+ *
114+ * @param outputDir directory to write files into
115+ * @param outputStreamFactory creates an output stream for each file path (e.g., wrapping with GZIPOutputStream)
116+ * @param inputStreamFactory creates an input stream for each file path (e.g., wrapping with GZIPInputStream)
117+ * @param fileExtension file extension including dot (e.g., ".csv" or ".csv.gz")
118+ * @param prefixColumns static columns prepended to each row, in insertion order
119+ * @param suffixColumns static columns appended to each row, in insertion order
120+ */
121+ public CsvDataTableStore (Path outputDir ,
122+ Function <Path , OutputStream > outputStreamFactory ,
123+ Function <Path , InputStream > inputStreamFactory ,
124+ String fileExtension ,
125+ Map <String , String > prefixColumns ,
126+ Map <String , String > suffixColumns ) {
89127 this .outputDir = outputDir ;
90128 this .outputStreamFactory = outputStreamFactory ;
129+ this .inputStreamFactory = inputStreamFactory ;
91130 this .fileExtension = fileExtension ;
92131 this .prefixColumns = prefixColumns ;
93132 this .suffixColumns = suffixColumns ;
@@ -106,6 +145,14 @@ private static OutputStream defaultOutputStream(Path path) {
106145 }
107146 }
108147
148+ private static InputStream defaultInputStream (Path path ) {
149+ try {
150+ return Files .newInputStream (path );
151+ } catch (IOException e ) {
152+ throw new UncheckedIOException (e );
153+ }
154+ }
155+
109156 @ Override
110157 public <Row > void insertRow (DataTable <Row > dataTable , ExecutionContext ctx , Row row ) {
111158 String fileKey = fileKey (dataTable );
@@ -115,7 +162,64 @@ public <Row> void insertRow(DataTable<Row> dataTable, ExecutionContext ctx, Row
115162
116163 @ Override
117164 public Stream <?> getRows (String dataTableName , @ Nullable String group ) {
118- return Stream .empty ();
165+ // Flush any open writers for this data table so all rows are on disk
166+ for (BucketWriter writer : writers .values ()) {
167+ if (writer .dataTable .getName ().equals (dataTableName ) &&
168+ Objects .equals (writer .dataTable .getGroup (), group )) {
169+ writer .flush ();
170+ }
171+ }
172+
173+ List <String []> allRows = new ArrayList <>();
174+ //noinspection DataFlowIssue
175+ File [] files = outputDir .toFile ().listFiles ((dir , name ) -> name .endsWith (fileExtension ));
176+ if (files == null ) {
177+ return Stream .empty ();
178+ }
179+
180+ int prefixCount = prefixColumns .size ();
181+ int suffixCount = suffixColumns .size ();
182+
183+ for (File file : files ) {
184+ try (InputStream is = inputStreamFactory .apply (file .toPath ())) {
185+ DataTableDescriptor descriptor = readDescriptor (is );
186+ if (descriptor == null ||
187+ !descriptor .getName ().equals (dataTableName ) ||
188+ !Objects .equals (descriptor .getGroup (), group )) {
189+ continue ;
190+ }
191+ // readDescriptor consumed comment lines; now parse the remaining CSV
192+ // (header + data rows). Re-read the full file with CsvParser.
193+ } catch (IOException e ) {
194+ continue ;
195+ }
196+
197+ try (InputStream is = inputStreamFactory .apply (file .toPath ())) {
198+ CsvParserSettings settings = new CsvParserSettings ();
199+ settings .setHeaderExtractionEnabled (true );
200+ settings .getFormat ().setComment ('#' );
201+ CsvParser parser = new CsvParser (settings );
202+ parser .beginParsing (new InputStreamReader (is , StandardCharsets .UTF_8 ));
203+
204+ String [] row ;
205+ while ((row = parser .parseNext ()) != null ) {
206+ // Strip prefix and suffix columns, returning only data table columns
207+ int dataCount = row .length - prefixCount - suffixCount ;
208+ if (dataCount <= 0 ) {
209+ allRows .add (row );
210+ } else {
211+ String [] dataRow = new String [dataCount ];
212+ System .arraycopy (row , prefixCount , dataRow , 0 , dataCount );
213+ allRows .add (dataRow );
214+ }
215+ }
216+ parser .stopParsing ();
217+ } catch (IOException e ) {
218+ // Skip unreadable files
219+ }
220+ }
221+
222+ return allRows .stream ();
119223 }
120224
121225 @ Override
@@ -146,8 +250,7 @@ private BucketWriter createBucketWriter(DataTable<?> dataTable) {
146250 }
147251
148252 // Build headers: prefix + data table columns + suffix
149- List <String > headers = new ArrayList <>();
150- headers .addAll (prefixColumns .keySet ());
253+ List <String > headers = new ArrayList <>(prefixColumns .keySet ());
151254 for (ColumnDescriptor col : descriptor .getColumns ()) {
152255 headers .add (col .getName ());
153256 }
@@ -223,6 +326,14 @@ synchronized void writeRow(Object row) {
223326 csvWriter .writeRow ((Object []) values );
224327 }
225328
329+ synchronized void flush () {
330+ csvWriter .flush ();
331+ try {
332+ os .flush ();
333+ } catch (IOException ignored ) {
334+ }
335+ }
336+
226337 void close () {
227338 csvWriter .close ();
228339 try {
0 commit comments