教你成为全栈工程师(Full Stack Developer) 三十一-利用微信搜索抓取公众号文章

请尊重原创,转载请注明来源网站www.lcsays.com以及原始链接地址

自动收集我关注的微信公众号文章

 

我的微信里关注了数十个有关大数据的公众号,每天都会出现那个小红点让我点进去看,但是点多了就会觉得烦了,所以我要做的第一步就是自动把公众号里的新文章都收集到一块,怎么做呢?scrapy!

对!scrapy抓取!但是scrapy顺着超链接抓取web网页容易,抓取微信app里的内容就有难度了,暂时还是做不到模拟一个收集app软件。庆幸的是,腾讯和搜狗搜索结婚啦!生出了一个小宝宝:搜狗微信搜索。下面我们就借助搜狗微信搜索来实现我的目的

举个例子,我关注了一个公众号叫:大数据文摘。打开http://weixin.sogou.com/,输入“大数据文摘”,点“搜公众号”,搜索结果如下:

点击这个搜索结果,跳到了新页面

这里面显示的都是最新发布的文章

好!我们就沿着这条路线来追踪公众号的新文章

下面我们来分析一下url

第一个搜索结果页的url是:http://weixin.sogou.com/weixin?type=1&query=%E5%A4%A7%E6%95%B0%E6%8D%AE%E6%96%87%E6%91%98&ie=utf8&_sug_=n&_sug_type_=,我们去掉query以外的参数得到:http://weixin.sogou.com/weixin?query=%E5%A4%A7%E6%95%B0%E6%8D%AE%E6%96%87%E6%91%98,打开之后结果是一样的,ok,这个就作为我们抓取的种子入口,如果搜索其他公众号就把query参数换掉

下面分析搜索结果里怎么提取第二章页面,也就是公众号profile页的链接,我们看下搜索结果页的部分html如下:

<div target="_blank" href="http://mp.weixin.qq.com/profile?src=3&amp;timestamp=1463443372&amp;ver=1&amp;signature=lNY-ZbjfPHr40G-zyUe*Sdc9HIn2IisEo0vwpKEAV*Z*ALBYuYf2HaMUtEP*15rQzs47zSEiORN3BOWPNA2R*A==" class="wx-rb bg-blue wx-rb_v1 _item" uigs_exp_id="" onclick="gotourl('http://mp.weixin.qq.com/profile?src=3&amp;timestamp=1463443372&amp;ver=1&amp;signature=lNY-ZbjfPHr40G-zyUe*Sdc9HIn2IisEo0vwpKEAV*Z*ALBYuYf2HaMUtEP*15rQzs47zSEiORN3BOWPNA2R*A==',event,this);return true;" id="sogou_vr_11002301_box_0" uigs="sogou_vr_11002301_box_0">
<div class="img-box">
<span class="ico-bg"></span><span class="ico-r"></span><img style="visibility: visible; height: 57px; margin-left: 0px;" src="http://img01.sogoucdn.com/app/a/100520090/oIWsFt58NVJTkYWvPtICKgg8ka60" onload="vrImgLoad(this, 'fit', 57, 57)" onerror="vrImgErr(this, '/wechat/images/account/def56-56.png')" extra="err:'http://img01.sogoucdn.com/net/a/04/link?appid=100520078&amp;url=http://wx.qlogo.cn/mmhead/Q3auHgzwzM46WJlQ8GYRWPhThl25rSKJEYBm408fnEkYS9DUkiaSxGg/0/0'"></div>
<div class="txt-box">
<h3><em><!--red_beg-->大数据文摘<!--red_end--></em></h3>
<h4>
<span>微信号:<label name="em_weixinhao">BigDataDigest</label></span>
</h4>
<p class="s-p3">
<span class="sp-tit">功能介绍:</span><span class="sp-txt">普及<em><!--red_beg-->数据<!--red_end--></em>思维,传播<em><!--red_beg-->数据<!--red_end--></em>文化</span>
</p>
<p class="s-p3">
<span class="sp-tit"><script>authnamewrite('2')</script>微信认证:</span><span class="sp-txt">深圳大数据文摘科技有限公司</span>
</p>
<p class="s-p3">
<span class="sp-tit">最近文章:</span><span class="sp-txt"><a class="blue" target="_blank" id="sogou_vr_11002301_link_first_0" href="http://mp.weixin.qq.com/s?src=3&amp;timestamp=1463443372&amp;ver=1&amp;signature=fZ5HsUYiytbTgb8SekmcI3g9oizZncGBgdipWihPFh2pPnAwAwO62nX9iXNILZx0XtQB3R*3PWcgqPh1YWL*LX3qxIOf0ZpkKyhZSUkAgPmH*w71dqIB2*wfNTpVDZx5G3nh31tctf*lNqXlfXzgfPO6E60vqoqB694bPMymy*I=" title="二项式与小苹果——看牛顿如何将灵感火花拓展成知识体系">二项式与小苹果——看牛顿如何将灵感火花拓展成知识体系</a><span class="hui"><script>vrTimeHandle552write('1463440604')</script>46分钟前</span></span>
</p>
……

看这里关键的href一行:

<div target="_blank" href="http://mp.weixin.qq.com/profile?src=3&amp;timestamp=1463443372&amp;ver=1&amp;signature=lNY-ZbjfPHr40G-zyUe*Sdc9HIn2IisEo0vwpKEAV*Z*ALBYuYf2HaMUtEP*15rQzs47zSEiORN3BOWPNA2R*A==" class="wx-rb bg-blue wx-rb_v1 _item" uigs_exp_id="" onclick="gotourl('http://mp.weixin.qq.com/profile?src=3&amp;timestamp=1463443372&amp;ver=1&amp;signature=lNY-ZbjfPHr40G-zyUe*Sdc9HIn2IisEo0vwpKEAV*Z*ALBYuYf2HaMUtEP*15rQzs47zSEiORN3BOWPNA2R*A==',event,this);return true;" id="sogou_vr_11002301_box_0" uigs="sogou_vr_11002301_box_0">

这就是我们要提取的profile页链接,提取方式可以直接通配成:“url里带http://mp.weixin.qq.com/profile?src=的href属性”

 

ps:找xpath的方便方法是利用浏览器的开发者工具,比如chrome界面如下:

在Elements的标签处点右键选择:Copy->Copy XPath,就自动把xpath路径拷贝到剪切板了

 

注意:在这里我突然想到一个问题,每个公众号对应的profile页面是不是永远不变的呢?经过我的实验,这条url里的timestamp参数和signature是有对应关系的,任意一个错了都无法打开,而且每次搜索生成的链接都是不同的,所以我断定在微信搜索内容是动态生成链接的,那么这个动态链接的生命周期就不可预测了,所以为了保险起见,我们每次都从搜索入口追溯,才是万全之策

下面我们分析profile页里的文章链接,我们看profile页的部分 html如下:

<h4 class="weui_media_title" hrefs="/s?timestamp=1463443165&amp;src=3&amp;ver=1&amp;signature=dZCo9et5C6nyZfVAQAl416OW-eXJbi0VaS0QPQdvEv1tawqgsjlVYUd0oav0tUHAf38HOGU3Lskd7qqXbFg9D2mP8cv36CZ1dW0bGxbP4YyJcRdy*M*Mow6xD5YWDK8-82r9MX*4WqgbGqo4FAhZeiGTEl27YhIbaIxPiQgMbxc=">代理银行业务:通过监管列表对代理银行客户进行风险评级</h4>
<p class="weui_media_desc">为了确保银行积极的通过代理银行关系来连接美国金融市场,需要考虑如何根据现有电汇和监管列表信息,来提升可疑行为模型的成熟度。</p>
<p class="weui_media_extra_info">2016年5月17日</p>

这里面可以找到文章的内容了链接、标题、摘要、发布时间,简直太完美了

链接的提取方式可以直接通配成:h4.weui_media_title hrefs

标题的提取方式可以直接通配成:h4.weui_media_title text

摘要的提取方式可以直接通配成:p.weui_media_desc

发布时间的提取方式可以直接通配成:p.weui_media_extra_info

 

开发我的scrapy爬虫

 

如果还没有安装scrapy,请见《教你成为全栈工程师(Full Stack Developer) 三十-十分钟掌握最强大的python爬虫

创建一个scrapy工程

scrapy startproject weixin

在weixin/spiders/中创建dashujuwenzhai.py内容如下:

