2020年11月24日

Aerospike 的使用案例

点击数:13

导入依赖

Java 程序多用Maven 构建,需要导入一下依赖包:

 <!--  KV 缓存 aerospike -->
        <dependency>
            <groupId>com.aerospike</groupId>
            <artifactId>aerospike-client</artifactId>
            <version>4.1.11</version>
        </dependency>

Aerospike 天然支持Netty,如果需要用到 Netty 做 IO 循环事件,需要增加如下 POC 依赖:

  <dependencies>
      <dependency>
      <groupId>com.aerospike</groupId>
       <artifactId>aerospike-client</artifactId>
       <version>4.1.11</version>
    </dependency>

    <dependency>
      <groupId>io.netty</groupId>
      <artifactId>netty-handler</artifactId>
      <version>4.1.11.Final</version>
    </dependency>

    <dependency>
      <groupId>io.netty</groupId>
      <artifactId>netty-transport</artifactId>
      <version>4.1.11.Final</version>
    </dependency>

    <!-- Only needed when using epoll event loops on linux -->
    <dependency>
      <groupId>io.netty</groupId>
      <artifactId>netty-transport-native-epoll</artifactId>
      <classifier>linux-x86_64</classifier>
      <version>4.1.11.Final</version>
    </dependency>
  </dependencies>

配置客户端连接

单点连接

// 方法 1:最简单的方式,使用默认的客户端连接策略
AerospikeClient aerospikeClient = new AerospikeClient("10.57.30.214", 3000);

// 方法 2:自己创建客户端连接策略
// 创建全局写策略
WritePolicy policy = new WritePolicy();
policy.setTimeout(3000); // 单位毫秒
// 创建客户端连接策略
 ClientPolicy clientPolicy = new ClientPolicy();
 clientPolicy.writePolicyDefault = policy;
// 创建客户端
AerospikeClient aerospikeClient = new AerospikeClient(clientPolicy, "10.57.30.214", 3000);

集群连接

集群同步方式:

// 集群 endpoints,逗号分割
String hostStr = "10.57.30.214:3000,10.57.30.215:3000"

String[] arr = hostStr.split(",");  // 通过逗号截取字符串
Host[] hosts = new Host[arr.length];
for (int i = 0; i < arr.length; i++) {
    String[] hostPort = StringUtils.split(arr[i], ":");
    Host host = new Host(hostPort[0],  Integer.valueOf(hostPort[1]));
    hosts[i] = host;
}

WritePolicy wp = new WritePolicy();
wp.setTimeout(3000);

ClientPolicy clientPolicy = new ClientPolicy();
clientPolicy.writePolicyDefault = wp;

AerospikeClient   aerospikeClient = new AerospikeClient(clientPolicy, hosts);

集群异步方式:

// 集群 endpoints,逗号分割
String hostStr = "10.57.30.214:3000,10.57.30.215:3000"

String[] arr = hostStr.split(",");  // 通过逗号截取字符串
Host[] hosts = new Host[arr.length];
for (int i = 0; i < arr.length; i++) {
    String[] hostPort = StringUtils.split(arr[i], ":");
    Host host = new Host(hostPort[0],  Integer.valueOf(hostPort[1]));
    hosts[i] = host;
}

WritePolicy wp = new WritePolicy();
wp.setTimeout(3000);
wp.timeoutDelay = 50;

AsyncClientPolicy clientPolicy = new AsyncClientPolicy();
clientPolicy.writePolicyDefault = wp;

AsyncClient   aerospikeClient = new AsyncClient(clientPolicy, hosts);

Springboot 配置

#--------------- aerospike -----------------#

#as配置
aerospike.endpoints=10.57.30.214:3000
aerospike.namespace=ns1
aerospike.timeout=3000
#as的单台最大tps,压测任意调大,线上1000000,单位为一分钟
aerospike.max.tps=1000000
#as单条record最大长度限制
aerospike.data.maxLen=8388608
package cn.openmind.conf;

import com.aerospike.client.AerospikeClient;
import com.aerospike.client.Host;
import com.aerospike.client.policy.ClientPolicy;
import com.aerospike.client.policy.WritePolicy;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * AerospikeConfig
 *
 * @author zhoujunwen
 * @date 2020-09-16 10:37
 * @desc
 */
@Configuration
@Getter
@Setter
@Slf4j
public class AerospikeConf {
    @Value("{aerospike.namespace}")
    private String namespace;
    @Value("{aerospike.endpoints}")
    private String hostStr;
    @Value("${aerospike.timeout}")
    private Integer timeout;

    @Bean
    public AerospikeClient aerospikeClient() {
        AerospikeClient client = new AerospikeClient(new ClientPolicy(), getHost());
        return client;
    }

