topo

A library to create in process topologies of goroutines connected by channels.

  • 所有者: mdmarek/topo
  • 平台:
  • 許可證: Apache License 2.0
  • 分類:
  • 主題:
  • 喜歡:
    0
      比較:

Github星跟蹤圖

TOPO

A library to create in process topologies of goroutines connected by channels.
Topo does boilerplate work as outlined in http://blog.golang.org/pipelines.
You receive correctly connected input and output channels, leaving the
message processing for you while handling the plumbing. Topo strives to be
simple, all interaction are via proper Go channels, no wrapping interfaces.

Example Code

package main

import (
	"fmt"
	"sync"

	"github.com/mdmarek/topo"
	"github.com/mdmarek/topo/topoutil"
)

const nworkers = 2

func worker(in <-chan topo.Mesg, out chan<- topo.Mesg) {
	... do something ...
}

func main() {
	wg := new(sync.WaitGroup)
	wg.Add(nworkers)

	// Create a new topo and source of streaming data from meetup.com.
	t, err := topo.New()
	if err != nil {
		fmt.Printf("Failed to create topo: %v\n", err)
		return
	}

	source, err := topoutil.NewMeetupSource(t)
	if err != nil {
		fmt.Printf("Failed to open source: %v\n", err)
		return
	}

	// Shuffles messages read from the source
	// to each worker.
	outputs := t.Shuffle(nworkers, worker, source)

	// Each output channel is read by one Sink, which
	// prints to stdout the messages it receives.
	for i := 0; i < nworkers; i++ {
		go topoutil.Sink(i, wg, outputs[i])
	}

	// Wait for the sinks to finish, if ever.
	wg.Wait()
}

Messages

Topo creates channels of type chan Mesg, and a Mesg is defined as the
interface:

type Mesg interface { 
	Key() uint64
	Body() interface{}
}

Compositions

Topo works through three simple compositions of channels to form pipelines:
Merge, Shuffle, and Partition.

Merge takes n input channels and merges them into one output channel.

Shuffle takes n input channels and connects them to m functions writing their output
to m output channels. Messages from the n input channels are sent to the first
available function.

Partition takes n input channels and connects them to m functions writing their output
to m output channels. Messages from the n input channels are routed by taking the
message's key value modulo m.

Sources

When writing a source of data for the topology it should use the topology's exit channel
in its select statement, otherwise a deadlock panic may occur. The basic structure is
as follows:

func NewMySource(... params ..., t topo.Topo) (<-chan topo.Mesg, error) {

	...

	out := make(chan topo.Mesg)
	go func(exit <-chan bool) {
		defer close(out)
		for ... {
			select {
			case out <- produce():
			case <-exit:
				return
			}
		}
	}(t.ExitChan())

	...

	return out, nil
}

Keep in mind to pass the exit channel as a parameter to any started goroutiness rather
than as a closure.

主要指標

概覽
名稱與所有者mdmarek/topo
主編程語言Go
編程語言Go (語言數: 1)
平台
許可證Apache License 2.0
所有者活动
創建於2014-11-12 05:48:24
推送於2014-12-01 23:17:27
最后一次提交2014-11-23 22:38:33
發布數2
最新版本名稱0.2 (發布於 )
第一版名稱0.1 (發布於 )
用户参与
星數119
關注者數7
派生數5
提交數37
已啟用問題?
問題數2
打開的問題數0
拉請求數3
打開的拉請求數0
關閉的拉請求數0
项目设置
已啟用Wiki?
已存檔?
是復刻?
已鎖定?
是鏡像?
是私有?