#!/usr/bin/python
# -*- coding: utf-8 -*-
import scrapy
class ShareditorSpider(scrapy.Spider):
    name = "dashujuwenzhai"
    allowed_domains = ["qq.com"]
    start_urls = [
        "http://weixin.sogou.com/weixin?query=大数据文摘"
    ]
    def parse(self, response):
        print response.body
        href = response.selector.xpath('//div[@id="sogou_vr_11002301_box_0"]/@href').extract()[0]
        yield scrapy.Request(href, callback=self.parse_profile)
    def parse_profile(self, response):
        print response.body

 

执行

scrapy crawl dashujuwenzhai

即可以抓到大数据文摘的profile页面内容

请尊重原创,转载请注明来源网站www.lcsays.com以及原始链接地址

接下来来研究profile页,抓回的页面不是普通的html页面,而是通过js渲染出来的,也就是我们看到的每一条文章的标题、摘要等都是通过js计算出来的,代码里有这么一句:

var msgList = '{&quot;list&quot;:[{&quot;comm_msg_info&quot;:{&quot;id&quot;:410106318,&quot;type&quot;:49,&quot;datetime&quot;:1463528503,&quot;fakeid&quot;:&quot;2391437564&quot;,&quot;status&quot;:2,&quot;content&quot;:&quot;&quot;},&quot;app_msg_ext_info&quot;:{&quot;title&quot;:&quot;机器人前传:达芬奇的机器狮和日耳曼装甲骑士&quot;,&quot;digest&quot;:&quot;这是一篇描述阿尔法狗和Atlas机器人祖先的文章。远在500多年前的达芬奇时代,已经有了不少关于机器人的探索。这个大天才写了大量关于自动机描述,在他的个人笔记中也充斥着各种机械发明的构思,比如弹簧驱动的汽车和机器狮子。&quot;,&quot;content&quot;:&quot;&quot;,&quot;fileid&quot;:504157567,&quot;content_url&quot;:&quot;\\/s?timestamp=1463529354&amp;amp;src=3&amp;amp;ver=1&amp;amp;signature=cG*R8qc-PGKV-aZ4q9IlJQfIHtGp5I3H63xlK-h5mBO0W2FRAzCddav9cPf*GuwUBI4x0zJzmtcoOU7sQQeMf3CfNzaTEIq4C8YwnsZQGnqnauqr2wQYvEFvAooyecPF3H6bg8OiqpSZsd5LnY*fVrZOMINmQwV8Qup*D9qvUkw=&quot;,&quot;source_url&quot;:&quot;https:\\/\\/mp.weixin.qq.com\\/s?__biz=MzA4OTYwNzk0NA==&amp;amp;mid=401744027&amp;amp;idx=1&amp;amp;sn=43699667dca4438a49db51fb3700af4f&amp;amp;scene=1&amp;amp;srcid=0517MRoAk1EzgC5iSMtvoYC5&amp;amp;pass_ticket=06ybKvJknob%2F5%2B%2FAmkUtnjcyCqWcuNxZTJapLW5QZyk7PWh1jD7ubwb5H1zXzMWB#rd&quot;,&quot;cover&quot;:&quot;http:\\/\\/mmbiz.qpic.cn\\/mmbiz\\/wc7YNPm3YxXiajPXq2Y2PWQsic1SmjCxnTicHKtwItmARwkha1RI1gH1WwTfRvEUzauWJibjuJC9oJ8eibeVlDjRkwg\\/0?wx_fmt=jpeg&quot;,&quot;subtype&quot;:0,&quot;is_multi&quot;:1,&quot;multi_app_msg_item_list&quot;:[{&quot;title&quot;:&quot;清华论坛实录|刘瑞宝:洞见数据内涵,提升公共安全研判能力&quot;,&quot;digest&quot;:&quot;本文为刘瑞宝先生于2016年3月24日在RONG—大数据与公共安全专场上所做的题为《洞见数据内涵,提升公共安全研判能力》的演讲实录。&quot;,&quot;content&quot;:&quot;&quot;,&quot;fileid&quot;:504157565,&quot;content_url&quot;:&quot;\\/s?timestamp=1463529354&amp;amp;src=3&amp;amp;ver=1&amp;amp;signature=cG*R8qc-PGKV-aZ4q9IlJQfIHtGp5I3H63xlK-h5mBO0W2FRAzCddav9cPf*GuwUBI4x0zJzmtcoOU7sQQeMf3CfNzaTEIq4C8YwnsZQGnrmdiX-aBZzJtqDGa76CoHH8gL7PEfN3ZQN5lNa4YgJUeUyE*SIna3B7W*zKWYskkU=&quot;,&quot;source_url&quot;:&quot;https:\\/\\/mp.weixin.qq.com\\/s?__biz=MzAxMzA2MDYxMw==&amp;amp;mid=2651555964&amp;amp;idx=2&amp;amp;sn=479aaf7f3b687b973ffa303d3d3be6b9&amp;amp;scene=1&amp;amp;srcid=0517C5DgLArlrdVAlQ9GIHOl&amp;amp;pass_ticket=06ybKvJknob%2F5%2B
……

当然还没有截取全,这就是文章的全部内容,写到了一个js变量里,这样就无法通过scrapy原生的response.xpath拿到,这怎么办呢?

我们来利用phantomjs来渲染,这是一个强大的工具,它是无界面的浏览器,所以渲染js速度非常快,但是也有一些缺陷,有一些浏览器渲染功能不支持,所以如果再深入可以借助selenium工具,这又是一个强大的工具,它原本是用来做web应用程序自动化测试用的,也就是可以模拟各种点击浏览等动作,那么用他来做爬虫几乎就是一个真人,本节先来研究phantomjs,有关selenium的内容后面有需求了再研究

 

安装phantomjs

wget https://bitbucket.org/ariya/phantomjs/downloads/phantomjs-2.1.1-linux-x86_64.tar.bz2
tar jxvf phantomjs-2.1.1-linux-x86_64.tar.bz2
cd phantomjs-2.1.1-linux-x86_64/
./bin/phantomjs examples/netlog.js https://www.lcsays.com/

以上输出了网路通信日志,说明没有问题

 

为了方便,可以把./bin/phantomjs拷贝到~/bin下

 

写一个phantomjs渲染脚本

var page = require('webpage').create();
var system = require('system');
page.open(system.args[1], function(status) {
    var sc = page.evaluate(function() {
        return document.body.innerHTML;
    });
    window.setTimeout(function() {
        console.log(sc);
        phantom.exit();
    }, 100);
});

创建phantomjs渲染脚本getBody.js内容如下:

 

执行

phantomjs getBody.js 'http://mp.weixin.qq.com/profile?src=3&timestamp=1463529344&ver=1&signature=lNY-ZbjfPHr40G-zyUe*Sdc9HIn2IisEo0vwpKEAV*Z*ALBYuYf2HaMUtEP*15rQ7TpyhXFL52e8W929D4nd2g==' > profile.html

这里的链接可能已经失效,请换成在搜狗微信搜索搜到某个公众号profile页面里的某一篇文章的url

打开profile.html会发现内容已经被渲染完成了,每篇文章的地方变成了:

                    <div id="WXAPPMSG410106318" class="weui_media_box appmsg" msgid="410106318">
                        <span class="weui_media_hd" style="background-image:url(http://mmbiz.qpic.cn/mmbiz/wc7YNPm3YxXiajPXq2Y2PWQsic1SmjCxnTicHKtwItmARwkha1RI1gH1WwTfRvEUzauWJibjuJC9oJ8eibeVlDjRkwg/0?wx_fmt=jpeg)" data-s="640" data-t="1463528503000" hrefs="/s?timestamp
                        <div class="weui_media_bd">
                            <h4 class="weui_media_title" hrefs="/s?timestamp=1463531541&amp;src=3&amp;ver=1&amp;signature=n187YKNZjqgxyUtJ*yFEQGG7wJOH79RQeRrjQ0RGRdKEiZmR6iM0oNE5P0DPbQEwWTnShlZ4C3JIZr9PYThxbnhuCPl2UTc5NGE0ZkARKXEhTqCe7QvAGFf8vy2QWnPKqA9iSBBgBrocHKLBAuTM
                            机器人前传:达芬奇的机器狮和日耳曼装甲骑士
                            </h4>
                            <p class="weui_media_desc">这是一篇描述阿尔法狗和Atlas机器人祖先的文章。远在500多年前的达芬奇时代,已经有了不少关于机器人的探索。这个大天才写了大量关于自动机描述,在他的个人笔记中也充斥着各种机械发明的构思,比如弹簧驱动的汽车和机器狮子。</p>
                            <p class="weui_media_extra_info">2016年5月18日</p>
                        </div>
                    </div>

  ​

这便可以通过scrapy的request.xpath提取了

 

重新完善我们的scrapy爬虫脚本

 

