likes
comments
collection
share

kratos源码分析-配置解析

作者站长头像
站长
· 阅读数 14

项目启动时一般前置条件为解析配置文件, 我们看下这块是怎么设计的.

1. 相关类图

kratos源码分析-配置解析

流程解释
  1. config 实现 config Interface 接口, 初始化reader对象

  2. 调用Load方法, 通过入口注入的不同文件源, 调用实现了source interface 对应的Load方法(以file举例子)

  3. file 通过 Load 入口, 判断是目录还是文件, 执行对应的方法, 拿到文件内容返回

  4. config 拿到file返回的内容, 交给reader去合并

  5. reader 根据文件格式(yml) 转为 map[string]interface{} 结构, 并进行覆盖合并, 将结果存到对象变量上

  6. reader 处理完成后, config调用 file watcher 进行文件变更监听, 启动协程监听

由此可见, config对象其实作为入口, 将数据生产交给file、env, 将数据加工解析交给 reader

2. 代码案例

init 接收外部参数定义解析配置文件地址

func init() {
	flag.StringVar(&commonconf, "common", "./configs/local", "common config path, eg: -conf config.yaml")
	flag.StringVar(&flagconf, "conf", "./configs/local/api", "config path, eg: -conf config.yaml")
    flag.Parse()
}

使用config.New初始化文件配置

//初始化配置
//新增两个配置源, 文件格式 common and flagconf 路径
c := config.New(
    config.WithSource(
        file.NewSource(commonconf),
        file.NewSource(flagconf),
    ),
)

// 关闭watch相关的监听器
defer c.Close()

// 加载配置文件
if err := c.Load(); err != nil {
	panic(err)
}

//解析配置到bc结构上
var bc conf.Bootstrap
if err := c.Scan(&bc); err != nil {
    panic(err)
}

我们看下config.New的实现

// 初始化解析器, 关联reader对象, 数据交给由reader加工和存储
func New(opts ...Option) Config {
	o := options{
		decoder:  defaultDecoder,
		resolver: defaultResolver,
	}
	for _, opt := range opts {
		opt(&o)
	}
	return &config{
		opts:   o,
		reader: newReader(o),
	}
}

Options 有下面几个属性

type options struct {
	sources  []Source // 配置源, 由初始化负责传入的source配置源
	decoder  Decoder //  解析器
	resolver Resolver // 变量解析替换
}

执行 Load 配置加载

func (c *config) Load() error {
    // 因为我们在入口传入的是file对象, 所以执行src load的时候也是file对象的 load 方法
	for _, src := range c.opts.sources {
        // 获取文件内容(可能是一个目录, 会存在多个文件)
		kvs, err := src.Load()
		if err != nil {
			return err
		}
		for _, v := range kvs {
			log.Debugf("config loaded: %s format: %s", v.Key, v.Format)
		}
        // 合并配置key
		if err = c.reader.Merge(kvs...); err != nil {
			log.Errorf("failed to merge config source: %v", err)
			return err
		}
        // 调用file watch, 监听文件变化
		w, err := src.Watch()
		if err != nil {
			log.Errorf("failed to watch config source: %v", err)
			return err
		}
		c.watchers = append(c.watchers, w)
        // 异步监听文件变化(调用对应的watch对象)
		go c.watch(w)
	}

    // 解析内容中是否包含 ${APPID:default} 变量
    // 如果在配置文件中存在 APPID: xx 配置, 则进行替换
    // 否则使用default默认值
	if err := c.reader.Resolve(); err != nil {
		log.Errorf("failed to resolve config source: %v", err)
		return err
	}
	return nil
}

异步watch

func (c *config) watch(w Watcher) {
	for {
		kvs, err := w.Next()
		if err != nil {
			if errors.Is(err, context.Canceled) {
				log.Infof("watcher's ctx cancel : %v", err)
				return
			}
			time.Sleep(time.Second)
			log.Errorf("failed to watch next config: %v", err)
			continue
		}
    	// 处理逻辑忽略 .....
		c.cached.Range(func(key, value interface{}) bool {
			k := key.(string)
			v := value.(Value)
			if n, ok := c.reader.Value(k); ok && reflect.TypeOf(n.Load()) == reflect.TypeOf(v.Load()) && !reflect.DeepEqual(n.Load(), v.Load()) {
				v.Store(n.Load())
				if o, ok := c.observers.Load(k); ok {
					o.(Observer)(k, v)
				}
			}
			return true
		})
	}
}

Scan 将配置转换成结构体

var bc conf.Bootstrap
if err := c.Scan(&bc); err != nil {
    panic(err)
}