DLP Inspect initial commit#48
DLP Inspect initial commit#48YadnikiPawar wants to merge 9 commits intoGoogleCloudPlatform:file_trackerfrom YadnikiPawar:file_tracker
Conversation
| p.apply( | ||
| "File Read Transforrm", | ||
| FileReaderTransform.newBuilder().setSubscriber(options.getSubscriber()).build()); | ||
| PCollection<KV<String, String>> nonInspectedContents = |
There was a problem hiding this comment.
Can we please return a tuple? Anywhere you have try/catch you should use multi output with error tag. I think there are quite a few places, you will need this change. Without multi output pipeline will fail to recover.
There was a problem hiding this comment.
Okay sure. I will change this code block to return tuple and will take care about such try-catch blocks.
| Util.INSPECTED) | ||
| .build(); | ||
| LOG.info("Audit Row {}", aggrRow.toString()); | ||
| LOG.info("FileTrackerTransform:MergePartialStatsRow: Audit Row {}", aggrRow.toString()); |
There was a problem hiding this comment.
I think this is AuditInspectDataTransform
There was a problem hiding this comment.
Yes the class name is AuditInspectDataTransform. But for logging I have maintained the usage of Transformation names that will be displayed on UI DAG. So here the name of this Transformation is FileTrackerTransform and the sub step is MergePartialStatsRow. Can take out MergePartialStatsRow if it doesn't seem to be right as a log message.
| String errorMessage = | ||
| String.format( | ||
| "Payload Size %s Exceeded Batch Size %s", | ||
| "DLPTransform:DLPInspect: Payload Size %s Exceeded Batch Size %s", |
There was a problem hiding this comment.
Optional- Change the log.error to warn
There was a problem hiding this comment.
Yes got that. Will run this through the team and decide accordingly.
src/main/java/com/google/swarm/tokenization/common/DLPTransform.java
Outdated
Show resolved
Hide resolved
| Row.withSchema(Util.bqAuditSchema) | ||
| .addValues(fileName, Util.getTimeStamp(),0L, "EMPTY") | ||
| .build()); | ||
| } |
There was a problem hiding this comment.
For the processElement i think error is thrown and caught in the class after DLP is called. Can we have tuple like this? This should make sure internal error is not going to crash the pipeline.
There was a problem hiding this comment.
Yes I got it. But just for clarification the try block is already handling the exception, right?
We are just adding the catch block to be sure.
Also, this class is already returning a tuple. I will add this additional TupleTag for the Errors.
Is there a need to log these in our main DLPS3ScannerPipeline.java file separately or can we leave it ?
There was a problem hiding this comment.
Also should we handle this initialization error as follows : https://github.com/GoogleCloudPlatform/dlp-dataflow-deidentification/blob/2c9bbd178ccaaa4422f89ba49fd0e8b4a0e4f26b/src/main/java/com/google/swarm/tokenization/S3Import.java#L333
There was a problem hiding this comment.
you can leave it. Ideally you will flatten all the errors and write back somewhere. processElement throws the error cand catch clock is to output without crashing the pipeline.
| } | ||
| } catch (Exception e) { | ||
| c.output(Util.readRowFailure, KV.of(fileName, e.getMessage())); | ||
| LOG.error("File Read Transform:ReadFile: Error processing the file "+ fileName +" - " + Arrays.toString(e.getStackTrace())); |
There was a problem hiding this comment.
This is where the multi output should be added. First comment added. So this class should output a Tuple.
There was a problem hiding this comment.
So should be keep the LOG as a warning here or just take out the log and keep output statement only?
Or log these in our main DLPS3ScannerPipeline.java file separately?
There was a problem hiding this comment.
I think let's log all in the main class after flatten.
src/main/java/com/google/swarm/tokenization/common/FileReaderSplitDoFn.java
Outdated
Show resolved
Hide resolved
src/main/java/com/google/swarm/tokenization/common/FileReaderSplitDoFn.java
Outdated
Show resolved
Hide resolved
src/main/java/com/google/swarm/tokenization/common/FileReaderTransform.java
Outdated
Show resolved
Hide resolved
src/main/java/com/google/swarm/tokenization/common/FileReaderTransform.java
Outdated
Show resolved
Hide resolved
src/main/java/com/google/swarm/tokenization/common/FileReaderTransform.java
Outdated
Show resolved
Hide resolved
| LOG.info("File Read Transform:ConvertToGCSUri: Valid File Located: {}", file_name); | ||
| c.output(file_name); | ||
| } | ||
| } |
There was a problem hiding this comment.
Feel like this logic can be simplified. (Optional)
| Instant file_ts = Instant.parse(file_ts_string); | ||
| Instant tf_ts = new Instant(metadata.lastModifiedMillis()); | ||
| LOG.warn(file_ts.toString()); | ||
| LOG.warn(tf_ts.toString()); |
There was a problem hiding this comment.
Should have one statement and a meaningful error message. This is not helpful. I feel you have too many logs here. Let's try to summarize and output what will be useful
There was a problem hiding this comment.
Yes sure. Some parts of the code are added by Robert L. Will look into those parts and take out the unnecessary logs.
| public static TupleTag<Row> inspectData = new TupleTag<Row>() {}; | ||
| public static TupleTag<Row> auditData = new TupleTag<Row>() {}; | ||
| public static TupleTag<Row> errorData = new TupleTag<Row>() {}; | ||
| public static TupleTag<KV<String, String>> readRowSuccess = new TupleTag<KV<String, String>>() {}; |
There was a problem hiding this comment.
needs documentation explaining what the key/value pairs are whenever using such generic structs
@santhh
Please review the code changes. I have pulled and then modified the code for better understanding of the changes.