2323import com .sun .net .httpserver .HttpExchange ;
2424import com .sun .net .httpserver .HttpHandler ;
2525import com .sun .net .httpserver .HttpServer ;
26-
26+
2727public class Main {
2828
2929 public static void main (String [] args ) throws Exception {
@@ -35,11 +35,11 @@ public static void main(String[] args) throws Exception {
3535 server .start ();
3636
3737 // Re-register ourselves
38- Timer timer = new Timer ();
39- TimerTask task = new RegisterTask ();
38+ Timer timer = new Timer ();
39+ TimerTask task = new RegisterTask ();
4040 timer .schedule (task , 0 , 5000 );
4141 }
42-
42+
4343 static class MyHandler implements HttpHandler {
4444 @ Override
4545 public void handle (HttpExchange t ) throws IOException {
@@ -71,7 +71,8 @@ public void handle(HttpExchange t) throws IOException {
7171 try {
7272 Map <String , String > query = splitQuery (t .getRequestURI ());
7373 URI replayUrl = URI .create (query .get ("replay_url" ));
74- // boolean v2 = t.getRequestURI().getRawQuery() != null ? t.getRequestURI().getRawQuery().contains("v2") : false;
74+ // boolean v2 = t.getRequestURI().getRawQuery() != null ?
75+ // t.getRequestURI().getRawQuery().contains("v2") : false;
7576 // boolean v2 = true;
7677
7778 // Get the replay as a byte[]
@@ -87,12 +88,12 @@ public void handle(HttpExchange t) throws IOException {
8788
8889 if (replayUrl .toString ().endsWith (".bz2" )) {
8990 // Write byte[] to bunzip, get back decompressed byte[]
90- Process bz = new ProcessBuilder (new String [] {"bunzip2" }).start ();
91+ Process bz = new ProcessBuilder (new String [] { "bunzip2" }).start ();
9192
9293 // Start separate thread so we can consume output while sending input
9394 Thread thread = new Thread (() -> {
9495 try {
95- copy ( new ByteArrayInputStream ( bzIn ), bz .getOutputStream ());
96+ bz .getOutputStream (). write ( bzIn );
9697 bz .getOutputStream ().close ();
9798 } catch (IOException ex ) {
9899 ex .printStackTrace ();
@@ -119,57 +120,62 @@ public void handle(HttpExchange t) throws IOException {
119120 t .getResponseBody ().close ();
120121 }
121122
122- // String cmd = String.format("""
123- // curl --max-time 145 --fail -L %s | %s | curl -X POST -T - "localhost:5600%s" %s
124- // """,
125- // replayUrl,
126- // replayUrl.toString().endsWith(".bz2") ? "bunzip2" : "cat",
127- // v2 ? "?blob" : "",
128- // v2 ? "" : " | node processors/createParsedDataBlob.mjs"
129- // );
130- // System.err.println(cmd);
131- // // Download, unzip, parse, aggregate
132- // Process proc = new ProcessBuilder(new String[] {"bash", "-c", cmd})
133- // .start();
134- // ByteArrayOutputStream output = new ByteArrayOutputStream();
135- // ByteArrayOutputStream error = new ByteArrayOutputStream();
136- // copy(proc.getInputStream(), output);
137- // // Write error to console
138- // copy(proc.getErrorStream(), error);
139- // System.err.println(error.toString());
140- // int exitCode = proc.waitFor();
141- // if (exitCode != 0) {
142- // // We can send 200 status here and no response if expected error (read the error string)
143- // // Maybe we can pass the specific error info in the response headers
144- // int status = 500;
145- // if (error.toString().contains("curl: (28) Operation timed out")) {
146- // // Parse took too long, maybe China replay?
147- // status = 200;
148- // }
149- // if (error.toString().contains("curl: (22) The requested URL returned error: 502")) {
150- // // Google-Edge-Cache: origin retries exhausted Error: 2010
151- // // Server error, don't retry
152- // status = 200;
153- // }
154- // if (error.toString().contains("bunzip2: Data integrity error when decompressing")) {
155- // // Corrupted replay, don't retry
156- // status = 200;
157- // }
158- // if (error.toString().contains("bunzip2: Compressed file ends unexpectedly")) {
159- // // Corrupted replay, don't retry
160- // status = 200;
161- // }
162- // if (error.toString().contains("bunzip2: (stdin) is not a bzip2 file.")) {
163- // // Tried to unzip a non-bz2 file
164- // status = 200;
165- // }
166- // t.sendResponseHeaders(status, 0);
167- // t.getResponseBody().close();
168- // } else {
169- // t.sendResponseHeaders(200, output.size());
170- // output.writeTo(t.getResponseBody());
171- // t.getResponseBody().close();
172- // }
123+ // String cmd = String.format("""
124+ // curl --max-time 145 --fail -L %s | %s | curl -X POST -T - "localhost:5600%s"
125+ // %s
126+ // """,
127+ // replayUrl,
128+ // replayUrl.toString().endsWith(".bz2") ? "bunzip2" : "cat",
129+ // v2 ? "?blob" : "",
130+ // v2 ? "" : " | node processors/createParsedDataBlob.mjs"
131+ // );
132+ // System.err.println(cmd);
133+ // // Download, unzip, parse, aggregate
134+ // Process proc = new ProcessBuilder(new String[] {"bash", "-c", cmd})
135+ // .start();
136+ // ByteArrayOutputStream output = new ByteArrayOutputStream();
137+ // ByteArrayOutputStream error = new ByteArrayOutputStream();
138+ // copy(proc.getInputStream(), output);
139+ // // Write error to console
140+ // copy(proc.getErrorStream(), error);
141+ // System.err.println(error.toString());
142+ // int exitCode = proc.waitFor();
143+ // if (exitCode != 0) {
144+ // // We can send 200 status here and no response if expected error (read the
145+ // error string)
146+ // // Maybe we can pass the specific error info in the response headers
147+ // int status = 500;
148+ // if (error.toString().contains("curl: (28) Operation timed out")) {
149+ // // Parse took too long, maybe China replay?
150+ // status = 200;
151+ // }
152+ // if (error.toString().contains("curl: (22) The requested URL returned error:
153+ // 502")) {
154+ // // Google-Edge-Cache: origin retries exhausted Error: 2010
155+ // // Server error, don't retry
156+ // status = 200;
157+ // }
158+ // if (error.toString().contains("bunzip2: Data integrity error when
159+ // decompressing")) {
160+ // // Corrupted replay, don't retry
161+ // status = 200;
162+ // }
163+ // if (error.toString().contains("bunzip2: Compressed file ends unexpectedly"))
164+ // {
165+ // // Corrupted replay, don't retry
166+ // status = 200;
167+ // }
168+ // if (error.toString().contains("bunzip2: (stdin) is not a bzip2 file.")) {
169+ // // Tried to unzip a non-bz2 file
170+ // status = 200;
171+ // }
172+ // t.sendResponseHeaders(status, 0);
173+ // t.getResponseBody().close();
174+ // } else {
175+ // t.sendResponseHeaders(200, output.size());
176+ // output.writeTo(t.getResponseBody());
177+ // t.getResponseBody().close();
178+ // }
173179 }
174180 }
175181
@@ -179,33 +185,15 @@ public static Map<String, String> splitQuery(URI uri) throws UnsupportedEncoding
179185 String [] pairs = query .split ("&" );
180186 for (String pair : pairs ) {
181187 int idx = pair .indexOf ("=" );
182- query_pairs .put (URLDecoder .decode (pair .substring (0 , idx ), "UTF-8" ), URLDecoder .decode (pair .substring (idx + 1 ), "UTF-8" ));
188+ query_pairs .put (URLDecoder .decode (pair .substring (0 , idx ), "UTF-8" ),
189+ URLDecoder .decode (pair .substring (idx + 1 ), "UTF-8" ));
183190 }
184191 return query_pairs ;
185192 }
186-
187- // buffer size used for reading and writing
188- private static final int BUFFER_SIZE = 8192 ;
189-
190- /**
191- * Reads all bytes from an input stream and writes them to an output stream.
192- */
193- private static long copy (InputStream source , OutputStream sink ) throws IOException {
194- long nread = 0L ;
195- byte [] buf = new byte [BUFFER_SIZE ];
196- int n ;
197- while ((n = source .read (buf )) > 0 ) {
198- sink .write (buf , 0 , n );
199- nread += n ;
200- }
201- return nread ;
202- }
203193}
204194
205- class RegisterTask extends TimerTask
206- {
207- public void run ()
208- {
195+ class RegisterTask extends TimerTask {
196+ public void run () {
209197 if (System .getenv ().containsKey ("SERVICE_REGISTRY_HOST" )) {
210198 try {
211199 String ip = "" ;
@@ -217,16 +205,18 @@ public void run()
217205 ip = RegisterTask .shellExec ("hostname -i" );
218206 }
219207 long nproc = Runtime .getRuntime ().availableProcessors ();
220- String postCmd = "curl -X POST --max-time 60 -L " + System .getenv ().get ("SERVICE_REGISTRY_HOST" ) + "/register/parser/" + ip + ":5600" + "?size=" + nproc + "&key=" + System .getenv ().get ("RETRIEVER_SECRET" );
208+ String postCmd = "curl -X POST --max-time 60 -L " + System .getenv ().get ("SERVICE_REGISTRY_HOST" )
209+ + "/register/parser/" + ip + ":5600" + "?size=" + nproc + "&key="
210+ + System .getenv ().get ("RETRIEVER_SECRET" );
221211 System .err .println (postCmd );
222212 RegisterTask .shellExec (postCmd );
223213 } catch (Exception e ) {
224214 System .err .println (e );
225215 }
226216 }
227- }
217+ }
228218
229- public static String shellExec (String cmdCommand ) throws IOException {
219+ public static String shellExec (String cmdCommand ) throws IOException {
230220 final StringBuilder stringBuilder = new StringBuilder ();
231221 String [] cmdArr = cmdCommand .split (" " );
232222 final Process process = Runtime .getRuntime ().exec (cmdArr , null , null );
@@ -237,5 +227,4 @@ public static String shellExec(String cmdCommand) throws IOException {
237227 }
238228 return stringBuilder .toString ();
239229 }
240- }
241-
230+ }
0 commit comments