Commit a888d351 by usual2970

525说心事爬虫

parent 10c11650
// Copyright 2013 The Gorilla WebSocket Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package components
import (
"bytes"
"log"
"net/http"
"time"
"github.com/gorilla/websocket"
)
const (
// Time allowed to write a message to the peer.
writeWait = 10 * time.Second
// Time allowed to read the next pong message from the peer.
pongWait = 60 * time.Second
// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10
// Maximum message size allowed from peer.
maxMessageSize = 512
)
var (
newline = []byte{'\n'}
space = []byte{' '}
)
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
// Client is an middleman between the websocket connection and the hub.
type Client struct {
hub *Hub
// The websocket connection.
conn *websocket.Conn
// Buffered channel of outbound messages.
send chan []byte
}
// readPump pumps messages from the websocket connection to the hub.
func (c *Client) readPump() {
defer func() {
c.hub.unregister <- c
c.conn.Close()
}()
c.conn.SetReadLimit(maxMessageSize)
c.conn.SetReadDeadline(time.Now().Add(pongWait))
c.conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
for {
_, message, err := c.conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) {
log.Printf("error: %v", err)
}
break
}
message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1))
c.hub.broadcast <- message
}
}
// write writes a message with the given message type and payload.
func (c *Client) write(mt int, payload []byte) error {
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
return c.conn.WriteMessage(mt, payload)
}
// writePump pumps messages from the hub to the websocket connection.
func (c *Client) writePump() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
c.conn.Close()
}()
for {
select {
case message, ok := <-c.send:
if !ok {
// The hub closed the channel.
c.write(websocket.CloseMessage, []byte{})
return
}
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
w, err := c.conn.NextWriter(websocket.TextMessage)
if err != nil {
return
}
w.Write(message)
// Add queued chat messages to the current websocket message.
n := len(c.send)
for i := 0; i < n; i++ {
w.Write(newline)
w.Write(<-c.send)
}
if err := w.Close(); err != nil {
return
}
case <-ticker.C:
if err := c.write(websocket.PingMessage, []byte{}); err != nil {
return
}
}
}
}
// serveWs handles websocket requests from the peer.
func ServeWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
return
}
client := &Client{hub: hub, conn: conn, send: make(chan []byte, 256)}
client.hub.register <- client
go client.writePump()
client.readPump()
}
// Copyright 2013 The Gorilla WebSocket Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package components
// hub maintains the set of active clients and broadcasts messages to the
// clients.
type Hub struct {
// Registered clients.
clients map[*Client]bool
// Inbound messages from the clients.
broadcast chan []byte
// Register requests from the clients.
register chan *Client
// Unregister requests from clients.
unregister chan *Client
}
func NewHub() *Hub {
return &Hub{
broadcast: make(chan []byte),
register: make(chan *Client),
unregister: make(chan *Client),
clients: make(map[*Client]bool),
}
}
func (h *Hub) Run() {
for {
select {
case client := <-h.register:
h.clients[client] = true
case client := <-h.unregister:
if _, ok := h.clients[client]; ok {
delete(h.clients, client)
close(client.send)
}
case message := <-h.broadcast:
for client := range h.clients {
select {
case client.send <- message:
default:
close(client.send)
delete(h.clients, client)
}
}
}
}
}
appname = service
httpport = 8080
httpport = 8082
runmode = dev
copyrequestbody = true
autorender=false
autorender=true
upusername=staticydl
uppassword=ydl12345com
......
package controllers
import (
"github.com/astaxie/beego"
"github.com/usual2970/util/service/components"
"fmt"
)
type ChatController struct {
beego.Controller
}
var hub *components.Hub
func (c *ChatController) Index() {
fmt.Println(hub)
components.ServeWs(hub, c.Ctx.ResponseWriter, c.Ctx.Request)
}
func init(){
hub = components.NewHub()
go hub.Run()
}
......@@ -9,7 +9,7 @@ type MainController struct {
}
func (c *MainController) Get() {
c.Data["Website"] = "beego.me"
c.Data["Host"] = c.Ctx.Request.Host
c.Data["Email"] = "astaxie@gmail.com"
c.TplName = "index.tpl"
}
......@@ -6,7 +6,9 @@ import (
)
func init() {
beego.Router("/", &controllers.MainController{})
beego.Router("/", &controllers.ChatController{},"*:Index")
beego.Router("/im",&controllers.MainController{})
beego.AutoRouter(&controllers.TelController{})
beego.AutoRouter(&controllers.GitController{})
}
package pipeline
import (
"github.com/hu17889/go_spider/core/common/com_interfaces"
"github.com/hu17889/go_spider/core/common/page_items"
"database/sql"
_ "github.com/go-sql-driver/mysql"
"log"
"github.com/pquerna/ffjson/ffjson"
// "regexp"
)
type PipelineMysql struct {
pMysql *sql.DB
}
func NewPipelineMysql() *PipelineMysql {
db, err := sql.Open("mysql", "root:123456Aa@tcp(127.0.0.1:3306)/qqxinlin?allowOldPasswords=1")
if err != nil {
panic("Open database error")
}
return &PipelineMysql{pMysql:db}
}
func (this *PipelineMysql) Process(items *page_items.PageItems, t com_interfaces.Task) {
println("----------------------------------------------------------------------------------------------")
println("Crawled url :\t" + items.GetRequest().GetUrl() + "\n")
tag:=items.GetRequest().GetUrlTag()
if tag=="item"{
this.insertItem(items.GetAll())
}
if tag=="answer"{
this.insertAnswer(items.GetAll())
}
}
/**
* [func description]
* @param {[type]} this *PipelineMysql) insertItem(item map[string]string [description]
* @return {[type]} [description]
*/
func (this *PipelineMysql) insertItem(item map[string]string) {
stmt, err := this.pMysql.Prepare("insert into ask (origin_id,origin_url,origin_uid,source,title,content,answer_num,nick_name,gender,education_background,age,area,head,marriage,create_time) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
defer stmt.Close()
if err != nil {
log.Println(err)
return
}
_,err=stmt.Exec(item["origin_id"],item["origin_url"],item["originUserId"],"1",item["title"],item["content"],item["answerNum"],item["nick_name"],item["gender"],item["educationBackground"],item["age"],item["area"],item["head"],item["marriage"],item["createTime"])
if err != nil {
log.Println(err)
return
}
//插入answer
var answerList *AnswerList
err=ffjson.Unmarshal([]byte(item["answers"]),&answerList)
if err!=nil{
log.Println(err)
}
astmt,err:=this.pMysql.Prepare("insert into answer (content,ask_origin_id,expert_name,expert_title,expert_head,answer_time) values(?,?,?,?,?,?)")
if err!=nil{
log.Println(err)
return
}
defer astmt.Close()
for _,answer:=range answerList.List{
_,err=astmt.Exec(answer.Content,answer.AskOriginId,answer.ExpertName,answer.ExpertTitle,answer.ExpertHead,answer.AnswerTime)
if err !=nil{
log.Println(err)
}
}
}
func (this *PipelineMysql) insertAnswer(item map[string]string) {
//插入answer
var answerList *AnswerList
err:=ffjson.Unmarshal([]byte(item["answers"]),&answerList)
if err!=nil{
log.Println(err)
return
}
astmt,err:=this.pMysql.Prepare("insert into answer (content,ask_origin_id,expert_name,expert_title,expert_head,answer_time) values(?,?,?,?,?,?)")
if err!=nil{
log.Println(err)
return
}
defer astmt.Close()
for _,answer:=range answerList.List{
_,err=astmt.Exec(answer.Content,answer.AskOriginId,answer.ExpertName,answer.ExpertTitle,answer.ExpertHead,answer.AnswerTime)
if err !=nil{
log.Println(err)
}
}
}
package pipeline
type Answer struct{
Content string
OriginExpertId string
AskOriginId string
ExpertName string
ExpertTitle string
ExpertHead string
AnswerTime string
}
type AnswerList struct{
List []*Answer
}
type Config struct{
Time int64
}
\ No newline at end of file
// The example gets stock newses from site sina.com (http://live.sina.com.cn/zt/f/v/finance/globalnews1).
// The spider is continuous service.
// The stock api returns json result.
// It fetchs news at regular intervals that has been set in the config file.
// The result is saved in a file by PipelineFile.
package main
import (
"github.com/hu17889/go_spider/core/common/page"
"github.com/usual2970/util/spiders/pipeline"
"github.com/hu17889/go_spider/core/spider"
"github.com/PuerkitoBio/goquery"
"github.com/hu17889/go_spider/core/common/request"
"github.com/pquerna/ffjson/ffjson"
"fmt"
"strings"
"regexp"
"time"
"net/http"
"strconv"
"flag"
)
type MyPageProcesser struct {
startNewsId int
}
func NewMyPageProcesser() *MyPageProcesser {
return &MyPageProcesser{}
}
var getAll *bool
var getPage *int
var baseUrl="http://www.psy525.cn"
var listUrl="http://www.psy525.cn/case/"
var answerUrl="http://www.psy525.cn/sys/ajax.ashx?t=showcasepublicanslist"
// Parse html dom here and record the parse result that we want to crawl.
// Package simplejson (https://github.com/bitly/go-simplejson) is used to parse data of json.
func (this *MyPageProcesser) Process(p *page.Page) {
if !p.IsSucc() {
println(p.Errormsg())
return
}
tag:=p.GetUrlTag()
query := p.GetHtmlParser()
if tag=="list"{
p.SetSkip(true)
var reqs []*request.Request
query.Find("#clist li").Each(func(i int, s *goquery.Selection) {
url, _ := s.Find("a").Attr("href")
req := request.NewRequest(baseUrl+url, "html", "item", "GET", "", nil, nil, nil, nil)
reqs = append(reqs, req)
})
nextSelect:=query.Find("#page a").Last()
if nextSelect.Text()=="下一页"{
url,_:=nextSelect.Attr("href")
pageR := regexp.MustCompile("\\_(\\d+)")
pageMatches:=pageR.FindStringSubmatch(url)
pageNum,_:=strconv.Atoi(pageMatches[1])
page:=pageNum/20
if *getAll || (*getPage>=page){
req := request.NewRequest(listUrl+url, "html", "list", "GET", "", nil, nil, nil, nil)
reqs = append(reqs, req)
}
}
p.AddTargetRequestsWithParams(reqs)
}
if(tag=="item"){
query := p.GetHtmlParser()
title := strings.TrimSpace(query.Find("#c-que h1").Text())
content := strings.TrimSpace(query.Find("#c-que p.que").Text())
gender:=query.Find("#c-head p em").Eq(0).Text()
age:=query.Find("#c-head p em").Eq(1).Text()
educationBackground:=query.Find("#c-head p em").Eq(2).Text()
marriage:=query.Find("#c-head p em").Eq(3).Text()
area:=query.Find("#c-head p em").Eq(4).Text()
nick_name:=query.Find("#c-head p a").Text()
origin_url:=p.GetRequest().GetUrl()
r := regexp.MustCompile("\\/(\\d+)")
matches:=r.FindStringSubmatch(origin_url)
origin_id:=matches[1]
userUrl,_:=query.Find("#c-head a.avatar").Attr("href")
uidR := regexp.MustCompile("u(\\d+)")
uMatches:=uidR.FindStringSubmatch(userUrl)
originUserId:=uMatches[1]
head,_:=query.Find("#c-head a.avatar img").Attr("src")
answerNum:=query.Find("#c-que .remark b").Text()
tempTime:=query.Find("#c-que .remark i").Eq(1).Text()
timeR:=regexp.MustCompile("(\\d{4}.*)")
timeMatch:=timeR.FindStringSubmatch(tempTime)
the_time, _ := time.Parse("2006年01月02日 15:04", timeMatch[1])
createTime:=the_time.Format("2006-01-02 15:04:05")
//answers
var answers []*pipeline.Answer
eidR := regexp.MustCompile("d(\\d+)")
query.Find(".c-ans").Each(func( i int,s *goquery.Selection){
answer:=&pipeline.Answer{}
answer.Content=s.Find(".anstxt").Text()
tempTime:=s.Find(".time").Eq(0).Text()
timeMatch:=timeR.FindStringSubmatch(tempTime)
the_time, _ := time.Parse("2006年01月02日 15:04", timeMatch[1])
answer.AnswerTime=the_time.Format("2006-01-02 15:04:05")
answer.AskOriginId=origin_id
answer.ExpertName=s.Find(".dr .un").Text()
answer.ExpertTitle=s.Find(".dr em").Text()
expertUrl,_:=s.Find(".dr .un").Attr("href")
eMatches:=eidR.FindStringSubmatch(expertUrl)
answer.OriginExpertId=eMatches[1]
expertHead,_:=s.Find(".dr .avatar img").Attr("src")
answer.ExpertHead=expertHead
answers= append(answers,answer)
})
answerList:=&pipeline.AnswerList{answers}
answerJson,_:=ffjson.Marshal(answerList)
p.AddField("title",title)
p.AddField("content",content)
p.AddField("gender",gender)
p.AddField("age",age)
p.AddField("educationBackground",educationBackground)
p.AddField("marriage",marriage)
p.AddField("area",area)
p.AddField("nick_name",nick_name)
p.AddField("origin_url",origin_url)
p.AddField("origin_id",origin_id)
p.AddField("originUserId",originUserId)
p.AddField("head",head)
p.AddField("answerNum",answerNum)
p.AddField("createTime",createTime)
p.AddField("answers",string(answerJson))
answerNumInt,_:=strconv.Atoi(answerNum)
var reqs []*request.Request
if answerNumInt>3{
for i:=1;i<=(answerNumInt/3)+1;i++{
start:=i*3
req:=request.NewRequest(answerUrl, "html", "answer", "POST", fmt.Sprintf("start=%s&parm=0_-1_2_%s",strconv.Itoa(start),origin_id), http.Header{}, nil, nil, nil)
req=addHeader(req)
reqs=append(reqs,req)
}
p.AddTargetRequestsWithParams(reqs)
}
}
if tag=="answer"{
postData:=p.GetRequest().GetPostdata()
var answers []*pipeline.Answer
eidR := regexp.MustCompile("d(\\d+)")
timeR:=regexp.MustCompile("(\\d{4}.*)")
idR:=regexp.MustCompile("0\\_\\-1\\_2\\_(\\d+)")
idMatch:=idR.FindStringSubmatch(postData)
origin_id:=idMatch[1]
query.Find(".c-ans").Each(func( i int,s *goquery.Selection){
answer:=&pipeline.Answer{}
answer.Content=s.Find(".anstxt").Text()
tempTime:=s.Find(".time").Eq(0).Text()
timeMatch:=timeR.FindStringSubmatch(tempTime)
the_time, _ := time.Parse("2006年01月02日 15:04", timeMatch[1])
answer.AnswerTime=the_time.Format("2006-01-02 15:04:05")
answer.AskOriginId=origin_id
answer.ExpertName=s.Find(".dr .un").Text()
answer.ExpertTitle=s.Find(".dr em").Text()
expertUrl,_:=s.Find(".dr .un").Attr("href")
eMatches:=eidR.FindStringSubmatch(expertUrl)
answer.OriginExpertId=eMatches[1]
expertHead,_:=s.Find(".dr .avatar img").Attr("src")
answer.ExpertHead=expertHead
answers= append(answers,answer)
})
answerList:=&pipeline.AnswerList{answers}
answerJson,_:=ffjson.Marshal(answerList)
p.AddField("answers",string(answerJson))
}
}
func (this *MyPageProcesser) Finish() {
fmt.Printf("TODO:before end spider \r\n")
}
func main() {
// spider input:
// PageProcesser ;
// task name used in Pipeline for record;
//
//
var reqs []*request.Request
getAll=flag.Bool("all",false,"是否获取全部")
getPage=flag.Int("page",2,"获取的页数")
flag.Parse()
println(*getAll)
println(*getPage)
url:="http://www.psy525.cn/case/caseopen.html"
req := request.NewRequest(url, "html", "list", "GET", "", nil, nil, nil, nil)
reqs=append(reqs,req)
spider.NewSpider(NewMyPageProcesser(), "psy525").
AddRequests(reqs). // start url, html is the responce type ("html" or "json" or "jsonp" or "text")
AddPipeline(pipeline.NewPipelineMysql()). // Print result to std output // Print result in file
OpenFileLog("/tmp").
SetThreadnum(5). // Error info or other useful info in spider will be logged in file of defalt path like "WD/log/log.2014-9-1". // Sleep time between 1s and 3s.
Run()
//AddPipeline(pipeline.NewPipelineFile("/tmp/tmpfile")). // print result in file
}
func addHeader(req *request.Request) *request.Request {
req.Header.Add("Content-Type","application/x-www-form-urlencoded; charset=UTF-8")
req.Header.Add("Referer","http://www.psy525.cn/")
return req
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment