核心部分
N0.1 Request/Response
type Response
struct {
Url
string
Body
string
Meta *
map[
string]
string
}
type Request
struct {
Url
string
Parse
func(response *Response)
Meta *
map[
string]
string
}
请求与响应的封装结构体,对于 Response , Url 为请求时的 Url , Body 为此 Url 的网页源码, Meta 为上一层传过来的数据(之所以用指针是因为指针省内存);
对于 Request 结构, Url 是需要请求的链接, Parse 是该 Url 对应的解析函数, Meta 是传给 Parse 函数参数 response 的数据。
No.2 请求池/数据池
var RequestChan =
make(
chan *Request,
100)
var DataChan =
make(
chan *
map[
string]
string,
100)
请求池 RequestChan 用于存放等待完成的请求,数据池 DataChan 用于存放还未处理的数据。
No.3 细节部分
var wg sync.WaitGroup
var swg sync.WaitGroup
var Save =
func(data *
map[
string]
string) {
fmt.Println(data)
}
var Download =
func(url
string) (content
string, statusCode
int) {
defer func() { fmt.Printf(
"get<%d>: %s\n", statusCode, url) }()
resp, err1 := http.Get(url)
if err1 !=
nil {
statusCode =
-100
return
}
defer resp.Body.Close()
data, err2 := ioutil.ReadAll(resp.Body)
if err2 !=
nil {
statusCode =
-200
return
}
statusCode = resp.StatusCode
content =
string(data)
return
}
func doRequest(request *Request) {
defer wg.Done()
html, sta := Download(request.Url)
if sta !=
200 {
return
}
request.Parse(&Response{request.Url, html, request.Meta})
}
func save() {
defer swg.Done()
for {
data := <-DataChan
if data ==
nil {
break
}
Save(data)
}
}
对于请求与数据的处理,我们需要全部完成,且并发数量要限制在一定范围内,因此我们定义了 wg 与 swg 两个协程组, Save 变量是一个函数,之所以定义成一个变量,是因为 Save 是提供给用户操作的,例如用户可能将数据保存在文件、数据库、云盘等等地方,我们只提供一个默认保存方案,即向控制台输出数据。同理 Download 变量也是一个函数,用于处理下载问题。doRequest 函数不提供给用户操作,因此其首字母小写,其作用是处理请求 Request , save 函数是我们的数据处理函数,从数据池中取出数据,然后调用用户定义的 Save 方法处理数据。
NO.4 调度器Scheduler(核心)
func Scheduler(threads
int) {
go save()
swg.Add
(1)
for {
for i :=
0; i < threads; i++ {
req := <-RequestChan
if req ==
nil {
DataChan <-
nil
return
}
go doRequest(req)
wg.Add
(1)
}
wg.Wait()
}
swg.Wait()
}
调度器的作用是调度请求,首先,调度器先开启数据处理线程 save 然后再依次开启 threads 个线程处理请求,当前一批 request 请求完了后,再从请求池里取出 threads 个 request 进行请求,当所有请求完毕后,将数据池的队尾写入 nil 提示所有 request 都处理完毕了, 如果数据也处理完毕,则可结束数据处理线程,此时调度结束,控制权交给用户的 main 函数。
NO.5 完整代码
package myspider
import (
"fmt"
"io/ioutil"
"net/http"
"sync"
)
type Response
struct {
Url
string
Body
string
Meta *
map[
string]
string
}
type Request
struct {
Url
string
Parse
func(response *Response)
Meta *
map[
string]
string
}
var RequestChan =
make(
chan *Request,
100)
var DataChan =
make(
chan *
map[
string]
string,
100)
var wg sync.WaitGroup
var swg sync.WaitGroup
var Save =
func(data *
map[
string]
string) {
fmt.Println(data)
}
var Download =
func(url
string) (content
string, statusCode
int) {
defer func() { fmt.Printf(
"get<%d>: %s\n", statusCode, url) }()
resp, err1 := http.Get(url)
if err1 !=
nil {
statusCode =
-100
return
}
defer resp.Body.Close()
data, err2 := ioutil.ReadAll(resp.Body)
if err2 !=
nil {
statusCode =
-200
return
}
statusCode = resp.StatusCode
content =
string(data)
return
}
func doRequest(request *Request) {
defer wg.Done()
html, sta := Download(request.Url)
if sta !=
200 {
return
}
request.Parse(&Response{request.Url, html, request.Meta})
}
func save() {
defer swg.Done()
for {
data := <-DataChan
if data ==
nil {
break
}
Save(data)
}
}
func Scheduler(threads
int) {
go save()
swg.Add
(1)
for {
for i :=
0; i < threads; i++ {
req := <-RequestChan
if req ==
nil {
DataChan <-
nil
return
}
go doRequest(req)
wg.Add
(1)
}
wg.Wait()
}
swg.Wait()
}
示例
NO.1 古诗词网(纵向结构)
package main
import (
.
"./myspider"
"regexp"
)
var textItem = regexp.MustCompile(
`aspx">(.*?)<.*?aspx">(.*?)</a>`)
var nextReg = regexp.MustCompile(
`href="(.*?)">下一页</a>`)
var lll =
0
func parse(response *Response) {
nt := nextReg.FindStringSubmatch(response.Body)
if len(nt) ==
2 && lll <
2 {
lll++
RequestChan <- &Request{
"http://so.gushiwen.org/mingju/" + nt
[1], parse,
nil}
}
else {
defer func() { RequestChan <-
nil }()
}
res := textItem.FindAllStringSubmatch(response.Body,
100)
for _, it :=
range res {
if len(it) ==
3 {
data :=
make(
map[
string]
string)
data[
"text"] = it
[1]
data[
"title"] = it
[2]
DataChan <- &data
}
}
}
func main() {
RequestChan <- &Request{
"http://so.gushiwen.org/mingju/", parse,
nil}
Scheduler
(4)
}
执行过程:首先生成第一个请求,扔到请求池,然后开始同时处理4个请求的调度过程。
NO.3 8edy电影网(纵横结构)
package main
import (
.
"./myspider"
"fmt"
"os"
"regexp"
)
var detReg = regexp.MustCompile(
`"(/movie/[0-9]*?/)" target="_blank">`)
var nt = regexp.MustCompile(
`"(/kh/p[0-9]*?)">下一页`)
var cur =
0
var max =
2
func parse1(response *Response) {
cur++
for _, link :=
range detReg.FindAllStringSubmatch(response.Body,
40) {
RequestChan <- &Request{
"http://www.8edy.tv" + link
[1], parse2,
nil}
}
next := nt.FindStringSubmatch(response.Body)
if len(next) >
0 && cur < max {
RequestChan <- &Request{
"http://www.8edy.tv" + next
[1], parse1,
nil}
}
else {
RequestChan <-
nil
}
}
var titleReg = regexp.MustCompile(
`<h1 class="lvzi">(.*?)</h1>`)
var ftypeReg = regexp.MustCompile(
`类型:(.*?)<br/>`)
var yearReg = regexp.MustCompile(
`年份:(.*?)<br/>`)
var areaReg = regexp.MustCompile(
`地区:(.*?)<br/>`)
var formReg = regexp.MustCompile(
`格式:(.*?)<br/>`)
func parse2(response *Response) {
html := response.Body
data :=
make(
map[
string]
string)
data[
"title"] = titleReg.FindStringSubmatch(html)
[1]
data[
"type"] = ftypeReg.FindStringSubmatch(html)
[1]
data[
"year"] = yearReg.FindStringSubmatch(html)
[1]
data[
"area"] = areaReg.FindStringSubmatch(html)
[1]
data[
"form"] = formReg.FindStringSubmatch(html)
[1]
DataChan <- &data
}
func main() {
file, err := os.Create(
"8edy.csv")
defer file.Close()
if err !=
nil {
fmt.Println(err)
return
}
file.WriteString(fmt.Sprintf(
"%s,%s,%s,%s,%s\n",
"标题",
"类型",
"年份",
"地区",
"格式"))
Save =
func(data *
map[
string]
string) {
file.WriteString(fmt.Sprintf(
"%s,%s,%s,%s,%s\n", (*data)[
"title"], (*data)[
"type"], (*data)[
"year"], (*data)[
"area"], (*data)[
"form"]))
}
RequestChan <- &Request{
"http://www.8edy.tv/kh/", parse1,
nil}
Scheduler
(10)
}
过程与上面的一样,这个程序多一个自定义数据存储过程。
更多更新在这里:https://github.com/ChenL1994/GoScrapy