基础知识思考整理
http://www.jb51.cc/article/p-cudsbmht-bep.html

主要是Storm流计算的简单的环境的搭建和简单的demo的运行。
当前系统和系统中已经预装的软件:
其中软件包括jdk1.7.0_79、python2.7.6、zookeeper3.3.5

如果没装的话,可以大致按照[1]这个安装这些软件。

安装ZeroMQ和jzmq

Storm底层是基于ZeroMQ消息队列的,所以还需要安装ZeroMQ和其java版本,但是在编译的过程中可能会遇到一些问题,主要是配置和编译的问题,可以通过[2]中的方式解决,同时按照[3]来进行相关配置。

安装Storm

参照[1][3]可以完成安装,主要是下载和相应的文件的配置。

测试安装

以上完成整个单机版本Storm的环境的配置,下面是Storm自带的examples中的storm-starter例子(也就是WordCountTopology)的运行。
启动Storm
cd /path/to/zookeeper/bin/
./zkServer.sh start #启动zookeeper
cd /path/to/storm/bin
./storm nimbus &
./storm supervisor &
./storm ui &
启动完成后利用jps查看后台进程:

运行demo

确保可以成功启动后来进入storm-starter进行WordCountTopology例子的运行:
实际下面的过程就是strom-starter中README.markdown中的说明和步骤:
前面熟悉平台和相应例子代码的过程先略过,来看利用Maven运行storm-starter:
安装Maven3.x
对Storm首先进行一个本地构建

# Must be run from the top-level directory of the Storm code repository
$ mvn clean install -DskipTests=true

上面的命令在本地构建Storm并将jar包安装到$HOME/.m2/repository中,后面运行maven命令的时候,Maven能够找到相应的Storm版本等信息。
将storm-starter打包,便于提交和运行

$ mvn package

生成的包存于storm-starter的target目录下,命名为storm-starter-版本.jar
后面进行提交的时候可以使用这个包进行提交。

# Example 1: Run the ExclamationTopology in local mode (LocalCluster)
$ storm jar target/storm-starter-*.jar org.apache.storm.starter.ExclamationTopology

# Example 2: Run the RollingTopWords in remote/cluster mode,
# under the name "production-topology"
$ storm jar storm-starter-*.jar org.apache.storm.starter.RollingTopWords production-topology remote

运行wordCountTopology例子:

cd /path/to/storm/bin
./storm jar path/to/generated.jar org.apache.storm.starter.WordCountTopology

运行结果,输出比较多,整理其中一个例子查看:

可以看到这是“I am at two with nature”的一个统计词频的结果。

源码和相应的注释如下:

/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License,Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing,software * distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */
package org.apache.storm.starter;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.task.ShellBolt;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.starter.spout.RandomSentencespout;

import java.util.HashMap;
import java.util.Map;

/** * This topology demonstrates Storm's stream groupings and multilang capabilities. */
public class WordCountTopology {
    //分词 bolt
    //基于shellBolt实现的SplitSentence类 方便使用多语言 如 python 等
  public static class SplitSentence extends ShellBolt implements IRichBolt {

    //用pyton写的一个Bolt 构造函数 调用其它语言写的bolt
    public SplitSentence() {
      super("python","splitsentence.py");
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
    }

    @Override
    public Map<String,Object> getComponentConfiguration() {
      return null;
    }
  }
    //词频统计 bolt
    //用java实现,所以基类用BaseBasicBolt就够了
  public static class WordCount extends BaseBasicBolt {

    Map<String,Integer> counts = new HashMap<String,Integer>();

    @Override
    public void execute(Tuple tuple,BasicOutputCollector collector) {
      String word = tuple.getString(0);
      Integer count = counts.get(word);
      if (count == null)
        count = 0;
      count++;
      counts.put(word,count);
      //发射单词和计数值
      collector.emit(new Values(word,count));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word","count"));
    }
  }

  //启动main函数
  //
  public static void main(String[] args) throws Exception {
    //创建一个topology
    TopologyBuilder builder = new TopologyBuilder();
    //创建spout RandomSentencespout是一个随机句子产生器
    builder.setspout("spout",new RandomSentencespout(),5);
    //创建split bolt
    builder.setBolt("split",new SplitSentence(),8).shuffleGrouping("spout");
    //创建count bolt
    builder.setBolt("count",new WordCount(),12).fieldsGrouping("split",new Fields("word"));


    //提交运行
    Config conf = new Config();
    conf.setDebug(true);

    if (args != null && args.length > 0) {
      conf.setNumWorkers(3);
      StormSubmitter.submitTopologyWithProgressBar(args[0],conf,builder.createtopology());
    }
    else {
      //最大并行度
      conf.setMaxTaskParallelism(3);
      //提交
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("word-count",builder.createtopology());
      Thread.sleep(10000);
      cluster.shutdown();
    }
  }
}

