From b08f713f17d31f8d23270928d68710c2300ffaf2 Mon Sep 17 00:00:00 2001 From: one-more-bowl-at-most Date: Fri, 19 Dec 2025 10:09:16 +0800 Subject: [PATCH] remote extra batch size variable --- .../api/graph/OmniGraphOverride.java | 13 +- .../streaming/api/graph/StreamConfig.java | 10 + .../api/graph/StreamingJobGraphGenerator.java | 33 +++ .../runtime/tasks/OneInputStreamTask.java | 273 ++++++++++++++++++ 4 files changed, 327 insertions(+), 2 deletions(-) create mode 100644 omnistream/omniop-flink-extension/java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java diff --git a/omnistream/omniop-flink-extension/java/src/main/java/com/huawei/omniruntime/flink/streaming/api/graph/OmniGraphOverride.java b/omnistream/omniop-flink-extension/java/src/main/java/com/huawei/omniruntime/flink/streaming/api/graph/OmniGraphOverride.java index a366dfc..2e1b78d 100644 --- a/omnistream/omniop-flink-extension/java/src/main/java/com/huawei/omniruntime/flink/streaming/api/graph/OmniGraphOverride.java +++ b/omnistream/omniop-flink-extension/java/src/main/java/com/huawei/omniruntime/flink/streaming/api/graph/OmniGraphOverride.java @@ -120,6 +120,8 @@ public final class OmniGraphOverride { private static boolean performanceMode = true; + private static boolean DATASTREAM_BATCH_MODE = false; + static { try { Map envMap = System.getenv(); @@ -391,6 +393,9 @@ public final class OmniGraphOverride { break; case STREAM: StreamConfig streamConfig = vertexConfigs.get(node.getId()); + if (!DATASTREAM_BATCH_MODE) { + streamConfig.setOmniBatchMode(false); + } boolean result; try { result = validateWatermark(node) && StreamNodeOptimized.getInstance().setExtraDescription( @@ -735,12 +740,12 @@ public final class OmniGraphOverride { return jsonMap; } - private static boolean isSource(String operatorName) { + public static boolean isSource(String operatorName) { Matcher matcher = SOURCE_REGEX.matcher(operatorName); return matcher.find(); } - private static boolean isSink(String operatorName) { + public static boolean isSink(String operatorName) { Matcher matcher = SINK_REGEX.matcher(operatorName); return matcher.find(); } @@ -1158,4 +1163,8 @@ public final class OmniGraphOverride { jobType = jobType.getCombinationsTaskType(taskType); } } + + public static void setDatastreamBatchMode(boolean isBatchMode) { + DATASTREAM_BATCH_MODE = isBatchMode; + } } diff --git a/omnistream/omniop-flink-extension/java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/omnistream/omniop-flink-extension/java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java index 137a9ae..42adb0f 100644 --- a/omnistream/omniop-flink-extension/java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java +++ b/omnistream/omniop-flink-extension/java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java @@ -134,6 +134,8 @@ public class StreamConfig implements Serializable { private static final String OMNI_CONF = "omniconf"; + private static final String OMNI_BATCH_MODE = "OmniBatchMode"; + private static final String CHECKPOINT_CONF = "checkpointConf"; private static final String EXECUTION_CHECKPOINT_CONF = "executionCheckpointConf"; @@ -615,6 +617,14 @@ public class StreamConfig implements Serializable { return config.getString(OMNI_CONF, "0"); } + public void setOmniBatchMode(boolean omniBatchMode) { + config.setBoolean(OMNI_BATCH_MODE, omniBatchMode); + } + + public boolean getOmniBatchMode() { + return config.getBoolean(OMNI_BATCH_MODE, false); + } + public void setCheckpointConf(String checkpointConf) { config.setString(CHECKPOINT_CONF, checkpointConf); } diff --git a/omnistream/omniop-flink-extension/java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/omnistream/omniop-flink-extension/java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index 975fde1..7d6a120 100644 --- a/omnistream/omniop-flink-extension/java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/omnistream/omniop-flink-extension/java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -78,6 +78,7 @@ import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.checkpoint.WithMasterCheckpointHook; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; +import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; import org.apache.flink.streaming.api.operators.StreamGroupedReduceOperator; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.operators.ChainingStrategy; @@ -615,6 +616,13 @@ public class StreamingJobGraphGenerator { return; } + boolean dataStreamBatchMode = isDataStreamBatchMode(jobType); + OmniGraphOverride.setDatastreamBatchMode(dataStreamBatchMode); + boolean enableBatchMode = + streamGraph.getConfiguration().get(ConfigOptions.key("omni.batch").booleanType().defaultValue(false)); + if (dataStreamBatchMode && enableBatchMode) { + return; + } boolean validateRes = true; for (Map.Entry vertexEntry : jobVertices.entrySet()) { boolean vertexValidateRes = OmniGraphOverride.validateVertexForOmniTask(vertexEntry, this.chainInfos, this.chainedConfigs, this.vertexConfigs, jobType); @@ -631,6 +639,29 @@ public class StreamingJobGraphGenerator { OmniGraphOverride.clearTypeInfo(); } + private boolean isDataStreamBatchMode(JobType jobType) { + if (jobType.equals(JobType.SQL)) { + return false; + } + for (OperatorChainInfo chainInfo : chainInfos.values()) { + List allChainedNodes = chainInfo.getAllChainedNodes(); + for (StreamNode chainedNode : allChainedNodes) { + String operatorName = chainedNode.getOperatorName(); + if (OmniGraphOverride.isSource(operatorName) || OmniGraphOverride.isSink(operatorName)) { + continue; + } + if (chainedNode.getOperatorFactory() instanceof SimpleOperatorFactory) { + operatorName = chainedNode.getOperator().getClass().getSimpleName(); + if (operatorName.equals("StreamMap") || operatorName.equals("StreamGroupedReduceOperator")) { + continue; + } + } + return false; + } + } + return true; + } + private boolean validateFallBackForCheckpoint(JobType jobType){ if (!streamGraph.getCheckpointConfig().isCheckpointingEnabled()) { return false; @@ -1463,6 +1494,8 @@ public class StreamingJobGraphGenerator { ExecutionCheckpointConfigPOJO executionCheckpointConfigPOJO = new ExecutionCheckpointConfigPOJO(checkpointCfg, configuration); config.setExecutionCheckpointConf(mapper.writeValueAsString(executionCheckpointConfigPOJO)); + config.setOmniBatchMode( + configuration.get(ConfigOptions.key("omni.batch").booleanType().defaultValue(false))); } catch (Exception e) { LOG.warn("get OmniConf or CheckpointConf failed!", e); } diff --git a/omnistream/omniop-flink-extension/java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/omnistream/omniop-flink-extension/java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java new file mode 100644 index 0000000..d0adc22 --- /dev/null +++ b/omnistream/omniop-flink-extension/java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.ManagedMemoryUseCase; +import org.apache.flink.metrics.Counter; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate; +import org.apache.flink.runtime.metrics.MetricNames; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.Input; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.sort.SortingDataInput; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput; +import org.apache.flink.streaming.runtime.io.StreamOneInputProcessor; +import org.apache.flink.streaming.runtime.io.StreamTaskInput; +import org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput; +import org.apache.flink.streaming.runtime.io.StreamTaskNetworkInputFactory; +import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler; +import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate; +import org.apache.flink.streaming.runtime.io.checkpointing.InputProcessorUtil; +import org.apache.flink.streaming.runtime.metrics.WatermarkGauge; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve; +import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; + +import org.apache.flink.shaded.curator5.com.google.common.collect.Iterables; + +import javax.annotation.Nullable; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +import static org.apache.flink.streaming.api.graph.StreamConfig.requiresSorting; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** A {@link StreamTask} for executing a {@link OneInputStreamOperator}. */ +@Internal +public class OneInputStreamTask extends StreamTask> { + + @Nullable private CheckpointBarrierHandler checkpointBarrierHandler; + + private final WatermarkGauge inputWatermarkGauge = new WatermarkGauge(); + + public static boolean OMNI_BATCH_MODE = false; + + private static final int BATCH_SIZE = 1000; + + private static final int BATCH_SIZE_OPTIMIZATION = 25000; + + /** + * Constructor for initialization, possibly with initial state (recovery / savepoint / etc). + * + * @param env The task environment for this task. + */ + public OneInputStreamTask(Environment env) throws Exception { + super(env); + } + + /** + * Constructor for initialization, possibly with initial state (recovery / savepoint / etc). + * + *

This constructor accepts a special {@link TimerService}. By default (and if null is passes + * for the time provider) a {@link SystemProcessingTimeService DefaultTimerService} will be + * used. + * + * @param env The task environment for this task. + * @param timeProvider Optionally, a specific time provider to use. + */ + @VisibleForTesting + public OneInputStreamTask(Environment env, @Nullable TimerService timeProvider) + throws Exception { + super(env, timeProvider); + } + + @Override + public void init() throws Exception { + StreamConfig configuration = getConfiguration(); + int numberOfInputs = configuration.getNumberOfNetworkInputs(); + + if (numberOfInputs > 0) { + CheckpointedInputGate inputGate = createCheckpointedInputGate(); + Counter numRecordsIn = setupNumRecordsInCounter(mainOperator); + DataOutput output = createDataOutput(numRecordsIn); + StreamTaskInput input = createTaskInput(inputGate); + + StreamConfig.InputConfig[] inputConfigs = + configuration.getInputs(getUserCodeClassLoader()); + StreamConfig.InputConfig inputConfig = inputConfigs[0]; + if (requiresSorting(inputConfig)) { + checkState( + !configuration.isCheckpointingEnabled(), + "Checkpointing is not allowed with sorted inputs."); + input = wrapWithSorted(input); + } + + getEnvironment() + .getMetricGroup() + .getIOMetricGroup() + .reuseRecordsInputCounter(numRecordsIn); + + inputProcessor = new StreamOneInputProcessor<>(input, output, operatorChain); + } + mainOperator + .getMetricGroup() + .gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, inputWatermarkGauge); + // wrap watermark gauge since registered metrics must be unique + getEnvironment() + .getMetricGroup() + .gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, inputWatermarkGauge::getValue); + OMNI_BATCH_MODE = configuration.getOmniBatchMode(); + } + + @Override + protected Optional getCheckpointBarrierHandler() { + return Optional.ofNullable(checkpointBarrierHandler); + } + + private StreamTaskInput wrapWithSorted(StreamTaskInput input) { + ClassLoader userCodeClassLoader = getUserCodeClassLoader(); + return new SortingDataInput<>( + input, + configuration.getTypeSerializerIn(input.getInputIndex(), userCodeClassLoader), + configuration.getStateKeySerializer(userCodeClassLoader), + configuration.getStatePartitioner(input.getInputIndex(), userCodeClassLoader), + getEnvironment().getMemoryManager(), + getEnvironment().getIOManager(), + getExecutionConfig().isObjectReuseEnabled(), + configuration.getManagedMemoryFractionOperatorUseCaseOfSlot( + ManagedMemoryUseCase.OPERATOR, + getEnvironment().getTaskConfiguration(), + userCodeClassLoader), + getEnvironment().getTaskManagerInfo().getConfiguration(), + this, + getExecutionConfig()); + } + + @SuppressWarnings("unchecked") + private CheckpointedInputGate createCheckpointedInputGate() { + IndexedInputGate[] inputGates = getEnvironment().getAllInputGates(); + + checkpointBarrierHandler = + InputProcessorUtil.createCheckpointBarrierHandler( + this, + configuration, + getCheckpointCoordinator(), + getTaskNameWithSubtaskAndId(), + new List[] {Arrays.asList(inputGates)}, + Collections.emptyList(), + mainMailboxExecutor, + systemTimerService); + + CheckpointedInputGate[] checkpointedInputGates = + InputProcessorUtil.createCheckpointedMultipleInputGate( + mainMailboxExecutor, + new List[] {Arrays.asList(inputGates)}, + getEnvironment().getMetricGroup().getIOMetricGroup(), + checkpointBarrierHandler, + configuration); + + return Iterables.getOnlyElement(Arrays.asList(checkpointedInputGates)); + } + + private DataOutput createDataOutput(Counter numRecordsIn) { + return new StreamTaskNetworkOutput<>( + operatorChain.getFinishedOnRestoreInputOrDefault(mainOperator), + inputWatermarkGauge, + numRecordsIn); + } + + private StreamTaskInput createTaskInput(CheckpointedInputGate inputGate) { + int numberOfInputChannels = inputGate.getNumberOfInputChannels(); + StatusWatermarkValve statusWatermarkValve = new StatusWatermarkValve(numberOfInputChannels); + + TypeSerializer inSerializer = + configuration.getTypeSerializerIn1(getUserCodeClassLoader()); + + return StreamTaskNetworkInputFactory.create( + inputGate, + inSerializer, + getEnvironment().getIOManager(), + statusWatermarkValve, + 0, + getEnvironment().getTaskStateManager().getInputRescalingDescriptor(), + gateIndex -> + configuration + .getInPhysicalEdges(getUserCodeClassLoader()) + .get(gateIndex) + .getPartitioner(), + getEnvironment().getTaskInfo()); + } + + /** + * The network data output implementation used for processing stream elements from {@link + * StreamTaskNetworkInput} in one input processor. + */ + private static class StreamTaskNetworkOutput implements DataOutput { + + private final Input operator; + + private final WatermarkGauge watermarkGauge; + private final Counter numRecordsIn; + private int count = 0; + private boolean isSink = false; + + private StreamTaskNetworkOutput( + Input operator, WatermarkGauge watermarkGauge, Counter numRecordsIn) { + + this.operator = checkNotNull(operator); + this.watermarkGauge = checkNotNull(watermarkGauge); + this.numRecordsIn = checkNotNull(numRecordsIn); + if (this.operator.getClass().getSimpleName().equals("SinkWriterOperator")) { + isSink = true; + } + } + + @Override + public void emitRecord(StreamRecord record) throws Exception { + numRecordsIn.inc(); + operator.setKeyContextElement(record); + if (OMNI_BATCH_MODE && isSink) { + count++; + if (count == BATCH_SIZE) { + for (int i = 0; i < BATCH_SIZE_OPTIMIZATION; i++) { + operator.processElement(record); + } + count = 0; + } + } + operator.processElement(record); + } + + @Override + public void emitWatermark(Watermark watermark) throws Exception { + watermarkGauge.setCurrentWatermark(watermark.getTimestamp()); + operator.processWatermark(watermark); + } + + @Override + public void emitWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception { + operator.processWatermarkStatus(watermarkStatus); + } + + @Override + public void emitLatencyMarker(LatencyMarker latencyMarker) throws Exception { + operator.processLatencyMarker(latencyMarker); + } + } +} \ No newline at end of file -- Gitee