    @Bean
    public WritePolicy writePolicy() {
        WritePolicy policy = new WritePolicy();
        policy.setTimeout(getTimeout());
        return policy;
    }


    private Host[] getHost() {
        if (StringUtils.isBlank(getHostStr())) {
            log.warn(" 初始化 arerospike 异常,因为没有配置 host 列表");
            return null;
        }
        String[] arr = getHostStr().split(",");
        Host[] hosts = new Host[arr.length];
        for (int i = 0; i < arr.length; i++) {
            String[] hostPort = StringUtils.split(arr[i], ":");
            Host host = new Host(hostPort[0], Integer.valueOf(hostPort[1]));
            hosts[i] = host;
        }
        return hosts;
    }

}

在需要的地方通过注解注入即可。比如:

@Autowired
private AerospikeClient aerospikeClient;

定义接口

一般而言,我们项目中存在多种缓存机制。大部分的时候,我们优先选择了 Redis,而且对 redis 的数据结构也相对熟悉。因此,在定义接口时,我们需要借鉴 redis client 的方法,方便缓存迁移。

package cn.openmind.cache;

import java.util.List;

/**
 * IKVStorage
 *
 * @author zhoujunwen
 * @date 2020-09-16 09:51
 * @desc
 */
public interface IKVStorage {
    /**
     * 设置 key 的过期时间
     *
     * @param key 缓存 key
     * @param ttl ttl
     * @return
     * @throws Exception
     */
    boolean touch(String key, int ttl) throws Exception;

    /**
     * 自增
     *
     * @param key 缓存 key
     * @param by  自增步长
     * @param ttl
     * @return
     * @throws Exception
     */
    long incr(String key, long by, int ttl) throws Exception;

    /**
     * 删除
     *
     * @param key
     * @return
     * @throws Exception
     */
    boolean delete(String key) throws Exception;

    /**
     * 判断 key 是否存在
     *
     * @param key
     * @return
     * @throws Exception
     */
    boolean exists(String key) throws Exception;

    /**
     * 设值
     *
     * @param key   key
     * @param value 值, 支持对象和基本数据类型
     * @param ttl   ttl
     * @return
     */
    boolean setString(String key, String value, int ttl);

    /**
     * @param key
     * @return
     */
    String getString(String key);


    /**
     * 入队
     *
     * @param key
     * @param value
     * @return
     */
    <T> boolean lpush(String key, T... value) throws Exception;

    /**
     * 从列表中删除某个元素
     *
     * @param key
     * @param value
     * @return
     */
    <T> boolean lrem(String key, T value);

    /**
     * 获取队列成员
     *
     * @param key
     * @param start
     * @param end
     * @return
     */
    List<?> lrange(String key, int start, int end);
}

实现接口

package cn.openmind.cache.impl;

import cn.openmind.conf.AerospikeConf;
import cn.tongdun.kara.biz.service.dal.IKVStorage;
import com.aerospike.client.*;
import com.aerospike.client.cdt.*;
import com.aerospike.client.policy.RecordExistsAction;
import com.aerospike.client.policy.WritePolicy;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Collections;
import java.util.List;

/**
 * AspKVStorageImpl
 *
 * @author zhoujunwen
 * @date 2020-09-16 09:52
 * @desc
 */
@Slf4j
@Component("aspKVStorage")
public class AspKVStorageImpl implements IKVStorage {
    @Autowired
    @Setter
    private AerospikeClient aerospikeClient;
    @Autowired
    @Setter
    private AerospikeConf aerospikeConf;

    // as 的数据集名称,相当于 mysql 表名
    private static final String dataSetName = "kara";

    private static final String BIN = "data";

    private static final Gson gson = new GsonBuilder().excludeFieldsWithoutExposeAnnotation().create();

    /**
     * 设置 key 的过期时间
     *
     * @param key 缓存 key
     * @param ttl ttl
     * @return
     * @throws Exception
     */
    @Override
    public boolean touch(String key, int ttl) throws Exception {
        Key k = new Key(aerospikeConf.getNamespace(), dataSetName, key);
        try {
            WritePolicy wp = new WritePolicy();
            wp.expiration = ttl;
            wp.setTimeout(aerospikeClient.getWritePolicyDefault().totalTimeout);
            aerospikeClient.touch(wp, k);
        } catch (AerospikeException e) {
            log.warn("it happens error when set ttl {} for key {} ", ttl, key);
            throw new Exception(e);
        } finally {
        }
        return true;
    }

