0x00 需求背景

我们部门计划要做一个公共服务管理平台,主要给后端业务提供配置化服务,将配置中心的部分配置迁移到服务平台的可视化界面中开放给运营侧配置。当然我们也提供了文图音视任务状态的管理能力,为补偿失败任务提供了益中解决方案;同时对各个应用集群节点也进行了监控,对于需要将数据放在内存场景的情况进行管控。

算法研发同学也对于服务平台“寄予厚望”,希望我们能提供一套AB测试方案,能后帮助他们对比算法优化前后的效果。一番需求沟通和评审之后,我们对ABtest的设计也就有了一个具体的方案,此处不对设计方案做过多的说明。

ABtest设计方案

图1 ABtest设计方案

为了减少对业务代码的入侵,在上图的设计中,服务心跳、ABTEST的逻辑封装在服务平台提供的SDK中。其中,ABTEST使用了AspectJ实现对业务代码的修改和增强。

为了不破坏原有业务的吞吐量,在开发过程中用到了消息队列kafka同步消息,但是kafka集群的配置是在配置中心提供的,不同的环境、不同的机房配置不一样,问题:如何将动态配置注入到AspectJ的切面中呢?

0x01 使用AspectJ实现业务增强

服务平台封装的SDK是纯Java项目,没有引入Spring框架相关的包。因此,不能简单的使用@Autowire @Resource注解引入spring的对象。

在SDK中我们封装了两个对象。

一个是AbTest的注解类,目的是在需要切入的方法上增加@AbTest(indicator="porn_model_indicator",stream=DemoStrem.class, parser=DemoParser)就可以实现ABtest的功能,其中indicator与创建任务是关联的,目的是定位到该注解indicator和创建的任务是对应则执行abtest,否则不执行;stream是分流器,实现分类策略,SDK定义了Stream接口并提供默认的分流器实现类;parser是解析器,将执行结果解析为符合服务平台所需的数据报文格式,同样SDK定义了Parser接口并提供了默认的解析器实现类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Target({ElementType.METHOD,ElementType.TYPE,ElementType.FIELD,ElementType.LOCAL_VARIABLE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface AbTest {
/**
* 指标
* @return
*/
String indicator();

/**
* 分流器
* @return
*/
Class<? extends Stream> stream() default Stream.class;

/**
* 解析器
* @return
*/
Class<? extends Parser> parser() default Parser.class;
}

另一个是AbTestAspect的切面类,目前是对有@AbTest注解的方法进行增强,并将结果发送到消息队列工服务平台消费。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package xxx;
@Aspect
public class ABTestAspect {
private static final Logger logger = LoggerFactory.getLogger(ABTestAspect.class);

// 这个对象是从引入方的spring管理,如何注入呢?
IProducer producer;

private static ThreadFactory threadName = new ThreadFactoryBuilder()
.setNameFormat("PyxisABTestAspect").build();
private static final ExecutorService EXECUTOR_SERVICE =
new ThreadPoolExecutor(9, 9, 0L,TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1024),
threadName);

public ABTestAspect() {
}

@Around("execution(!void *(..)) && @annotation(xxx.annotation.AbTest)")
public Object around(ProceedingJoinPoint point) throws Throwable {

//执行原逻辑,获取结果
Object s1 = point.proceed();
EXECUTOR_SERVICE.submit(new Runnable() {
@Override
public void run() {
try {
//获取注解类
MethodSignature signature = (MethodSignature) point.getSignature();
AbTest abTest = signature.getMethod().getDeclaredAnnotation(AbTest.class);
//注解信息解析
Object streamObject = abTest.stream().getConstructor().newInstance();
String indicatorList = abTest.indicator();
String[] indicators = indicatorList.split(",");
Stream stream = null;
if (streamObject instanceof DemoStream) {
stream = (DemoStream) streamObject;
} else {
logger.error("ABTestAspect 注解参数有误,分流器不存在");
return;
}

//获取目标方法的参数列表
Object[] args = point.getArgs();
String[] parameterNames = signature.getParameterNames();
HashMap<String, Object> paraMap = new HashMap<>();
for (int index = 0; index < args.length; index++) {
paraMap.put(parameterNames[index], args[index]);
}

//获取abtest任务信息
//通过指标数组,获取每个指标对应的已经开启的task,并按合作方过滤
// ...

//对每个task走分流,找出是否配置了与此流量对应的模型,如果有就返回url和groupTag
//定义结果集
//获取实验组结果并加入结果集

//加入A组结果
//一条流量可能进入多个Task,生成多条A组结果
//
//结果上报,通过kafka发回给pyxis服务端
upload(resultArraylist);
} catch (Throwable e) {
logger.error("ABtest切面执行异常");
e.printStackTrace();
}
}
});

//返回正常逻辑的处理结果
return s1;
}

private void upload(List<Result> resultArraylist) {
for (Result result : resultArraylist) {
try {
final String msgKey = UUID.randomUUID().toString();
// 注意,这里用到了producer
producer.produce("report_abtest_result", msgKey, JSONObject.toJSONString(result).getBytes("UTF-8"));
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

0x10 解决问题

@AspectJ注解的切面是在Spring容器之外创建的,因此不受Spring容器管理声明周期,并且该实例是单例,在@AspectJ注解的类中,编译后我们就会发现多一个方法:aspectOf(),Aspect切面对象也就是由该方法创建。

想要在Spring中对切面进行有效注入,那么必须将该类交由Spring管理,这就很明显了,创建一个bean放在Spring的容器中,创建方法必须是aspectOf()

**此外,需要对注入的对象增加setter方法。**我们在ABTestAspect中增加下面方法

1
2
3
public void setProducer(IProducer producer) {
this.producer = producer;
}
  • xml方式:
1
2
3
4
<!-- 注意 syncProducer 应该是你已经创建好的bean -->
<bean id="aBTestAspect" class="xxx.ABTestAspect" factory-method="aspectOf" >
<property name="producer" ref="syncProducer"></property>
</bean>
  • JavaConfig方式:
1
2
3
4
5
6
7
8
9
10
11
12
/**
* SimpleProducer 是 IProducer 接口的实现类
**/
@Autowire
private SimpleProducer producer;

@Bean
public ABTestAspect theAspect() {
ABTestAspect aspect = Aspects.aspectOf(ABTestAspect.class);
aspect.setProducer(producer)
return aspect;
}