#!/usr/bin/python
# -*- coding: utf-8 -*-
import scrapy
import subprocess
from scrapy.http import HtmlResponse
from scrapy.selector import Selector
class ShareditorSpider(scrapy.Spider):
    name = "dashujuwenzhai"
    allowed_domains = ["qq.com"]
    start_urls = [
        "http://weixin.sogou.com/weixin?query=算法与数学之美"
    ]
    def parse(self, response):
        href = response.selector.xpath('//div[@id="sogou_vr_11002301_box_0"]/@href').extract()[0]
        cmd="~/bin/phantomjs ./getBody.js '%s'" % href
        stdout, stderr = subprocess.Popen(cmd, shell=True, stdout = subprocess.PIPE, stderr = subprocess.PIPE).communicate()
        response = HtmlResponse(url=href, body=stdout)
        for selector in Selector(response=response).xpath('//*[@id="history"]/div/div/div/div'):
            hrefs= selector.xpath('h4/@hrefs').extract()[0].strip()
            title = selector.xpath('h4/text()').extract()[0].strip()
            abstract = selector.xpath('//*[contains(@class, "weui_media_desc")]/text()').extract()[0].strip()
            pubtime = selector.xpath('//*[contains(@class, "weui_media_extra_info")]/text()').extract()[0].strip()
            print hrefs
            print title
            print abstract
            print pubtime
    def parse_profile(self, response):
        print response.body

这是一段我用了数天精力创造成功的一段代码,耗费了我很多体力值,所以重点讲解一下

href = response.selector.xpath('//div[@id="sogou_vr_11002301_box_0"]/@href').extract()[0]

从公众号搜索结果页里提取profile页面的链接,这个id我怀疑不久后将失效,所以如果想做完美,还得不断完善,有关xpath的使用技巧可以参考http://ejohn.org/blog/xpath-css-selectors/

cmd="~/bin/phantomjs ./getBody.js '%s'" % href
stdout, stderr = subprocess.Popen(cmd, shell=True, stdout = subprocess.PIPE, stderr = subprocess.PIPE).communicate()

加载phantomjs脚本getBody.js来渲染profile页面,把里面的js渲染成html

response = HtmlResponse(url=href, body=stdout)

用渲染后的html页面来创建一个HtmlResponse,用于 后面继续xpath提信息

Selector(response=response).xpath('//*[@id="history"]/div/div/div/div')

找到每一条文章模块所在的div

 

            hrefs= selector.xpath('//h4/@hrefs').extract()[0].strip()
            title = selector.xpath('h4/text()').extract()[0].strip()
            abstract = selector.xpath('//*[contains(@class, "weui_media_desc")]/text()').extract()[0].strip()
            pubtime = selector.xpath('//*[contains(@class, "weui_media_extra_info")]/text()').extract()[0].strip()

根据这个div结构提取各个字段

 

基于这个爬虫脚本,想造就怎样的神奇,就看你之后的想象力了,没有做不到,只有想不到!

 

教你成为全栈工程师(Full Stack Developer) 三十-十分钟掌握最强大的python爬虫

请尊重原创,转载请注明来源网站www.lcsays.com以及原始链接地址

安装方法

 

执行

yum install libffi-devel
yum install openssl-devel
pip install scrapy

 

scrapy的代码会安装在

/usr/local/lib/python2.7/site-packages/scrapy

 

中文文档在

http://scrapy-chs.readthedocs.io/zh_CN/latest/

 

使用样例

 

创建文件shareditor.py如下:

 

# -*- coding: utf-8 -*-
import scrapy

