Skip to content

Commit 4067a8f

Browse files
authored
Merge pull request #718 from makarandhinge/fix/issue-418-refactor-benchmark-jobs
Refactor benchmark jobs and remove hardcoded paths (Fixes #418)
2 parents 05f7a82 + 60ac8ef commit 4067a8f

File tree

5 files changed

+181
-15
lines changed

5 files changed

+181
-15
lines changed

wayang-benchmark/src/main/java/org/apache/wayang/apps/grep/Grep.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,11 +98,19 @@ public static void wayangFlink(String input, String output){
9898
}
9999

100100
public static void main(String... args) throws Exception {
101+
if (args.length < 3) {
102+
System.err.println("Usage: <size> <platform> <input-file> [output-file]");
103+
System.err.println(" size: dataset size indicator");
104+
System.err.println(" platform: so|pure-java|pure-spark|pure-flink|wayang-java|wayang-spark|wayang-flink");
105+
System.err.println(" input-file: full path to input file");
106+
System.err.println(" output-file: full path to output file (optional, defaults to <input-file>.out)");
107+
System.exit(1);
108+
}
109+
101110
int size = Integer.parseInt(args[0]);
102111
String platform = args[1];
103-
104-
String input = args[2]+"/python/src/pywy/tests/resources/10e"+size+"MB.input";
105-
String output = args[2]+"/lala.out";
112+
String input = args[2];
113+
String output = args.length > 3 ? args[3] : args[2] + ".out";
106114

107115
String[] command = {"rm", "-r", output};
108116
Process process = Runtime.getRuntime().exec(command);
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.wayang.apps.tpch;
20+
21+
import org.apache.wayang.api.JavaPlanBuilder;
22+
import org.apache.wayang.apps.tpch.data.LineItemTuple;
23+
import org.apache.wayang.apps.tpch.data.q1.GroupKey;
24+
import org.apache.wayang.apps.tpch.data.q1.ReturnTuple;
25+
import org.apache.wayang.core.api.Configuration;
26+
import org.apache.wayang.core.api.WayangContext;
27+
import org.apache.wayang.java.Java;
28+
import org.apache.wayang.spark.Spark;
29+
30+
import java.util.Collection;
31+
32+
/**
33+
* TPC-H Query 1 implementation using JavaPlanBuilder API.
34+
* This is the modern, fluent API version. Compare with {@link TPCHQ1WithJavaNative}
35+
* to see the differences between the native operator API and the JavaPlanBuilder API.
36+
*/
37+
public class TPCHQ1WithPlanBuilder {
38+
39+
/**
40+
* Executes TPC-H Query 1, which is as follows:
41+
* <pre>
42+
* select
43+
* l_returnflag,
44+
* l_linestatus,
45+
* sum(l_quantity) as sum_qty,
46+
* sum(l_extendedprice) as sum_base_price,
47+
* sum(l_extendedprice*(1-l_discount)) as sum_disc_price,
48+
* sum(l_extendedprice*(1-l_discount)*(1+l_tax)) as sum_charge,
49+
* avg(l_quantity) as avg_qty,
50+
* avg(l_extendedprice) as avg_price,
51+
* avg(l_discount) as avg_disc,
52+
* count(*) as count_order
53+
* from
54+
* lineitem
55+
* where
56+
* l_shipdate <= date '1998-12-01' - interval '[DELTA]' day (3)
57+
* group by
58+
* l_returnflag,
59+
* l_linestatus
60+
* order by
61+
* l_returnflag,
62+
* l_linestatus;
63+
* </pre>
64+
*
65+
* @param wayangContext the Wayang context
66+
* @param lineItemUrl URL to the lineitem CSV file
67+
* @param delta the {@code [DELTA]} parameter
68+
* @return Collection of query results
69+
*/
70+
private static Collection<ReturnTuple> executeQ1(WayangContext wayangContext, String lineItemUrl, final int delta) {
71+
final int maxShipdate = LineItemTuple.Parser.parseDate("1998-12-01") - delta;
72+
73+
JavaPlanBuilder planBuilder = new JavaPlanBuilder(wayangContext)
74+
.withJobName("TPC-H Q1")
75+
.withUdfJarOf(TPCHQ1WithPlanBuilder.class);
76+
77+
return planBuilder
78+
// Read the lineitem table
79+
.readTextFile(lineItemUrl).withName("Load lineitem")
80+
81+
// Parse the rows
82+
.map(line -> new LineItemTuple.Parser().parse(line))
83+
.withName("Parse lineitem")
84+
85+
// Filter by shipdate
86+
.filter(tuple -> tuple.L_SHIPDATE <= maxShipdate)
87+
.withName("Filter by shipdate")
88+
89+
// Project the queried attributes
90+
.map(lineItemTuple -> new ReturnTuple(
91+
lineItemTuple.L_RETURNFLAG,
92+
lineItemTuple.L_LINESTATUS,
93+
lineItemTuple.L_QUANTITY,
94+
lineItemTuple.L_EXTENDEDPRICE,
95+
lineItemTuple.L_EXTENDEDPRICE * (1 - lineItemTuple.L_DISCOUNT),
96+
lineItemTuple.L_EXTENDEDPRICE * (1 - lineItemTuple.L_DISCOUNT) * (1 + lineItemTuple.L_TAX),
97+
lineItemTuple.L_QUANTITY,
98+
lineItemTuple.L_EXTENDEDPRICE,
99+
lineItemTuple.L_DISCOUNT,
100+
1))
101+
.withName("Project attributes")
102+
103+
// Aggregation: group by returnflag and linestatus
104+
.reduceByKey(
105+
returnTuple -> new GroupKey(returnTuple.L_RETURNFLAG, returnTuple.L_LINESTATUS),
106+
(t1, t2) -> {
107+
t1.SUM_QTY += t2.SUM_QTY;
108+
t1.SUM_BASE_PRICE += t2.SUM_BASE_PRICE;
109+
t1.SUM_DISC_PRICE += t2.SUM_DISC_PRICE;
110+
t1.SUM_CHARGE += t2.SUM_CHARGE;
111+
t1.AVG_QTY += t2.AVG_QTY;
112+
t1.AVG_PRICE += t2.AVG_PRICE;
113+
t1.AVG_DISC += t2.AVG_DISC;
114+
t1.COUNT_ORDER += t2.COUNT_ORDER;
115+
return t1;
116+
})
117+
.withName("Aggregate")
118+
119+
// Finalize AVG operations
120+
.map(t -> {
121+
t.AVG_QTY /= t.COUNT_ORDER;
122+
t.AVG_PRICE /= t.COUNT_ORDER;
123+
t.AVG_DISC /= t.COUNT_ORDER;
124+
return t;
125+
})
126+
.withName("Finalize aggregation")
127+
128+
// Execute and collect results
129+
.collect();
130+
}
131+
132+
public static void main(String[] args) {
133+
if (args.length == 0) {
134+
System.err.print("Usage: <platform1>[,<platform2>]* <query number> <query parameters>*");
135+
System.exit(1);
136+
}
137+
138+
WayangContext wayangContext = new WayangContext(new Configuration());
139+
for (String platform : args[0].split(",")) {
140+
switch (platform) {
141+
case "java":
142+
wayangContext.register(Java.basicPlugin());
143+
break;
144+
case "spark":
145+
wayangContext.register(Spark.basicPlugin());
146+
break;
147+
default:
148+
System.err.format("Unknown platform: \"%s\"\n", platform);
149+
System.exit(3);
150+
return;
151+
}
152+
}
153+
154+
Collection<ReturnTuple> results;
155+
switch (Integer.parseInt(args[1])) {
156+
case 1:
157+
results = executeQ1(wayangContext, args[2], Integer.parseInt(args[3]));
158+
break;
159+
default:
160+
System.err.println("Unsupported query number.");
161+
System.exit(2);
162+
return;
163+
}
164+
165+
// Print results
166+
results.forEach(System.out::println);
167+
}
168+
}

wayang-benchmark/src/main/java/org/apache/wayang/apps/wordcount/Main.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,24 +18,17 @@
1818

1919
package org.apache.wayang.apps.wordcount;
2020

21+
import org.apache.wayang.api.JavaPlanBuilder;
2122
import org.apache.wayang.basic.data.Tuple2;
2223
import org.apache.wayang.core.api.Configuration;
2324
import org.apache.wayang.core.api.WayangContext;
24-
import org.apache.wayang.core.plan.wayangplan.WayangPlan;
25-
import org.apache.wayang.core.util.ReflectionUtils;
2625
import org.apache.wayang.java.Java;
27-
import org.apache.wayang.java.platform.JavaPlatform;
2826
import org.apache.wayang.spark.Spark;
29-
import org.apache.wayang.api.JavaPlanBuilder;
30-
3127

3228
import java.io.IOException;
3329
import java.net.URISyntaxException;
3430
import java.util.Arrays;
3531
import java.util.Collection;
36-
import java.util.LinkedList;
37-
import java.util.List;
38-
import java.util.OptionalDouble;
3932

4033
public class Main {
4134

wayang-benchmark/src/main/java/org/apache/wayang/apps/wordcount/WordCount.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ public static void main(String[] args){
4242
WayangContext wayangContext = new WayangContext(new Configuration())
4343
.withPlugin(Java.basicPlugin())
4444
.withPlugin(Spark.basicPlugin());
45-
// .withPlugin(Flink.basicPlugin());
4645

4746
JavaPlanBuilder planBuilder = new JavaPlanBuilder(wayangContext)
4847
.withJobName("WordCount")
@@ -55,7 +54,6 @@ public static void main(String[] args){
5554

5655
/* Split each line by non-word characters */
5756
.flatMap(line -> Arrays.asList(line.split("\\W+")))
58-
// .withSelectivity(1, 100, 0.9)
5957
.withName("Split words")
6058

6159
/* Filter empty tokens */

wayang-benchmark/src/main/java/org/apache/wayang/apps/wordcount/WordCountParquet.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
import org.apache.wayang.api.JavaPlanBuilder;
2222
import org.apache.wayang.basic.data.Tuple2;
23-
import org.apache.wayang.basic.operators.ParquetSource;
23+
import org.apache.wayang.core.api.Configuration;
2424
import org.apache.wayang.core.api.WayangContext;
2525
import org.apache.wayang.java.Java;
2626
import org.apache.wayang.spark.Spark;
@@ -90,4 +90,3 @@ public static void main(String[] args){
9090
wordcounts.forEach(wc -> System.out.printf("%dx %s\n", wc.field1, wc.field0));
9191
}
9292
}
93-

0 commit comments

Comments
 (0)