代码拉取完成,页面将自动刷新
同步操作将从 YoMo/yomo 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
package yomo
import (
"context"
"github.com/yomorun/yomo/core"
"github.com/yomorun/yomo/core/frame"
)
const (
sourceLogPrefix = "\033[32m[yomo:source]\033[0m "
)
// Source is responsible for sending data to yomo.
type Source interface {
// Close will close the connection to YoMo-Zipper.
Close() error
// Connect to YoMo-Zipper.
Connect() error
// SetDataTag will set the tag of data when invoking Write().
SetDataTag(tag uint8)
// Write the data to directed downstream.
Write(data []byte) (n int, err error)
// WriteWithTag will write data with specified tag, default transactionID is epoch time.
WriteWithTag(tag uint8, data []byte) error
// SetErrorHandler set the error handler function when server error occurs
SetErrorHandler(fn func(err error))
// [Experimental] SetReceiveHandler set the observe handler function
SetReceiveHandler(fn func(tag byte, data []byte))
// Write the data to all downstream
Broadcast(data []byte) error
}
// YoMo-Source
type yomoSource struct {
name string
zipperEndpoint string
client *core.Client
tag uint8
fn func(byte, []byte)
}
var _ Source = &yomoSource{}
// NewSource create a yomo-source
func NewSource(name string, opts ...Option) Source {
options := NewOptions(opts...)
client := core.NewClient(name, core.ClientTypeSource, options.ClientOptions...)
return &yomoSource{
name: name,
zipperEndpoint: options.ZipperAddr,
client: client,
}
}
// Write the data to downstream.
func (s *yomoSource) Write(data []byte) (int, error) {
err := s.WriteWithTag(s.tag, data)
if err != nil {
return 0, err
}
return len(data), nil
}
// SetDataTag will set the tag of data when invoking Write().
func (s *yomoSource) SetDataTag(tag uint8) {
s.tag = tag
}
// Close will close the connection to YoMo-Zipper.
func (s *yomoSource) Close() error {
if err := s.client.Close(); err != nil {
s.client.Logger().Errorf("%sClose(): %v", sourceLogPrefix, err)
return err
}
s.client.Logger().Debugf("%s is closed", sourceLogPrefix)
return nil
}
// Connect to YoMo-Zipper.
func (s *yomoSource) Connect() error {
// set backflowframe handler
s.client.SetBackflowFrameObserver(func(frm *frame.BackflowFrame) {
if s.fn != nil {
s.fn(frm.GetDataTag(), frm.GetCarriage())
}
})
err := s.client.Connect(context.Background(), s.zipperEndpoint)
if err != nil {
s.client.Logger().Errorf("%sConnect() error: %s", sourceLogPrefix, err)
}
return err
}
// WriteWithTag will write data with specified tag, default transactionID is epoch time.
func (s *yomoSource) WriteWithTag(tag uint8, data []byte) error {
f := frame.NewDataFrame()
f.SetCarriage(byte(tag), data)
f.SetSourceID(s.client.ClientID())
s.client.Logger().Debugf("%sWriteWithTag: tid=%s, source_id=%s, data[%d]=%# x",
sourceLogPrefix, f.TransactionID(), f.SourceID(), len(data), frame.Shortly(data))
return s.client.WriteFrame(f)
}
// SetErrorHandler set the error handler function when server error occurs
func (s *yomoSource) SetErrorHandler(fn func(err error)) {
s.client.SetErrorHandler(fn)
}
// [Experimental] SetReceiveHandler set the observe handler function
func (s *yomoSource) SetReceiveHandler(fn func(byte, []byte)) {
s.fn = fn
s.client.Logger().Debugf("%sSetReceiveHandler(%v)", sourceLogPrefix, s.fn)
}
// Broadcast Write the data to all downstream
func (s *yomoSource) Broadcast(data []byte) error {
f := frame.NewDataFrame()
f.SetCarriage(byte(s.tag), data)
f.SetSourceID(s.client.ClientID())
f.SetBroadcast(true)
s.client.Logger().Debugf("%sBroadcast: tid=%s, source_id=%s, data[%d]=%# x",
sourceLogPrefix, f.TransactionID(), f.SourceID(), len(data), frame.Shortly(data))
return s.client.WriteFrame(f)
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。