class ShareditorSpider(scrapy.Spider): name = ‘shareditor’ start_urls = [‘https://www.lcsays.com/'] def parse(self, response): for href in response.css(‘a::attr(href)'): full_url = response.urljoin(href.extract()) yield scrapy.Request(full_url, callback=self.parse_question) def parse_question(self, response): yield { ‘title’: response.css(‘h1 a::text’).extract()[0], ‘link’: response.url, }

然后执行:

scrapy runspider shareditor.py

会看到抓取打印的debug信息,它爬取了www.lcsays.com网站的全部网页

是不是很容易掌握?

 

创建网络爬虫常规方法

 

上面是一个最简单的样例,真正网络爬虫需要有精细的配置和复杂的逻辑,所以介绍一下scrapy的常规创建网络爬虫的方法

执行

[root@centos7vm tmp]# scrapy startproject myfirstpro

自动创建了myfirstpro目录,进去看下内容:

[root@centos7vm tmp]# cd myfirstpro/myfirstpro/
[root@centos7vm myfirstpro]# ls
__init__.py  items.py  pipelines.py  settings.py  spiders

 

讲解一下几个文件

 

settings.py是爬虫的配置文件,讲解其中几个配置项:

 

USER_AGENT是ua,也就是发http请求时指明我是谁,因为我们的目的不纯,所以我们伪造成浏览器,改成

USER_AGENT = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 Safari/537.36'

请尊重原创,转载请注明来源网站www.lcsays.com以及原始链接地址

ROBOTSTXT_OBEY表示是否遵守robots协议(被封禁的不抓取),因为我们的目的不纯,所以我们不遵守,改成

ROBOTSTXT_OBEY = False

 

DOWNLOAD_DELAY表示对同一个站点抓取延迟,也就是抓一个,歇一会,再抓一个,再歇一会,为了对对方站点冲(yi)击(bei)太(fa)大(xian),我们调整为1,也就是一秒抓一个

CONCURRENT_REQUESTS_PER_DOMAIN表示对同一个站点并发有多少个线程抓取,同样道理,我们也调整为1

CONCURRENT_REQUESTS_PER_IP同理也调整为1

 

items.py是抓取结果解析出来的结构体,一会再说

 

下面我们进入spiders目录,创建我们的第一个spider程序,如下:

import scrapy
class ShareditorSpider(scrapy.Spider):
    name = "shareditor"
    allowed_domains = ["shareditor.com"]
    start_urls = [
        "https://www.lcsays.com/"
    ]
    def parse(self, response):
        filename = response.url.split("/")[-2] + '.html'
        with open(filename, 'wb') as f:
            f.write(response.body)

 

这里面start_urls是初始抓取时的种子链接,parse方法在抓取完成后自动调用,会把抓取回来的body内容写到以.html为结尾的文件中

 

退到上一级目录执行:

[root@centos7vm myfirstpro]# scrapy crawl shareditor

 

执行完成后会多出来一个www.lcsays.com.html文件,内容就是https://www.lcsays.com/网页的内容

 

页面解析

 

下面说一下页面解析,这是所有的网络爬虫不能忽略的功能,而且是最核心的部分。python库用于页面解析的有BeautifulSoup(速度慢)和Ixml(非标准库),但两者都有一些缺点,所以scrapy基于Ixml实现了一套页面解析工具,叫做Selectors

 

Selector的使用方法非常简单,创建TestSelectors.py如下:

from scrapy.selector import Selector
from scrapy.http import HtmlResponse
body = '<html><body><span>good</span></body></html>'
span_text=Selector(text=body).xpath('//span/text()').extract()
print span_text

 

执行python TestSelectors.py输出:

[root@centos7vm tmp]# python TestSelectors.py
[u'good']

 

如果能直接拿到HtmlResponse对象,也可以直接调用:

response.selector.xpath('//span').extract()

 

有关xpath的内容,可以参考https://www.w3.org/TR/xpath/#location-paths

 

调试页面解析

 

scrapy提供了非常方便的页面解析方法,直接执行

scrapy shell https://www.lcsays.com/

进入调试终端,可以直接使用request、response等变量来做各种操作尝试

 

注意:

1. 如果想获取文本,调用selector的xpath方法后要调用extract()方法

2. 如果想找到第一个匹配xpath的内容,需要调用extract_first()方法

教你成为全栈工程师(Full Stack Developer) 二十九-在storm上运行python程序(修正)

请尊重原创,转载请注明来源网站www.lcsays.com以及原始链接地址

部署0.9.5版本的storm集群

 

streamparse最新稳定版是基于0.9.5版本的storm的,所以我们需要把storm集群的版本回退到0.9.5,方法如下:

wget http://apache.fayea.com/storm/apache-storm-0.9.5/apache-storm-0.9.5.tar.gz

解压后修改conf/storm.yaml文件,添加如下配置项:

storm.zookeeper.servers:
    - "127.0.0.1"
nimbus.seeds: ["127.0.0.1"]
supervisor.slots.ports:
    - 6700
    - 6701
    - 6702
    - 6703

启动,执行:

[root@centos7vm apache-storm-0.9.5]# ./bin/storm nimbus &
[root@centos7vm apache-storm-0.9.5]# ./bin/storm supervisor &
[root@centos7vm apache-storm-0.9.5]# ./bin/storm ui &

启动需要花费数秒钟时间,直到打开http://localhost:8080能正常显示web页说明启动正常

 

创建wordcount并修改配置

 

确定安装的streamparse版本是

[root@centos7vm tmp]# sparse --version
sparse 2.1.4

执行

[root@centos7vm tmp]# sparse quickstart wordcount

生成了wordcount目录,进入wordcount目录修改config.json文件
请尊重原创,转载请注明来源网站www.lcsays.com以及原始链接地址

把nimbus和workers配置成你的storm机器(填写别名或ip,需要提前建立好ssh host的无密码登陆),比如我的是:

{
    "library": "",
    "topology_specs": "topologies/",
    "virtualenv_specs": "virtualenvs/",
    "envs": {
        "prod": {
            "user": "",
            "nimbus": "centos",
            "workers": ["centos"],
            "log": {
                "path": "",
                "max_bytes": 1000000,
                "backup_count": 10,
                "level": "info"
            },
            "virtualenv_root": "/root/tmp/wordcount/virtualenvs/"
        }
    }
}

 

生成jar包

 

执行

[root@centos7vm wordcount]# sparse jar

会看到生成了_build/wordcount-0.0.1-SNAPSHOT-standalone.jar文件

 

部署任务

 

执行

[root@centos7vm wordcount]# storm jar _build/wordcount-0.0.1-SNAPSHOT-standalone.jar streamparse.commands.submit_topology topologies/wordcount.clj

 

查看日志

[root@centos7vm wordcount]# tail /data/apache-storm-0.9.5/logs/worker-6700.log
2016-05-16T08:53:38.579+0800 b.s.t.ShellBolt [INFO] ShellLog pid:74811, name:count-bolt elephant: 656
2016-05-16T08:53:38.580+0800 b.s.t.ShellBolt [INFO] ShellLog pid:74811, name:count-bolt dog: 690
2016-05-16T08:53:38.585+0800 b.s.t.ShellBolt [INFO] ShellLog pid:74811, name:count-bolt zebra: 659
2016-05-16T08:53:38.588+0800 b.s.t.ShellBolt [INFO] ShellLog pid:74811, name:count-bolt dog: 691
2016-05-16T08:53:38.595+0800 b.s.t.ShellBolt [INFO] ShellLog pid:74811, name:count-bolt elephant: 657
2016-05-16T08:53:38.596+0800 b.s.t.ShellBolt [INFO] ShellLog pid:74811, name:count-bolt dog: 692
2016-05-16T08:53:38.604+0800 b.s.t.ShellBolt [INFO] ShellLog pid:74811, name:count-bolt elephant: 658
2016-05-16T08:53:38.606+0800 b.s.t.ShellBolt [INFO] ShellLog pid:74811, name:count-bolt dog: 693
2016-05-16T08:53:38.635+0800 b.s.t.ShellBolt [INFO] ShellLog pid:74811, name:count-bolt zebra: 660
2016-05-16T08:53:38.640+0800 b.s.t.ShellBolt [INFO] ShellLog pid:74811, name:count-bolt dog: 694

看到如上日志,说明正常运行了

教你成为全栈工程师(Full Stack Developer) 二十八-在storm上运行python程序

请尊重原创,转载请注明来源网站www.lcsays.com以及原始链接地址

安装lein

 

下载安装脚本

wget https://raw.githubusercontent.com/technomancy/leiningen/stable/bin/lein

把lein放到~/bin下并chmod +x lein增加可执行权限(默认我们的bash是会自动把~/bin加到PATH环境变量里的)

执行lein,自动下载安装所需的软件包

 

安装streamparser

 

如果没有安装pip(python包的管理工具),先安装pip

wget https://bootstrap.pypa.io/get-pip.py --no-check-certificate
python get-pip.py

这时pip就安装好了,执行

pip -h

能看到帮助信息

 

安装virtualenv命令

pip install virtualenv

 

安装python-dev,执行

yum install python-devel

 

安装streamparser,执行

pip install streamparse

 

注意:如果你是在直接使用root账户,那么需要在~/.bash_profile中添加

export LEIN_ROOT=1

这是因为在root账户下使用lein时会有警告提示并等待你输入回车才能继续,这样的话下面你执行sparse命令时会莫名其妙的被卡住

请尊重原创,转载请注明来源网站www.lcsays.com以及原始链接地址

创建第一个项目,执行

sparse quickstart wordcount

运行:

cd wordcount
sparse run

先会有一段较长时间的编译过程,之后会不断输出log如下说明运行正常:

51774 [Thread-25] INFO  backtype.storm.task.ShellBolt - ShellLog pid:27202, name:count-bolt elephant: 1327
51776 [Thread-25] INFO  backtype.storm.task.ShellBolt - ShellLog pid:27202, name:count-bolt dog: 1335
51781 [Thread-27] INFO  backtype.storm.task.ShellBolt - ShellLog pid:27227, name:count-bolt cat: 1335
51783 [Thread-27] INFO  backtype.storm.task.ShellBolt - ShellLog pid:27227, name:count-bolt zebra: 1328
51784 [Thread-25] INFO  backtype.storm.task.ShellBolt - ShellLog pid:27202, name:count-bolt elephant: 1328

这都是在本地模拟运行,下面讲解如何部署到真正的storm集群运行

 

部署到storm并运行

 

修改config.json文件,配置好自己的storm集群,如下:

{
    "library": "",
    "topology_specs": "topologies/",
    "virtualenv_specs": "virtualenvs/",
    "envs": {
        "prod": {
            "user": "",
            "nimbus": "centos",
            "workers": ["centos"],
            "log": {
                "path": "",
                "max_bytes": 1000000,
                "backup_count": 10,
                "level": "info"
            },
            "virtualenv_root": "/tmp/virtualenvs/"
        }
    }
}

 

然后执行sparse submit

等待较长时间之后如果出现了:Finished submitting topology: wordcount,说明部署成功

 

可能存在的问题

 

问题1:如果你在部署的单机storm上执行sparse submit还有可能报错:

IOError: Local port: 6627 already in use, unable to open ssh tunnel to centos:6627

这是因为端口冲突,解决方案就是你找到另外一台机器来安装streamparse并创建wordcount提交就可以了

 

问题2:如果你部署的storm是1.0.1版本,而sparse版本当前还不支持storm 1.0.1,因此即使部署上去了还是会报错不能正常执行,所以只能部署一个0.9.5版本的storm或者等待sparse什么时候支持1.0.1了

修改project.clj,把

:dependencies  [[org.apache.storm/storm-core "0.9.5"]

改成

:dependencies  [[org.apache.storm/storm-core "1.0.1"]

 

教你成为全栈工程师(Full Stack Developer) 二十七-开发第一个storm任务

请尊重原创,转载请注明来源网站www.lcsays.com以及原始链接地址

运行storm-starter

 

首先要下载storm源代码,我们服务部署的是1.0.1版本,那么我就下载同样的版本,在github.com上找到https://github.com/apache/storm/releases/tag/v1.0.1的源代码(注意,这里的源代码和上一节中降到的部署用的包是不同的,这个是未编译的原始代码,上节中的是编译好的直接可运行的包,当然你也可以用本节下载的源代码重新编译来部署),下载:

wget https://github.com/apache/storm/archive/v1.0.1.tar.gz

解压后编译代码中的样例start-storm(位于源代码的examples/storm-starter),编译方法如下(也可以加载到eclipse,用maven编译):

mvn -D maven.test.skip=true clean package

注:如果没有安装maven则先安装,具体方法百度一下

 

编译好了之后会生成target/storm-starter-1.0.1.jar文件,下面我们来在我们部署好的storm上运行这个jar包里的storm.starter.StatefulTopology类

 

执行:

storm jar examples/storm-starter/storm-starter-topologies-*.jar storm.starter.StatefulTopology statetopology

这里的storm命令是在storm部署目录的bin目录下的脚本,如果没有配置PATH环境变量,可以用绝对路径执行,如果我的storm部署在/data/apache-storm-1.0.1/,那么就执行:

/data/apache-storm-1.0.1/bin/storm jar storm-starter-1.0.1.jar storm.starter.StatefulTopology statetopology

 

如果运行成功说明你的storm是正常的,输出如下:

[root@centos7vm storm-1.0.1]# /data/apache-storm-1.0.1/bin/storm jar storm-starter-1.0.1.jar storm.starter.StatefulTopology statetopology
Running: java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/data/apache-storm-1.0.1 -Dstorm.log.dir=/data/apache-storm-1.0.1/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /data/apache-storm-1.0.1/lib/storm-core-1.0.1.jar:/data/apache-storm-1.0.1/lib/kryo-3.0.3.jar:/data/apache-storm-1.0.1/lib/reflectasm-1.10.1.jar:/data/apache-storm-1.0.1/lib/asm-5.0.3.jar:/data/apache-storm-1.0.1/lib/minlog-1.3.0.jar:/data/apache-storm-1.0.1/lib/objenesis-2.1.jar:/data/apache-storm-1.0.1/lib/clojure-1.7.0.jar:/data/apache-storm-1.0.1/lib/disruptor-3.3.2.jar:/data/apache-storm-1.0.1/lib/log4j-api-2.1.jar:/data/apache-storm-1.0.1/lib/log4j-core-2.1.jar:/data/apache-storm-1.0.1/lib/log4j-slf4j-impl-2.1.jar:/data/apache-storm-1.0.1/lib/slf4j-api-1.7.7.jar:/data/apache-storm-1.0.1/lib/log4j-over-slf4j-1.6.6.jar:/data/apache-storm-1.0.1/lib/servlet-api-2.5.jar:/data/apache-storm-1.0.1/lib/storm-rename-hack-1.0.1.jar:storm-starter-1.0.1.jar:/data/apache-storm-1.0.1/conf:/data/apache-storm-1.0.1/bin -Dstorm.jar=storm-starter-1.0.1.jar storm.starter.StatefulTopology statetopology
1329 [main] INFO  o.a.s.StormSubmitter - Generated ZooKeeper secret payload for MD5-digest: -6806752055047040892:-6736308748043757911
1492 [main] INFO  o.a.s.s.a.AuthUtils - Got AutoCreds []
1687 [main] INFO  o.a.s.StormSubmitter - Uploading topology jar storm-starter-1.0.1.jar to assigned location: /data/apache-storm-1.0.1/storm-local/nimbus/inbox/stormjar-89ac13d0-dc05-4aec-ab4c-0c6fc6b7f5e0.jar
Start uploading file 'storm-starter-1.0.1.jar' to '/data/apache-storm-1.0.1/storm-local/nimbus/inbox/stormjar-89ac13d0-dc05-4aec-ab4c-0c6fc6b7f5e0.jar' (62385346 bytes)
[==================================================] 62385346 / 62385346
File 'storm-starter-1.0.1.jar' uploaded to '/data/apache-storm-1.0.1/storm-local/nimbus/inbox/stormjar-89ac13d0-dc05-4aec-ab4c-0c6fc6b7f5e0.jar' (62385346 bytes)
2992 [main] INFO  o.a.s.StormSubmitter - Successfully uploaded topology jar to assigned location: /data/apache-storm-1.0.1/storm-local/nimbus/inbox/stormjar-89ac13d0-dc05-4aec-ab4c-0c6fc6b7f5e0.jar
2992 [main] INFO  o.a.s.StormSubmitter - Submitting topology statetopology in distributed mode with conf {"storm.zookeeper.topology.auth.scheme":"digest","storm.zookeeper.topology.auth.payload":"-6806752055047040892:-6736308748043757911","topology.workers":1,"topology.debug":false}
3682 [main] INFO  o.a.s.StormSubmitter - Finished submitting topology: statetopology

web界面如下:

 

stateopology详细运行情况可以点击进去

 

我们用eclipse(你可以用其他IDE)导入源代码,找到main函数如下:

    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", new RandomIntegerSpout());
        builder.setBolt("partialsum", new StatefulSumBolt("partial"), 1).shuffleGrouping("spout");
        builder.setBolt("printer", new PrinterBolt(), 2).shuffleGrouping("partialsum");
        builder.setBolt("total", new StatefulSumBolt("total"), 1).shuffleGrouping("printer");
        Config conf = new Config();
        conf.setDebug(false);
        if (args != null && args.length > 0) {
            conf.setNumWorkers(1);
            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
        } else {
            LocalCluster cluster = new LocalCluster();
            StormTopology topology = builder.createTopology();
            cluster.submitTopology("test", conf, topology);
            Utils.sleep(40000);
            cluster.killTopology("test");
            cluster.shutdown();
        }
    }

这个storm任务是由一个Spout(随机生成整数值)和三个Bolt(主要对数字做汇总并打印一些信息)组成,为了能看到运行的效果,我们找到部署目录里的log4j2/worker.xml配置文件,找到appenders配置,如果没有自己指定log目录,那么这里面默认应该是部署目录里的logs/workers-artifacts下

我们进入logs/workers-artifacts目录,这里面就是每个storm任务的日志目录

点网页里的stateopology

 

这里的statetopology-2-1463051043就是我们的任务id,那么在logs/workers-artifacts目录中就会有statetopology-2-1463051043目录

cd statetopology-2-1463051043

会看到又有多个目录,继续点web里的spout进入到spout的状态页面,看到如下:

 

这是由supervisor启动的executor的主机地址和port,那么我们进入到刚才的statetopology-2-1463051043目录的6700目录就是这个spout的日志啦,执行

[root@centos7vm 6700]# tail -f worker.log

日志在定期输出随机生成的数字

 

以上就是storm-starter的运行,在storm-starter项目里还有很多功能的演示,在真正使用时会经常参考

 

 

 

从零开始开发storm任务

 

下面我们准备不依赖storm的源代码,而是自己从零开始写一个storm任务,来理解一下storm的开发要点

 

打开eclipse(需要预安装mvn插件,没装的请百度一下),我们新建一个maven工程:

 

group id设置为:com.shareditor,artifact id设置为:myfirststormpro,其他都默认下一步,最终我们的工程如下:

 

这是maven为我们自动生成的helloworld,我们用maven编译一下,项目上点右键,选择Run as -> Maven build,在Goals中填clean package,点Run编译,如果编译成功会看到:

 

以上如果成功,说明maven编译没有问题

请尊重原创,转载请注明来源网站www.lcsays.com以及原始链接地址

下一步我们来编辑storm项目依赖,打开pom.xml源文件如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.shareditor</groupId>
  <artifactId>myfirststormpro</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>
  <name>myfirststormpro</name>
  <url>http://maven.apache.org</url>
  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>
  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
  </dependencies>
</project>

 

在dependencies标签中添加如下依赖:

        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>1.0.1</version>
        </dependency>

然后在com.shareditor.myfirststormpro包下创建MySpout.java,如下:

package com.shareditor.myfirststormpro;
import java.util.Map;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MySpout extends BaseRichSpout{
private static final Logger LOG = LoggerFactory.getLogger(MySpout.class);

private SpoutOutputCollector collector;
public void open(Map conf, TopologyContext context,
        SpoutOutputCollector collector) {
    this.collector = collector;
}
public void nextTuple() {
    Utils.sleep(1000);
    LOG.info("MySpout nextTuple");
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("value"));
}

}

 

创建MyBolt.java,如下:

package com.shareditor.myfirststormpro;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MyBolt extends BaseBasicBolt {
private static final Logger LOG = LoggerFactory.getLogger(MyBolt.class);
public void execute(Tuple input, BasicOutputCollector collector) {
    LOG.info("MyBolt execute");
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
    // TODO Auto-generated method stub
}

}

 

修改App.java,如下:

package com.shareditor.myfirststormpro;
import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
/**
 * Hello world!
 *
 */
public class App 
{
    public static void main(String[] args) throws Exception {
    System.out.println("main");
    
    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("myspout", new MySpout());
    builder.setBolt("mybolt", new MyBolt()).shuffleGrouping("myspout");
    
    Config conf = new Config();
    conf.setDebug(false);
    conf.setNumWorkers(1);
    StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
}

}

重新用maven编译项目,没问题会输出[INFO] BUILD SUCCESS,并且在myfirststormpro/target中会生成:myfirststormpro-0.0.1-SNAPSHOT.jar

 

用storm启动它,执行:

/data/apache-storm-1.0.1/bin/storm jar myfirststormpro-0.0.1-SNAPSHOT.jar com.shareditor.myfirststormpro.App myfirststormproname

 

像上面找storm-starter的方法一样找到日志,看到:

[root@centos7vm 6700]# tail -f worker.log
2016-05-13 08:48:59.100 c.s.m.MySpout [INFO] MySpout nextTuple
2016-05-13 08:49:00.112 c.s.m.MySpout [INFO] MySpout nextTuple
2016-05-13 08:49:02.308 c.s.m.MySpout [INFO] MySpout nextTuple
2016-05-13 08:49:03.310 c.s.m.MySpout [INFO] MySpout nextTuple
2016-05-13 08:49:04.312 c.s.m.MySpout [INFO] MySpout nextTuple
2016-05-13 08:49:05.314 c.s.m.MySpout [INFO] MySpout nextTuple
……

每隔一秒钟打印一行MySpout nextTuple

 

下面我们做一些修改,让他真正射点东西:

 

    public void nextTuple() {
        Utils.sleep(1000);
        LOG.info("MySpout nextTuple");
    collector.emit(new Values(10));
}</code></pre>

 

重新编译并部署后查看日志如下:

[root@centos7vm 6700]# tail -f worker.log
2016-05-13 08:52:13.558 c.s.m.MySpout [INFO] MySpout nextTuple
2016-05-13 08:52:13.566 c.s.m.MyBolt [INFO] MyBolt execute
2016-05-13 08:52:14.563 c.s.m.MySpout [INFO] MySpout nextTuple
2016-05-13 08:52:14.565 c.s.m.MyBolt [INFO] MyBolt execute
2016-05-13 08:52:15.564 c.s.m.MySpout [INFO] MySpout nextTuple
2016-05-13 08:52:15.568 c.s.m.MyBolt [INFO] MyBolt execute
2016-05-13 08:52:16.565 c.s.m.MySpout [INFO] MySpout nextTuple
2016-05-13 08:52:16.567 c.s.m.MyBolt [INFO] MyBolt execute
2016-05-13 08:52:17.566 c.s.m.MySpout [INFO] MySpout nextTuple
2016-05-13 08:52:17.569 c.s.m.MyBolt [INFO] MyBolt execute
2016-05-13 08:52:18.567 c.s.m.MySpout [INFO] MySpout nextTuple
2016-05-13 08:52:18.569 c.s.m.MyBolt [INFO] MyBolt execute

我们看到Bolt开始工作了

 

那么我们怎么知道Bolt有没有收到数据呢,继续修改Bolt如下:

    public void execute(Tuple input, BasicOutputCollector collector) {
    int value = (Integer)input.getValueByField("value");
    LOG.info("MyBolt execute receive " + value);
}</code></pre>

 

重新编译并部署后查看日志如下:

[root@centos7vm 6700]# tail -f worker.log
2016-05-13 08:58:02.565 c.s.m.MySpout [INFO] MySpout nextTuple
2016-05-13 08:58:02.567 c.s.m.MyBolt [INFO] MyBolt execute receive 10
2016-05-13 08:58:03.566 c.s.m.MySpout [INFO] MySpout nextTuple
2016-05-13 08:58:03.568 c.s.m.MyBolt [INFO] MyBolt execute receive 10
2016-05-13 08:58:04.567 c.s.m.MySpout [INFO] MySpout nextTuple
2016-05-13 08:58:04.570 c.s.m.MyBolt [INFO] MyBolt execute receive 10
2016-05-13 08:58:05.567 c.s.m.MySpout [INFO] MySpout nextTuple
2016-05-13 08:58:05.570 c.s.m.MyBolt [INFO] MyBolt execute receive 10
2016-05-13 08:58:06.569 c.s.m.MySpout [INFO] MySpout nextTuple
2016-05-13 08:58:06.573 c.s.m.MyBolt [INFO] MyBolt execute receive 10

 

试验一下storm的ack机制

在我们的Spout中添加:

	@Override
	public void ack(Object msgId) {
		LOG.info("MySpout arc");
		super.ack(msgId);
	}

重新编译部署后查看log发现没什么变化,这是为什么呢?

因为我们的bolt没有显式地调用ack函数,所以继续修改bolt,但是只有OutputCollector才能调用ack函数,因此我们再尝试一个bolt基类:BaseRichBolt,把整个代码修改成如下:

package com.shareditor.myfirststormpro;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MyBolt extends BaseRichBolt {
private static final Logger LOG = LoggerFactory.getLogger(MyBolt.class);

 private OutputCollector collector;
public void declareOutputFields(OutputFieldsDeclarer declarer) {
	// TODO Auto-generated method stub
	
}
public void prepare(Map stormConf, TopologyContext context,
        OutputCollector collector) {
	this.collector = collector;
}
public void execute(Tuple input) {
	int value = (Integer)input.getValueByField("value");
	LOG.info("MyBolt execute receive " + value);
	collector.ack(input);
}

}

再次编译部署查看log,看到了打印ack的日志啦

总结一下:storm就是帮我们管理了流式计算的服务部署和数据流传递,我们只需要实现具体的业务逻辑即可

教你成为全栈工程师(Full Stack Developer) 二十六-storm安装与初识

请尊重原创,转载请注明来源网站www.lcsays.com以及原始链接地址

安装zookeeper

 

zookeeper是storm运行强依赖

注意:

1)用supervision启动zookeeper,保证不能退出

