Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,6 @@
public @interface Output {

String value() default "__default__";

boolean iterator() default false;
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,16 @@
*/
package org.talend.sdk.component.api.processor;

import java.util.Iterator;

public interface OutputEmitter<T> {

void emit(T value);

default void setIterator(Iterator<T> iterator) {
};

default Iterator<T> getIterator() {
return null;
};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/**
* Copyright (C) 2006-2026 Talend Inc. - www.talend.com
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.talend.sdk.component.api.processor;

import java.util.Iterator;

public interface OutputIterator<T> {

void setIterator(Iterator<T> iterator);
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
import org.talend.sdk.component.api.processor.Input;
import org.talend.sdk.component.api.processor.LastGroup;
import org.talend.sdk.component.api.processor.Output;
import org.talend.sdk.component.api.processor.OutputEmitter;
import org.talend.sdk.component.api.processor.OutputIterator;
import org.talend.sdk.component.api.service.record.RecordBuilderFactory;
import org.talend.sdk.component.runtime.base.Delegated;
import org.talend.sdk.component.runtime.base.LifecycleImpl;
Expand Down Expand Up @@ -153,10 +155,19 @@ public void beforeGroup() {

private BiFunction<InputFactory, OutputFactory, Object> buildProcessParamBuilder(final Parameter parameter) {
if (parameter.isAnnotationPresent(Output.class)) {
return (inputs, outputs) -> {
final String name = parameter.getAnnotation(Output.class).value();
return outputs.create(name);
};
final Output annotation = parameter.getAnnotation(Output.class);
if (annotation.iterator()) {
return (inputs, outputs) -> {
final String name = parameter.getAnnotation(Output.class).value();
final OutputEmitter outputEmitter = outputs.create(name);
return (OutputIterator<Object>) outputEmitter::setIterator;
};
} else {
return (inputs, outputs) -> {
final String name = parameter.getAnnotation(Output.class).value();
return outputs.create(name);
};
}
}

final Class<?> parameterType = parameter.getType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.talend.sdk.component.api.processor.LastGroup;
import org.talend.sdk.component.api.processor.Output;
import org.talend.sdk.component.api.processor.OutputEmitter;
import org.talend.sdk.component.api.processor.OutputIterator;
import org.talend.sdk.component.api.processor.Processor;
import org.talend.sdk.component.api.standalone.DriverRunner;
import org.talend.sdk.component.api.standalone.RunAtDriver;
Expand Down Expand Up @@ -258,7 +259,7 @@ private boolean validOutputParam(final Parameter p) {
return false;
}
final ParameterizedType pt = ParameterizedType.class.cast(p.getParameterizedType());
return OutputEmitter.class == pt.getRawType();
return OutputEmitter.class == pt.getRawType() || OutputIterator.class == pt.getRawType();
}

private Stream<Class<? extends Annotation>> getPartitionMapperMethods(final boolean infinite) {
Expand Down
12 changes: 12 additions & 0 deletions component-runtime-testing/component-runtime-junit/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,18 @@
<method>*get*</method>
<justification>we don't need this method</justification>
</difference>
<difference>
<className>org/talend/sdk/component/junit5/ComponentExtension</className>
<differenceType>4001</differenceType>
<to>org/junit/jupiter/api/extension/TestInstantiationAwareExtension</to>
<justification>org.junit.jupiter.api.extension.TestInstantiationAwareExtension has been removed by junit5, not by our code</justification>
</difference>
<difference>
<className>org/talend/sdk/component/junit5/MavenDecrypterExtension</className>
<differenceType>4001</differenceType>
<to>org/junit/jupiter/api/extension/TestInstantiationAwareExtension</to>
<justification>org.junit.jupiter.api.extension.TestInstantiationAwareExtension has been removed by junit5, not by our code</justification>
</difference>
</ignored>
</configuration>
<executions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.talend.sdk.component.runtime.record.RecordConverters;

import lombok.RequiredArgsConstructor;
import lombok.Setter;

public abstract class BaseIOHandler {

Expand Down Expand Up @@ -77,7 +78,7 @@ public <T> T getValue(final String connectorName) {
}

public boolean hasMoreData() {
return connections.entrySet().stream().anyMatch(e -> e.getValue().hasNext());
return connections.values().stream().anyMatch(IO::hasNext);
}

protected String getActualName(final String name) {
Expand All @@ -91,19 +92,24 @@ static class IO<T> {

private final Class<T> type;

@Setter
private Iterator<T> source;

private void reset() {
values.clear();
}

boolean hasNext() {
return values.size() != 0;
return !values.isEmpty()
|| (source != null && source.hasNext());
}

T next() {
if (hasNext()) {
if (!values.isEmpty()) {
return type.cast(values.poll());
} else if (source != null && source.hasNext()) {
return type.cast(source.next());
}

return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@
*/
package org.talend.sdk.component.runtime.di;

import java.util.Iterator;
import java.util.Map;

import javax.json.JsonBuilderFactory;
import javax.json.JsonValue;
import javax.json.bind.Jsonb;
import javax.json.spi.JsonProvider;

import org.jspecify.annotations.Nullable;
import org.talend.sdk.component.api.processor.OutputEmitter;
import org.talend.sdk.component.api.record.Record;
import org.talend.sdk.component.api.record.Schema;
import org.talend.sdk.component.runtime.output.OutputFactory;
Expand All @@ -41,15 +45,45 @@ public OutputsHandler(final Jsonb jsonb, final Map<Class<?>, Object> servicesMap
}

public OutputFactory asOutputFactory() {
return name -> value -> {
final BaseIOHandler.IO ref = connections.get(getActualName(name));
if (ref != null && value != null) {
if (value instanceof javax.json.JsonValue) {
ref.add(jsonb.fromJson(value.toString(), ref.getType()));
return name -> new OutputEmitter() {

@Override
public void emit(final Object value) {
final IO ref = connections.get(getActualName(name));
if (ref != null && value != null) {
ref.add(convert(value, ref));
}
}

@Override
public void setIterator(final Iterator iterator) {
final IO ref = connections.get(getActualName(name));
if (ref != null) {
ref.setSource(new Iterator() {

@Override
public boolean hasNext() {
return iterator.hasNext();
}

@Override
public Object next() {
Object value = iterator.next();
return convert(value, ref);
}
});
}
}

private @Nullable Object convert(final Object value, final IO ref) {
if (value == null) {
return null;
} else if (value instanceof JsonValue) {
return jsonb.fromJson(value.toString(), ref.getType());
} else if (value instanceof Record) {
ref.add(registry.find(ref.getType()).newInstance(Record.class.cast(value)));
return registry.find(ref.getType()).newInstance(Record.class.cast(value));
} else {
ref.add(jsonb.fromJson(jsonb.toJson(value), ref.getType()));
return jsonb.fromJson(jsonb.toJson(value), ref.getType());
}
}
};
Expand Down
Loading