public class BackPressureStatsTrackerImpl implements BackPressureStatsTracker { private static final Logger LOG = LoggerFactory.getLogger(BackPressureStatsTrackerImpl.class); /** Maximum stack trace depth for samples. */ static final int MAX_STACK_TRACE_DEPTH = 3; /** Expected class name for back pressure indicating stack trace element. */ static final String EXPECTED_CLASS_NAME = "org.apache.flink.runtime.io.network.buffer.LocalBufferPool"; /** Expected method name for back pressure indicating stack trace element. */ static final String EXPECTED_METHOD_NAME = "requestBufferBuilderBlocking"; /** Lock guarding trigger operations. */ private final Object lock = new Object(); /* Stack trace sample coordinator. */ private final StackTraceSampleCoordinator coordinator;
@Override public BufferBuilder requestBufferBuilderBlocking() throws IOException, InterruptedException { return toBufferBuilder(requestMemorySegment(true)); }
private MemorySegment requestMemorySegment(boolean isBlocking) throws InterruptedException, IOException { synchronized (availableMemorySegments) { returnExcessMemorySegments(); boolean askToRecycle = owner!= null; // fillavailableMemorySegments with at least one element, wait if required while (availableMemorySegments.isEmpty()){ if (isDestroyed) { throw new IllegalStateException("Bufferpool is destroyed."); } if (numberOfRequestedMemorySegments< currentPoolSize) { final MemorySegment segment = networkBufferPool.requestMemorySegment(); if (segment != null) { numberOfRequestedMemorySegments++; return segment; } } if (askToRecycle){ owner.releaseMemory(1); } if (isBlocking){ availableMemorySegments.wait(2000); } else { return null; } } return availableMemorySegments.poll(); }
/** * Returns back pressure statistics for a operator. Automatically triggers stack trace sampling * if statistics are not available or outdated. * * @param vertex Operator to get the stats for. * @return Back pressure statistics for an operator */ public OptionalgetOperatorBackPressureStats(ExecutionJobVertex vertex) { synchronized (lock) { final OperatorBackPressureStats stats = operatorStatsCache.getIfPresent(vertex); if (stats == null || backPressureStatsRefreshInterval <= System.currentTimeMillis() - stats.getEndTimestamp()) { triggerStackTraceSampleInternal(vertex); } return Optional.ofNullable(stats); } }
/** * Triggers a stack trace sample for a operator to gather the back pressure * statistics. If there is a sample in progress for the operator, the call * is ignored. * * @param vertex Operator to get the stats for. * @return Flag indicating whether a sample with triggered. */ private boolean triggerStackTraceSampleInternal(final ExecutionJobVertex vertex) { assert(Thread.holdsLock(lock)); if (shutDown) { return false; } if (!pendingStats.contains(vertex) && !vertex.getGraph().getState().isGloballyTerminalState()) { Executor executor = vertex.getGraph().getFutureExecutor(); // Only trigger if still active job if (executor != null) { pendingStats.add(vertex); if (LOG.isDebugEnabled()) { LOG.debug("Triggering stack trace sample for tasks: " + Arrays.toString(vertex.getTaskVertices())); } CompletableFuturesample = coordinator.triggerStackTraceSample( vertex.getTaskVertices(), numSamples, delayBetweenSamples, MAX_STACK_TRACE_DEPTH); sample.handleAsync(new StackTraceSampleCompletionCallback(vertex), executor); return true; } } return false; }