2)用cron定期清理zookeeper的日志数据,不然磁盘会很快占满(以后再研究)

 

安装步骤:

wget http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.4.8/zookeeper-3.4.8.tar.gz
tar zxvf zookeeper-3.4.8.tar.gz
cd zookeeper-3.4.8/
cp conf/zoo_sample.cfg conf/zoo.cfg

修改conf/zoo.cfg,把

dataDir=/tmp/zookeeper

改成

dataDir=/var/zookeeper

创建/var/zookeeper/myid,内容为数字1

执行

./bin/zkServer.sh start

成功启动

验证方法:

./bin/zkCli.sh

输入help命令查看帮助

 

为了让zookeeper异常退出后能自动重启,需要安装deamontools

wget http://cr.yp.to/daemontools/daemontools-0.76.tar.gz
tar zxvf daemontools-0.76.tar.gz
cd admin/daemontools-0.76/

vim src/error.h 找到:extern int errno; 改成:#include <errno.h>

执行

package/install

这时已经安装好了

[root@centos7vm daemontools-0.76]# which supervise
/usr/local/bin/supervise

创建/data/service/zookeeper/run文件,内容为:

#!/bin/bash
exec 2>&1
exec /data/zookeeper-3.4.8/bin/zkServer.sh start

增加执行权限

