Commit d20a0188 by usual2970

优化

parent 5000dbff
...@@ -41,6 +41,7 @@ type Collect struct { ...@@ -41,6 +41,7 @@ type Collect struct {
wg WaitGroupWrap wg WaitGroupWrap
chanPool chan bool chanPool chan bool
chanItem chan *Item chanItem chan *Item
f *os.File
} }
func NewCollect(filePath string, needUnzip bool) *Collect { func NewCollect(filePath string, needUnzip bool) *Collect {
...@@ -69,8 +70,8 @@ func (c *Collect) GetItems() error { ...@@ -69,8 +70,8 @@ func (c *Collect) GetItems() error {
if err != nil { if err != nil {
return err return err
} }
defer f.Close() c.f = f
rd := bufio.NewReader(f) rd := bufio.NewReader(c.f)
c.wg.Wrap(func() { c.wg.Wrap(func() {
for { for {
...@@ -90,22 +91,21 @@ func (c *Collect) GetItems() error { ...@@ -90,22 +91,21 @@ func (c *Collect) GetItems() error {
} }
}) })
c.wg.Wrap(func() {
for { for {
select { select {
case item := <-c.chanItem: case item := <-c.chanItem:
c.chanPool <- true //缓冲区满之后阻塞,后面的readChan将等待 c.chanPool <- true //缓冲区满之后阻塞,后面的readChan将等待
c.wg.Wrap(func() { c.wg.Wrap(func() {
c.handleItem(item) c.handleItem(item)
<-c.chanPool <-c.chanPool
}) })
case <-time.After(30 * time.Second): case <-time.After(30 * time.Second):
log.Printf("timed out") //等待30后退出 log.Printf("timed out") //等待30后退出
break return
}
} }
} })
return nil return nil
} }
...@@ -152,5 +152,10 @@ func (c *Collect) handleItem(item *Item) { ...@@ -152,5 +152,10 @@ func (c *Collect) handleItem(item *Item) {
} }
func (c *Collect) Exit() { func (c *Collect) Exit() {
c.wg.Wait() c.wg.Wait()
if c.f != nil {
c.f.Close()
}
} }
...@@ -22,7 +22,7 @@ func TestOnItem(t *testing.T) { ...@@ -22,7 +22,7 @@ func TestOnItem(t *testing.T) {
}) })
err := c.GetItems() err := c.GetItems()
c.Exit()
fmt.Println(err) fmt.Println(err)
c.Exit()
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment