release](https://camo.githubusercontent.com/d7b6f15132f8a8da8f6c56526dcf56a40c385d7c/68747470733a2f2f696d672e736869656c64732e696f2f6769746875622f72656c656173652f68656e72796c656532636e2f74656c65706f72742e7376673f7374796c653d666c61742d737175617265)](https://github.com/henrylee2cn/teleport/releases)
Teleport is a versatile, high-performance and flexible socket framework.
It can be used for peer-peer, rpc, gateway, micro services, push services, game services and so on.
简体中文
Self Test
A server and a client process, running on the same machine
CPU: Intel Xeon E312xx (Sandy Bridge) 16 cores 2.53GHz
Memory: 16G
OS: Linux 2.6.32-696.16.1.el6.centos.plus.x86_64, CentOS 6.4
Go: 1.9.2
Message size: 581 bytes
Message codec: protobuf
Sent total 1000000 messages
teleport
Comparison Test
More Detail
svg file
go get -u -f github.com/henrylee2cn/teleport
Header
Body
JSON
Protobuf
string
tcp
tcp4
tcp6
unix
unixpacket
package main import ( "fmt" "time" tp "github.com/henrylee2cn/teleport" ) func main() { // graceful go tp.GraceSignal() // server peer srv := tp.NewPeer(tp.PeerConfig{ CountTime: true, ListenPort: 9090, PrintDetail: true, }) // router srv.RouteCall(new(Math)) // broadcast per 5s go func() { for { time.Sleep(time.Second * 5) srv.RangeSession(func(sess tp.Session) bool { sess.Push( "/push/status", fmt.Sprintf("this is a broadcast, server time: %v", time.Now()), ) return true }) } }() // listen and serve srv.ListenAndServe() } // Math handler type Math struct { tp.CallCtx } // Add handles addition request func (m *Math) Add(arg *[]int) (int, *tp.Rerror) { // test query parameter tp.Infof("author: %s", m.Query().Get("author")) // add var r int for _, a := range *arg { r += a } // response return r, nil }
package main import ( "time" tp "github.com/henrylee2cn/teleport" ) func main() { // log level tp.SetLoggerLevel("ERROR") cli := tp.NewPeer(tp.PeerConfig{}) defer cli.Close() cli.RoutePush(new(Push)) sess, err := cli.Dial(":9090") if err != nil { tp.Fatalf("%v", err) } var result int rerr := sess.Call("/math/add?author=henrylee2cn", []int{1, 2, 3, 4, 5}, &result, ).Rerror() if rerr != nil { tp.Fatalf("%v", rerr) } tp.Printf("result: %d", result) tp.Printf("wait for 10s...") time.Sleep(time.Second * 10) } // Push push handler type Push struct { tp.PushCtx } // Push handles '/push/status' message func (p *Push) Status(arg *string) *tp.Rerror { tp.Printf("%s", *arg) return nil }
More Examples
Message.Body
Abstracts the data message(Message Object) of the application layer and is compatible with HTTP message:
You can customize your own communication protocol by implementing the interface:
type ( // Proto pack/unpack protocol scheme of socket message. Proto interface { // Version returns the protocol's id and name. Version() (byte, string) // Pack writes the Message into the connection. // Note: Make sure to write only once or there will be package contamination! Pack(*Message) error // Unpack reads bytes from the connection to the Message. // Note: Concurrent unsafe! Unpack(*Message) error } ProtoFunc func(io.ReadWriter) Proto )
Next, you can specify the communication protocol in the following ways:
func SetDefaultProtoFunc(ProtoFunc) type Peer interface { ... ServeConn(conn net.Conn, protoFunc ...ProtoFunc) Session DialContext(ctx context.Context, addr string, protoFunc ...ProtoFunc) (Session, *Rerror) Dial(addr string, protoFunc ...ProtoFunc) (Session, *Rerror) Listen(protoFunc ...ProtoFunc) error ... }
Default protocol RawProto(Big Endian):
RawProto
{4 bytes message length} {1 byte protocol version} {1 byte transfer pipe length} {transfer pipe IDs} # The following is handled data by transfer pipe {2 bytes sequence length} {sequence} {1 byte message type} # e.g. CALL:1; REPLY:2; PUSH:3 {2 bytes URI length} {URI} {2 bytes metadata length} {metadata(urlencoded)} {1 byte body codec id} {body}
Transfer filter pipe, handles byte stream of message when transfer.
// XferFilter handles byte stream of message when transfer. type XferFilter interface { // Id returns transfer filter id. Id() byte // Name returns transfer filter name. Name() string // OnPack performs filtering on packing. OnPack([]byte) ([]byte, error) // OnUnpack performs filtering on unpacking. OnUnpack([]byte) ([]byte, error) } // Get returns transfer filter by id. func Get(id byte) (XferFilter, error) // GetByName returns transfer filter by name. func GetByName(name string) (XferFilter, error) // XferPipe transfer filter pipe, handlers from outer-most to inner-most. // Note: the length can not be bigger than 255! type XferPipe struct { // Has unexported fields. } func NewXferPipe() *XferPipe func (x *XferPipe) Append(filterId ...byte) error func (x *XferPipe) AppendFrom(src *XferPipe) func (x *XferPipe) Ids() []byte func (x *XferPipe) Len() int func (x *XferPipe) Names() []string func (x *XferPipe) OnPack(data []byte) ([]byte, error) func (x *XferPipe) OnUnpack(data []byte) ([]byte, error) func (x *XferPipe) Range(callback func(idx int, filter XferFilter) bool) func (x *XferPipe) Reset()
The body’s codec set.
type Codec interface { // Id returns codec id. Id() byte // Name returns codec name. Name() string // Marshal returns the encoding of v. Marshal(v interface{}) ([]byte, error) // Unmarshal parses the encoded data and stores the result // in the value pointed to by v. Unmarshal(data []byte, v interface{}) error }
Plug-ins during runtime.
type ( // Plugin plugin background Plugin interface { Name() string } // PreNewPeerPlugin is executed before creating peer. PreNewPeerPlugin interface { Plugin PreNewPeer(*PeerConfig, *PluginContainer) error } ... )
// Start a server var peer1 = tp.NewPeer(tp.PeerConfig{ ListenPort: 9090, // for server role }) peer1.Listen() ... // Start a client var peer2 = tp.NewPeer(tp.PeerConfig{}) var sess, err = peer2.Dial("127.0.0.1:8080")
type Aaa struct { tp.CallCtx } func (x *Aaa) XxZz(arg *) (, *tp.Rerror) { ... return r, nil }
register it to root router:
// register the call route: /aaa/xx_zz peer.RouteCall(new(Aaa))
// or register the call route: /xx_zz peer.RouteCallFunc((*Aaa).XxZz)
func XxZz(ctx tp.CallCtx, arg *) (, *tp.Rerror) { ... return r, nil }
// register the call route: /xx_zz peer.RouteCallFunc(XxZz)
type Bbb struct { tp.PushCtx } func (b *Bbb) YyZz(arg *) *tp.Rerror { ... return nil }
// register the push route: /bbb/yy_zz peer.RoutePush(new(Bbb))
// or register the push route: /yy_zz peer.RoutePushFunc((*Bbb).YyZz)
// YyZz register the route: /yy_zz func YyZz(ctx tp.PushCtx, arg *) *tp.Rerror { ... return nil }
// register the push route: /yy_zz peer.RoutePushFunc(YyZz)
func XxxUnknownCall (ctx tp.UnknownCallCtx) (interface{}, *tp.Rerror) { ... return r, nil }
// register the unknown call route: /* peer.SetUnknownCall(XxxUnknownCall)
func XxxUnknownPush(ctx tp.UnknownPushCtx) *tp.Rerror { ... return nil }
// register the unknown push route: /* peer.SetUnknownPush(XxxUnknownPush)
AaBb
/aa_bb
Aa_Bb
/aa/bb
aa_bb
Aa__Bb
aa__bb
ABC_XYZ
/abc/xyz
ABcXYz
/abc_xyz
ABC__XYZ
// NewIgnoreCase Returns a ignoreCase plugin. func NewIgnoreCase() *ignoreCase { return &ignoreCase{} } type ignoreCase struct{} var ( _ tp.PostReadCallHeaderPlugin = new(ignoreCase) _ tp.PostReadPushHeaderPlugin = new(ignoreCase) ) func (i *ignoreCase) Name() string { return "ignoreCase" } func (i *ignoreCase) PostReadCallHeader(ctx tp.ReadCtx) *tp.Rerror { // Dynamic transformation path is lowercase ctx.UriObject().Path = strings.ToLower(ctx.UriObject().Path) return nil } func (i *ignoreCase) PostReadPushHeader(ctx tp.ReadCtx) *tp.Rerror { // Dynamic transformation path is lowercase ctx.UriObject().Path = strings.ToLower(ctx.UriObject().Path) return nil }
// add router group group := peer.SubRoute("test") // register to test group group.RouteCall(new(Aaa), NewIgnoreCase()) peer.RouteCallFunc(XxZz, NewIgnoreCase()) group.RoutePush(new(Bbb)) peer.RoutePushFunc(YyZz) peer.SetUnknownCall(XxxUnknownCall) peer.SetUnknownPush(XxxUnknownPush)
type PeerConfig struct { Network string `yaml:"network" ini:"network" comment:"Network; tcp, tcp4, tcp6, unix or unixpacket"` LocalIP string `yaml:"local_ip" ini:"local_ip" comment:"Local IP"` ListenPort uint16 `yaml:"listen_port" ini:"listen_port" comment:"Listen port; for server role"` DefaultDialTimeout time.Duration `yaml:"default_dial_timeout" ini:"default_dial_timeout" comment:"Default maximum duration for dialing; for client role; ns,µs,ms,s,m,h"` RedialTimes int32 `yaml:"redial_times" ini:"redial_times" comment:"The maximum times of attempts to redial, after the connection has been unexpectedly broken; for client role"` DefaultBodyCodec string `yaml:"default_body_codec" ini:"default_body_codec" comment:"Default body codec type id"` DefaultSessionAge time.Duration `yaml:"default_session_age" ini:"default_session_age" comment:"Default session max age, if less than or equal to 0, no time limit; ns,µs,ms,s,m,h"` DefaultContextAge time.Duration `yaml:"default_context_age" ini:"default_context_age" comment:"Default CALL or PUSH context max age, if less than or equal to 0, no time limit; ns,µs,ms,s,m,h"` SlowCometDuration time.Duration `yaml:"slow_comet_duration" ini:"slow_comet_duration" comment:"Slow operation alarm threshold; ns,µs,ms,s ..."` PrintDetail bool `yaml:"print_detail" ini:"print_detail" comment:"Is print body and metadata or not"` CountTime bool `yaml:"count_time" ini:"count_time" comment:"Is count cost time or not"` }
SetMessageSizeLimit sets max message size. If maxSize<=0, set it to max uint32.
func SetMessageSizeLimit(maxMessageSize uint32)
SetSocketKeepAlive sets whether the operating system should send keepalive messages on the connection.
func SetSocketKeepAlive(keepalive bool)
SetSocketKeepAlivePeriod sets period between keep alives.
func SetSocketKeepAlivePeriod(d time.Duration)
SetSocketNoDelay controls whether the operating system should delay message transmission in hopes of sending fewer messages (Nagle’s algorithm). The default is true (no delay), meaning that data is sent as soon as possible after a Write.
func SetSocketNoDelay(_noDelay bool)
SetSocketReadBuffer sets the size of the operating system’s receive buffer associated with the connection.
func SetSocketReadBuffer(bytes int)
SetSocketWriteBuffer sets the size of the operating system’s transmit buffer associated with the connection.
func SetSocketWriteBuffer(bytes int)
Teleport is under Apache v2 License. See the LICENSE file for the full license text