chmod +x /data/service/zookeeper/run

杀了之前手工启动的zookeeper,然后执行

cd /data/service/zookeeper
nohup supervise /data/service/zookeeper &

这时zookeeper被supervice启动了,尝试杀一次zookeeper后还会自动起来

请尊重原创,转载请注明来源网站www.lcsays.com以及原始链接地址

参考:http://zookeeper.apache.org/doc/r3.3.3/zookeeperAdmin.html

 

 

安装storm

 

wget http://apache.fayea.com/storm/apache-storm-1.0.1/apache-storm-1.0.1.tar.gz
tar apache-storm-1.0.1.tar.gz
cd apache-storm-1.0.1

修改conf/storm.yaml,添加如下配置

storm.zookeeper.servers:
    - "127.0.0.1"
nimbus.seeds: ["127.0.0.1"]
supervisor.slots.ports:
    - 6700
    - 6701
    - 6702
    - 6703

 

启动storm,执行

 

./bin/storm nimbus &
./bin/storm supervisor &
./bin/storm ui &

 

打开web界面,http://127.0.0.1:8080

界面如下:

 

安装完成

 

讲讲storm

 

storm系统由一个nimbus节点和多个supervisor节点组成,上面因为是部署单机版本,所以只启动了一个supervisor。他们之间是通过zookeeper协调运行的,所以必须依赖zookeeper。nimbus负责分配任务和监控任务,本身不做计算,supervisor负责真正的计算任务。

storm上运行的任务和map-reduce的不同在于它运行的是一种topology任务,也就是一种有向无环图形式的任务服务。

上面配置文件中配置的supervisor.slots.ports包含了4个port,也就是这个supervisor可以监听4个端口同时并发的执行4个任务,因此在web界面里我们看到Free slots是4

在map-reduce系统上运行的任务我们叫做mapper和reducer,相对之下,在storm上运行的任务叫做spout(涛涛不绝地喷口)和bolt(螺栓),在拓扑里传递的消息叫做tuple。spout其实就是信息产生的源头,而bolt就是处理逻辑

 

下一节我们来试验一些简单的用途,来发觉storm可以用来做些什么事情

教你成为全栈工程师(Full Stack Developer) 二十五-为你的网站添加强大的搜索功能

请尊重原创,转载请注明来源网站www.lcsays.com以及原始链接地址

elasticSearch安装

elasticSearch服务的安装请见我的上一篇文章《教你成为全栈工程师(Full Stack Developer) 二十四-ES(elasticsearch)搜索引擎安装和使用

 

elastic-bundle插件安装

 

需要在我们的symfony2项目中安装elastic-bundle插件,执行

[root@centos7vm mywebsite]# composer require friendsofsymfony/elastica-bundle

 

配置方法

 

修改app/AppKernel.php添加如下注册:

new FOS\ElasticaBundle\FOSElasticaBundle(),

 