    /**
     * 自增
     *
     * @param key 缓存 key
     * @param by  自增步长
     * @param ttl
     * @return
     * @throws Exception
     */
    @Override
    public long incr(String key, long by, int ttl) throws Exception {
        Key k = new Key(aerospikeConf.getNamespace(), dataSetName, key);

        long res;

        try {
            WritePolicy wp = new WritePolicy();
            wp.expiration = ttl;
            wp.setTimeout(aerospikeClient.getWritePolicyDefault().totalTimeout);
            Operation[] opt = new Operation[]{Operation.add(new Bin(BIN, by)), Operation.get(BIN)};

            Record record = aerospikeClient.operate(wp, k, opt);
            if (record == null) {
                return 0L;
            }
            res = record.getLong(BIN);
        } catch (AerospikeException e) {
            throw new Exception(e);
        } finally {
        }

        return res;
    }

    /**
     * 删除
     *
     * @param key
     * @return
     * @throws Exception
     */
    @Override
    public boolean delete(String key) throws Exception {
        Key k = new Key(aerospikeConf.getNamespace(), dataSetName, key);

        boolean res;
        try {
            res = aerospikeClient.delete(null, k);
        } catch (AerospikeException e) {
            throw new Exception(e);
        } finally {
        }

        return res;
    }

    /**
     * 判断 key 是否存在
     *
     * @param key
     * @return
     */
    @Override
    public boolean exists(String key) {
        if (StringUtils.isBlank(key)) {
            return false;
        }
        Key k = new Key(aerospikeConf.getNamespace(), dataSetName, key);
        return aerospikeClient.exists(null, k);
    }

    /**
     * 设值
     *
     * @param key   key
     * @param value 值, 支持对象和普通字符串
     * @param ttl   ttl
     * @return
     */
    @Override
    public boolean setString(String key, String value, int ttl) {
        if (value == null) {
            return false;
        }
        Key k = new Key(aerospikeConf.getNamespace(), dataSetName, key);
        Bin bin = new Bin(BIN, value);
        WritePolicy wp = new WritePolicy();
        wp.expiration = ttl;
        wp.maxRetries = 1;
        wp.recordExistsAction = RecordExistsAction.REPLACE;
        aerospikeClient.put(wp, k, bin);
        return true;
    }

    /**
     * @param key
     * @return
     */
    public String getString(String key) {
        if (key == null) {
            return null;
        }

        try {
            Key k = new Key(aerospikeConf.getNamespace(), dataSetName, key);
            Record r = aerospikeClient.get(null, k);
            return r != null ? r.getString(BIN) : null;
        } catch (Throwable ex) {
            throw ex;
        }
    }

    /**
     * lpush, push 时去重,支持多个值
     *
     * @param key
     * @param value
     * @return
     */
    public <T> boolean lpush(String key, T... value) {
        if (value == null || value.length == 0) {
            return false;
        }
        WritePolicy policy = new WritePolicy();
        Key k = new Key(aerospikeConf.getNamespace(), dataSetName, key);
        ListPolicy listPolicy = new ListPolicy(ListOrder.UNORDERED, ListWriteFlags.ADD_UNIQUE);
        for (T v : value) {
            if (v == null) {
                continue;
            }
            Value val = Value.get(v);
            Operation opt = ListOperation.append(listPolicy, BIN, val);
            try {
                Record rec = aerospikeClient.operate(policy, k, opt);
                log.info("asp lpush return record: [{}]", rec);
            } catch (AerospikeException e) {
                if (e.getResultCode() == ResultCode.ELEMENT_EXISTS) {
                    log.warn("element [{}] already exists in [{}] key", v, key);
                    continue;
                } else {
                    return false;
                }
            } finally {
            }
        }
        return true;
    }

    @Override
    public <T> boolean lrem(String key, T value) {
        Key k = new Key(aerospikeConf.getNamespace(), dataSetName, key);
        Operation opt = ListOperation.removeByValue(BIN, Value.get(value), ListReturnType.COUNT);
        Record r = aerospikeClient.operate(null, k, opt);
        int count = r != null ? r.getInt(BIN) : 0;
        log.info("asp lrem from key:[{}] remove value:[{}] count:[{}]", key, Value.getAsBlob(value), count);
        return true;
    }

    /**
     * 获取队列成员
     *
     * @param key
     * @param start
     * @param end
     * @return
     */
    @Override
    public List<?> lrange(String key, int start, int end) {
        Key k = new Key(aerospikeConf.getNamespace(), dataSetName, key);
        Operation opt = ListOperation.getRange(BIN, start);

        Record r = aerospikeClient.operate(null, k, opt);
        return r != null ? r.getList(BIN) : Collections.emptyList();
    }
}

发表评论