博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
生产BackPressure 的代码
阅读量:6504 次
发布时间:2019-06-24

本文共 3958 字,大约阅读时间需要 13 分钟。

public class BackPressureStatsTrackerImpl implements BackPressureStatsTracker {
private static final Logger LOG = LoggerFactory.getLogger(BackPressureStatsTrackerImpl.class); /** Maximum stack trace depth for samples. */ static final int MAX_STACK_TRACE_DEPTH = 3; /** Expected class name for back pressure indicating stack trace element. */ static final String EXPECTED_CLASS_NAME = "org.apache.flink.runtime.io.network.buffer.LocalBufferPool"; /** Expected method name for back pressure indicating stack trace element. */ static final String EXPECTED_METHOD_NAME = "requestBufferBuilderBlocking"; /** Lock guarding trigger operations. */ private final Object lock = new Object(); /* Stack trace sample coordinator. */ private final StackTraceSampleCoordinator coordinator;
@Override public BufferBuilder requestBufferBuilderBlocking() throws IOException, InterruptedException {
return toBufferBuilder(requestMemorySegment(true)); }
private MemorySegment requestMemorySegment(boolean isBlocking) throws InterruptedException, IOException {
   synchronized (availableMemorySegments) {
      returnExcessMemorySegments();       boolean askToRecycle = owner!= null;       // fillavailableMemorySegments with at least one element, wait if required       while (availableMemorySegments.isEmpty()){
         if (isDestroyed) {
            throw new IllegalStateException("Bufferpool is destroyed.");          }          if (numberOfRequestedMemorySegments< currentPoolSize) {
            final MemorySegment segment = networkBufferPool.requestMemorySegment();             if (segment != null) {
               numberOfRequestedMemorySegments++;                return segment;             }          }          if (askToRecycle){
            owner.releaseMemory(1);          }          if (isBlocking){
            availableMemorySegments.wait(2000);          }          else {
            return null;          }       }       return availableMemorySegments.poll();    }
/**  * Returns back pressure statistics for a operator. Automatically triggers stack trace sampling  * if statistics are not available or outdated.  *  * @param vertex Operator to get the stats for.  * @return Back pressure statistics for an operator  */ public Optional
getOperatorBackPressureStats(ExecutionJobVertex vertex) {
synchronized (lock) {
final OperatorBackPressureStats stats = operatorStatsCache.getIfPresent(vertex); if (stats == null || backPressureStatsRefreshInterval <= System.currentTimeMillis() - stats.getEndTimestamp()) {
triggerStackTraceSampleInternal(vertex); } return Optional.ofNullable(stats); } }
/**  * Triggers a stack trace sample for a operator to gather the back pressure  * statistics. If there is a sample in progress for the operator, the call  * is ignored.  *  * @param vertex Operator to get the stats for.  * @return Flag indicating whether a sample with triggered.  */ private boolean triggerStackTraceSampleInternal(final ExecutionJobVertex vertex) {
assert(Thread.holdsLock(lock)); if (shutDown) {
return false; } if (!pendingStats.contains(vertex) && !vertex.getGraph().getState().isGloballyTerminalState()) {
Executor executor = vertex.getGraph().getFutureExecutor(); // Only trigger if still active job if (executor != null) {
pendingStats.add(vertex); if (LOG.isDebugEnabled()) {
LOG.debug("Triggering stack trace sample for tasks: " + Arrays.toString(vertex.getTaskVertices())); } CompletableFuture
sample = coordinator.triggerStackTraceSample( vertex.getTaskVertices(), numSamples, delayBetweenSamples, MAX_STACK_TRACE_DEPTH); sample.handleAsync(new StackTraceSampleCompletionCallback(vertex), executor); return true; } } return false; }

转载地址:http://oxmyo.baihongyu.com/

你可能感兴趣的文章
android实现图片识别的几种方法
查看>>
mvc学习地址
查看>>
masonry 基本用法
查看>>
Word产品需求文档,已经过时了【转】
查看>>
dtoj#4299. 图(graph)
查看>>
关于网站的一些js和css常见问题的记录
查看>>
zabbix-3.4 触发器
查看>>
换用代理IP的Webbrowser方法
查看>>
【视频编解码·学习笔记】7. 熵编码算法:基础知识 & 哈夫曼编码
查看>>
spark集群安装部署
查看>>
MySql 查询表字段数
查看>>
mariadb 内存占用优化
查看>>
Centos7安装编译安装zabbix2.219及mariadb-5.5.46
查看>>
怎么获得combobox的valueField值
查看>>
Console-算法[if,while]-一输入两个正整数m和n,求其最大公约数和最小公倍数
查看>>
浅谈网络协议(四) IP的由来--DHCP与PXE
查看>>
jre与jdk的区别
查看>>
全景图的种类
查看>>
git 维护
查看>>
jfinal框架下使用c3P0连接池连接sql server 2008
查看>>