并在app/config/config.yml中添加

fos_elastica:
    clients:
        default: { host: localhost, port: 9200 }
    indexes:
        app:
            types:
                blogpost:
                    mappings:
                        title:
                            type: string
                            index: analyzed
                            analyzer: ik_max_word
                        body:
                            type: string
                            index: analyzed
                            analyzer: ik_max_word
                    persistence:
                        driver: orm
                        model: AppBundle\Entity\BlogPost
                        provider: ~
                        listener:
                            insert: true
                            update: true
                            delete: true
                        finder: ~

 

解释一下

 

这里的clients配置的是本地elasticSearch服务的host和port

app是我们应用的名字,你可以随意改成你指定的名字,比如shareditor

blogpost是我们应用里不同数据的名字,每一个都会把索引集中建到一起

title和body是我们要搜索的域,这里我们指定了切词器是ik_max_word

persistence里配置了blogpost的读取orm,也就是BlogPost这个Entity

 

 

 

批量生成索引

执行

[root@centos7vm mywebsite]# php app/console fos:elastica:populate

ps:这一句只在第一次集成elasticSearch的时候执行,以后每当有新文章发布就会自动写入索引,无需手动更新

请尊重原创,转载请注明来源网站www.lcsays.com以及原始链接地址

下面我们开始修改网站添加搜索功能

 

添加action

 

修改src/AppBundle/Controller/BlogController.php,增加如下方法:

    public function searchAction(Request $request)
    {
        $query = $request->get('q');
        $finder = $this->container->get('fos_elastica.finder.app.blogpost');
        $paginator = $this->get('knp_paginator');
        $results = $finder->createPaginatorAdapter($query);
        $pagination = $paginator->paginate($results, 1, 10);
        return $this->render('blog/search.html.twig', array('pagination' => $pagination,
            'query' => $query,
            'latestblogs' => BlogController::getLatestBlogs($this),
            'recommends' => BlogController::getRecommends($this)));
    }

解释一下

$request->get('q');这是从url参数里获取q这个参数来作为query词,后面网页修改里会通过form表单的input空间传递q的value

$this->container->get('fos_elastica.finder.app.blogpost');这是根据config.yml里的配置获取finder实例来做搜索用

下面利用$paginator做翻页,每页10条,默认展示第一页

 

添加路由配置

 

修改app/config/routing.yml,添加

blog_search:
    path:     /blogsearch
    defaults: { _controller: AppBundle:Blog:search }

修改网页模板

 

修改app/Resources/views/base.html.twig,增加如下代码:

    <div class="col-sm-8 col-xs-10"><h1><a href="{{ path('homepage') }}" style="text-decoration: none;color: white;">lcsays</a></h1></div>
    <div class="col-sm-3 col-xs-1">
        <form action="{{ path('blog_search') }}" style="margin-top: 10px;">
            <input type="search" name="q" placeholder="搜文章" maxlength="200" style="background-color: transparent;">
        </form>
    </div>

这其实就是在网页顶部增加了一个搜索框,这里面的input输入框的name设置成q,刚好对应了上面的$request->get('q');

 

搜索结果页

 

为了简单,我们尽量复用博客列表页,拷贝app/Resources/views/blog/list.html.twig到app/Resources/views/blog/search.html.twig,把

{% block title %}{{ subject.name }} - lcsays - 关注大数据技术{% endblock title %}

改成

{% block title %}{{ query }}的搜索结果 - lcsays - 关注大数据技术{% endblock title %}

 

精打细算修改配置节省阿里云服务器的内存使用方法

请尊重原创,转载请注明来源网站www.lcsays.com以及原始链接地址

检查内存空间占用

首先我们来检查一下自己的阿里云机器都是哪些进程占用了资源,执行

[root@MYAY ~]# ps aux|grep -v USER|sort -n -k 6

这是按照实际内存占用从小到大排序,那么排在最后几位的就是我们的内存占用大户了

……
apache   17496  0.1  3.6 365692 37568 ?        S    07:51   0:01 php-fpm: pool www
apache   17498  0.1  3.9 368016 39916 ?        S    07:51   0:01 php-fpm: pool www
apache   17497  0.2  5.4 384920 55876 ?        S    07:51   0:02 php-fpm: pool www
mysql    17240  0.0 12.1 831064 123468 ?       Sl   07:43   0:01 /usr/sbin/mysqld --basedir=/usr --datadir=/var/lib/mysql --plugin-dir=/usr/lib64/mysql/plugin --log-error=/var/log/mysqld.log --pid-file=/var/run/mysqld/mysqld.pid --socket=/var/lib/mysql/mysql.sock

mysql没办法,为了性能更高内存还是不要动

注意这里的php-fpm,这是php-fpm启动的诸多个子进程,启动子进程的目的是为了提高并发响应速度,我这里只列出了后三个,其实有好多个,几乎占用了我磁盘空间的一半,如果你的网站流量小于1000每天,那么完全没有必要启动这么多子进程空闲着

优化方案

下面就是优化方法了,见证奇迹的时刻到了,打开/etc/php-fpm.d/www.conf,找到这些配置项:

pm = dynamic
pm.max_children = 50
pm.start_servers = 3
pm.min_spare_servers = 2
pm.max_spare_servers = 8

你的应该不是这样的配置,这是我优化过的,简单说明一下:

pm有dynamic和static两种取值:static就是配死子进程起多少个,这时pm.max_children有效,下面三个配置无效;dynamic就是动态启动子进程,这时pm.max_children无效,下面三个配置有效

如果你是土豪,那么请用static,我什么也不说了,如果你不是土豪,请乖乖用dynamic

pm.start_servers是指php-fpm一启动给分配几个子进程,我们设置小点,比如3

pm.min_spare_servers是指最少要留几个空闲的php-fpm做备份等待,我们设置小点,比如2

pm.max_spare_servers是指最多要留几个空闲的php-fpm做备份等待,我们设置小点,比如8

ok,重启php-fpm看看你内存节省了多少

妈妈再也不用担心我的内存啦!

教你成为全栈工程师(Full Stack Developer) 二十四-ES(elasticsearch)搜索引擎安装和使用

请尊重原创,转载请注明来源网站www.lcsays.com以及原始链接地址

为什么需要搜索引擎

 

首先想一下:在一篇文章里找一个关键词怎么找?字符串匹配是最佳答案。

然后再想一下:找到100篇文章里包含某个关键词的文章列表怎么找?依然是关键词匹配

再继续想:找到100000000000(一千亿)篇文章里包含某个关键词的文章怎么找?如果用关键词匹配,以现在的电脑处理速度,从远古时代到人类灭绝这么长时间都处理不完,这时候搜索引擎要发挥作用了

 

搜索引擎技术有多么高深?

 

搜索引擎这种技术实际上从古代就有了。想象一个国家存储各类编撰资料的部门,有几个屋子的书,如果想找到某一本书的时候会怎么找呢?对了,有分类目录,先确定要找的书籍是哪个类别的,然后从目录里面找到想要找的书籍位于屋子的什么位置,然后再去拿。搜索引擎其实就是做了生成目录(也就是索引)的事情。那么如今的搜索引擎是怎么生成索引的呢?

要把互联网上的资料生成索引,拢共分三步:第一步,把资料编号;第二步,把每篇资料内容切成词;第三步,把词和资料编号的对应关系处理成“词=》编号列表”的形式

这时候你就可以迅速的找到一千亿篇文章里包含某个关键词的文章了,告诉我关键词是什么,我直接就从索引里找到了这个词对应的文章编号列表了,搞定!把需要数万年才能做完的工作用了不到一秒钟就搞定了,这就是搜索引擎的魅力!

当然,上面说的搜索引擎技术很简单,但百度数万工程师也不都是白吃饭的,如果想做好一个搜索引擎产品需要解决的问题就有很多了:收集网页时要考虑全、快、稳、新、优的问题,建索引时要考虑质量、效率、赋权、周期、时效性、资源消耗等问题,搜索的时候要考虑query分析、排序、筛选、展示、性能、广告、推荐、个性化、统计等问题,整体上要考虑地域性、容灾、国际化、当地法律、反作弊、垂直需求、移动互联网等诸多问题,所以百度大厦彻夜通明也是可以理解的。

 

开源搜索引擎

 

既然搜索引擎技术这么复杂,那么我们何必自寻烦恼了,开源社区为我们提供了很多资源,世界很美好。

说到开源搜索引擎一定要用的开源项目就是lucene,它不是搜索引擎产品,而是一个类库,而且是至今开源搜索引擎的最好的类库没有之一,因为只有它一个。lucene是用java语言开发,基本上涵盖了搜索引擎中索引和检索两个核心部分的全部功能,而且抽象的非常好,我后面会单独写数篇文章专门讲lucene的使用。最后强调一遍,它是一个类库,不是搜索引擎,你可以比较容易的基于lucene写一个搜索引擎。

