Skip to content

Commit 73eea4c

Browse files
committed
More operators, improve coverage
1 parent beca0e8 commit 73eea4c

14 files changed

+1076
-26
lines changed

README.md

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
ixjava
22
=================
33

4+
<a href='https://travis-ci.org/akarnokd/ixjava/builds'><img src='https://travis-ci.org/akarnokd/ixjava.svg?branch=1.x'></a>
5+
[![codecov.io](http://codecov.io/github/akarnokd/ixjava/coverage.svg?branch=1.x)](http://codecov.io/github/akarnokd/ixjava?branch=1.x)
6+
[![Maven Central](https://maven-badges.herokuapp.com/maven-central/com.github.akarnokd/ixjava/badge.svg)](https://maven-badges.herokuapp.com/maven-central/com.github.akarnokd/ixjava)
7+
48
Interactive Extensions for Java, the dual of RxJava. Originally implemented in the Reactive4Java framework, now converted to work with RxJava.
59

610
The aim is to provide pull-based datastream support with the same naming as in RxJava mainly for the pre-Java-8 world. The Stream API in Java 8 is not exactly the same thing because Streams can be only consumed once while Iterables can be consumed many times. Google Guava features a lot of Iterable operators but without method chaining support.
@@ -10,11 +14,6 @@ and interactive dataflows.**
1014

1115
# Releases
1216

13-
<a href='https://travis-ci.org/akarnokd/ixjava/builds'><img src='https://travis-ci.org/akarnokd/ixjava.svg?branch=1.x'></a>
14-
[![codecov.io](http://codecov.io/github/akarnokd/ixjava/coverage.svg?branch=1.x)](http://codecov.io/github/akarnokd/ixjava?branch=1.x)
15-
[![Maven Central](https://maven-badges.herokuapp.com/maven-central/com.github.akarnokd/ixjava/badge.svg)](https://maven-badges.herokuapp.com/maven-central/com.github.akarnokd/ixjava)
16-
17-
1817
**gradle**
1918

2019
```

src/main/java/ix/Ix.java

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -170,9 +170,13 @@ public static <T> Ix<T> repeatValue(T value, long count) {
170170
}
171171

172172
public static <T> Ix<T> repeatValue(T value, Pred0 stopPredicate) {
173-
return new IxRepeatPredicate<T>(value, stopPredicate);
173+
return repeatValue(value, Long.MAX_VALUE, stopPredicate);
174174
}
175-
175+
176+
public static <T> Ix<T> repeatValue(T value, long count, Pred0 stopPredicate) {
177+
return new IxRepeatPredicate<T>(value, count, stopPredicate);
178+
}
179+
176180
public static <T, R> Ix<R> forloop(T seed, Pred<? super T> condition,
177181
Func1<? super T, ? extends T> next,
178182
Func1<? super T, ? extends R> selector) {
@@ -395,18 +399,21 @@ public final Ix<T> takeUntil(Pred<? super T> stopPredicate) {
395399
}
396400

397401
public final Ix<List<T>> buffer(int size) {
398-
// TODO implement
399-
throw new UnsupportedOperationException();
402+
return new IxBuffer<T>(this, size);
400403
}
401404

402405
public final Ix<List<T>> buffer(int size, int skip) {
403-
// TODO implement
404-
throw new UnsupportedOperationException();
406+
if (size == skip) {
407+
return buffer(size);
408+
}
409+
if (size < skip) {
410+
return new IxBufferSkip<T>(this, size, skip);
411+
}
412+
return new IxBufferOverlap<T>(this, size, skip);
405413
}
406414

407415
public final <K> Ix<GroupedIx<K, T>> groupBy(Func1<? super T, ? extends K> keySelector) {
408-
// TODO implement
409-
throw new UnsupportedOperationException();
416+
return groupBy(keySelector, IdentityHelper.<T>instance());
410417
}
411418

412419
public final <K, V> Ix<GroupedIx<K, V>> groupBy(Func1<? super T, ? extends K> keySelector,
@@ -416,23 +423,19 @@ public final <K, V> Ix<GroupedIx<K, V>> groupBy(Func1<? super T, ? extends K> ke
416423
}
417424

418425
public final Ix<T> repeat() {
419-
// TODO implement
420-
throw new UnsupportedOperationException();
426+
return concat(repeatValue(this));
421427
}
422428

423429
public final Ix<T> repeat(long times) {
424-
// TODO implement
425-
throw new UnsupportedOperationException();
430+
return concat(repeatValue(this, times));
426431
}
427432

428433
public final Ix<T> repeat(Pred0 predicate) {
429-
// TODO implement
430-
throw new UnsupportedOperationException();
434+
return concat(repeatValue(this, predicate));
431435
}
432436

433437
public final Ix<T> repeat(long times, Pred0 predicate) {
434-
// TODO implement
435-
throw new UnsupportedOperationException();
438+
return concat(repeatValue(this, times, predicate));
436439
}
437440

438441
public final Ix<T> publish() {
@@ -659,6 +662,9 @@ public final void print(CharSequence separator, int charsPerLine) {
659662
System.out.println();;
660663
System.out.print(s);
661664
len = s.length();
665+
} else {
666+
System.out.print(s);
667+
len += s.length();
662668
}
663669
}
664670

src/main/java/ix/IxBuffer.java

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Copyright 2011-2016 David Karnok
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package ix;
18+
19+
import java.util.*;
20+
21+
final class IxBuffer<T> extends IxSource<T, List<T>> {
22+
23+
final int size;
24+
25+
public IxBuffer(Iterable<T> source, int size) {
26+
super(source);
27+
this.size = size;
28+
}
29+
30+
@Override
31+
public Iterator<List<T>> iterator() {
32+
return new BufferIterator<T>(source.iterator(), size);
33+
}
34+
35+
static final class BufferIterator<T> extends IxSourceIterator<T, List<T>> {
36+
final int size;
37+
38+
public BufferIterator(Iterator<T> it, int size) {
39+
super(it);
40+
this.size = size;
41+
}
42+
43+
@Override
44+
protected boolean moveNext() {
45+
int s = size;
46+
47+
Iterator<T> it = this.it;
48+
49+
List<T> list = new ArrayList<T>();
50+
51+
while (s != 0 && it.hasNext()) {
52+
list.add(it.next());
53+
s--;
54+
}
55+
56+
if (list.isEmpty()) {
57+
done = true;
58+
return false;
59+
}
60+
value = list;
61+
hasValue = true;
62+
if (s != 0) {
63+
done = true;
64+
}
65+
return true;
66+
}
67+
68+
69+
}
70+
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Copyright 2011-2016 David Karnok
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package ix;
18+
19+
import java.util.*;
20+
21+
import rx.functions.Action2;
22+
23+
final class IxBufferOverlap<T> extends IxSource<T, List<T>> {
24+
25+
final int size;
26+
27+
final int skip;
28+
29+
public IxBufferOverlap(Iterable<T> source, int size, int skip) {
30+
super(source);
31+
this.size = size;
32+
this.skip = skip;
33+
}
34+
35+
@Override
36+
public Iterator<List<T>> iterator() {
37+
return new BufferIterator<T>(source.iterator(), size, skip);
38+
}
39+
40+
static final class BufferIterator<T> extends IxSourceQueuedIterator<T, List<T>, List<T>>
41+
implements Action2<List<T>, T> {
42+
final int size;
43+
44+
final int skip;
45+
46+
int index;
47+
48+
public BufferIterator(Iterator<T> it, int size, int skip) {
49+
super(it);
50+
this.size = size;
51+
this.skip = skip;
52+
}
53+
54+
@SuppressWarnings("unchecked")
55+
@Override
56+
protected boolean moveNext() {
57+
Iterator<T> it = this.it;
58+
59+
int s = size;
60+
int k = skip;
61+
62+
int i = index;
63+
64+
while (it.hasNext()) {
65+
if (i == 0) {
66+
offer(new ArrayList<T>());
67+
}
68+
T v = it.next();
69+
foreach(this, v);
70+
if (++i == k) {
71+
i = 0;
72+
}
73+
74+
if (((List<T>)peek()).size() == s) {
75+
break;
76+
}
77+
}
78+
index = i;
79+
80+
List<T> list = fromObject(poll());
81+
if (list == null) {
82+
done = true;
83+
return false;
84+
}
85+
86+
value = list;
87+
hasValue = true;
88+
return true;
89+
}
90+
91+
@Override
92+
public void call(List<T> t1, T t2) {
93+
t1.add(t2);
94+
}
95+
}
96+
}

src/main/java/ix/IxBufferSkip.java

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Copyright 2011-2016 David Karnok
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package ix;
18+
19+
import java.util.*;
20+
21+
final class IxBufferSkip<T> extends IxSource<T, List<T>> {
22+
23+
final int size;
24+
25+
final int skip;
26+
27+
public IxBufferSkip(Iterable<T> source, int size, int skip) {
28+
super(source);
29+
this.size = size;
30+
this.skip = skip;
31+
}
32+
33+
@Override
34+
public Iterator<List<T>> iterator() {
35+
return new BufferSkipIterator<T>(source.iterator(), size, skip);
36+
}
37+
38+
static final class BufferSkipIterator<T> extends IxSourceIterator<T, List<T>> {
39+
final int size;
40+
41+
final int skip;
42+
43+
boolean once;
44+
45+
public BufferSkipIterator(Iterator<T> it, int size, int skip) {
46+
super(it);
47+
this.size = size;
48+
this.skip = skip;
49+
}
50+
51+
@Override
52+
protected boolean moveNext() {
53+
Iterator<T> it = this.it;
54+
55+
int s = size;
56+
57+
if (once) {
58+
int k = skip - s;
59+
while (k != 0 && it.hasNext()) {
60+
it.next();
61+
k--;
62+
}
63+
64+
} else {
65+
once = true;
66+
}
67+
68+
List<T> list = new ArrayList<T>();
69+
70+
while (s != 0 && it.hasNext()) {
71+
list.add(it.next());
72+
s--;
73+
}
74+
75+
76+
if (list.isEmpty()) {
77+
done = true;
78+
return false;
79+
}
80+
value = list;
81+
hasValue = true;
82+
if (s != 0) {
83+
done = true;
84+
}
85+
return true;
86+
}
87+
88+
89+
}
90+
}

src/main/java/ix/IxDoOn.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,19 @@
1+
/*
2+
* Copyright 2011-2016 David Karnok
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
117
package ix;
218

319
import java.util.Iterator;

0 commit comments

Comments
 (0)