Skip to content

Commit e35198a

Browse files
committed
migrate to new API
1 parent 45bbdec commit e35198a

File tree

21 files changed

+930
-280
lines changed

21 files changed

+930
-280
lines changed

common/src/main/java/org/apache/flink/training/examples/ridecount/RideCountExample.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,12 @@
2323
import org.apache.flink.streaming.api.datastream.DataStream;
2424
import org.apache.flink.streaming.api.datastream.KeyedStream;
2525
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
26+
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
2627
import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
2728
import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator;
2829

30+
import java.time.Duration;
31+
2932
/**
3033
* Example that counts the rides for each driver.
3134
*
@@ -47,7 +50,18 @@ public static void main(String[] args) throws Exception {
4750
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
4851

4952
// start the data generator
50-
DataStream<TaxiRide> rides = env.addSource(new TaxiRideGenerator());
53+
DataStream<TaxiRide> rides =
54+
env.fromSource(
55+
new TaxiRideGenerator(),
56+
new BoundedOutOfOrdernessTimestampExtractor<TaxiRide>(
57+
Duration.ofSeconds(10)) {
58+
59+
@Override
60+
public long extractTimestamp(TaxiRide taxiRide) {
61+
return taxiRide.getEventTimeMillis();
62+
}
63+
},
64+
"taxi ride");
5165

5266
// map each ride to a tuple of (driverId, 1)
5367
DataStream<Tuple2<Long, Long>> tuples =

common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiFareGenerator.java

Lines changed: 46 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -18,53 +18,69 @@
1818

1919
package org.apache.flink.training.exercises.common.sources;
2020

21-
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
21+
import org.apache.flink.api.common.typeinfo.TypeInformation;
22+
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
23+
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
24+
import org.apache.flink.connector.datagen.source.GeneratorFunction;
2225
import org.apache.flink.training.exercises.common.datatypes.TaxiFare;
23-
import org.apache.flink.training.exercises.common.utils.DataGenerator;
2426

25-
import java.time.Duration;
2627
import java.time.Instant;
28+
import java.util.concurrent.ConcurrentLinkedDeque;
29+
import java.util.concurrent.atomic.AtomicLong;
2730

2831
/**
2932
* This SourceFunction generates a data stream of TaxiFare records.
3033
*
3134
* <p>The stream is generated in order.
3235
*/
33-
public class TaxiFareGenerator implements SourceFunction<TaxiFare> {
36+
public class TaxiFareGenerator extends DataGeneratorSource<TaxiFare> {
3437

35-
private volatile boolean running = true;
36-
private Instant limitingTimestamp = Instant.MAX;
37-
38-
/** Create a bounded TaxiFareGenerator that runs only for the specified duration. */
39-
public static TaxiFareGenerator runFor(Duration duration) {
40-
TaxiFareGenerator generator = new TaxiFareGenerator();
41-
generator.limitingTimestamp = DataGenerator.BEGINNING.plus(duration);
42-
return generator;
43-
}
44-
45-
@Override
46-
public void run(SourceContext<TaxiFare> ctx) throws Exception {
47-
48-
long id = 1;
49-
50-
while (running) {
51-
TaxiFare fare = new TaxiFare(id);
38+
private static Instant limitingTimestamp = Instant.MAX;
5239

40+
/**
41+
* build taxi fare deque.
42+
*
43+
* @return taxiFareDeque
44+
*/
45+
public static ConcurrentLinkedDeque<TaxiFare> buildTaxiFareDeque() {
46+
ConcurrentLinkedDeque<TaxiFare> taxiFareDeque = new ConcurrentLinkedDeque<>();
47+
for (int i = 1; ; i++) {
48+
TaxiFare fare = new TaxiFare(i);
5349
// don't emit events that exceed the specified limit
5450
if (fare.startTime.compareTo(limitingTimestamp) >= 0) {
5551
break;
5652
}
57-
58-
++id;
59-
ctx.collect(fare);
60-
61-
// match our event production rate to that of the TaxiRideGenerator
62-
Thread.sleep(TaxiRideGenerator.SLEEP_MILLIS_PER_EVENT);
53+
taxiFareDeque.push(fare);
6354
}
55+
return taxiFareDeque;
56+
}
57+
58+
/** TaxiFareGenerator. */
59+
public TaxiFareGenerator() {
60+
this(buildTaxiFareDeque());
6461
}
6562

66-
@Override
67-
public void cancel() {
68-
running = false;
63+
/**
64+
* TaxiFareGenerator.
65+
*
66+
* @param taxiFareDeque taxiFareDeque
67+
*/
68+
public TaxiFareGenerator(ConcurrentLinkedDeque<TaxiFare> taxiFareDeque) {
69+
super(
70+
new GeneratorFunction<Long, TaxiFare>() {
71+
72+
private final AtomicLong id = new AtomicLong(0);
73+
private final AtomicLong maxStartTime = new AtomicLong(0);
74+
75+
@Override
76+
public TaxiFare map(Long value) throws Exception {
77+
synchronized (this) {
78+
return taxiFareDeque.poll();
79+
}
80+
}
81+
},
82+
taxiFareDeque.size(),
83+
RateLimiterStrategy.perSecond(200),
84+
TypeInformation.of(TaxiFare.class));
6985
}
7086
}

common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiRideGenerator.java

Lines changed: 56 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -18,69 +18,82 @@
1818

1919
package org.apache.flink.training.exercises.common.sources;
2020

21-
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
21+
import org.apache.flink.api.common.typeinfo.TypeInformation;
22+
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
23+
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
24+
import org.apache.flink.connector.datagen.source.GeneratorFunction;
2225
import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
2326

2427
import java.util.ArrayList;
2528
import java.util.List;
2629
import java.util.PriorityQueue;
2730
import java.util.Random;
31+
import java.util.concurrent.ConcurrentLinkedDeque;
32+
import java.util.concurrent.atomic.AtomicLong;
2833

2934
/**
3035
* This SourceFunction generates a data stream of TaxiRide records.
3136
*
3237
* <p>The stream is produced out-of-order.
3338
*/
34-
public class TaxiRideGenerator implements SourceFunction<TaxiRide> {
39+
public class TaxiRideGenerator extends DataGeneratorSource<TaxiRide> {
3540

3641
public static final int SLEEP_MILLIS_PER_EVENT = 10;
3742
private static final int BATCH_SIZE = 5;
38-
private volatile boolean running = true;
3943

40-
@Override
41-
public void run(SourceContext<TaxiRide> ctx) throws Exception {
44+
/** TaxiRideGenerator. */
45+
public TaxiRideGenerator() {
46+
super(
47+
new GeneratorFunction<Long, TaxiRide>() {
4248

43-
PriorityQueue<TaxiRide> endEventQ = new PriorityQueue<>(100);
44-
long id = 0;
45-
long maxStartTime = 0;
49+
private final AtomicLong id = new AtomicLong(0);
50+
private final AtomicLong maxStartTime = new AtomicLong(0);
51+
private final PriorityQueue<TaxiRide> endEventQ = new PriorityQueue<>(100);
52+
private final ConcurrentLinkedDeque<TaxiRide> deque =
53+
new ConcurrentLinkedDeque<>();
4654

47-
while (running) {
55+
@Override
56+
public TaxiRide map(Long value) throws Exception {
57+
synchronized (this) {
58+
if (deque.isEmpty()) {
59+
long idLocal = id.get();
60+
// generate a batch of START events
61+
List<TaxiRide> startEvents = new ArrayList<TaxiRide>(BATCH_SIZE);
62+
for (int i = 1; i <= BATCH_SIZE; i++) {
63+
TaxiRide ride = new TaxiRide(idLocal + i, true);
64+
startEvents.add(ride);
65+
// the start times may be in order, but let's not assume that
66+
maxStartTime.set(
67+
Math.max(
68+
maxStartTime.get(), ride.getEventTimeMillis()));
69+
}
70+
// enqueue the corresponding END events
71+
for (int i = 1; i <= BATCH_SIZE; i++) {
72+
endEventQ.add(new TaxiRide(idLocal + i, false));
73+
}
4874

49-
// generate a batch of START events
50-
List<TaxiRide> startEvents = new ArrayList<TaxiRide>(BATCH_SIZE);
51-
for (int i = 1; i <= BATCH_SIZE; i++) {
52-
TaxiRide ride = new TaxiRide(id + i, true);
53-
startEvents.add(ride);
54-
// the start times may be in order, but let's not assume that
55-
maxStartTime = Math.max(maxStartTime, ride.getEventTimeMillis());
56-
}
75+
// release the END events coming before the end of this new batch
76+
// (this allows a few END events to precede their matching START
77+
// event)
78+
while (endEventQ.peek().getEventTimeMillis()
79+
<= maxStartTime.get()) {
80+
TaxiRide ride = endEventQ.poll();
81+
deque.push(ride);
82+
}
5783

58-
// enqueue the corresponding END events
59-
for (int i = 1; i <= BATCH_SIZE; i++) {
60-
endEventQ.add(new TaxiRide(id + i, false));
61-
}
84+
// then emit the new START events (out-of-order)
85+
java.util.Collections.shuffle(startEvents, new Random(id.get()));
86+
startEvents.iterator().forEachRemaining(deque::push);
6287

63-
// release the END events coming before the end of this new batch
64-
// (this allows a few END events to precede their matching START event)
65-
while (endEventQ.peek().getEventTimeMillis() <= maxStartTime) {
66-
TaxiRide ride = endEventQ.poll();
67-
ctx.collect(ride);
68-
}
69-
70-
// then emit the new START events (out-of-order)
71-
java.util.Collections.shuffle(startEvents, new Random(id));
72-
startEvents.iterator().forEachRemaining(r -> ctx.collect(r));
73-
74-
// prepare for the next batch
75-
id += BATCH_SIZE;
76-
77-
// don't go too fast
78-
Thread.sleep(BATCH_SIZE * SLEEP_MILLIS_PER_EVENT);
79-
}
80-
}
81-
82-
@Override
83-
public void cancel() {
84-
running = false;
88+
// prepare for the next batch
89+
id.set(id.get() + BATCH_SIZE);
90+
}
91+
return deque.poll();
92+
}
93+
}
94+
},
95+
Long.MAX_VALUE,
96+
RateLimiterStrategy.perSecond(200),
97+
TypeInformation.of(TaxiRide.class));
8598
}
8699
}

common/src/test/java/org/apache/flink/training/exercises/testing/ComposedPipeline.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@
1919
package org.apache.flink.training.exercises.testing;
2020

2121
import org.apache.flink.api.common.JobExecutionResult;
22-
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
22+
import org.apache.flink.api.connector.source.Source;
2323
import org.apache.flink.training.exercises.common.utils.MissingSolutionException;
2424

25+
import java.util.function.Supplier;
26+
2527
/**
2628
* This allows the tests to be run against both the exercises and the solutions.
2729
*
@@ -39,16 +41,16 @@ public ComposedPipeline(
3941
}
4042

4143
@Override
42-
public JobExecutionResult execute(SourceFunction<IN> source, TestSink<OUT> sink)
44+
public JobExecutionResult execute(Supplier<Source<IN, ?, ?>> sourceSupplier, TestSink<OUT> sink)
4345
throws Exception {
4446

4547
JobExecutionResult result;
4648

4749
try {
48-
result = exercise.execute(source, sink);
50+
result = exercise.execute(sourceSupplier, sink);
4951
} catch (Exception e) {
5052
if (MissingSolutionException.ultimateCauseIsMissingSolution(e)) {
51-
result = solution.execute(source, sink);
53+
result = solution.execute(sourceSupplier, sink);
5254
} else {
5355
throw e;
5456
}

common/src/test/java/org/apache/flink/training/exercises/testing/ComposedTwoInputPipeline.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
package org.apache.flink.training.exercises.testing;
2020

2121
import org.apache.flink.api.common.JobExecutionResult;
22-
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
22+
import org.apache.flink.api.connector.source.Source;
2323
import org.apache.flink.training.exercises.common.utils.MissingSolutionException;
2424

2525
/**
@@ -43,7 +43,7 @@ public ComposedTwoInputPipeline(
4343

4444
@Override
4545
public JobExecutionResult execute(
46-
SourceFunction<IN1> source1, SourceFunction<IN2> source2, TestSink<OUT> sink)
46+
Source<IN1, ?, ?> source1, Source<IN2, ?, ?> source2, TestSink<OUT> sink)
4747
throws Exception {
4848

4949
JobExecutionResult result;

common/src/test/java/org/apache/flink/training/exercises/testing/ExecutablePipeline.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,11 @@
1919
package org.apache.flink.training.exercises.testing;
2020

2121
import org.apache.flink.api.common.JobExecutionResult;
22-
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
22+
import org.apache.flink.api.connector.source.Source;
23+
24+
import java.util.function.Supplier;
2325

2426
public interface ExecutablePipeline<IN, OUT> {
25-
JobExecutionResult execute(SourceFunction<IN> source, TestSink<OUT> sink) throws Exception;
27+
JobExecutionResult execute(Supplier<Source<IN, ?, ?>> sourceSupplier, TestSink<OUT> sink)
28+
throws Exception;
2629
}

common/src/test/java/org/apache/flink/training/exercises/testing/ExecutableTwoInputPipeline.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@
1919
package org.apache.flink.training.exercises.testing;
2020

2121
import org.apache.flink.api.common.JobExecutionResult;
22-
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
22+
import org.apache.flink.api.connector.source.Source;
2323

2424
public interface ExecutableTwoInputPipeline<IN1, IN2, OUT> {
2525
JobExecutionResult execute(
26-
SourceFunction<IN1> source1, SourceFunction<IN2> source2, TestSink<OUT> sink)
26+
Source<IN1, ?, ?> source1, Source<IN2, ?, ?> source2, TestSink<OUT> sink)
2727
throws Exception;
2828
}

0 commit comments

Comments
 (0)