欢迎来到得力文库 - 分享文档赚钱的网站! | 帮助中心 好文档才是您的得力助手!
得力文库 - 分享文档赚钱的网站
全部分类
  • 研究报告>
  • 管理文献>
  • 标准材料>
  • 技术资料>
  • 教育专区>
  • 应用文书>
  • 生活休闲>
  • 考试试题>
  • pptx模板>
  • 工商注册>
  • 期刊短文>
  • 图片设计>
  • ImageVerifierCode 换一换

    storm进行大数据实时分析(共20页).docx

    • 资源ID:13433926       资源大小:41.99KB        全文页数:19页
    • 资源格式: DOCX        下载积分:20金币
    快捷下载 游客一键下载
    会员登录下载
    微信登录下载
    三方登录下载: 微信开放平台登录   QQ登录  
    二维码
    微信扫一扫登录
    下载资源需要20金币
    邮箱/手机:
    温馨提示:
    快捷下载时,用户名和密码都是您填写的邮箱或者手机号,方便查询和重复下载(系统自动生成)。
    如填写123,账号就是123,密码也是123。
    支付方式: 支付宝    微信支付   
    验证码:   换一换

     
    账号:
    密码:
    验证码:   换一换
      忘记密码?
        
    友情提示
    2、PDF文件下载后,可能会被浏览器默认打开,此种情况可以点击浏览器菜单,保存网页到桌面,就可以正常下载了。
    3、本站不支持迅雷下载,请使用电脑自带的IE浏览器,或者360浏览器、谷歌浏览器下载即可。
    4、本站资源下载后的文档和图纸-无水印,预览文档经过压缩,下载后原文更清晰。
    5、试题试卷类文档,如果标题没有明确说明有答案则都视为没有答案,请知晓。

    storm进行大数据实时分析(共20页).docx

    精选优质文档-倾情为你奉上使用Storm实现实时大数据分析!发表于2012-12-24 16:54| 6074次阅读| 来源Dr.Dobb's| 20 条评论| 作者Shruthi Kumar、Siddharth Patankar摘要:随着数据体积的越来越大,实时处理成为了许多机构需要面对的首要挑战。Shruthi Kumar和Siddharth Patankar在Dr.Dobbs上结合了汽车超速监视,为我们演示了使用Storm进行实时大数据分析。CSDN在此编译、整理。简单和明了,Storm让大数据分析变得轻松加愉快。当今世界,公司的日常运营经常会生成TB级别的数据。数据来源囊括了互联网装置可以捕获的任何类型数据,网站、社交媒体、交易型商业数据以及其它商业环境中创建的数据。考虑到数据的生成量,实时处理成为了许多机构需要面对的首要挑战。我们经常用的一个非常有效的开源实时计算工具就是  Twitter开发,通常被比作“实时的Hadoop”。然而Storm远比Hadoop来的简单,因为用它处理大数据不会带来新老技术的交替。Shruthi Kumar、Siddharth Patankar共同效力于Infosys,分别从事技术分析和研发工作。本文详述了Storm的使用方法,例子中的项目名称为“超速报警系统(Speeding Alert System)”。我们想实现的功能是:实时分析过往车辆的数据,一旦车辆数据超过预设的临界值 便触发一个trigger并把相关的数据存入数据库。Storm对比Hadoop的批处理,Storm是个实时的、分布式以及具备高容错的计算系统。同Hadoop一样Storm也可以处理大批量的数据,然而Storm在保证高可靠性的前提下还可以让处理进行的更加实时;也就是说,所有的信息都会被处理。Storm同样还具备容错和分布计算这些特性,这就让Storm可以扩展到不同的机器上进行大批量的数据处理。他同样还有以下的这些特性:· 易于扩展。对于扩展,你只需要添加机器和改变对应的topology(拓扑)设置。Storm使用Hadoop Zookeeper进行集群协调,这样可以充分的保证大型集群的良好运行。· 每条信息的处理都可以得到保证。· Storm集群管理简易。· Storm的容错机能:一旦topology递交,Storm会一直运行它直到topology被废除或者被关闭。而在执行中出现错误时,也会由Storm重新分配任务。· 尽管通常使用Java,Storm中的topology可以用任何语言设计。当然为了更好的理解文章,你首先需要安装和设置Storm。需要通过以下几个简单的步骤:· 从Storm官方下载· 将bin/directory解压到你的PATH上,并保证bin/storm脚本是可执行的。Storm组件Storm集群主要由一个主节点和一群工作节点(worker node)组成,通过 Zookeeper进行协调。主节点:主节点通常运行一个后台程序 Nimbus,用于响应分布在集群中的节点,分配任务和监测故障。这个很类似于Hadoop中的Job Tracker。工作节点:工作节点同样会运行一个后台程序 Supervisor,用于收听工作指派并基于要求运行工作进程。每个工作节点都是topology中一个子集的实现。而Nimbus和Supervisor之间的协调则通过Zookeeper系统或者集群。ZookeeperZookeeper是完成Supervisor和Nimbus之间协调的服务。而应用程序实现实时的逻辑则被封装进Storm中的“topology”。topology则是一组由Spouts(数据源)和Bolts(数据操作)通过Stream Groupings进行连接的图。下面对出现的术语进行更深刻的解析。Spout:简而言之,Spout从来源处读取数据并放入topology。Spout分成可靠和不可靠两种;当Storm接收失败时,可靠的Spout会对tuple(元组,数据项组成的列表)进行重发;而不可靠的Spout不会考虑接收成功与否只发射一次。而Spout中最主要的方法就是nextTuple(),该方法会发射一个新的tuple到topology,如果没有新tuple发射则会简单的返回。Bolt:Topology中所有的处理都由Bolt完成。Bolt可以完成任何事,比如:连接的过滤、聚合、访问文件/数据库、等等。Bolt从Spout中接收数据并进行处理,如果遇到复杂流的处理也可能将tuple发送给另一个Bolt进行处理。而Bolt中最重要的方法是execute(),以新的tuple作为参数接收。不管是Spout还是Bolt,如果将tuple发射成多个流,这些流都可以通过declareStream()来声明。Stream Groupings:Stream Grouping定义了一个流在Bolt任务间该如何被切分。这里有的6个Stream Grouping类型:1. 随机分组(Shuffle grouping):随机分发tuple到Bolt的任务,保证每个任务获得相等数量的tuple。2. 字段分组(Fields grouping):根据指定字段分割数据流,并分组。例如,根据“user-id”字段,相同“user-id”的元组总是分发到同一个任务,不同“user-id”的元组可能分发到不同的任务。3. 全部分组(All grouping):tuple被复制到bolt的所有任务。这种类型需要谨慎使用。4. 全局分组(Global grouping):全部流都分配到bolt的同一个任务。明确地说,是分配给ID最小的那个task。5. 无分组(None grouping):你不需要关心流是如何分组。目前,无分组等效于随机分组。但最终,Storm将把无分组的Bolts放到Bolts或Spouts订阅它们的同一线程去执行(如果可能)。6. 直接分组(Direct grouping):这是一个特别的分组类型。元组生产者决定tuple由哪个元组处理者任务接收。当然还可以实现CustomStreamGroupimg接口来定制自己需要的分组。项目实施当下情况我们需要给Spout和Bolt设计一种能够处理大量数据(日志文件)的topology,当一个特定数据值超过预设的临界值时促发警报。使用Storm的topology,逐行读入日志文件并且监视输入数据。在Storm组件方面,Spout负责读入输入数据。它不仅从现有的文件中读入数据,同时还监视着新文件。文件一旦被修改Spout会读入新的版本并且覆盖之前的tuple(可以被Bolt读入的格式),将tuple发射给Bolt进行临界分析,这样就可以发现所有可能超临界的记录。下一节将对用例进行详细介绍。临界分析这一节,将主要聚焦于临界值的两种分析类型:瞬间临界(instant thershold)和时间序列临界(time series threshold)。· 瞬间临界值监测:一个字段的值在那个瞬间超过了预设的临界值,如果条件符合的话则触发一个trigger。举个例子当车辆超越80公里每小时,则触发trigger。· 时间序列临界监测:字段的值在一个给定的时间段内超过了预设的临界值,如果条件符合则触发一个触发器。比如:在5分钟类,时速超过80KM两次及以上的车辆。Listing One显示了我们将使用的一个类型日志,其中包含的车辆数据信息有:车牌号、车辆行驶的速度以及数据获取的位置。AB 12360North cityBC 12370South cityCD 23440South cityDE 12340East  cityEF 12390South cityGH 12350West  city这里将创建一个对应的XML文件,这将包含引入数据的模式。这个XML将用于日志文件的解析。XML的设计模式和对应的说明请见下表。XML文件和日志文件都存放在Spout可以随时监测的目录下,用以关注文件的实时更新。而这个用例中的topology请见下图。Figure 1:Storm中建立的topology,用以实现数据实时处理如图所示:FilelistenerSpout接收输入日志并进行逐行的读入,接着将数据发射给ThresoldCalculatorBolt进行更深一步的临界值处理。一旦处理完成,被计算行的数据将发送给DBWriterBolt,然后由DBWriterBolt存入给数据库。下面将对这个过程的实现进行详细的解析。Spout的实现Spout以日志文件和XML描述文件作为接收对象。XML文件包含了与日志一致的设计模式。不妨设想一下一个示例日志文件,包含了车辆的车牌号、行驶速度、以及数据的捕获位置。(看下图)Figure2:数据从日志文件到Spout的流程图Listing Two显示了tuple对应的XML,其中指定了字段、将日志文件切割成字段的定界符以及字段的类型。XML文件以及数据都被保存到Spout指定的路径。Listing Two:用以描述日志文件的XML文件。1. <TUPLEINFO> 2. <FIELDLIST> 3. <FIELD> 4. <COLUMNNAME>vehicle_number</COLUMNNAME> 5. <COLUMNTYPE>string</COLUMNTYPE> 6. </FIELD> 7.  8. <FIELD>9. <COLUMNNAME>speed</COLUMNNAME> 10. <COLUMNTYPE>int</COLUMNTYPE> 11. </FIELD> 12.  13. <FIELD> 14. <COLUMNNAME>location</COLUMNNAME> 15. <COLUMNTYPE>string</COLUMNTYPE> 16. </FIELD> 17. </FIELDLIST> 18. <DELIMITER>,</DELIMITER> 19. </TUPLEINFO>   通过构造函数及它的参数Directory、PathSpout和TupleInfo对象创建Spout对象。TupleInfo储存了日志文件的字段、定界符、字段的类型这些很必要的信息。这个对象通过序列化XML时建立。Spout的实现步骤:· 对文件的改变进行分开的监听,并监视目录下有无新日志文件添加。· 在数据得到了字段的说明后,将其转换成tuple。· 声明Spout和Bolt之间的分组,并决定tuple发送给Bolt的途径。Spout的具体编码在Listing Three中显示。Listing Three:Spout中open、nextTuple和delcareOutputFields方法的逻辑。1. public void open( Map conf, TopologyContext context,SpoutOutputCollector collector )   2.    3.            _collector = collector;   4.          try   5.             6.          fileReader  =  new BufferedReader(new FileReader(new File(file);  7.            8.          catch (FileNotFoundException e)  9.            10.          System.exit(1);   11.            12.                                                           13.  14. public void nextTuple()  15.  16.          protected void ListenFile(File file)  17.            18.          Utils.sleep(2000);  19.          RandomAccessFile access = null;  20.          String line = null;   21.             try   22.               23.                 while (line = access.readLine() != null)  24.                   25.                     if (line !=null)  26.                        27.                          String fields=null;  28.                           if (tupleInfo.getDelimiter().equals("|")  fields = line.split(""+tupleInfo.getDelimiter();   29.                           else   30.                           fields = line.split  (tupleInfo.getDelimiter();   31.                           if (tupleInfo.getFieldList().size() = fields.length)  _collector.emit(new Values(fields);  32.                       33.                  34.               35.             catch (IOException ex)   36.               37.  38.  39. public void declareOutputFields(OutputFieldsDeclarer declarer)  40.  41.       String fieldsArr = new String tupleInfo.getFieldList().size();  42.       for(int i=0; i<tupleInfo.getFieldList().size(); i+)  43.         44.               fieldsArri = tupleInfo.getFieldList().get(i).getColumnName();  45.         46. declarer.declare(new Fields(fieldsArr);  47.       declareOutputFileds()决定了tuple发射的格式,这样的话Bolt就可以用类似的方法将tuple译码。Spout持续对日志文件的数据的变更进行监听,一旦有添加Spout就会进行读入并且发送给Bolt进行处理。Bolt的实现Spout的输出结果将给予Bolt进行更深一步的处理。经过对用例的思考,我们的topology中需要如Figure 3中的两个Bolt。Figure 3:Spout到Bolt的数据流程。ThresholdCalculatorBoltSpout将tuple发出,由ThresholdCalculatorBolt接收并进行临界值处理。在这里,它将接收好几项输入进行检查;分别是:临界值检查· 临界值栏数检查(拆分成字段的数目)· 临界值数据类型(拆分后字段的类型)· 临界值出现的频数· 临界值时间段检查Listing Four中的类,定义用来保存这些值。Listing Four:ThresholdInfo类1. public class ThresholdInfo implementsSerializable  2.  3.     4.         private String action;   5.         private String rule;   6.         private Object thresholdValue;  7.         private int thresholdColNumber;   8.         private Integer timeWindow;   9.         private int frequencyOfOccurence;   10.    基于字段中提供的值,临界值检查将被Listing Five中的execute()方法执行。代码大部分的功能是解析和接收值的检测。Listing Five:临界值检测代码段1. public void execute(Tuple tuple, BasicOutputCollector collector)   2.  3.     if(tuple!=null)   4.       5.         List<Object> inputTupleList = (List<Object>) tuple.getValues();  6.         int thresholdColNum = thresholdInfo.getThresholdColNumber();   7.         Object thresholdValue = thresholdInfo.getThresholdValue();   8.         String thresholdDataType = tupleInfo.getFieldList().get(thresholdColNum-1).getColumnType();   9.         Integer timeWindow = thresholdInfo.getTimeWindow();  10.          int frequency = thresholdInfo.getFrequencyOfOccurence();  11.          if(thresholdDataType.equalsIgnoreCase("string")  12.            13.              String valueToCheck = inputTupleList.get(thresholdColNum-1).toString();  14.              String frequencyChkOp = thresholdInfo.getAction();  15.              if(timeWindow!=null)  16.                17.                  long curTime = System.currentTimeMillis();  18.                  long diffInMinutes = (curTime-startTime)/(1000);  19.                  if(diffInMinutes>=timeWindow)  20.                    21.                      if(frequencyChkOp.equals("=")  22.                        23.                           if(valueToCheck.equalsIgnoreCase(thresholdValue.toString()  24.                             25.                               count.incrementAndGet();  26.                               if(count.get() > frequency)  27.                                   splitAndEmit(inputTupleList,collector);  28.                             29.                        30.                      else if(frequencyChkOp.equals("!=")  31.                        32.                          if(!valueToCheck.equalsIgnoreCase(thresholdValue.toString()  33.                            34.                               count.incrementAndGet();  35.                               if(count.get() > frequency)  36.                                   splitAndEmit(inputTupleList,collector);  37.                             38.                         39.                       else                         System.out.println("Operator not supported");   40.                     41.                 42.               else 43.                 44.                   if(frequencyChkOp.equals("=")  45.                     46.                       if(valueToCheck.equalsIgnoreCase(thresholdValue.toString()  47.                         48.                           count.incrementAndGet();  49.                           if(count.get() > frequency)  50.                               splitAndEmit(inputTupleList,collector);  51.                             52.                     53.                   else if(frequencyChkOp.equals("!=")  54.                     55.                        if(!valueToCheck.equalsIgnoreCase(thresholdValue.toString()  56.                          57.                            count.incrementAndGet();  58.                            if(count.get() > frequency)  59.                                splitAndEmit(inputTupleList,collector);  60.                             61.                      62.                  63.               64.             else if(thresholdDataType.equalsIgnoreCase("int") |                     thresholdDataType.equalsIgnoreCase("double") |                     thresholdDataType.equalsIgnoreCase("float") |                     thresholdDataType.equalsIgnoreCase("long") |                     thresholdDataType.equalsIgnoreCase("short")  65.               66.                 String frequencyChkOp = thresholdInfo.getAction();  67.                 if(timeWindow!=null)  68.                   69.                      long valueToCheck =                          Long.parseLong(inputTupleList.get(thresholdColNum-1).toString();  70.                      long curTime = System.currentTimeMillis();  71.                      long diffInMinutes = (curTime-startTime)/(1000);  72.                      System.out.println("Difference in minutes="+diffInMinutes);  73.                      if(diffInMinutes>=timeWindow)  74.                        75.                           if(frequencyChkOp.equals("<")  76.                             77.                               if(valueToCheck < Double.parseDouble(thresholdValue.toString()  78.                                 79.                                    count.incrementAndGet();  80.                                    if(count.get() > frequency)  81.                                        splitAndEmit(inputTupleList,collector);  82.                                 83.                             84.                           else if(frequencyChkOp.equals(">")  85.                             86.                                if(valueToCheck > Double.parseDouble(thresholdValue.toString()  87.                                   88.                                    count.incrementAndGet();  89.                                    if(count.get() > frequency)  90.                                        splitAndEmit(inputTupleList,collector);  91.                                  92.                              93.                            else if(frequencyChkOp.equals("=")  94.                              95.                               if(valueToCheck = Double.parseDouble(thresholdValue.toString()  96.                                 97.                                   count.incrementAndGet();  98.                                   if(count.get() > frequency)  99.                                       splitAndEmit(inputTupleList,collector);  100.                                  101.                              102.                            else if(frequencyChkOp.equals("!=")  103.                              104.     . . .  105.                               106.                          107.                108.       else 109.           splitAndEmit(null,collector);  110.         111.       else 112.        113.            System.err.println("Emitting null in bolt");  114.            splitAndEmit(null,collector);  115.       116.  经由Bolt发送的的tuple将会传递到下一个对应的Bolt,在我们的用例中是DBWriterBolt。DBWriterBolt经过处理的tuple必须被持久化以便于触发tigger或者更深层次的使用。DBWiterBolt做了这个持久化的工作并把tuple存入了数据库。表的建立由prepare()函数完成,这也将是topology调用的第一个方法。方法的编码如Listing Six所示。Listing Six:建表编码。1. public void prepare( Map StormConf, TopologyContext context )   2.          3.     try   4.       5.         Class.forName(dbClass);  6.        7.     catch (ClassNotFoundException e)   8.       9.         System.out.println("Driver not found");  10.         e.printStackTrace();  11.       12.    13.     try   14.       15.        connection driverManager.getConnection(   16.            "jdbc:mysql:/"+databaseIP+":"+databasePort+"/"+databaseName, userName, pwd);  17.        connection.prepareStatement("DROP TABLE IF EXISTS "+tableName).execute();  18.    19.        StringBuilder createQuery = new StringBuilder(  20.            "CREATE TABLE IF NOT EXISTS "+tableName+"(");  21.        for(Field fields : tupleInfo.getFieldList()  22.          23.            if(fields.getColumnType().equalsIgnoreCase("String")  24.                createQuery.append(fields.getColumnName()+" VARCHAR(500),")

    注意事项

    本文(storm进行大数据实时分析(共20页).docx)为本站会员(飞****2)主动上传,得力文库 - 分享文档赚钱的网站仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对上载内容本身不做任何修改或编辑。 若此文所含内容侵犯了您的版权或隐私,请立即通知得力文库 - 分享文档赚钱的网站(点击联系客服),我们立即给予删除!

    温馨提示:如果因为网速或其他原因下载失败请重新下载,重复下载不扣分。




    关于得利文库 - 版权申诉 - 用户使用规则 - 积分规则 - 联系我们

    本站为文档C TO C交易模式,本站只提供存储空间、用户上传的文档直接被用户下载,本站只是中间服务平台,本站所有文档下载所得的收益归上传人(含作者)所有。本站仅对用户上传内容的表现方式做保护处理,对上载内容本身不做任何修改或编辑。若文档所含内容侵犯了您的版权或隐私,请立即通知得利文库网,我们立即给予删除!客服QQ:136780468 微信:18945177775 电话:18904686070

    工信部备案号:黑ICP备15003705号-8 |  经营许可证:黑B2-20190332号 |   黑公网安备:91230400333293403D

    © 2020-2023 www.deliwenku.com 得利文库. All Rights Reserved 黑龙江转换宝科技有限公司 

    黑龙江省互联网违法和不良信息举报
    举报电话:0468-3380021 邮箱:hgswwxb@163.com  

    收起
    展开