Skip to content

Commit

Permalink
feat: BMW: 刷新Consul时,添加ModifyIndex参数作为CAS锁 --story=120680830 (#615)
Browse files Browse the repository at this point in the history
  • Loading branch information
EASYGOING45 authored Nov 19, 2024
1 parent cf233a9 commit 6e7b986
Show file tree
Hide file tree
Showing 20 changed files with 199 additions and 85 deletions.
1 change: 1 addition & 0 deletions pkg/bk-monitor-worker/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ require (
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/spf13/afero v1.9.5 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/subosito/gotenv v1.4.2 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.2.12 // indirect
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (c *MetadataCenter) AddDataId(dataId string) error {

func (c *MetadataCenter) fillInfo(dataId string, info *DataIdInfo) error {
key := fmt.Sprintf("%s/apm/data_id/%s", config.StorageConsulPathPrefix, dataId)
bytesData, err := c.Consul.Get(key)
_, bytesData, err := c.Consul.Get(key)
if err != nil {
return fmt.Errorf("failed to get key: %s from Consul. error: %s", key, err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/bk-monitor-worker/internal/metadata/models/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func RefreshRouterVersion(ctx context.Context, path string) error {
return err
}
timestamp := strconv.FormatInt(time.Now().Unix(), 10)
err = client.Put(path, timestamp, 0)
err = client.Put(path, timestamp, 0, 0)
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func RefreshInfluxdbClusterInfoConsulClusterConfig(ctx context.Context, objs *[]
if err != nil {
return err
}
err = hashconsul.Put(consulClient, consulConfigPath, val)
err = hashconsul.PutCas(consulClient, consulConfigPath, val, 0, nil)
if err != nil {
logger.Errorf("consul path [%s] refresh with value [%s] failed, %v", consulConfigPath, val, err)
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (i InfluxdbHostInfo) RefreshConsulClusterConfig(ctx context.Context) error
return err
}
// 更新consul信息
err = hashconsul.Put(consulClient, i.ConsulConfigPath(), configStr)
err = hashconsul.PutCas(consulClient, i.ConsulConfigPath(), configStr, 0, nil)
if err != nil {
logger.Errorf("host: [%s] refresh consul config failed, %v", i.HostName, err)
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (i InfluxdbStorage) RefreshConsulClusterConfig(ctx context.Context, isPubli
if err != nil {
return err
}
err = hashconsul.Put(consulClient, i.ConsulConfigPath(), val)
err = hashconsul.PutCas(consulClient, i.ConsulConfigPath(), val, 0, nil)
if err != nil {
logger.Errorf("put consul path [%s] value [%s] err, %v", i.ConsulConfigPath(), val, err)
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (i InfluxdbTagInfo) AddConsulInfo(ctx context.Context) error {
if err != nil {
return err
}
err = hashconsul.Put(consulClient, i.ConsulConfigPath(), val)
err = hashconsul.PutCas(consulClient, i.ConsulConfigPath(), val, 0, nil)
if err != nil {
return err
}
Expand All @@ -150,7 +150,7 @@ func (i InfluxdbTagInfo) GetConsulInfo(ctx context.Context) (*TagItemInfo, error
if err != nil {
return nil, err
}
dataBytes, err := consulClient.Get(i.ConsulConfigPath())
_, dataBytes, err := consulClient.Get(i.ConsulConfigPath())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -181,7 +181,7 @@ func (i InfluxdbTagInfo) ModifyConsulInfo(ctx context.Context, oldInfo TagItemIn
if err != nil {
return err
}
err = hashconsul.Put(consulClient, i.ConsulConfigPath(), val)
err = hashconsul.PutCas(consulClient, i.ConsulConfigPath(), val, 0, nil)

models.PushToRedis(ctx, models.InfluxdbTagInfoKey, i.RedisField(), val)
return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ func TestBcsClusterInfoSvc_RegisterCluster(t *testing.T) {
return nil
})
defer patch.Reset()
gomonkey.ApplyFunc(hashconsul.Put, func(c *consul.Instance, key, val string) error { return nil })
gomonkey.ApplyFunc(hashconsul.PutCas, func(c *consul.Instance, key, val string) error { return nil })
cluster, err := NewBcsClusterInfoSvc(nil).RegisterCluster(bkBizId, clusterID, projectId, "test")
assert.NoError(t, err)
dataIdList := []uint{cluster.K8sMetricDataID, cluster.CustomMetricDataID, cluster.K8sEventDataID}
Expand Down
14 changes: 9 additions & 5 deletions pkg/bk-monitor-worker/internal/metadata/service/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (d DataSourceSvc) CreateDataSource(dataName, etcConfig, operator, sourceLab
tx.Commit()
}
// 触发consul刷新
err = NewDataSourceSvc(&ds).RefreshOuterConfig(context.Background())
err = NewDataSourceSvc(&ds).RefreshOuterConfig(context.Background(), 0, nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -528,7 +528,7 @@ func (d DataSourceSvc) GseRouteConfig() (*bkgse.GSERoute, error) {
}

// RefreshConsulConfig 更新consul配置,告知ETL等其他依赖模块配置有所更新
func (d DataSourceSvc) RefreshConsulConfig(ctx context.Context) error {
func (d DataSourceSvc) RefreshConsulConfig(ctx context.Context, modifyIndex uint64, oldValueBytes []byte) error {
logger.Infof("RefreshConsulConfig:data_id [%d] started to refresh consul config", d.BkDataId)
// 如果数据源没有启用,则不用刷新 consul 配置
if !d.CanRefreshConfig() {
Expand All @@ -542,10 +542,14 @@ func (d DataSourceSvc) RefreshConsulConfig(ctx context.Context) error {
return nil
}
}

// 获取Consul句柄
consulClient, err := consul.GetInstance()
if err != nil {
logger.Errorf("RefreshConsulConfig:data_id [%d] get consul client failed, %v", d.BkDataId, err)
return err
}

val, err := d.ToJson(true, true)
if err != nil {
return errors.Wrap(err, "RefreshConsulConfig:datasource to_json failed")
Expand All @@ -554,7 +558,7 @@ func (d DataSourceSvc) RefreshConsulConfig(ctx context.Context) error {
if err != nil {
return err
}
err = hashconsul.Put(consulClient, d.ConsulConfigPath(), valStr)
err = hashconsul.PutCas(consulClient, d.ConsulConfigPath(), valStr, modifyIndex, oldValueBytes)
if err != nil {
logger.Errorf("RefreshConsulConfig:data_id [%v] put [%s] to [%s] failed, %v", d.BkDataId, valStr, d.ConsulConfigPath(), err)
return err
Expand All @@ -563,7 +567,7 @@ func (d DataSourceSvc) RefreshConsulConfig(ctx context.Context) error {
return nil
}

func (d DataSourceSvc) RefreshOuterConfig(ctx context.Context) error {
func (d DataSourceSvc) RefreshOuterConfig(ctx context.Context, modifyIndex uint64, oldValueBytes []byte) error {
if !d.IsEnable {
logger.Infof("data_id [%d] is not enable, nothing will refresh to outer systems.", d.BkDataId)
return nil
Expand All @@ -575,7 +579,7 @@ func (d DataSourceSvc) RefreshOuterConfig(ctx context.Context) error {
logger.Errorf("data_id [%d] refresh gse config failed, %v", d.BkDataId, err)
}

err = d.RefreshConsulConfig(ctx)
err = d.RefreshConsulConfig(ctx, modifyIndex, oldValueBytes)
if err != nil {
logger.Errorf("data_id [%d] refresh consul config failed, %v", d.BkDataId, err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ func (r ResultTableSvc) RefreshEtlConfig() error {
if err := resulttable.NewDataSourceQuerySet(db).BkDataIdEq(dsrt.BkDataId).One(&ds); err != nil {
return err
}
if err := NewDataSourceSvc(&ds).RefreshConsulConfig(context.TODO()); err != nil {
if err := NewDataSourceSvc(&ds).RefreshConsulConfig(context.TODO(), 0, nil); err != nil {
return err
}
logger.Infof("RefreshEtlConfig:table_id [%s] refresh etl config success", r.TableId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
func TestResultTableSvc_CreateResultTable(t *testing.T) {
mocker.InitTestDBConfig("../../../bmw_test.yaml")
gomonkey.ApplyPrivateMethod(InfluxdbStorageSvc{}, "syncDb", func(_ InfluxdbStorageSvc) error { return nil })
gomonkey.ApplyFunc(hashconsul.Put, func(c *consul.Instance, key, val string) error { return nil })
gomonkey.ApplyFunc(hashconsul.PutCas, func(c *consul.Instance, key, val string) error { return nil })
gomonkey.ApplyMethod(&http.Client{}, "Do", func(t *http.Client, req *http.Request) (*http.Response, error) {
var data string
if strings.Contains(req.URL.Path, "v1/kv") {
Expand Down
45 changes: 32 additions & 13 deletions pkg/bk-monitor-worker/internal/metadata/task/config_refresh.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/internal/metadata/models/resulttable"
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/internal/metadata/models/storage"
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/internal/metadata/service"
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/store/consul"
consulSvc "github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/store/consul"
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/store/mysql"
t "github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/task"
Expand Down Expand Up @@ -182,21 +183,21 @@ func RefreshInfluxdbRoute(ctx context.Context, t *t.Task) error {
func RefreshDatasource(ctx context.Context, t *t.Task) error {
defer func() {
if err := recover(); err != nil {
logger.Errorf("RefreshDatasource Runtime panic caught: %v", err)
logger.Errorf("RefreshDatasource: Runtime panic caught: %v", err)
}
}()

logger.Infof("start to refresh data source, start_time: %s", time.Now().Truncate(time.Second))
logger.Infof("RefreshDatasource: start to refresh data source, start_time: %s", time.Now().Truncate(time.Second))

db := mysql.GetDBSession().DB
// 过滤满足条件的记录
var dataSourceRtList []resulttable.DataSourceResultTable
if err := resulttable.NewDataSourceResultTableQuerySet(db).Select("bk_data_id", "table_id").All(&dataSourceRtList); err != nil {
logger.Errorf("query datasourceresulttable record error, %v", err)
logger.Errorf("RefreshDatasource: query datasourceresulttable record error, %v", err)
return err
}
if len(dataSourceRtList) == 0 {
logger.Infof("no data source need update, skip")
logger.Infof("RefreshDatasource: no data source need update, skip")
return nil
}

Expand All @@ -212,7 +213,7 @@ func RefreshDatasource(ctx context.Context, t *t.Task) error {
for _, chunkRts := range slicex.ChunkSlice(rtList, 0) {
var tempList []resulttable.ResultTable
if err := resulttable.NewResultTableQuerySet(db).IsDeletedEq(false).IsEnableEq(true).TableIdIn(chunkRts...).Select("table_id").All(&tempList); err != nil {
logger.Errorf("query enabled result table error, %v", err)
logger.Errorf("RefreshDatasource: query enabled result table error, %v", err)
continue
}
// 组装数据
Expand All @@ -225,7 +226,7 @@ func RefreshDatasource(ctx context.Context, t *t.Task) error {
}
// 如果可用的结果表为空,则忽略
if len(enabledRtList) == 0 {
logger.Warn("not found enabled result by result_table, skip")
logger.Warn("RefreshDatasource: not found enabled result by result_table, skip")
return nil
}
// 过滤到可用的数据源
Expand All @@ -244,12 +245,12 @@ func RefreshDatasource(ctx context.Context, t *t.Task) error {
// data id 数量可控,先不拆分;仅刷新未迁移到计算平台的数据源 ID 及通过 gse 创建的数据源 ID
if err := resulttable.NewDataSourceQuerySet(db).CreatedFromEq(common.DataIdFromBkGse).IsEnableEq(true).
BkDataIdIn(dataIdList...).OrderDescByLastModifyTime().All(&dataSourceList); err != nil {
logger.Errorf("query datasource record error, %v", err)
logger.Errorf("RefreshDatasource: query datasource record error, %v", err)
return err
}

if len(dataSourceList) == 0 {
logger.Infof("no datasource need update")
logger.Infof("RefreshDatasource: no datasource need update")
return nil
}

Expand All @@ -264,17 +265,35 @@ func RefreshDatasource(ctx context.Context, t *t.Task) error {
wg.Done()
}()
dsSvc := service.NewDataSourceSvc(&ds)
if err := dsSvc.RefreshOuterConfig(ctx); err != nil {
logger.Errorf("data_id [%v] failed to refresh outer config, %v", dsSvc.BkDataId, err)
} else {
logger.Infof("data_id [%v] refresh all outer success", dsSvc.BkDataId)
consulClient, err := consul.GetInstance()
if err != nil {
logger.Errorf("RefreshDatasource: data_id [%v] failed to get consul client, %v,skip", dsSvc.BkDataId, err)
return
}

oldIndex, oldValueBytes, err := consulClient.Get(dsSvc.ConsulConfigPath())
if err != nil {
logger.Errorf("RefreshDatasource: data_id [%v] failed to get old value from [%v], %v, will set modifyIndex as 0", dsSvc.BkDataId, dsSvc.ConsulConfigPath(), err)
return
}
if oldValueBytes == nil {
logger.Infof("RefreshDatasource: data_id [%v] consul path [%v] not found, will set modifyIndex as 0", dsSvc.BkDataId, dsSvc.ConsulConfigPath())
}
modifyIndex := oldIndex

logger.Infof("RefreshDatasource: data_id [%v] try to refresh consul config, modifyIndex: %v", dsSvc.BkDataId, modifyIndex)

if err := dsSvc.RefreshOuterConfig(ctx, modifyIndex, oldValueBytes); err != nil {
logger.Errorf("RefreshDatasource: data_id [%v] failed to refresh outer config, %v", dsSvc.BkDataId, err)
return
}
logger.Infof("RefreshDatasource: data_id [%v] refresh all outer success", dsSvc.BkDataId)
}(dataSource, wg, ch)

}
wg.Wait()

logger.Infof("refresh data source end, end_time: %s", time.Now().Truncate(time.Second))
logger.Infof("RefreshDatasource: refresh data source end, end_time: %s", time.Now().Truncate(time.Second))
return nil
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/bk-monitor-worker/script/diff/consul/consuldiff.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,9 @@ func comparePathData(bypassPathList *[]string) {
}

func output(srcPath string, bypassPath string) (bool, error) {
srcData, _ := srcConsulClient.Get(srcPath)
bypassData, _ := bypassConsulClient.Get(bypassPath)

_, srcData, _ := srcConsulClient.Get(srcPath)
_, bypassData, _ := bypassConsulClient.Get(bypassPath)
srcDataJson := string(srcData)
bypassDataJson := string(bypassData)
// 优先判断字符串匹配,如果可以,则进行
Expand Down
6 changes: 4 additions & 2 deletions pkg/bk-monitor-worker/script/diff/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/bytedance/sonic v1.12.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/deckarep/golang-set v1.8.0 // indirect
github.com/fatih/color v1.15.0 // indirect
Expand All @@ -37,8 +36,11 @@ require (
)

require (
github.com/chenzhuoyu/iasm v0.9.0 // indirect
github.com/bytedance/sonic/loader v0.2.0 // indirect
github.com/cloudwego/base64x v0.1.4 // indirect
github.com/cloudwego/iasm v0.2.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
Expand Down
Loading

0 comments on commit 6e7b986

Please sign in to comment.