其中调用的python文件如下:

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License,Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import storm

class SplitSentenceBolt(storm.BasicBolt):
    def process(self,tup):
        words = tup.values[0].split(" ")
        for word in words:
          storm.emit([word])

SplitSentenceBolt().run()

参考

[1] http://www.cnblogs.com/stone_w/p/4487897.html
[2] https://my.oschina.net/mingdongcheng/blog/43009
[3] http://blog.csdn.net/zz430581/article/details/37505707
[4] http://blog.csdn.net/nb_vol_1/article/details/46287625
[5] http://blog.csdn.net/nb_vol_1/article/details/46287625

【预研】Ubuntu 12.04 64 bit Storm的单机版本的安装和运行的更多相关文章

  1. IOs Cordova长按显示文本选择放大镜即使禁用文本选择,如何删除?

    是否有任何可能导致此问题的插件?任何帮助深表感谢.Cordova插件:>com.mbppower.camerapreview>cordova-plugin-statusbar>cordova-plugin-whitelist>离子插件键盘>org.apache.cordova.camera>org.apache.cordova.console>org.apache.cordova.device>org.apache.cordova.dialogs>org.apache.cordova.file>org.a

  2. [翻译]Swift编程语言——高级操作符

    高级操作符在前面的基本操作符之外,为了做更复杂的值操作,Swift还提供了若干高级操作符。不同于C中的算术操作符,Swfit中的算术操作符不会默认溢出。Swift使得为这些自定义的类型量身打造标准操作符的实现变得很轻松。预定义操作符没有任何限制,Swift提供了定制中缀、前缀、后缀和指派操作符的自由。Swfit提供所有的C支持的按位操作符,下文有具体描述。CSS颜色值#CC6699依据Swift的十六进制表示法被写作0xCC6699。)有符号整型用它们的第一个bit来表示正负。

  3. android – org.apache.cordova.api不存在. PhoneGap 3.0

    我正在尝试将VideoPlayer插件(https://github.com/macdonst/VideoPlayer)添加到我的phonegapAndroid应用程序中.在编译时遇到问题:第25行:解决方法将您的导入更改为:

  4. 如何将android客户端连接到我的笔记本电脑内的Apache服务器(php)的localhost?

    我的笔记本电脑中的localhost-127.0.0.1或android10.0.0.1中的localhost?>那么,如果我想从android访问localhost来调用PHP来运行?哪个ip地址/url我需要放在Android应用程序?我需要在httpconfig中为XAMPP修改任何内容吗?解决方法使用ipconfig在笔记本电脑中找到您的IP地址.在手机中使用该地址而不是127.0.0.1.

  5. android – 在android工作室中的proguard错误

    我想在我的应用程序中使用proguard,我启用它但是当我想生成apk文件时,它给了我这个错误:我正在使用最新版本的sdk23,这是我的gradle文件:怎么了?我在这段代码中做错了什么?谢谢解决方法只需在proguard上添加:

  6. 无法修复Android Proguard返回错误代码1错误

    当我尝试在我的Android应用程序中使用proguard时只需添加到我的project.properties文件,APK导出失败并显示消息Proguard返回错误代码1这是我的project.properties文件这是错误堆栈:解决方法将这些行添加到proguard配置文件(proguard-android.txt)见ProguardTroubleshooting请注意,如果您使用您的配置文件

  7. Phonegap 2.4 Android Proguard配置

    有人有主意吗???

  8. android – 如何在sharedPreferences中分析ANR

    在sharedPreferences中遇到ANR,不知道如何定位问题.以下是trace的三个部分,其他大多数线程都是“WAIT”或“TIMED_WAIT”.由于countdownlatch.await(),“主”线程被阻止.第二个线程“pool-1-thread-1”等待fsync.最后一个是试图读一些东西.我认为第二个线程已经阻塞了主线程,因为如果这个无法完成,它将不会调用countdownla

  9. Android无法访问org.apache.http.client.HttpClient

    我正在使用androidstudio创建一个向服务器发出GET请求的应用程序.我的代码是这样的:问题是AndroidStudio标记了这一行有错误:说“无法访问org.apache.http.client.HttpClient”这是我的gradle文件:解决方法在AndroidSDK23中不推荐使用HttpClient,因为它推断,您可以在HttpURLConnection中迁移代码https:/

  10. Android L – 没有对等证书

    我开发了一个小应用程序,使用带有自签名证书的SSL连接到我的服务器.为了使它工作,我使用BouncyCastleProvider将我的证书加载到自定义密钥库中,并在我的自定义SSLSocketFactory中导入证书.Everythink在android2.3(最小sdk)到4.4.4之间运行良好.但在androidL(预览版)中,我的应用失败了:TueAug1214:34:40BRT2014:j

随机推荐

  1. crontab发送一个月份的电子邮件

    ubuntu14.04邮件服务器:Postfixroot收到来自crontab的十几封电子邮件.这些邮件包含PHP警告.>我已经解决了这些警告的原因.>我已修复每个cronjobs不发送电子邮件(输出发送到>/dev/null2>&1)>我删除了之前的所有电子邮件/var/mail/root/var/spool/mail/root但我仍然每小时收到十几封电子邮件.这些电子邮件来自cronjobs,

  2. 模拟两个ubuntu服务器计算机之间的慢速连接

    我想模拟以下场景:假设我有4台ubuntu服务器机器A,B,C和D.我想在机器A和机器C之间减少20%的网络带宽,在A和B之间减少10%.使用网络模拟/限制工具来做到这一点?

  3. ubuntu-12.04 – 如何在ubuntu 12.04中卸载从源安装的redis?

    我从源代码在Ubuntu12.04上安装了redis-server.但在某些时候它无法完全安装,最后一次makeinstallcmd失败.然后我刚刚通过apt包安装.现在我很困惑哪个安装正在运行哪个conf文件?实际上我想卸载/删除通过源安装的所有内容,只是想安装一个包.转到源代码树并尝试以下命令:如果这不起作用,您可以列出软件自行安装所需的步骤:

  4. ubuntu – “apt-get source”无法找到包但“apt-get install”和“apt-get cache”可以找到它

    我正在尝试下载软件包的源代码,但是当我运行时它无法找到.但是当我运行apt-cache搜索squid3时,它会找到它.它也适用于apt-getinstallsquid3.我使用的是Ubuntu11.04服务器,这是我的/etc/apt/sources.list我已经多次更新了.我尝试了很多不同的debs,并没有发现任何其他地方的错误.这里的问题是你的二进制包(deb)与你的源包(deb-src)不

  5. ubuntu – 有没有办法检测nginx何时完成正常关闭?

    &&touchrestarted),因为即使Nginx没有完成其关闭,touch命令也会立即执行.有没有好办法呢?这样的事情怎么样?因此,pgrep将查找任何Nginx进程,而while循环将让它坐在那里直到它们全部消失.你可以改变一些有用的东西,比如睡1;/etc/init.d/Nginx停止,以便它会休眠一秒钟,然后尝试使用init.d脚本停止Nginx.你也可以在某处放置一个计数器,这样你就可以在需要太长时间时发出轰击信号.

  6. ubuntu – 如何将所有外发电子邮件从postfix重定向到单个地址进行测试

    我正在为基于Web的应用程序设置测试服务器,该应用程序发送一些电子邮件通知.有时候测试是使用真实的客户数据进行的,因此我需要保证服务器在我们测试时无法向真实客户发送电子邮件.我想要的是配置postfix,以便它接收任何外发电子邮件并将其重定向到一个电子邮件地址,而不是传递到真正的目的地.我正在运行ubuntu服务器9.10.先感谢您设置本地用户以接收所有被困邮件:你需要在main.cf中添加:然后

  7. ubuntu – vagrant无法连接到虚拟框

    当我使用基本的Vagrantfile,只配置了两条线:我看到我的虚拟框打开,但是我的流氓日志多次显示此行直到超时:然后,超时后的一段时间,虚拟框框终于要求我登录,但是太久了!所以我用流氓/流氓记录.然后在我的物理机器上,如果我“流氓ssh”.没有事情发生,直到:怎么了?

  8. ubuntu – Nginx – 转发HTTP AUTH – 用户?

    我和Nginx和Jenkins有些麻烦.我尝试使用Nginx作为Jenkins实例的反向代理,使用HTTP基本身份验证.它到目前为止工作,但我不知道如何传递带有AUTH用户名的标头?}尝试将此指令添加到您的位置块

  9. Debian / Ubuntu – 删除后如何恢复/ var / cache / apt结构?

    我在ubuntu服务器上的空间不足,所以我做了这个命令以节省空间但是现在在尝试使用apt时,我会收到以下错误:等等显然我删除了一些目录结构.有没有办法做apt-getrebuild-var-tree或类似的?

  10. 检查ubuntu上安装的rubygems版本?

    如何查看我的ubuntu盒子上安装的rubygems版本?只是一个想法,列出已安装的软件包和grep为ruby或宝石或其他:)dpkg–get-selections

返回
顶部