golang 怎么并发

allMap存储的是一个任务列表,KEY标记了这个任务类型,Value对应的是任务的参数, 现在我需要并发处理这些任务 。 开发过程中使用了如下两种方法,效果并不好,感觉自己没有领会到golang并发处理的思想 ; 下面是我的几点体会和疑惑,希望得到各位大神的指导。

方式一

    // allMap 中存储了任务列表
    // Task 定义如下
    type Task struct {
        Params interface{}
        ResultChan chan []byte
        // Wg *sync.WaitGroup
    }
    Params是参数,ResultChan是处理完毕之后,将结果写入到ResultChan中 ;

    // 并发 处理任务
    for key, value := range allMap {
        go func(k string, v interface{}) {
            log.Debug("k : " , k )
            if k == tools.REQUEST {
                // A
                log.Debug("elem len : ", len(value))

                one_task = &service.Task{
                    Params:     v,
                    ResultChan: make(chan []byte, len(value)),
                    //Wg : new(sync.WaitGroup) ,
                }
                // B
                log.Debugf("1 one_task : %+v ", one_task)

                // AddTask函数逻辑会处理one_task,处理完毕之后,将结果写入到one_task结构体的ResultChan字段;
                service.AddTask(one_task)

            } else if k == tools.REQUEST {

            }
        }(key, value)
    }

    // C
    log.Debugf("2 one_task : %+v ", one_task)

    // 接收结果
    go func() {
        for item := range one_task.ResultChan {
            log.Debug("Receive data From ResultChan : ", string(item))
        }
        log.Debug("Process ", tools.REQUEST, " end ")
    }()

这种方式的弊端,太依赖程序执行的先后顺序了,测试的过程中,发现当C发生在A和B之前时,会使接收结果goroutinue访问ResultChan成员发生奔溃,因为此时ResultChan还没有申请空间。

方案一解决方案: service.AddTask(one_task) 函数再加一个参数,chan <- interface{} , AddTask处理完之后,将结果写入到这个通道里面,接收结果协程监听该通道,然后读取结果。

方式二

延迟并发时机

    for k, v := range allMap {
        //go func(k string, v interface{}) {
        log.Debug("k : ", k)
        if k == tools.REQUEST {
            // A
            log.Debug("baojie elem len : ", len(v))
            one_task = &service.Task{
                Params:     v,
                ResultChan: make(chan []byte, len(v)),
                //Wg : new(sync.WaitGroup) ,
            }
            // B
            log.Debugf("1 one_task : %+v ", one_task)
            go service.AddTask(one_task)
        } else if k == tools.REQUEST_TCP {
        }
        //}(key, value)
    }

    // C
    log.Debugf("2 one_task : %+v ", one_task)

    // 接收结果
    go func() {
        for item := range one_task.ResultChan {
            log.Debug("Receive data From ResultChan : ", string(item))
        }
        log.Debug("Process ", tools.REQUEST_BAOJIE, " end ")
    }()

这样,就保证了C必须发生在A、B之后,这样一来,ResultChan一定先初始化了,等待AddTask后面的协程往里面写入数据,接收结果协程就会读取出来。

问题1

问题来了,既然方式一存在问题,那么方式二中是否在效率上有何弊端呢 ?

我这样写并发的逻辑是否有问题 ?

问题2

这种思想是否可取

var task Task ; 

// 提交任务 线程 
for key , value := range allMap{
   task := Task{
        params : value  , 
        result : make(chan interface{} , len(value) ) , // value 是一个list 
    }
   go processOneByOne(key ,value)   // 这种方式是不是开启了很多协程? len(allmap) 

}

// 取结果
for result := range task.result {
    // get result from chann 
    // to do 
} 

问题3

计划使用一个全局的chan,processOneByOne业务函数处理完毕之后,将结果写到该chan中,然后监听这个chann,从chann中获取结果 处理流程大致:

demo.go

func TodoWork(){
    go func(){
        for key ,value := range allMap{
            processOneByOne(key , value ) 
        }
    }()

    for item := range task.ResultChan {
        // 问题一、 这里如何保证item就是上面那个key value的结果,而不是其他的KEY、value对应的结果  
        // 问题二、 当TodoWork在多进程环境下面时,是否存在竞争问题?
        println(item) 

    }
}

task.go

var (
    ResultChan chan interface{} 
)

func init(){
    ResultChan = make( chan interface{} , 100 ) 
}

func processOneByOne( key string , value interface{} ) {
    // 处理任务  
    // .... 

    // 写入结果 

    // 问题三、怎么关闭ResultChan , 如果不关闭,是不是goroutine泄漏问题啊 ? 
    // 如果这里不关闭的话,上面 for item := range task.ResultChan 一直会阻塞啊 ? 
    ResultChan <- "Hello World" 

}

最后,期待各位大佬解惑!!!

已邀请:

heramerom

赞同来自: whutchao itstudying

func TodoWork(){
    ch := make(chan interface{}, 1024)
    defer close(ch)
    for key ,value := range allMap{
        go  processOneByOne(key , value, ch)
    }

    for {
        value, ok := <- ch
        if !ok {
            panic ()
        }
        println(value)
    }
}

func processOneByOne(key string, value []interface{}, ch chan interface{}) {
    // do something
    ch <- result
}

whutchao

赞同来自:

1、TodoWork函数如果在多线程环境下调用,ch通道里面的结果会不会存在数据竞争的情况? 比如

func ceshi(){

    for _ ,elem :=  range  alist {
        TodoWork()   
    }
}

itstudying

赞同来自:

根据 heramerom 的基础上小小修改了下,控制 TodoWork 函数的正常退出

func TodoWork(allMap map[string][]interface{}) {
    ch := make(chan interface{}, 1024)
    wg := sync.WaitGroup{}
    wg.Add(len(allMap))

    go func() {
        defer close(ch)
        wg.Wait()
    }()
    for key, value := range allMap {
        go func() {
            defer wg.Done()
            processOneByOne(key, value, ch)
        }()
    }

    for {
        value, ok := <-ch
        if !ok {
            panic ()
        }
        println(value)
    }
}

func processOneByOne(key string, value []interface{}, ch chan interface{}) {
    // do something
    ch <- result 
}

要回复问题请先登录注册