
github.com/nsqio/nsq/nsqd/topic.go #215
 func (t *Topic) messagePump() {
    ......
    for {
    	select {
    	case msg = <-memoryMsgChan:
    ......
    	case <-t.exitChan:
    		goto exit
    	}
        for i, channel := range chans {
            chanMsg := msg
        
            // copy the message because each channel
            // needs a unique instance but...
            // fastpath to avoid copy if its the first channel
            // (the topic already created the first copy)
            if i > 0 {
            	chanMsg = NewMessage(msg.ID, msg.Body)
            	chanMsg.Timestamp = msg.Timestamp
            	chanMsg.deferred = msg.deferred
            }
            if chanMsg.deferred != 0 {
            	channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
            	continue
            }
            err := channel.PutMessage(chanMsg)
            ......
        }
    }
    ......
}
github.com/nsqio/nsq/nsqd/topic.go #44
func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topic {
    t := &Topic{
    ......
    }
    ......
    t.waitGroup.Wrap(func() { t.messagePump() })
    t.ctx.nsqd.Notify(t)
    return t
}