use Context to send cancelation signals to groutines

This commit is contained in:
mind1949 2020-06-12 21:31:08 +08:00
parent bf3225765d
commit 5048018d48
5 changed files with 42 additions and 8 deletions

View file

@ -45,7 +45,7 @@ import (
) )
func main() { func main() {
for tweet := range twitterscraper.SearchTweets("twitter scraper data -filter:retweets", 50) { for tweet := range twitterscraper.SearchTweets(context.Background(), "twitter scraper data -filter:retweets", 50) {
if tweet.Error != nil { if tweet.Error != nil {
panic(tweet.Error) panic(tweet.Error)
} }

View file

@ -1,6 +1,7 @@
package twitterscraper package twitterscraper
import ( import (
"context"
"fmt" "fmt"
"net/http" "net/http"
"net/url" "net/url"
@ -10,13 +11,20 @@ import (
const ajaxSearchURL = "https://twitter.com/i/search/timeline?q=%s" const ajaxSearchURL = "https://twitter.com/i/search/timeline?q=%s"
// SearchTweets returns channel with tweets for a given search query // SearchTweets returns channel with tweets for a given search query
func SearchTweets(query string, maxTweetsNbr int) <-chan *Result { func SearchTweets(ctx context.Context, query string, maxTweetsNbr int) <-chan *Result {
channel := make(chan *Result) channel := make(chan *Result)
go func(query string) { go func(query string) {
defer close(channel) defer close(channel)
var maxId string var maxId string
tweetsNbr := 0 tweetsNbr := 0
for tweetsNbr < maxTweetsNbr { for tweetsNbr < maxTweetsNbr {
select {
case <-ctx.Done():
channel <- &Result{Error: ctx.Err()}
return
default:
}
tweets, err := FetchSearchTweets(query, maxId) tweets, err := FetchSearchTweets(query, maxId)
if err != nil { if err != nil {
channel <- &Result{Error: err} channel <- &Result{Error: err}
@ -28,9 +36,16 @@ func SearchTweets(query string, maxTweetsNbr int) <-chan *Result {
} }
for _, tweet := range tweets { for _, tweet := range tweets {
select {
case <-ctx.Done():
channel <- &Result{Error: ctx.Err()}
return
default:
}
if tweetsNbr < maxTweetsNbr { if tweetsNbr < maxTweetsNbr {
lastId, _ := strconv.ParseInt(tweet.ID, 10, 64) lastId, _ := strconv.ParseInt(tweet.ID, 10, 64)
maxId = strconv.FormatInt(lastId - 1, 10) maxId = strconv.FormatInt(lastId-1, 10)
channel <- &Result{Tweet: *tweet} channel <- &Result{Tweet: *tweet}
} }
tweetsNbr++ tweetsNbr++

View file

@ -1,11 +1,14 @@
package twitterscraper package twitterscraper
import "testing" import (
"context"
"testing"
)
func TestGetSearchTweets(t *testing.T) { func TestGetSearchTweets(t *testing.T) {
count := 0 count := 0
maxTweetsNbr := 50 maxTweetsNbr := 50
for tweet := range SearchTweets("twitter scraper data -filter:retweets", maxTweetsNbr) { for tweet := range SearchTweets(context.Background(), "twitter scraper data -filter:retweets", maxTweetsNbr) {
if tweet.Error != nil { if tweet.Error != nil {
t.Error(tweet.Error) t.Error(tweet.Error)
} else { } else {

View file

@ -1,6 +1,7 @@
package twitterscraper package twitterscraper
import ( import (
"context"
"fmt" "fmt"
"net/http" "net/http"
"regexp" "regexp"
@ -45,18 +46,32 @@ type Result struct {
} }
// GetTweets returns channel with tweets for a given user. // GetTweets returns channel with tweets for a given user.
func GetTweets(user string, pages int) <-chan *Result { func GetTweets(ctx context.Context, user string, pages int) <-chan *Result {
channel := make(chan *Result) channel := make(chan *Result)
go func(user string) { go func(user string) {
defer close(channel) defer close(channel)
var lastTweetID string var lastTweetID string
for pages > 0 { for pages > 0 {
select {
case <-ctx.Done():
channel <- &Result{Error: ctx.Err()}
return
default:
}
tweets, err := FetchTweets(user, lastTweetID) tweets, err := FetchTweets(user, lastTweetID)
if err != nil { if err != nil {
channel <- &Result{Error: err} channel <- &Result{Error: err}
return return
} }
for _, tweet := range tweets { for _, tweet := range tweets {
select {
case <-ctx.Done():
channel <- &Result{Error: ctx.Err()}
return
default:
}
lastTweetID = tweet.ID lastTweetID = tweet.ID
channel <- &Result{Tweet: *tweet} channel <- &Result{Tweet: *tweet}
} }
@ -103,7 +118,7 @@ func FetchTweets(user string, last string) ([]*Tweet, error) {
return tweets, nil return tweets, nil
} }
func readTweetsFromHTML (htm *strings.Reader) ([]*Tweet, error) { func readTweetsFromHTML(htm *strings.Reader) ([]*Tweet, error) {
var tweets []*Tweet var tweets []*Tweet
doc, err := goquery.NewDocumentFromReader(htm) doc, err := goquery.NewDocumentFromReader(htm)

View file

@ -1,12 +1,13 @@
package twitterscraper package twitterscraper
import ( import (
"context"
"testing" "testing"
) )
func TestGetTweets(t *testing.T) { func TestGetTweets(t *testing.T) {
count := 0 count := 0
for tweet := range GetTweets("nomadic_ua", 2) { for tweet := range GetTweets(context.Background(), "nomadic_ua", 2) {
if tweet.Error != nil { if tweet.Error != nil {
t.Error(tweet.Error) t.Error(tweet.Error)
} else { } else {