然后要说的一个开源项目是solr,这是一个完整的搜索引擎产品,当然它底层一定是基于lucene的,毫无疑问,因为lucene是最好且唯一的搜索引擎类库。solr使用方法请看我的另一篇文章《教你一步步搭建和运行完整的开源搜索引擎

 

最后要说的就是elasticSearch,这个开源项目也可以说是一个产品级别的开源项目,当然它底层一定是基于lucene的,毫无疑问,因为lucene是最好且唯一的搜索引擎类库,我承认我是唐僧。它是一种提供了RESTful api的服务,RESTful就是直接通过HTTP协议收发请求和响应,接口比较清晰简单,是一种架构规则。话不多说,下面我就说下安装方法和简单使用方法,这样更容易理解,之后我会单独讲解怎么让你的网站利用elasticSearch实现搜索功能

 

elasticSearch安装

 

从github下载1.7版tag并编译(选择1.7版是因为当前我们的网站的symfony2版本还不支持2.x版本,但请放心的用,1.7版是经过无数人验证过的最稳定版本)

wget https://codeload.github.com/elastic/elasticsearch/tar.gz/v1.7.5

解压后进入目录执行

mvn package -DskipTests

这会花费你很长一段时间,你可以去喝喝茶了

 

编译完成后会在target/releases中生成编译好的压缩包(类似于elasticsearch-1.7.5.zip这样的文件),把这个压缩包解压放到任意目录就安装好了

 

安装ik插件

 

ik是一个中文切词插件,elasticSearch自带的中文切词很不专业,ik对中文切词支持的比较好。

https://github.com/medcl/elasticsearch-analysis-ik上找到我们elasticSearch对应的版本,1.7.5对应的ik版本是1.4.1,所以下载https://github.com/medcl/elasticsearch-analysis-ik/releases/tag/v1.4.1

解压出的目录是:

elasticsearch-analysis-ik-1.4.1

进入目录后执行

mvn clean package

编译完后依然是在target/releases生成了类似于elasticsearch-analysis-ik-*.zip的压缩包,把里面的内容解压到elasticsearch安装目录的plugins/ik下

再把elasticsearch-analysis-ik-1.4.1/config/ik目录整体拷贝到elasticsearch安装目录的config下

修改elasticsearch安装目录下的config/elasticsearch.yml,添加:

index:
  analysis:
    analyzer:
      ik:
        alias: [ik_analyzer]
        type: org.elasticsearch.index.analysis.IkAnalyzerProvider
      ik_max_word:
        type: ik
        use_smart: false
      ik_smart:
        type: ik
        use_smart: true

这样ik就安装好了

 

启动并试用

 

直接进入elasticsearch安装目录,执行

./bin/elasticsearch -d

后台启动完成

 

 

 

elasticSearch是通过HTTP协议收发数据的,所以我们用curl命令来给它发命令,elasticSearch默认监听9200端口

 

写入一篇文章:

curl -XPUT 'http://localhost:9200/myappname/myblog/1?pretty' -d '
{
  "title": "我的标题",
  "content": "我的内容"
}'

会收到返回信息:

{
  "_index" : "myappname",
  "_type" : "myblog",
  "_id" : "1",
  "_version" : 1,
  "created" : true
}

这说明我们成功把一篇文章发给了elasticSearch,它底层会利用lucene自动帮我们建好搜索用的索引

 

再写一篇文章:

curl -XPUT 'http://localhost:9200/myappname/myblog/2?pretty' -d '
{
  "title": "这是第二篇标题",
  "content": "这是第二篇内容"
}'

会收到返回信息:

{
  "_index" : "myappname",
  "_type" : "myblog",
  "_id" : "2",
  "_version" : 1,
  "created" : true
}

请尊重原创,转载请注明来源网站www.lcsays.com以及原始链接地址

这时我们找到elasticsearch安装目录的data目录下会生成这样的目录和文件:

ls data/nodes/0/indices/myappname/
0  1  2  3  4  _state

不同环境会稍有不同,但是都会生成myappname目录就对了

 

查看所有文章:

curl -XGET 'http://localhost:9200/myappname/myblog/_search?pretty=true' -d '
{
    "query" : {
        "match_all" : {}
    }
}'

这时会把我们刚才添加的两篇文章都列出来

 

搜索关键词“我的”:

curl -XGET 'http://localhost:9200/myappname/myblog/_search?pretty=true' -d '
{
    "query":{
        "query_string":{"query":"我的"}
    }
}'

会返回:

{
  "took" : 2,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 1,
    "max_score" : 0.191783,
    "hits" : [ {
      "_index" : "myappname",
      "_type" : "myblog",
      "_id" : "1",
      "_score" : 0.191783,
      "_source":
{
  "title": "我的标题",
  "content": "我的内容"
}
    } ]
  }
}

搜索关键词“第二篇”:

curl -XGET 'http://localhost:9200/myappname/myblog/_search?pretty=true' -d '
{
    "query":{
        "query_string":{"query":"第二篇"}
    }
}'

会返回:

{
  "took" : 2,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 1,
    "max_score" : 0.1879082,
    "hits" : [ {
      "_index" : "myappname",
      "_type" : "myblog",
      "_id" : "2",
      "_score" : 0.1879082,
      "_source":
{
  "title": "这是第二篇标题",
  "content": "这是第二篇内容"
}
    } ]
  }
}

 

如果想检查ik的切词效果,可以执行:

curl 'http://localhost:9200/myappname/_analyze?analyzer=ik_max_word&pretty=true' -d'
{
    "text":"中华人民共和国国歌"
}'

通过返回结果可以看出,ik_max_word切词把中华人民共和国国歌切成了“中华人民共和国”、“中华人民”、“中华”、“华人”、“人民共和国”、“人民”、“共和国”、“共和”、“国”、“国歌”

也就是说我们搜索这些词中的任意一个都能把这句话搜到,如果不安装ik插件的话,那只会切成:“中”、“华”、“人”、“民”、“共”、“和”、“国”、“国”、“歌”,不够智能,搜索也不好进行了

 

讲解一下

上面几条命令都是json形式,elasticSearch就是这么人性化,没治了。

这里面的myappname是你自己可以改成自己应用的名字,这在elasticSearch数据存储中是完全隔离的,而myblog是我们在同一个app中想要维护的不同的数据,就是你的不同数据,比如文章、用户、评论,他们最好都分开,这样搜索的时候也不会混

pretty参数就是让返回的json有换行和缩进,容易阅读,调试时可以加上,开发到程序里就可以去掉了

analyzer就是切词器,我们指定的ik_max_word在前面配置文件里遇到过,表示最大程度切词,各种切,360度切

返回结果里的hits就是“命中”,total是命中了几条,took是花了几毫秒,_score就是相关性程度,可以用来做排序的依据

 

elasticSearch有什么用

上面都是json的接口,那么我们怎么用呢?其实你想怎么用就怎么用,煎着吃、炒着吃、炖着吃都行。比如我们的博客网站,当你创建一篇博客的时候可以发送“添加”的json命令,然后你开发一个搜索页面,当你输入关键词点搜索的时候,可以发送查询的命令,这样返回的结果就是你的搜索结果,只不过需要你自己润色一下,让展现更美观。感觉复杂吗?下一节告诉你怎么用symfony2的扩展来实现博客网站的搜索功能

 

 

 

 

程序员工作头十年应该做的事

i>

  • 2. 每一阶段认真做好一个最感兴趣的项目。这既是一步一个脚印的痕迹,又可以让10年后的你有料可吹
  • 3. 每周抽时间对现有的工作做思考。一路向前要时不时停下脚步,系统的长远的思考总会有所收益
  • 4. 尝试做一些管理工作。在中国的IT行业只做技术是行不通的,要努力成为一个全面发展的人才
  • 5. 要常沟通,大胆沟通。约见一个比你优秀的人会使你自己成长的更快
  • 6. 抽时间看一些业界优秀的论文。如今技术发展是飞快的,只维护现有的技术总是跟不上时代的脚步
  • 7. 常读书。网上多的是信息,但缺少的是思想,读书可以系统的领悟思想,读书可以使人增长智慧
  • 8. 做一个自己的网站。从实用的角度做一个网站,不要经常改变方向,不要过早考虑盈利,10年后这也许就是一个产品
  • 9. 学习一个其他的领域。每周抽点时间学习一个其他领域,10年后你就会成为这方面的专家,最起码你可以结合最擅长的两个领域写本书