`

flume之退避算法backoff algorithm

阅读更多

什么是退避算法:

In a single channel contention based medium access control (MAC) protocols, whenever more than one station or node tries to access the medium at the same instant of time, it leads to packet collisions. If the collided stations tries to access the channel again, the packets will collide as the nodes are synchrozied in time. So the nodes need to be displaced in time. To displace them temporally, a backoff algorithm is used (example binary exponential backoff (BEB)). For example, in BEB algorithm, whenever a node's transmission is involved in a collision with another node's transmission, both nodes will choose a random waiting time and wait for this amoiunt of time before attempting again. If they are not successful in this attempt, they double their contention window and choose a randoim waiting time before transmitting again. This process will be repeated for certain number of attempts. If the nodes are not successful in their transmission after this limit, the packets will be dropped from their queue.

大致意思是,在一个共享信道的情况下,当网络上的节点在发生冲突时,每个节点节点等待一定的时间后重新发送。在二进制指数退避算法中,等待时间随着以二为底的指数增长。如果重试失败,那么下次的等待时间将会是上次的等待时间二倍。如果重试次数大于最大重试次数,那么包将从包队列中去除。

我们认识了什么是退避算法之后,来看一下flume中对退避算法的应用。从退避算法的概念可知,该算法用在网络错误,重试的情况中,例如打开一个网络链接,向网络中发送数据等。在flume中,insistentAppend和insistentOpen封装器都用到了退避算法来处理网络的发送数据和链接打开过程。我们来通过insistentAppend中的append方法例子,看一下怎么对退避算法进行运用。

 

public void append(Event evt) throws IOException, InterruptedException {
    List<IOException> exns = new ArrayList<IOException>();
    int attemptRetries = 0;
    appendRequests++;
    while (!backoff.isFailed() && isOpen.get()
        && !Thread.currentThread().isInterrupted()) {
      try {
        appendAttempts++;
        super.append(evt);
        appendSuccesses++;
        backoff.reset(); // reset backoff counter;
        return;
      } catch (InterruptedException ie) {
        throw ie;
      } catch (IOException e) {
        // this is an unexpected exception
        long waitTime = backoff.sleepIncrement();
        LOG.info("append attempt " + attemptRetries + " failed, backoff ("
            + waitTime + "ms): " + e.getMessage());
        LOG.debug(e.getMessage(), e);
        exns.add((e instanceof IOException) ? (IOException) e
            : new IOException(e));
        backoff.backoff();
        try {
          backoff.waitUntilRetryOk();
        } catch (InterruptedException e1) {
          // got an interrupted signal, bail out!
          throw e1;
        } finally {
          attemptRetries++;
          appendRetries++;
        }
      } catch (RuntimeException e) {
        // this is an unexpected exception
        LOG.info("Failed due to unexpected runtime exception "
            + "during append attempt", e);
        appendGiveups++;
        throw e;
      }
    }
    appendGiveups++;
    // failed to start
    IOException ioe = MultipleIOException.createIOException(exns);
    if (ioe == null) {
      return;
    }
    throw ioe;
  }

 通过对以上代码抽象,一般采用以下形式来运用backoff算法。

 

 while (!backoff.isFailed()) {
		      try {
		        doSomething(); //do something
		        backoff.reset(); // reset backoff counter;
		        return;
		      } catch (Exception e) {
		        backoff.backoff();
		        try {
		          backoff.waitUntilRetryOk();
		        } catch (InterruptedException e1) {       
		        } 
		     }
		 }

 

 目前在flume中主要运用了ExponentialBackoff,CappedExponentialBackoff,CumulativeCappedExponentialBackoff三种退避算法。

ExponentialBackOff是个简单的指数退避算法,仅仅让下次的等待时间是上次等待时间的2倍,当重试次数达到最大重试次数时,该任务将不能重试。

CappedExponentialBackoff对ExponentialBackOff算法作了简单的改造,该算法对每次的等待时间做了个限定,即每次的等待时间不超过某个值sleepCap。但该方法没有限定重试次数。

CumulativeCappedExponentialBackoff算法对CappedExponentialBackoff作了些改造,该算法加入了cumulativeCap变量,用来限制重试次数。在第一次backoff的时候设置failTime值为当前时间+cumulativeCap。是否可以重试由当前时间和failTime决定。当前时间小于failTime则表明还可以重试,否则,不能重试。

 

通过对以上的分析,可以得到一个Backoff算法必须提供四个接口(isFailed,backOff,waitUntilRetryOk,reset)。其中,isFailed用来判断是否可以重试,backoff用来设置等待时间,waitUntilRetryOk根据backoff设置的等待时间sleep,以便下次重试。reset的接口是在任务成功后,对backoff算法的一些变量重置。详细可以看ExponentialBackoff等源代码。

 

退避算法为我们在解决重试某项任务的时候,提供了一个比较好的等待思想。

分享到:
评论

相关推荐

    尚硅谷大数据技术之Flume

    尚硅谷大数据技术之Flume

    数据采集之Flume.xmind

    Apache Flume 是一个分布式、高可靠、高可用的用来收集、聚合、转移不同来源的大量日志数据到中央数据仓库的工具 Apache Flume是Apache软件基金会(ASF)的顶级项目 Event是Flume定义的一个数据流传输的最小单元。...

    Flume1.6.0入门:安装、部署、及flume的案例

    Flume1.6.0入门:安装、部署、及flume的案例

    大数据Ambari之flume集成编译好的源码包

    这是已经编译好的flume包,可以直接用于集成在Ambari上

    flume自学文档.pdf

    Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可...

    Flume构建高可用、可扩展的海量日志采集系统

    Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可...

    flume-ng安装

    flume-ng安装

    让你快速认识flume及安装和使用flume1 5传输数据 日志 到hadoop2 2 文档

    让你快速认识flume及安装和使用flume1 5传输数据 日志 到hadoop2 2 中文文档 认识 flume 1 flume 是什么 这里简单介绍一下 它是 Cloudera 的一个产品 2 flume 是干什么的 收集日志的 3 flume 如何搜集日志 我们把...

    apache-flume-1.8.0

    flume官网下载太慢,请从这里下载,次文件是官方网站的1.8版本,也就是支持jdk1.8的,不支持jdk1.7,如果要支持jdk1.7的,请下载我的资源里面也有,flume1.7,

    Flume集群环境搭建,flume监控

    flume集群环境搭建,详细讲解,图文并茂,包括flume信息监控和众多文章链接

    flume-ng-elasticsearch6-sink.zip

    flume1.9采集数据入存入elasticsearch6.2.4,flume1.9本身只支持低版本的elasticsearch,基于apache-flume-1.9.0-src的flume-ng-sinks/flume-ng-elasticsearch-sink源码修改,支持es6.2.4,打的包,直接替换flume/...

    Flume+kafka+Storm整合

    Flume+kafka+Storm整合 示例简介: 以下为三个组建整合,这里只做操作也演示结果,原理性方面大家多学习基础。 流程顺序是flume获取telnet数据,将接收到的数据发送至kafak,kafka作为Storm的spout,Storm进行有向无...

    flume支持RabbitMQ插件

    flume支持RabbitMQ插件

    47_Flume、Logstash、Filebeat调研报告

    基于flume+kafka+实时计算引擎(storm,spark,flink)的实时计算框架目前是比较火的一个分支,在实时数据采集组件中flume扮演着极为重要角色,logtash是ELK的重要组件部分,filebeat也是一个实时采集工具;

    springboot_log4j2_flume

    Log4j直接发送数据到Flume + Kafka (方式一) 通过flume收集系统日记, 收集的方式通常采用以下. 系统logs直接发送给flume系统, 本文主要记录种方式进行说明. 文章链接,请看:...

    大数据技术之Flume笔记

    大数据技术之Flume笔记

    Flume学习文档(2){Flume安装部署、Flume配置文件}.docx

    Flume学习文档(2){Flume安装部署、Flume配置文件}。 记录我的学习之旅,每份文档倾心倾力,带我成我大牛,回头观望满脸笑意,望大家多多给予意见,有问题或错误,请联系 我将及时改正;借鉴文章标明出处,谢谢

    flume-ftp-source 相关jar包

    由于flume官方并未提供ftp,source的支持; 因此想使用ftp文件服务器的资源作为数据的来源就需要自定义ftpsource,根据github:https://github.com/keedio/flume-ftp-source,提示下载相关jar,再此作为记录。

    flume-ng-1.6.0-cdh5.13.2

    CDH版本的flume Flume是Cloudera提供的一个高可用的,高可靠...当前Flume有两个版本Flume 0.9X版本的统称Flume-og,Flume1.X版本的统称Flume-ng。由于Flume-ng经过重大重构,与Flume-og有很大不同,使用时请注意区分。

    实时大数据采集框架Flume详解(视频+课件+代码+工具)

    01_Flume的介绍及其架构组成 02_Flume的安装部署 03_Flume的测试运行 04_Flume中配置使用file channel及HDFS sink 05_Flume中配置HDFS文件生成大小及时间分区 06_Flume中配置Spooling Dir的使用 07_Flume中...

Global site tag (gtag.js) - Google Analytics