分类
未分类

集合按某个对象属性值排序





List<OrderDto> orderSumDtoList = new ArrayList<>();
List<OrderDto> orderSumDtoListDesc = new ArrayList<>();
orderSumDtoListOrderDesc = orderSumDtoList.stream().sorted(Comparator.comparing(OrderSumDto::getPrice).reversed())
        .collect(Collectors.toList());
分类
未分类

Docker中创建Consul集群

一、Consul简介

Consul 是一套开源的分布式服务发现和配置管理系统,由 HashiCorp 公司用 Go 语言开发。它具有很多优点。包括:基于 raft 协议,比较简洁; 支持健康检查, 同时支持 HTTP 和 DNS 协议 支持跨数据中心的 WAN(广域网) 集群 提供图形界面 跨平台,支持 Linux、Mac、Windows。

consul是使用go语言开发的服务发现、配置管理中心服务。内置了服务注册与发现框 架、分布一致性协议实现、健康检查、Key/Value存储、多数据中心方案,不再需要依赖其他工具(比如ZooKeeper等)。服务部署简单,只有一个可运行的二进制的包。每个节点都需要运行agent,他有两种运行模式server和client。每个数据中心官方建议需要3或5个server节点以保证数据安全,同时保证server-leader的选举能够正确的进行。

@client

CLIENT表示consul的client模式,就是客户端模式。是consul节点的一种模式,这种模式下,所有注册到当前节点的服务会被转发到SERVER,本身是不持久化这些信息。

@server

SERVER表示consul的server模式,表明这个consul是个server,这种模式下,功能和CLIENT都一样,唯一不同的是,它会把所有的信息持久化的本地,这样遇到故障,信息是可以被保留的。

@server-leader

中间那个SERVER下面有LEADER的字眼,表明这个SERVER是它们的老大,它和其它SERVER不一样的一点是,它需要负责同步注册的信息给其它的SERVER,同时也要负责各个节点的健康监测。

@raft(分布式一致性协议)

server节点之间的数据一致性保证,一致性协议使用的是raft,而zookeeper用的paxos,etcd采用的也是raft。

@服务发现协议

consul采用http和dns协议,etcd只支持http

@服务注册

consul支持两种方式实现服务注册,一种是通过consul的服务注册http API,由服务自己调用API实现注册,另一种方式是通过json个是的配置文件实现注册,将需要注册的服务以json格式的配置文件给出。consul官方建议使用第二种方式。

@服务发现

consul支持两种方式实现服务发现,一种是通过http API来查询有哪些服务,另外一种是通过consul agent 自带的DNS(8600端口),域名是以NAME.service.consul的形式给出,NAME即在定义的服务配置文件中,服务的名称。DNS方式可以通过check的方式检查服务。

@服务间的通信协议

Consul使用gossip协议管理成员关系、广播消息到整个集群,他有两个gossip pool(LAN pool和WAN pool),LAN pool是同一个数据中心内部通信的,WAN pool是多个数据中心通信的,LAN pool有多个,WAN pool只有一个。

二、基本概念

在描述架构之前,这里提供了一些术语来帮助声明正在探讨的东西:

  • Agent——agent是一直运行在Consul集群中每个成员上的守护进程。通过运行 consul agent 来启动。agent可以运行在client或者server模式。指定节点作为client或者server是非常简单的,除非有其他agent实例。所有的agent都能运行DNS或者HTTP接口,并负责运行时检查和保持服务同步。
  • Client——一个Client是一个转发所有RPC到server的代理。这个client是相对无状态的。client唯一执行的后台活动是加入LAN gossip池。这有一个最低的资源开销并且仅消耗少量的网络带宽。
  • Server——一个server是一个有一组扩展功能的代理,这些功能包括参与Raft选举,维护集群状态,响应RPC查询,与其他数据中心交互WAN gossip和转发查询给leader或者远程数据中心。
  • DataCenter——虽然数据中心的定义是显而易见的,但是有一些细微的细节必须考虑。例如,在EC2中,多个可用区域被认为组成一个数据中心?我们定义数据中心为一个私有的,低延迟和高带宽的一个网络环境。这不包括访问公共网络,但是对于我们而言,同一个EC2中的多个可用区域可以被认为是一个数据中心的一部分。
  • Consensus——在我们的文档中,我们使用Consensus来表明就leader选举和事务的顺序达成一致。由于这些事务都被应用到有限状态机上,Consensus暗示复制状态机的一致性。
  • Gossip——Consul建立在Serf的基础之上,它提供了一个用于多播目的的完整的gossip协议。Serf提供成员关系,故障检测和事件广播。更多的信息在gossip文档中描述。这足以知道gossip使用基于UDP的随机的点到点通信。
  • LAN Gossip——它包含所有位于同一个局域网或者数据中心的所有节点。
  • WAN Gossip——它只包含Server。这些server主要分布在不同的数据中心并且通常通过因特网或者广域网通信。
  • RPC——远程过程调用。这是一个允许client请求server的请求/响应机制。

Consul Architecture 架构图

image.png

拆解开这个体系,从每一个组件开始了解。首先,可以看到有两个数据中心,分别标记为“one”和“two”。Consul是支持多数据中心一流,并且是常用业务场景。

每个数据中心都是由Server和client组成。建议有3~5 Server——基于故障处理和性能的平衡之策。如果增加越多的机器,则Consensus会越来越慢。对client没有限制,可以很容易地扩展到成千上万或数万。

同一个数据中心的所有节点都要加入Gossip协议。这意味着gossip pool包含给定数据中心的所有节点。有以下目的:首先,没有必要为client配置服务器地址参数;发现是自动完成的。第二,节点故障检测的工作不是放置在服务器上,而是分布式的。这使故障检测比心跳机制更可扩展性。第三,可用来作为消息层通知重要的事件,如leader选举。

每个数据中心的服务器都是属于一个Raft peer。这意味着,他们一起工作,选出一个的Leader,Leader server是有额外的职责。负责处理所有的查询和事务。事务也必须通过Consensus协议复制到所有的伙伴。由于这一要求,当非Leader Server接收到一个RPC请求,会转发到集群的leader。

Server节点也是作为WAN gossip pool的一部分。这个pool是与LAN gossip pool是不同的,它为具有更高延迟的网络响应做了优化,并且可能包括其他consul集群的server节点。设计WANpool的目的是让数据中心能够以low-touch的方式发现彼此。将一个新的数据中心加入现有的WAN Gossip是很容易的。因为池中的所有Server都是可控制的,这也使跨数据中心的要求。当一个Serfer接收到不同的数据中心的要求时,它把这个请求转发给相应数据中心的任一Server。然后,接收到请求的Server可能会转发给Leader。

多个数据中心之间是低耦合,但由于故障检测、连接缓存复用、跨数据中心要求快速和可靠的响应。

三、Docker中创建Consul集群

1 使用docker下载consul镜像,默认下载最consul最新版本,目前版本号为1.4.0,如果需要其他版本请登录https://hub.docker.com/进行搜索
2 下载完毕后分别创建/home/docker/consul、consul-server1-data、consul-server2-data、consul-server3-data、consul-client-data、consul-server1-conf、consul-server2-conf、consul-server3-conf、consul-client-conf这九个文件夹

[root@localhost ~]# cd /home

[root@localhost home]# mkdir docker

[root@localhost home]# cd docker

[root@localhost docker]# mkdir consul

[root@localhost docker]# cd consul

[root@localhost consul]# mkdir consul-server1-data consul-server2-data consul-server3-data  consul-client-data consul-server1-conf consul-server2-conf consul-server3-conf consul-client-conf

3 创建Consul配置文件
3.1 在docker中每个Consul成员都是docker中的一个容器,docker会给每个容器分配容器的IP地址,容器IP地址只能用于容器之间内部通讯不能被宿主机直接访问,每个Consul容器IP同时也是Consul成员的Agentd守护进程的IP地址,创建Consul集群需要其他Consul容器加入同一个Consul容器的Agentd守护进程的IP地址,将该Consul容器作为Consul容器的leader,当该Consul容器挂掉时,Consul集群会从所有Agentd守护进程的IP地址中再选举出一个leader,但当宿主及重启,docker中所有容器的IP地址都会发生变化,Consul集群中的每个成员的IP地址也发生变化,原本是Consul容器的IP地址可能变成了Mysql容器的IP地址,这样每个Consul成员无法自动加入原来Consul容器的Agentd守护进程的IP地址,Consul集群就会报错,解决方案是在所有Consul节点服务的配置文件中,配置参数”retry_join”,将docker中所有容器的IP都作为加入同一个Consul容器的Agentd守护进程的IP地址,任何一个Consul成员的Agent守护进程只需要知道集群中任意一个节点即可,加入到集群之后,集群节点之间会根据GOSSIP协议互相发现彼此的关系
要先统计docker中所有容器已经被使用的IP地址,将没有被使用的空闲的IP地址作为Consul容器Agentd守护进程的IP地址
3.1.1 查询docker中所有容器IP地址

[root@bogon consul-server1-conf]# docker inspect -f '{{.NetworkSettings.IPAddress}}' $(docker ps -q)
172.17.0.2
172.17.0.5



172.17.0.8
172.17.0.7

3.1.2 统计出已经使用的IP地址后,将空闲的172.17.0.6、172.17.0.9、172.17.0.4、172.17.0.3这四个准备使用的IP地址,作为Consul容器的IP地址,并将所有docker容器中所有已经使用的IP地址和准备使用的IP地址写入每个Consul节点服务的配置文件的配置参数”retry_join”中, “retry_join”: [“172.17.0.2″,”172.17.0.3″,”172.17.0.4″,”172.17.0.5″,”172.17.0.6″,”172.17.0.7″,”172.17.0.8″,”172.17.0.9”]
3.2 创建consul-server1节点服务配置文件
3.2.1 进入consul-server1-conf文件夹创建consul-server1.json文件

[root@localhost consul]# cd consul-server1-conf/
[root@localhost consul-server1-conf]# touch consul-server1.json 

3.2.2 vi编辑consul-server1.json配置文件,复制下列代码

{
    "datacenter": "DC1",
    "data_dir": "/consul/data",
    "log_level": "INFO",
    "node_name": "consul-server1",
    "server": true,
    "bootstrap_expect": 1,
    "retry_join": ["172.17.0.2","172.17.0.3","172.17.0.4","172.17.0.5","172.17.0.6","172.17.0.7","172.17.0.8","172.17.0.9"],
    "retry_interval": "3s",
    "enable_debug": false,
    "rejoin_after_leave": true,
    "enable_syslog": false
}

3.3 创建consul-server2节点服务配置文件
3.3.1 进入consul-server2-conf文件夹创建consul-server2.json文件

[root@localhost consul]# cd consul-server2-conf/
[root@localhost consul-server2-conf]# touch consul-server2.json 

3.3.2 vi编辑consul-server2.json配置文件,复制下列代码

{
    "datacenter": "DC1",
    "data_dir": "/consul/data",
    "log_level": "INFO",
    "node_name": "consul-server2",
    "server": true,
    "bootstrap_expect": 2,
    "retry_join": ["172.17.0.2","172.17.0.3","172.17.0.4","172.17.0.5","172.17.0.6","172.17.0.7","172.17.0.8","172.17.0.9"],
    "retry_interval": "3s",
    "enable_debug": false,
    "rejoin_after_leave": true,
    "enable_syslog": false
}

3.4 创建consul-server3节点服务配置文件
3.4.1 进入consul-server3-conf文件夹创建consul-server3.json文件

[root@localhost consul]# cd consul-server3-conf/
[root@localhost consul-server3-conf]# touch consul-server3.json 

3.4.2 vi编辑consul-server3.json配置文件,复制下列代码

{
    "datacenter": "DC1",
    "data_dir": "/consul/data",
    "log_level": "INFO",
    "node_name": "consul-server3",
    "server": true,
    "bootstrap_expect": 2,
    "retry_join": ["172.17.0.2","172.17.0.3","172.17.0.4","172.17.0.5","172.17.0.6","172.17.0.7","172.17.0.8","172.17.0.9"],
    "retry_interval": "3s",
    "enable_debug": false,
    "rejoin_after_leave": true,
    "enable_syslog": false
}

3.4 创建consul-client节点服务配置文件
3.4.1 进入consul-client-conf文件夹创建consul-client.json文件

[root@localhost consul]# cd consul-client-conf/
[root@localhost consul-client-conf]# touch consul-client.json 

3.4.2 vi编辑consul-client.json配置文件,复制下列代码

{
  "datacenter": "DC1",             
  "data_dir": "/consul/data",
  "log_level": "INFO",
  "node_name": "consul-client",
  "server": false,
  "ui": true,
  "bootstrap_expect": 0,
  "bind_addr": "192.168.43.234",
  "client_addr": "192.168.43.234",
  "retry_join": ["172.17.0.2","172.17.0.3","172.17.0.4","172.17.0.5","172.17.0.6","172.17.0.7","172.17.0.8","172.17.0.9"],
  "retry_interval": "3s", 
  "enable_debug": false,
  "rejoin_after_leave": true,
  "enable_syslog": false
}

3.5 配置参数说明
datacenter: 数据中心名称
data_di:ConsulServer模式节点的数据目录
log_level: “INFO”:日志级别
node_name:当前节点名称
server:是否为 Server 模式,true 为 Server 模式,false 为 Client 模式
ui:是否开启 UI 访问
bootstrap_expect:启动时期望的就绪节点,1 代表启动为 bootstrap 模式,等待其他节点加入
bind_addr:绑定的 IP,ConsulServer模式无需指定,ConsulClient模式必须绑定宿主机IP地址,否则报错[Consul]Error starting agent: Failed to get advertise address: Multiple private IPs found.
client_addr:作为 Client 接受请求的绑定 IP地址,该IP地址必须为宿主机IP地址,否则访问宿主机无法访问到Client模式的Consul节点,端口使用了 HTTP: 8500, DNS: 8600
retry_join:尝试加入的其他节点
retry_interval:每次尝试间隔
raft_protocol:Raft 协议版本
enable_debug:是否开启 Debug 模式
rejoin_after_leave:允许重新加入集群
enable_syslog:是否开启 syslog
4 启动三个server模式的consul节点consul-server1、consul-server2、consul-server3,启动一个client模式的consul节点consul-client

docker run -d --name consul-server1 --restart=always -v /home/docker/consul/consul-server1-data:/consul/data -v /home/docker/consul/consul-server1-conf:/consul/config consul agent -data-dir /consul/data -config-dir /consul/config

docker run -d --name consul-server2 --restart=always -v /home/docker/consul/consul-server2-data:/consul/data -v /home/docker/consul/consul-server2-conf:/consul/config consul agent -data-dir /consul/data -config-dir /consul/config

docker run -d --name consul-server3 --restart=always -v /home/docker/consul/consul-server3-data:/consul/data -v /home/docker/consul/consul-server3-conf:/consul/config consul agent -data-dir /consul/data -config-dir /consul/config

docker run -d --net=host --name consul-client --restart=always -p 8400:8400 -p 8500:8500 -p 8600:53/udp -v /home/docker/consul/consul-client-data:/consul/data -v /home/docker/consul/consul-client-conf:/consul/config consul agent  -data-dir /consul/data -config-dir /consul/config

5 查看consul客户端、consul容器是否启动

[root@localhost ~]# docker ps -a
CONTAINER ID        IMAGE               COMMAND                  CREATED             STATUS              PORTS                                                        NAMES
e46bf7bae81d        consul              "docker-entrypoint..."   About an hour ago   Up About an hour                                                                 consul-client
954f366b5a9a        consul              "docker-entrypoint..."   About an hour ago   Up About an hour    8300-8302/tcp, 8500/tcp, 8301-8302/udp, 8600/tcp, 8600/udp   consul-server3
54feb6ee26c4        consul              "docker-entrypoint..."   About an hour ago   Up About an hour    8300-8302/tcp, 8500/tcp, 8301-8302/udp, 8600/tcp, 8600/udp   consul-server2
5beadc6c1bec        consul              "docker-entrypoint..."   About an hour ago   Up About an hour    8300-8302/tcp, 8500/tcp, 8301-8302/udp, 8600/tcp, 8600/udp   consul-server1

6 查看ConsulServer模式、Client模式节点是否启动成功

[root@localhost ~]# docker exec -it consul-server1 /bin/sh
/ # consul members
Node            Address              Status  Type    Build  Protocol  DC   Segment
consul-server1  172.17.0.4:8301      alive   server  1.4.0  2         dc1  <all>
consul-server2  172.17.0.9:8301      alive   server  1.4.0  2         dc1  <all>
consul-server3  172.17.0.6:8301      alive   server  1.4.0  2         dc1  <all>
consul-client   192.168.43.234:8301  alive   client  1.4.0  2         dc1  <default>

7 查看目前全部的consul节点的角色状态

/ # consul operator raft list-peers
Node            ID                                    Address          State     Voter  RaftProtocol
consul-server3  bce5f51e-fada-8a13-7639-e205da35efe6  172.17.0.6:8300  follower  true   3
consul-server2  0b4cbb28-b03a-948f-c4f2-a6e00b6e0d06  172.17.0.9:8300  leader    true   3
consul-server1  952d468c-0391-2f98-4959-f62965401551  172.17.0.4:8300  follower  true   3

8 consult集群容器启动成功,打开浏览器输入http://192.168.43.234:8500,consul默认页面ui端口号8500

image.png

可以看到Node Health为3,则说明consul集群配置成功

9 错误处理

CONTAINER ID        IMAGE               COMMAND                  CREATED             STATUS                     PORTS                                                        NAMES
79032d57a304        consul              "docker-entrypoint..."   3 seconds ago       Exited (1) 2 seconds ago                                                                consul-client
53ef8a6fdb56        consul              "docker-entrypoint..."   11 minutes ago      Up 11 minutes              8300-8302/tcp, 8500/tcp, 8301-8302/udp, 8600/tcp, 8600/udp   consul-server3
d1f7c98d07f5        consul              "docker-entrypoint..."   12 minutes ago      Up 12 minutes              8300-8302/tcp, 8500/tcp, 8301-8302/udp, 8600/tcp, 8600/udp   consul-server2
86e45d805eb8        consul              "docker-entrypoint..."   12 minutes ago      Up 12 minutes              8300-8302/tcp, 8500/tcp, 8301-8302/udp, 8600/tcp, 8600/udp   consul-server1

8.1 如果consul客户端、consul服务端容器是未能正常启动,则查看该consul容器日志

docker logs consulclient

9 命令说明
–net=host:指定 docker网络模式为host模式共享宿主机的网络,若采用默认的bridge模式,则会存在容器跨主机间通信失败的问题
-v /data/consul_data/data:/consul/data:主机的数据目录挂载到容器的/consul/data下,因为该容器默认的数据写入位置即是/consul/data
-v /data/consul_data/conf:/consul/config:主机的配置目录挂载到容器的/consul/conf下,因为该容器默认的数据写入位置即是/consul/conf
consul agent -server:consul的server启动模式
consul agent -client:consul的client启动模式
consul agent -bind=192.168.43.234:consul绑定到主机的ip上
consul agent -bootstrap-expect=3:server要想启动,需要至少3个server
consul agent -data-dir /consul/data:consul的数据目录
consul agent -config-dir /consul/config:consul的配置目录
consul agent -join:加入的consul-server1节点IP地址建立consul集群,启动之后,集群就开始了Vote(投票选Leader)的过程
–net=host docker参数, 使得docker容器越过了netnamespace的隔离,免去手动指定端口映射的步骤
-e或–env 使用-e设置的环境变量,容器内部的进程可以直接拿到
-server consul支持以server或client的模式运行, server是服务发现模块的核心, client主要用于转发请求
-client consul绑定在哪个client地址上,这个地址提供HTTP、DNS、RPC等服务,默认是127.0.0.1,0.0.0.0 表示任何地址可以访问
-node – 群集中此节点的名称。这在群集中必须是唯一的。默认情况下,这是计算机的主机名
-bootstrap-expect 指定consul集群中有多少代理
-retry-join 指定要加入的consul节点地址,失败会重试, 可多次指定不同的地址
-bind 绑定IP地址用来在集群内部的通讯,集群内的所有节点到地址都必须是可达的,默认是0.0.0.0,但当宿主机重启后所有docker容器IP地址会发生变化,-bind 绑定IP作用就会失效,集群无法找到leader报错,如果将集群单独部署在一个宿主机内可以使用
-allow_stale 设置为true, 表明可以从consul集群的任一server节点获取dns信息, false则表明每次请求都会经过consul server leader
–name DOCKER容器的名称
-ui 提供图形化的界面
其他命令请查看consul官方文档:https://www.consul.io/docs/agent/options.html#ports

作者:QIQIHAL
链接:https://www.jianshu.com/p/067154800683
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

分类
Java

如何在Debian 9上安装Apache Kafka

Apache Kafka是一种流行的分布式消息代理,旨在有效处理大量实时数据。在本教程中,您将在Debian 9上安装和使用Apache Kafka。

作者选择了自由和开源基金作为Write for DOnations计划的一部分进行捐赠。

介绍

Apache Kafka是一种流行的分布式消息代理,旨在有效处理大量实时数据。 Kafka集群不仅具有高度可扩展性和容错性,而且与其他消息代理(如ActiveMQRabbitMQ)相比,它还具有更高的吞吐量。 虽然它通常用作发布/订阅消息传递系统,但许多组织也将其用于日志聚合,因为它为已发布的消息提供持久存储。

发布/订阅消息传递系统允许一个或多个生成器发布消息,而不考虑消费者的数量或他们将如何处理消息。 将自动通知订阅的客户端有关更新和新消息的创建。 与客户端定期轮询以确定新消息是否可用的系统相比,该系统更高效且可扩展。

在本教程中,您将在Debian 9上安装和使用Apache Kafka 1.1.1。

先决条件

要继续,您将需要:

  • 一个Debian 9服务器和一个具有sudo权限的非root用户。 如果您没有设置非root用户,请按照本指南中指定的步骤操作。
  • 服务器上至少有4GB的RAM。 没有这么多RAM的安装可能会导致Kafka服务失败, Java虚拟机(JVM)在启动期间抛出“Out Of Memory”异常。
  • OpenJDK 8安装在您的服务器上。 要安装此版本,请按照这些说明安装特定版本的OpenJDK。 Kafka是用Java编写的,所以它需要一个JVM; 但是,它的启动shell脚本有一个版本检测错误,导致它无法启动8以上的JVM版本。

第1步 – 为Kafka创建用户

由于Kafka可以通过网络处理请求,因此您应该为其创建专用用户。 如果Kafka服务器受到损害,这可以最大限度地减少对Debian机器的损害。 我们将在此步骤中创建一个专用的kafka用户,但是您应该创建一个不同的非root用户,以便在完成Kafka设置后在此服务器上执行其他任务。

以非root sudo用户身份登录,使用useradd命令创建名为kafka的用户:

sudo useradd kafka -m

-m标志确保将为用户创建主目录。 此主目录/home/kafka将作为我们的工作区目录,用于执行以下部分中的命令。

使用passwd设置密码:

sudo passwd kafka

使用adduser命令将kafka用户添加到sudo组,以便它具有安装Kafka依赖项所需的权限:

sudo adduser kafka sudo

您的kafka用户现已准备就绪。 使用su登录此帐户:

su -l kafka

现在我们已经创建了特定于Kafka的用户,我们可以继续下载和解压缩Kafka二进制文件。

第2步 – 下载和提取Kafka二进制文件

https://googleads.g.doubleclick.net/pagead/ads?client=ca-pub-7163569408396951&output=html&h=182&slotname=7452267897&adk=314624303&adf=331369438&pi=t.ma~as.7452267897&w=728&fwrn=4&lmt=1620976319&rafmt=11&psa=1&format=728×182&url=https%3A%2F%2Fwww.howtoing.com%2Fhow-to-install-apache-kafka-on-debian-9&flash=0&wgl=1&dt=1620976318979&bpp=16&bdt=497&idt=66&shv=r20210511&cbv=%2Fr20190131&ptt=9&saldr=aa&abxe=1&cookie=ID%3D48527fd304079bb0-22092b2c25c80005%3AT%3D1620976314%3ART%3D1620976314%3AS%3DALNI_Mb-NttFUDu89qowb358-mBugn_bfg&correlator=4005534740767&frm=20&pv=2&ga_vid=835390221.1620976319&ga_sid=1620976319&ga_hid=1902469506&ga_fc=0&rplot=4&u_tz=480&u_his=9&u_java=0&u_h=1080&u_w=1920&u_ah=1040&u_aw=1920&u_cd=24&u_nplug=0&u_nmime=0&adx=428&ady=1972&biw=1903&bih=938&scr_x=0&scr_y=0&oid=3&pvsid=4464633753090611&pem=659&ref=http%3A%2F%2Flink.putaoduo.com%2F&eae=0&fc=896&brdim=-8%2C-8%2C-8%2C-8%2C1920%2C0%2C1936%2C1056%2C1920%2C938&vis=1&rsz=d%7C%7CeEbr%7C&abl=CS&pfx=0&fu=128&bc=31&ifi=1&uci=a!1&btvi=1&fsb=1&xpc=9xGXlI1OqQ&p=https%3A//www.howtoing.com&dtd=179

让我们将kafka二进制文件下载并解压缩到我们kafka用户主目录中的专用文件夹中。

首先,在/home/kafka创建一个名为Downloads的目录来存储你的下载:

mkdir ~/Downloads

使用apt-get安装curl以便您能够下载远程文件:

sudo apt-get update && sudo apt-get install -y curl

安装curl ,使用它来下载Kafka二进制文件:

curl "http://www-eu.apache.org/dist/kafka/1.1.1/kafka_2.12-1.1.1.tgz" -o ~/Downloads/kafka.tgz

创建一个名为kafka的目录并切换到此目录。 这将是Kafka安装的基本目录:

mkdir ~/kafka && cd ~/kafka

使用tar命令解压缩下载的存档:

tar -xvzf ~/Downloads/kafka.tgz --strip 1

我们指定--strip 1标志以确保存档的内容在~/kafka/本身中提取,而不是在其中的另一个目录(例如~/kafka/kafka_ 2.12-1.1.1 / )中~/kafka/kafka_ 2.12-1.1.1 /

现在我们已经成功下载并提取了二进制文件,我们可以继续配置Kafka以允许删除主题。

第3步 – 配置Kafka服务器

Kafka的默认行为将不允许我们删除可以发布消息的主题 ,类别,组或订阅源名称。 要修改它,让我们编辑配置文件。

Kafka的配置选项在server.properties中指定。 使用nano或您喜欢的编辑器打开此文件:

nano ~/kafka/config/server.properties

让我们添加一个允许我们删除Kafka主题的设置。 将以下内容添加到文件的底部: 〜/Kafka/配置/ server.properties

delete.topic.enable = true

保存文件,然后退出nano 。 现在我们已经配置了Kafka,我们可以继续创建systemd单元文件,以便在启动时运行并启用它。

第4步 – 创建系统单元文件并启动Kafka服务器

https://googleads.g.doubleclick.net/pagead/ads?client=ca-pub-7163569408396951&output=html&h=182&slotname=7452267897&adk=314624303&adf=3526007643&pi=t.ma~as.7452267897&w=728&fwrn=4&lmt=1620976319&rafmt=11&psa=1&format=728×182&url=https%3A%2F%2Fwww.howtoing.com%2Fhow-to-install-apache-kafka-on-debian-9&flash=0&wgl=1&dt=1620976318979&bpp=4&bdt=498&idt=79&shv=r20210511&cbv=%2Fr20190131&ptt=9&saldr=aa&abxe=1&cookie=ID%3D48527fd304079bb0-22092b2c25c80005%3AT%3D1620976314%3ART%3D1620976314%3AS%3DALNI_Mb-NttFUDu89qowb358-mBugn_bfg&prev_fmts=728×182&correlator=4005534740767&frm=20&pv=1&ga_vid=835390221.1620976319&ga_sid=1620976319&ga_hid=1902469506&ga_fc=0&rplot=4&u_tz=480&u_his=9&u_java=0&u_h=1080&u_w=1920&u_ah=1040&u_aw=1920&u_cd=24&u_nplug=0&u_nmime=0&adx=428&ady=3260&biw=1903&bih=938&scr_x=0&scr_y=0&oid=3&pvsid=4464633753090611&pem=659&ref=http%3A%2F%2Flink.putaoduo.com%2F&eae=0&fc=896&brdim=-8%2C-8%2C-8%2C-8%2C1920%2C0%2C1936%2C1056%2C1920%2C938&vis=1&rsz=d%7C%7CeEbr%7C&abl=CS&pfx=0&fu=128&bc=31&ifi=2&uci=a!2&btvi=2&fsb=1&xpc=7S5HNZaHtm&p=https%3A//www.howtoing.com&dtd=185

在本节中,我们将为Kafka服务创建systemd单元文件 。 这将帮助我们执行常见的服务操作,例如以与其他Linux服务一致的方式启动,停止和重新启动Kafka。

ZooKeeper是Kafka用于管理其集群状态和配置的服务。 它通常在许多分布式系统中用作不可或缺的组件。 如果您想了解更多信息,请访问官方的ZooKeeper文档

zookeeper创建单元文件:

sudo nano /etc/systemd/system/zookeeper.service

在文件中输入以下单位定义: /etc/systemd/system/zookeeper.service

[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Type=simple
User=kafka
ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties
ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

[Unit]部分指定ZooKeeper在启动之前需要网络并且文件系统准备就绪。

[Service]部分指定systemd应使用zookeeper-server-start.shzookeeper-server-stop.sh shell文件来启动和停止服务。 它还指定ZooKeeper应该在异常退出时自动重启。

接下来,为kafka创建systemd服务文件:

sudo nano /etc/systemd/system/kafka.service

在文件中输入以下单位定义: /etc/systemd/system/kafka.service

[Unit]
Requires=zookeeper.service
After=zookeeper.service

[Service]
Type=simple
User=kafka
ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1'
ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

[Unit]部分指定此单元文件依赖于zookeeper.service 。 这将确保在kafka服务启动时zookeeper自动启动。

[Service]部分指定systemd应使用kafka-server-start.shkafka-server-stop.sh shell文件来启动和停止服务。 它还指定如果Kafka异常退出则应自动重启。

现在已经定义了单元,使用以下命令启动Kafka:

sudo systemctl start kafka

要确保服务器已成功启动,请检查kafka单元的日志日志:

sudo journalctl -u kafka

您应该看到类似于以下内容的输出:





Mar 23 13:31:48 kafka systemd[1]: Started kafka.service.

您现在有一个Kafka服务器监听端口9092

虽然我们已经启动了kafka服务,但如果我们要重新启动服务器,它将不会自动启动。 要在服务器启动时启用kafka ,请运行:

sudo systemctl enable kafka

现在我们已经启动并启用了服务,让我们检查安装。

第5步 – 测试安装

让我们发布并使用“Hello World”消息以确保Kafka服务器正常运行。 在Kafka中发布消息需要:

  • 生产者 ,可以将记录和数据发布到主题。
  • 消费者 ,从主题中读取消息和数据。

首先,输入以下命令创建一个名为TutorialTopic的主题:

~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TutorialTopic

您可以使用kafka-console-producer.sh脚本从命令行创建生成器。 它期望Kafka服务器的主机名,端口和主题名称作为参数。

通过键入以下内容将字符串"Hello, World"TutorialTopic主题:

echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic > /dev/null

接下来,您可以使用kafka-console-consumer.sh脚本创建Kafka使用者。 它期望ZooKeeper服务器的主机名和端口,以及主题名称作为参数。

以下命令使用来自TutorialTopic消息。 注意使用--from-beginning标志,它允许消费在消费者启动之前发布的消息:

~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TutorialTopic --from-beginning

如果没有配置问题,您应该在终端中看到Hello, World





Hello, World

该脚本将继续运行,等待将更多消息发布到该主题。 随意打开一个新终端并启动生产者发布更多消息。 您应该能够在消费者的输出中看到它们。

完成测试后,按CTRL+C以停止使用者脚本。 现在我们已经测试了安装,让我们继续安装KafkaT。

第6步 – 安装KafkaT(可选)

https://googleads.g.doubleclick.net/pagead/ads?client=ca-pub-7163569408396951&output=html&h=182&slotname=7452267897&adk=314624303&adf=170399143&pi=t.ma~as.7452267897&w=728&fwrn=4&lmt=1620976424&rafmt=11&psa=1&format=728×182&url=https%3A%2F%2Fwww.howtoing.com%2Fhow-to-install-apache-kafka-on-debian-9&flash=0&wgl=1&dt=1620976318963&bpp=3&bdt=482&idt=98&shv=r20210511&cbv=%2Fr20190131&ptt=9&saldr=aa&abxe=1&cookie=ID%3D48527fd304079bb0-22092b2c25c80005%3AT%3D1620976314%3ART%3D1620976314%3AS%3DALNI_Mb-NttFUDu89qowb358-mBugn_bfg&prev_fmts=728×182%2C728x182%2C300x600%2C1200x280%2C0x0%2C300x600&nras=3&correlator=4005534740767&frm=20&pv=1&ga_vid=835390221.1620976319&ga_sid=1620976319&ga_hid=1902469506&ga_fc=0&rplot=4&u_tz=480&u_his=9&u_java=0&u_h=1080&u_w=1920&u_ah=1040&u_aw=1920&u_cd=24&u_nplug=0&u_nmime=0&adx=428&ady=5771&biw=1903&bih=938&scr_x=0&scr_y=2410&oid=3&pvsid=4464633753090611&pem=659&ref=http%3A%2F%2Flink.putaoduo.com%2F&eae=0&fc=896&brdim=-8%2C-8%2C-8%2C-8%2C1920%2C0%2C1936%2C1056%2C1920%2C938&vis=1&rsz=d%7C%7CeEbr%7C&abl=CS&pfx=0&fu=128&bc=31&ifi=3&uci=a!3&btvi=5&fsb=1&xpc=X1D7mT49f0&p=https%3A//www.howtoing.com&dtd=M

KafkaT是Airbnb的一款工具,可让您更轻松地查看有关Kafka群集的详细信息,并从命令行执行某些管理任务。 因为它是一个Ruby gem,所以你需要Ruby才能使用它。 您还需要build-essential软件包才能构建其依赖的其他gem。 使用apt安装它们:

sudo apt install ruby ruby-dev build-essential

您现在可以使用gem命令安装KafkaT:

sudo gem install kafkat

KafkaT使用.kafkatcfg作为配置文件来确定Kafka服务器的安装和日志目录。 它还应该有一个条目将KafkaT指向您的ZooKeeper实例。

创建一个名为.kafkatcfg的新文件:

nano ~/.kafkatcfg

添加以下行以指定有关Kafka服务器和Zookeeper实例的必需信息: 〜/ .kafkatcfg

{
  "kafka_path": "~/kafka",
  "log_path": "/tmp/kafka-logs",
  "zk_path": "localhost:2181"
}

您现在可以使用KafkaT了。 首先,您可以使用它来查看有关所有Kafka分区的详细信息:

kafkat partitions

您将看到以下输出:





Topic                 Partition   Leader      Replicas        ISRs    
TutorialTopic         0             0         [0]             [0]
__consumer_offsets    0               0           [0]                           [0]
...
...

您将看到TutorialTopic以及__consumer_offsets ,这是Kafka用于存储客户端相关信息的内部主题。 您可以安全地忽略以__consumer_offsets开头的__consumer_offsets

要了解有关KafkaT的更多信息,请参阅其GitHub存储库

第7步 – 设置多节点群集(可选)

如果要使用更多Debian 9计算机创建多代理群集,则应在每台新计算机上重复第1步,第4步和第5步。 此外,您应该在server.properties文件中为每个进行以下更改:

  • 应更改broker.id属性的值,使其在整个群集中是唯一的。 此属性唯一标识集群中的每个服务器,并且可以将任何字符串作为其值。 例如, "server1""server2"等。
  • 应更改zookeeper.connect属性的值,以便所有节点都指向同一个ZooKeeper实例。 此属性指定ZooKeeper实例的地址,并遵循<HOSTNAME/IP_ADDRESS>:<PORT>格式。 例如, " 203.0.113.0 :2181"" 203.0.113.1 :2181"等。

如果要为群集设置多个ZooKeeper实例,则每个节点上zookeeper.connect属性的值应该是一个相同的逗号分隔字符串,其中列出了所有ZooKeeper实例的IP地址和端口号。

第8步 – 限制Kafka用户

https://googleads.g.doubleclick.net/pagead/ads?client=ca-pub-7163569408396951&output=html&h=182&slotname=7452267897&adk=314624303&adf=530343939&pi=t.ma~as.7452267897&w=728&fwrn=4&lmt=1620976425&rafmt=11&psa=1&format=728×182&url=https%3A%2F%2Fwww.howtoing.com%2Fhow-to-install-apache-kafka-on-debian-9&flash=0&wgl=1&dt=1620976318966&bpp=1&bdt=485&idt=96&shv=r20210511&cbv=%2Fr20190131&ptt=9&saldr=aa&abxe=1&cookie=ID%3D48527fd304079bb0-22092b2c25c80005%3AT%3D1620976314%3ART%3D1620976314%3AS%3DALNI_Mb-NttFUDu89qowb358-mBugn_bfg&prev_fmts=728×182%2C728x182%2C300x600%2C1200x280%2C0x0%2C300x600%2C728x182&nras=3&correlator=4005534740767&frm=20&pv=1&ga_vid=835390221.1620976319&ga_sid=1620976319&ga_hid=1902469506&ga_fc=0&rplot=4&u_tz=480&u_his=9&u_java=0&u_h=1080&u_w=1920&u_ah=1040&u_aw=1920&u_cd=24&u_nplug=0&u_nmime=0&adx=428&ady=7137&biw=1903&bih=938&scr_x=0&scr_y=3588&oid=3&pvsid=4464633753090611&pem=659&ref=http%3A%2F%2Flink.putaoduo.com%2F&eae=0&fc=896&brdim=-8%2C-8%2C-8%2C-8%2C1920%2C0%2C1936%2C1056%2C1920%2C938&vis=1&rsz=d%7C%7CeEbr%7C&abl=CS&pfx=0&fu=128&bc=31&ifi=4&uci=a!4&btvi=6&fsb=1&xpc=742j1SQH1c&p=https%3A//www.howtoing.com&dtd=M

现在所有安装都已完成,您可以删除kafka用户的管理员权限。 在执行此操作之前,请注销并以任何其他非root sudo用户身份重新登录。 如果您仍在运行相同的shell会话,那么只需键入exit即可启动本教程。

从sudo组中删除kafka用户:

sudo deluser kafka sudo

要进一步提高Kafka服务器的安全性,请使用passwd命令锁定kafka用户的密码。 这可以确保没有人可以使用此帐户直接登录服务器:

sudo passwd kafka -l

此时,只有root或sudo用户可以通过键入以下命令以kafka身份登录:

sudo su - kafka

将来,如果要解锁它,请使用带-u选项的passwd

sudo passwd kafka -u

您现在已成功限制kafka用户的管理员权限。

结论

您现在可以在Debian服务器上安全地运行Apache Kafka。 您可以使用Kafka客户端 (可用于大多数编程语言)创建Kafka生产者和使用者,从而在项目中使用它。 要了解有关Kafka的更多信息,您还可以查阅其文档

https://www.howtoing.com/how-to-install-apache-kafka-on-debian-9

分类
Java Spring Boot 未分类

mybatis-plus的坑

mybatis-plus的坑

BaseMapper的selectBatchIds方法,当idList参数为空时,调用此方法会导致服务cpu占用率几乎100%
分类
Java

gRPC基础教程之一

前言

本文将介绍 gRPC、Protocol Buffers 的概念,同时会给出 Protocol Buffers 代码生成器的使用教程,还有编写第一个基于 gRPC 的服务提供者与服务消费者的示例程序。

相关站点

gRPC 简介

gRPC 是一个高性能、开源和通用的 RPC 框架,面向移动和 HTTP/2 设计。目前提供 C、Java、Go 语言版本,分别是:grpc、grpc-java、grpc-go,其中 C 版本支持 C、C++、Node.js、Python、Ruby、Objective-C、PHP、C#。gRPC 基于 HTTP/2 标准设计,带来诸如双向流、流控、头部压缩、单 TCP 连接上的多复用请求等特性。在 gRPC 里客户端应用可以像调用本地对象一样直接调用另一台不同的机器上服务端应用的方法,更容易地创建分布式应用和服务。与许多 RPC 系统类似,gRPC 也是基于以下理念:定义一个服务,指定其能够被远程调用的方法(包含参数和返回类型)。在服务端实现这个接口,并运行一个 gRPC 服务器来处理客户端调用。在客户端拥有一个存根能够像服务端一样的方法。值得说明的是,gRPC 客户端和服务端可以在多种环境中运行和交互,支持用任何 gRPC 支持的语言来编写,所以可以很容易地用 Java 创建一个 gRPC 服务端,用 Go、Python、Ruby 来创建客户端。

使用 Protocol Buffers

gRPC 默认使用 Protocol Buffers,这是 Google 开源的一套成熟的结构数据序列化机制(当然也可以使用其他数据格式如 JSON)。当使用 proto files 创建 gRPC 服务,用 Protocol Buffers 消息类型来定义方法参数和返回类型。尽管 Protocol Buffers 已经存在了一段时间,官方的示例代码种使用了一种名叫 proto3 的新风格的 Protocol Buffers,它拥有轻量简化的语法、一些有用的新功能,并且支持更多新语言。当前针对 Java 和 C++ 发布了 beta 版本,针对 JavaNano(即 Android Java)发布 alpha 版本,在Protocol Buffers Github 源码库里有 Ruby 支持, 在 Github 源码库里还有针对 Go 语言的生成器, 对更多语言的支持正在开发中。虽然可以使用 proto2 (当前默认的 Protocol Buffers 版本), 通常建议在 gRPC 里使用 proto3,因为这样可以使用 gRPC 支持全部范围的的语言,并且能避免 proto2 客户端与 proto3 服务端交互时出现的兼容性问题,反之亦然。

本地编译安装 Protocol Buffers(可选)

参考自 gRPC-Java、Protobuf 编译构建的官方教程,一般情况下不需要构建 gRPC-Java,只有在对 gRPC-Java 的源码进行了更改或测试使用 gRPC-Java 库的非发布版本(例如 master 分支)时才需要构建。若本地安装了 Protobuf,则可以直接通过命令的方式调用 Protobuf 的代码生成器,无需再依赖额外的 IDE 插件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# 系统环境
CentOS Linux release 7.6.1810 (Core)
Linux develop 3.10.0-957.21.3.el7.x86_64 #1 SMP Tue Jun 18 16:35:19 UTC 2019 x86_64 x86_64 x86_64 GNU/Linux

# 拉取源码
# git clone https://github.com/google/protobuf.git

# 进入源码目录
# cd protobuf

# 切换至需要编译的版本的分支
# git checkout v3.7.1

# 查看当前所在的分支信息
# git branch -v

# 检测安装环境
# ./autogen.sh
# ./configure –disable-shared

# 编译安装
# make -j 8
# make install

# 如果/usr/local/lib不在库搜索路径中,可以通过运行以下命令添加
# sh -c ‘echo /usr/local/lib >> /etc/ld.so.conf’

# 使添加的库搜索路径生效
# ldconfig

# 查看protobuf安装的版本号
# protoc –version

# 编写.proto文件,使用protobuf的代码生成器自动生成Java代码,命令格式如下
# protoc -I=$SRC_DIR –java_out=$DST_DIR $SRC_DIR/addressbook.proto

# 默认安装路径:/usr/local
# 指定安装目录可以使用此命令: ./configure –disable-shared –prefix=/usr/local/protobuf-3.7.1

Eclipse 项目中添加 Protobuf 自动生成代码的 Maven 插件与 Protobuf 依赖

Protobuf 的原型文件和一些适合的插件,默认放在 src/main/proto 和 src/test/proto 目录中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>1.21.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>1.21.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>1.21.0</version>
</dependency>

<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.5.0.Final</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.5.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.7.1:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.21.0:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

往 Gradle 构建的项目添加 Protobuf 自动生成代码的插件与 Protobuf 依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
plugins {
id ‘com.google.protobuf’ version ‘0.8.8’
}

protobuf {
protoc {
artifact = “com.google.protobuf:protoc:3.7.1”
}
plugins {
grpc {
artifact = ‘io.grpc:protoc-gen-grpc-java:1.21.0’
}
}
generateProtoTasks {
all()*.plugins {
grpc {}
}
}
}

dependencies {
compile ‘io.grpc:grpc-stub:1.21.0’
compile ‘io.grpc:grpc-protobuf:1.21.0’
compile ‘io.grpc:grpc-netty-shaded:1.21.0’
testCompile group: ‘junit’, name: ‘junit’, version: ‘4.12’
}

编写 Proto 文件(定义服务),执行编译后自动生成 Java 文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# 创建gradle工程grpc-demo-provider,目录结构如下:
grpc-demo-provider/
├── build.gradle
└── src
├── main
│   ├── java
│   ├── proto
│   │   └── helloworld.proto
│   └── resources
└── test
├── java
├── proto
└── resources

# 进入工程目录
# cd grpc-demo-provider

# 编辑build.gradle文件,添加protobuf插件与依赖,可参考上面给出的gradle配置内容

# 创建proto文件
# mkdir -p src/main/proto
# vim src/main/proto/helloworld.proto

syntax = “proto3”;
option java_multiple_files = true;
option java_package = “com.grpc.demo.generate”;
option java_outer_classname = “HelloWorldProto”;
option objc_class_prefix = “HLW”;

package helloworld;

service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply) {}
}

message HelloRequest {
string name = 1;
}

message HelloReply {
string message = 1;
}

# 执行编译,自动生成Java文件
# gradle clean build

# 查看自动生成的文件目录结构,默认生成文件所在的目录是:$buildDir/generated/source/proto,其中Message在main/java目录下,Service在目录main/grpc下
# tree build/generated/source/proto
main
├── grpc
│   └── com
│   └── grpc
│   └── demo
│   └── generate
│   └── GreeterGrpc.java
└── java
└── com
└── grpc
└── demo
└── generate
├── HelloReply.java
├── HelloReplyOrBuilder.java
├── HelloRequest.java
├── HelloRequestOrBuilder.java
└── HelloWorldProto.java

Gradle 指定 Protobuf 代码自动生成的目录位置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// 指定Message代码的生成位置,最终生成位置在src/main/java目录下
protobuf {
generatedFilesBaseDir = “src”
}

// 指定Service代码的生成位置,最终生成位置在src/main/java目录下
protobuf {
generateProtoTasks {
all()*.plugins {
grpc {
outputSubDir = ‘java’
}
}
}
}

// 完整的写法,同时指定Message、Service代码生成的目录位置为src/main/java
protobuf {
protoc {
artifact = “com.google.protobuf:protoc:3.7.1”
}
plugins {
grpc {
artifact = ‘io.grpc:protoc-gen-grpc-java:1.21.0’
}
}
generateProtoTasks {
all()*.plugins {
grpc {
outputSubDir = ‘java’
}
}
}
generatedFilesBaseDir = ‘src’
}

RPC 服务提供者的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package com.grpc.demo.provider.service;

import com.grpc.demo.generate.GreeterGrpc;
import com.grpc.demo.generate.HelloReply;
import com.grpc.demo.generate.HelloRequest;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;

import java.io.IOException;
import java.util.logging.Logger;

public class HelloWorldProvider {

private Server server;
private static final Logger logger = Logger.getLogger(HelloWorldProvider.class.getName());

private void start() throws IOException {
int port = 50051;
server = ServerBuilder.forPort(port)
.addService(new GreeterImpl())
.build()
.start();
logger.info(“==> Server started, listening on ” + port);

Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
System.err.println(“*** shutting down gRPC server since JVM is shutting down”);
HelloWorldProvider.this.stop();
System.err.println(“*** server shut down”);
}
});
}

private void stop() {
if (server != null) {
server.shutdown();
}
}

/**
* Await termination on the main thread since the grpc library uses daemon threads.
*/
private void blockUntilShutdown() throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
}

public static void main(String[] args) throws IOException, InterruptedException {
final HelloWorldProvider server = new HelloWorldProvider();
server.start();
server.blockUntilShutdown();
}

static class GreeterImpl extends GreeterGrpc.GreeterImplBase {

@Override
public void sayHello(HelloRequest req, StreamObserver<HelloReply> responseObserver) {
HelloReply reply = HelloReply.newBuilder().setMessage(“Hello ” + req.getName()).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
}
}

RPC 服务消费者的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.grpc.demo.consumer.service;

import com.grpc.demo.generate.GreeterGrpc;
import com.grpc.demo.generate.HelloReply;
import com.grpc.demo.generate.HelloRequest;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;

import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

public class HelloWorldConsumer {

private final ManagedChannel channel;
private final GreeterGrpc.GreeterBlockingStub blockingStub;
private static final Logger logger = Logger.getLogger(HelloWorldConsumer.class.getName());

public HelloWorldConsumer(String host, int port) {
this(ManagedChannelBuilder.forAddress(host, port)
.usePlaintext()
.build());
}

HelloWorldConsumer(ManagedChannel channel) {
this.channel = channel;
blockingStub = GreeterGrpc.newBlockingStub(channel);
}

public void shutdown() throws InterruptedException {
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
}

public void greet(String name) {
logger.info(“==> Will try to greet ” + name + ” …”);
HelloRequest request = HelloRequest.newBuilder().setName(name).build();
HelloReply response;
try {
response = blockingStub.sayHello(request);
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, “RPC failed: {0}”, e.getStatus());
return;
}
logger.info(“==> Greeting: ” + response.getMessage());
}

public static void main(String[] args) throws Exception {
HelloWorldConsumer client = new HelloWorldConsumer(“localhost”, 50051);
try {
String user = “World”;
client.greet(user);
} finally {
client.shutdown();
}
}
}

先后启动Provider、Consumer应用,最终输出的日志信息如下图所示

Provider应用的日志信息:

Consumer应用的日志信息:

本文作者: Clay
发布时间: 2019-06-25 20:40:21
本文链接: https://www.techgrow.cn/posts/ceb4ff2b.html
版权声明: 本作品采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可。转载请注明出处!

分类
Java 开发经历

spring cloud config遇到的问题

本机环境调试一个使用到spring cloud config的项目,启动到时候访问config服务报503错误,感到奇怪到是之前一直运行得好好到。后来终于发现网络启用了全局代理模式,也就是通过远程访问本地网络环境,那肯定是不通的。

附上http状态码说明:

5xx(服务器错误)

这些状态代码表示服务器在尝试处理请求时发生内部错误。 这些错误可能是服务器本身的错误,而不是请求出错。

代码 说明

500 (服务器内部错误) 服务器遇到错误,无法完成请求。

501 (尚未实施) 服务器不具备完成请求的功能。 例如,服务器无法识别请求方法时可能会返回此代码。

502 (错误网关) 服务器作为网关或代理,从上游服务器收到无效响应。

503 (服务不可用) 服务器目前无法使用(由于超载或停机维护)。 通常,这只是暂时状态。

504 (网关超时) 服务器作为网关或代理,但是没有及时从上游服务器收到请求。

505 (HTTP 版本不受支持) 服务器不支持请求中所用的 HTTP 协议版本。

分类
Java

Spring Security实例-具有基于令牌的身份验证的Spring Security

介绍
如何使用Spring实现基于令牌的安全性功能。 让我们看一下工作流程以获得更好的理解:

  1. 用户使用用户名和密码发送请求。
  2. Spring Security将令牌返回给客户端API。
  3. 客户端API在每个请求中发送令牌作为身份验证的一部分。
  4. 使令牌在注销时失效。

让我们看看这个工作流程的样子:

1.Maven安装
我们将使用Spring Boot和Maven处理依赖关系。 在构建Spring Boot Web应用程序时,我们将为应用程序使用以下starters。

  1. Spring Boot Web starter
  2. Spring Boot Security starter.
  3. JPA starter

现在,我们的pom.xml如下所示:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-security</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-jpa</artifactId>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-lang3</artifactId>
        <version>3.8.1</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.security</groupId>
        <artifactId>spring-security-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

2. 数据库布局

我将在数据库级别简化此应用程序,我将使用一个表来存储用户详细信息和令牌。 在用户配置文件请求用户创建一个并返回此令牌之前,不会有针对用户个人资料的令牌。 表结构如下所示:

这不是产品级别就绪的表,但主要思想是存储用于客户概要文件的令牌,并将该令牌用于身份验证和授权。 您可以根据需要更改/调整此工作流程。

3. JPA 资料库

要保存并获取客户档案的令牌信息,我们需要创建一个自定义存储库。 该存储库负责根据令牌获取客户信息。 客户服务将使用我们的客户资源库基于令牌获取客户详细信息或执行登录。

@Repository
public interface CustomerRepository extends CrudRepository<Customer, Long> {

    @Query(value = "SELECT u FROM Customer u where u.userName = ?1 and u.password = ?2 ")
    Optional login(String username,String password);
    Optional findByToken(String token);
}

4. 客户验证服务

我们的客户验证服务遵循两个核心运营

提供登录功能以将令牌返回给客户端。
根据提供的令牌验证客户。
我们的客户服务如下所示:

@Service("customerService")
public class DefaultCustomerService implements CustomerService {

    @Autowired
    CustomerRepository customerRepository;

    @Override
    public String login(String username, String password) {
        Optional customer = customerRepository.login(username,password);
        if(customer.isPresent()){
            String token = UUID.randomUUID().toString();
            Customer custom= customer.get();
            custom.setToken(token);
            customerRepository.save(custom);
            return token;
        }

        return StringUtils.EMPTY;
    }

    @Override
    public Optional findByToken(String token) {
        Optional customer= customerRepository.findByToken(token);
        if(customer.isPresent()){
            Customer customer1 = customer.get();
            User user= new User(customer1.getUserName(), customer1.getPassword(), true, true, true, true,
                    AuthorityUtils.createAuthorityList("USER"));
            return Optional.of(user);
        }
        return  Optional.empty();
    }
}

让我们检查一下上面代码中的操作:

  1. 登录方法接受用户名和密码,并将返回用于成功凭证的令牌
  2. 我们将对所有安全资源使用第二种方法  

5. Spring Security 配置

这些是使用Spring Security和基于令牌的身份验证来保护REST API的主要配置类。在本节中,我们将讨论以下类:

  • AuthenticationProvider : 通过其身份验证令牌查找用户.
  • AuthenticationFilter :从请求标头中提取身份验证令牌
  • SecurityConfiguration : Spring Security 配置

5.1 Token Authentication Provider

AuthenticationProvider负责根据客户端在标头中发送的身份验证令牌来查找用户。 这就是我们基于Spring的令牌身份验证提供程序的外观:

@Component
public class AuthenticationProvider extends AbstractUserDetailsAuthenticationProvider {

 @Autowired
 CustomerService customerService;

 @Override
 protected void additionalAuthenticationChecks(UserDetails userDetails, UsernamePasswordAuthenticationToken usernamePasswordAuthenticationToken) throws AuthenticationException {
  //
 }

 @Override
 protected UserDetails retrieveUser(String userName, UsernamePasswordAuthenticationToken usernamePasswordAuthenticationToken) throws AuthenticationException {

  Object token = usernamePasswordAuthenticationToken.getCredentials();
  return Optional
   .ofNullable(token)
   .map(String::valueOf)
   .flatMap(customerService::findByToken)
   .orElseThrow(() -> new UsernameNotFoundException("Cannot find user with authentication token=" + token));
 }

我们的AuthenticationProvider使用CustomerService根据令牌查找客户。

5.2  Token Authentication Filter

令牌认证过滤器负责从头获取认证过滤器,并调用认证管理器进行认证。 身份验证过滤器如下所示:

public class AuthenticationFilter extends AbstractAuthenticationProcessingFilter {

    AuthenticationFilter(final RequestMatcher requiresAuth) {
        super(requiresAuth);
    }

    @Override
    public Authentication attemptAuthentication(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse) throws AuthenticationException, IOException, ServletException {

        Optional tokenParam = Optional.ofNullable(httpServletRequest.getHeader(AUTHORIZATION)); //Authorization: Bearer TOKEN
        String token= httpServletRequest.getHeader(AUTHORIZATION);
        token= StringUtils.removeStart(token, "Bearer").trim();
        Authentication requestAuthentication = new UsernamePasswordAuthenticationToken(token, token);
        return getAuthenticationManager().authenticate(requestAuthentication);

    }

    @Override
    protected void successfulAuthentication(final HttpServletRequest request, final HttpServletResponse response, final FilterChain chain, final Authentication authResult) throws IOException, ServletException {
        SecurityContextHolder.getContext().setAuthentication(authResult);
        chain.doFilter(request, response);
    }
}

让我们强调几个这里的要点:

  1. 此过滤器将身份验证委托给UsernamePasswordAuthenticationToken 
  2. 此过滤器仅对特定的URL启用(在下一节中说明)

5.3  Spring Security配置

这负责将所有内容组合在一起。让我们看看我们的Spring安全配置是怎样:

@Configuration
@EnableWebSecurity
@EnableGlobalMethodSecurity(prePostEnabled = true)
public class SecurityConfiguration extends WebSecurityConfigurerAdapter {


 private static final RequestMatcher PROTECTED_URLS = new OrRequestMatcher(
  new AntPathRequestMatcher("/api/**")
 );

 AuthenticationProvider provider;

 public SecurityConfiguration(final AuthenticationProvider authenticationProvider) {
  super();
  this.provider = authenticationProvider;
 }

 @Override
 protected void configure(final AuthenticationManagerBuilder auth) {
  auth.authenticationProvider(provider);
 }

 @Override
 public void configure(final WebSecurity webSecurity) {
  webSecurity.ignoring().antMatchers("/token/**");
 }

 @Override
 public void configure(HttpSecurity http) throws Exception {
  http.sessionManagement()
   .sessionCreationPolicy(SessionCreationPolicy.STATELESS)
   .and()
   .exceptionHandling()
   .and()
   .authenticationProvider(provider)
   .addFilterBefore(authenticationFilter(), AnonymousAuthenticationFilter.class)
   .authorizeRequests()
   .requestMatchers(PROTECTED_URLS)
   .authenticated()
   .and()
   .csrf().disable()
   .formLogin().disable()
   .httpBasic().disable()
   .logout().disable();
 }

 @Bean
 AuthenticationFilter authenticationFilter() throws Exception {
  final AuthenticationFilter filter = new AuthenticationFilter(PROTECTED_URLS);
  filter.setAuthenticationManager(authenticationManager());
  //filter.setAuthenticationSuccessHandler(successHandler());
  return filter;
 }

 @Bean
 AuthenticationEntryPoint forbiddenEntryPoint() {
  return new HttpStatusEntryPoint(HttpStatus.FORBIDDEN);
 }
}

让我们检查一些要点:

  1. 与请求模式/api/ **匹配的所有URL都是安全的,并且需要有效的令牌进行访问。
  2. The webSecurity.ignoring().antMatchers("/token/**") 显示安全检查中排除的所有请求。
  3. 我们已经在Spring安全性中注册了AuthenticationProvider。 Spring Security将检查令牌验证.
  4. 配置方法包括基本配置以及禁用基于表单的登录和其他标准功能

此步骤总结了使用Spring Security和基于令牌的身份验证来保护REST API的步骤。 在下一步中,我们将设置一个简单的Spring Boot Web应用程序以测试我们的工作流程。

 6. Spring Boot 控制器

让我们创建一个简单的Spring Boot控制器来测试我们的应用程序:

6.1 Token Controller

该控制器负责返回令牌以获取有效凭证:

@RestController
public class TokenController {

    @Autowired
    private CustomerService customerService;

    @PostMapping("/token")
    public String getToken(@RequestParam("username") final String username, @RequestParam("password") final String password){
       String token= customerService.login(username,password);
       if(StringUtils.isEmpty(token)){
           return "no token found";
       }
       return token;
    }
}

6.2 安全用户资料控制器

这是安全控制器。 它将返回有效令牌的用户资料。仅在传递有效令牌后才能访问此控制器:

@RestController
public class UserProfileController {

    @Autowired
    private CustomerService customerService;

    @GetMapping(value = "/api/users/user/{id}",produces = "application/json")
    public Customer getUserDetail(@PathVariable Long id){
        return customerService.findById(id);
    }
}

7. 测试应用

让我们构建和部署应用程序。一旦应用程序运行,就可以使用任何REST客户端来测试我们的应用程序(我使用的是Postman):

不使用 Access Token:

让我们从API获取令牌:

将令牌用于安全URL:

总结

在本文中,我们了解了如何使用基于令牌的方法来使用Spring Security保护REST API。 我们介绍了用于保护REST API的不同配置和设置。

若有需要这篇文章的Intellij IDEA SpringBoot项目源代码的朋友,可先微信扫下图二维码打赏8~9元,然后截图付款收据,加QQ:5404125(备注:源码)联系下载源代码。

分类
Java

Spring Cloud入门使用例子

SpringCloud 是微服务中的翘楚,最佳的落地方案。本仓库整合了一些工作中经常用到的一些技术,作为使用微服务框架的演示。

https://github.com/intomylife/SpringCloud

分类
Docker MySql orchestrator

docker mysql orchestrator高可用构建

使用到这个项目的构建脚本:

https://github.com/pondix/docker-mysql-proxysql

但是有个bug,需要修改一下conf/orchestrator/Dockerfile

FROM debian:stretch

MAINTAINER Nikolaos Vyzas nick@proxysql.com

WORKDIR /usr/local/orchestrator

#deb文件需要放在Dockerfile同级目录下
COPY orchestrator_3.1.4_amd64.deb /usr/local/orchestrator/
EXPOSE 3000
WORKDIR /usr/local/orchestrator
RUN apt update && apt -y install wget && apt -y install ./orchestrator_3.1.4_amd64.deb && mkdir /var/lib/orchestrator && rm -rf /var/lib/apt/lists/*

CMD /usr/local/orchestrator/orchestrator http

分类
Docker MySql

用Docker实现MySQL ProxySQL读写分离

ProxySQL是一个高性能的MySQL中间件,能够代理数百台MySQL服务器,支持数十万的并发连接。ProxySQL代理MySQL并提供读写分离,查询重写和数据分片等功能。
这篇文章主要介绍用Docker Compose编排用ProxySQL实现MySQL集群上读写分离的实践过程。
Docker Compose编排中使用了一个现有镜像,就是breeze2/proxysql
本次实践源码上传在GitHub的breeze2/mysql-proxy-docker

Docker Compose编排

这个编排主要实现一主两从的一个MySQL集群和一个ProxySQL代理,ProxySQL代理MYSQL集群的数据请求并且进行读写分离。

目录结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
L–mysql-proxy-docker //主目录
L–scripts //本地(Docker宿主)使用的一些脚本
L–mysql_set_users_and_repls.sh //设置各个数据库账号和开启主从复制
L–…
L–services //需要build的服务(目前是空)
L–volumes //各个容器的挂载数据卷
L–mysql_node0
L–mysql_node1
L–mysql_node2
L–proxysql
L–share //各个容器共享的目录
L–scripts //各个容器共用的一些脚本
L–parameters.env //账号密码等环境参数
L–docker-compose.yml //编排配置

docker-compose.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
version: “2”
services:

master:
image: mysql:5.7
container_name: mysql_node0
restart: always
mem_limit: 256m
networks:
net1:
ipv4_address: 10.6.0.10
ports:
– “3306”
volumes:
– “./volumes/share/:/root/share/”
– “./volumes/mysql_node0/lib/:/var/lib/mysql/”
– “./volumes/mysql_node0/conf/:/etc/mysql/conf.d/”
env_file:
– ./parameters.env

slave1:
image: mysql:5.7
container_name: mysql_node1
restart: always
depends_on:
– master
mem_limit: 256m
networks:
net1:
ipv4_address: 10.6.0.11
ports:
– “3306”
volumes:
– “./volumes/share/:/root/share/”
– “./volumes/mysql_node1/lib/:/var/lib/mysql/”
– “./volumes/mysql_node1/conf/:/etc/mysql/conf.d/”
env_file:
– ./parameters.env
slave2:
image: mysql:5.7
container_name: mysql_node2
depends_on:
– master
restart: always
mem_limit: 256m
networks:
net1:
ipv4_address: 10.6.0.12
ports:
– “3306”
volumes:
– “./volumes/share/:/root/share/”
– “./volumes/mysql_node2/lib/:/var/lib/mysql/”
– “./volumes/mysql_node2/conf/:/etc/mysql/conf.d/”
env_file:
– ./parameters.env

proxy:
image: breeze2/proxysql:1.4.3
container_name: proxysql
depends_on:
– master
– slave1
– slave2
restart: always
mem_limit: 256m
networks:
net1:
ipv4_address: 10.6.0.9
ports:
– “127.0.0.1:60320:6032”
– “60330:6033”
volumes:
– “./volumes/proxysql/conf:/etc/proxysql”
entrypoint: “proxysql -f -c /etc/proxysql/pr.cnf”
env_file:
– ./parameters.env

networks:
net1:
driver: bridge
ipam:
config:
– subnet: 10.6.0.0/16
gateway: 10.6.0.1

这里配置了四个容器服务,一个breeze2/proxysql,负责代理各个数据库;三个mysql,其中一个是主库,另外两个是从库。每个容器服务都指定了静态IP,即使服务重启也不会出现IP错乱问题。proxysql容器的6032是提供管理服务的端口,只对Docker宿主机本地IP开放,而6033是代理数据请求的端口,可以对Docker宿主机网络IP开放。

环境参数

parameters.env

1
2
3
4
MYSQL_ROOT_PASSWORD=123456
MYSQL_DATABASE=testing
MYSQL_User=testing
MYSQL_PASSWORD=testing

数据库配置

这里简单的配置一下各个数据库数据复制备份相关的参数
主库,mysql_node0/conf/my.cnf

1
2
3
4
5
6
7
8
9
10
[mysqld]
server-id=1
gtid-mode=on
enforce-gtid-consistency=true
log-bin=mysql-bin
binlog-do-db=testing
binlog-ignore-db=mysql
replicate-do-db=testing
replicate-ignore-db=mysql
expire_logs_days=7

从库,mysql_node1/conf/my.cnf

1
2
3
4
5
6
7
8
9
10
[mysqld]
server-id=2
gtid-mode=on
enforce-gtid-consistency=true
log-bin=mysql-bin
binlog-do-db=testing
binlog-ignore-db=mysql
replicate-do-db=testing
replicate-ignore-db=mysql
expire_logs_days=7

从库,mysql_node2/conf/my.cnf

1
2
3
4
5
6
7
8
9
10
[mysqld]
server-id=3
gtid-mode=on
enforce-gtid-consistency=true
log-bin=mysql-bin
binlog-do-db=testing
binlog-ignore-db=mysql
replicate-do-db=testing
replicate-ignore-db=mysql
expire_logs_days=7

ProxySQL配置

proxysql/conf/pr.cnf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
datadir=”/tmp”
# 管理平台参数
admin_variables =
{
admin_credentials=”admin2:admin2″
mysql_ifaces=”0.0.0.0:6032″
refresh_interval=2000
}
# mysql全局参数
mysql_variables =
{
threads=4
max_connections=2048
default_query_delay=0
default_query_timeout=36000000
have_compress=true
poll_timeout=2000
# interfaces=”0.0.0.0:6033;/tmp/proxysql.sock”
interfaces=”0.0.0.0:6033″
default_schema=”information_schema”
stacksize=1048576
server_version=”5.5.30″
connect_timeout_server=3000
# make sure to configure monitor username and password
# https://github.com/sysown/proxysql/wiki/Global-variables#mysql-monitor_username-mysql-monitor_password
monitor_username=”pr_muser”
monitor_password=”pr_mpass”
monitor_history=600000
monitor_connect_interval=60000
monitor_ping_interval=10000
monitor_read_only_interval=1500
monitor_read_only_timeout=500
ping_interval_server_msec=120000
ping_timeout_server=500
commands_stats=true
sessions_sort=true
connect_retries_on_failure=10
}
# mysql用户参数
mysql_users =
(
{
username = “pr_auser”
password = “pr_apass”
default_hostgroup = 0
}
)
# mysql服务器参数,10.6.0.10是主库放在0组,其他是从库放在1组
mysql_servers =
(
{
address = “10.6.0.10”
port = 3306
weight = 1
hostgroup = 0
max_connections = 50
},
{
address = “10.6.0.11”
port = 3306
weight = 2
hostgroup = 1
max_connections = 100
},
{
address = “10.6.0.12”
port = 3306
weight = 2
hostgroup = 1
max_connections = 150
}
)
# mysql请求规则,以下配置是读时加锁的请求发给0组,普通读取的请求发给1组,其他默认发给0组(上面的default_hostgroup)
mysql_query_rules:
(
{
rule_id=1
active=1
match_pattern=”^SELECT .* FOR UPDATE$”
destination_hostgroup=0
apply=1
},
{
rule_id=2
active=1
match_pattern=”^SELECT”
destination_hostgroup=1
apply=1
}
)

实际运行

在主目录下执行docker-compose up -d构建并运行整个Docker服务。

开启主从复制

在主目录下执行:

1
$ sh ./scripts/mysql_set_users_and_repls.sh

实际上是调用了挂载在数据库容器里的一些脚本:

1
2
3
volumes/share/scripts/msyql_grant_proxysql_users.sh #设置给proxysql用的账号,pr_auser和pr_muser
volumes/share/scripts/msyql_grant_slave.sh #主库设置给从库用的账号
volumes/share/scripts/msyql_grant_slave.sh #从库开始数据复制

脚本里的执行命令都很简单,一看就明。

开启ProxySQL代理

构建整个服务的时候,proxysql会先挂载主目录下的./volumes/proxysql/conf/pr.cnf到容器内/etc/proxysql/pr.cnf,然后执行proxysql -f -c /etc/proxysql/pr.cnf,所以这里的ProxySQL是按照pr.cnf里面的配置开启MySQL代理服务的,请仔细阅读上面ProxySQL配置。若有需要在ProxySQL运行的过程中修改配置,可以登录ProxySQL的管理系统操作。

ProxySQL管理系统

在Docker宿主机上登录ProxySQL管理系统(Docker宿主机需要装有MySQL Client):

1
$ mysql -u admin2 -padmin2 -h 127.0.0.1 -P60320

在ProxySQL管理系统上添加一个mysql_user(注意这个testing账号是各个数据库都已建立的,具体查看上面环境参数):

1
mysql> INSERT INTO mysql_users(username, password, default_hostgroup) VALUES (‘testing’, ‘testing’, 2);

确认是否已添加:

1
2
3
4
5
6
7
8
mysql> SELECT * FROM mysql_users;
+———-+———-+——–+———+——————-+—————-+—————+————————+————–+———+———-+—————–+
| username | password | active | use_ssl | default_hostgroup | default_schema | schema_locked | transaction_persistent | fast_forward | backend | frontend | max_connections |
+———-+———-+——–+———+——————-+—————-+—————+————————+————–+———+———-+—————–+
| pr_auser | pr_apass | 1 | 0 | 0 | | 0 | 0 | 0 | 1 | 1 | 10000 |
| testing | testing | 1 | 0 | 2 | NULL | 0 | 1 | 0 | 1 | 1 | 10000 |
+———-+———-+——–+———+——————-+—————-+—————+————————+————–+———+———-+—————–+
2 rows in set (0.00 sec)

把当前修改(MEMORY层)加载到正在运行的ProxySQL(RUNTIME层):

1
mysql> LOAD MYSQL USERS TO RUNTIME;

在Docker宿主机上确认ProxySQL是否已加载最新配置:

1
2
3
4
5
6
7
$ mysql -u testing -ptesting -h 127.0.0.1 -P60330
mysql: [Warning] Using a password on the command line interface can be insecure.
Welcome to the MySQL monitor. Commands end with ; or \g.
Your MySQL connection id is 7
Server version: 5.5.30 (ProxySQL Admin Module)


若想ProxySQL重启后依然是当前配置,要把当前修改(MEMORY层)保存到ProxySQL的Sqlite数据库里(DISK层):

1
mysql> SAVE MYSQL USERS TO DISK;

ProxySQL配置系统分三层,分别是MEMORY层、RUNTIME层和DISK层。ProxySQL管理系统操作的是MEMORY层,当前ProxySQL运行的是RUNTIME层,保存在ProxySQL本地Sqlite数据库里的是DISK层,详情请阅读文档ProxySQL Configuration

SysBench测试工具

SysBench是一个脚本化、多线程的基准测试工具,经常用于评估测试各种不同系统参数下的数据库负载情况。
SysBench的使用教程可以参考sysbench 0.5使用手册
这里使用SysBench-v1.0.9来对ProxySQL进行测试。

SysBench Test Prepare

首先,做测试准备:

1
2
3
4
5
6
7
8
$ sysbench /usr/local/Cellar/sysbench/1.0.9/share/sysbench/oltp_read_write.lua –threads=5 –max-requests=0 –time=36 –db-driver=mysql –mysql-user=pr_auser –mysql-password=’pr_apass’ –mysql-port=60330 –mysql-host=127.0.0.1 –mysql-db=testing –report-interval=1 prepare
sysbench 1.0.9 (using bundled LuaJIT 2.1.0-beta2)

Initializing worker threads…

Creating table ‘sbtest1’…
Inserting 10000 records into ‘sbtest1’
Creating a secondary index on ‘sbtest1’…

注意,/usr/local/Cellar/sysbench/1.0.9/share/sysbench/oltp_read_write.lua文件可以在SysBench安装包里找到,执行命令后,会在主库testing数据库里生成一个sbtest1表并插入一些数据,
在从库里一样可以看到testing数据库下有sbtest1表,说明主从复制已生效。

SysBench Test Run

然后,开始读写测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
$ sysbench /usr/local/Cellar/sysbench/1.0.9/share/sysbench/oltp_read_write.lua –threads=5 –max-requests=0 –time=36 –db-driver=mysql –mysql-user=pr_auser –mysql-password=’pr_apass’ –mysql-port=60330 –mysql-host=127.0.0.1 –mysql-db=testing –report-interval=1 run
sysbench 1.0.9 (using bundled LuaJIT 2.1.0-beta2)

Running the test with following options:
Number of threads: 5
Report intermediate results every 1 second(s)
Initializing random number generator from current time


Initializing worker threads…

Threads started!

[ 1s ] thds: 5 tps: 51.66 qps: 1087.83 (r/w/o: 769.92/209.62/108.29) lat (ms,95%): 144.97 err/s: 0.00 reconn/s: 0.00
[ 2s ] thds: 5 tps: 61.26 qps: 1229.13 (r/w/o: 862.60/243.01/123.52) lat (ms,95%): 142.39 err/s: 1.00 reconn/s: 0.00
[ 3s ] thds: 5 tps: 60.85 qps: 1237.04 (r/w/o: 867.92/247.41/121.71) lat (ms,95%): 121.08 err/s: 0.00 reconn/s: 0.00
[ 4s ] thds: 5 tps: 67.07 qps: 1332.44 (r/w/o: 931.01/267.29/134.15) lat (ms,95%): 127.81 err/s: 0.00 reconn/s: 0.00

查看结果

登录ProxySQL管理系统,查看统计结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
$ mysql -u admin2 -padmin2 -h 127.0.0.1 -P60320
mysql> select * from stats_mysql_query_digest limit\G;
*************************** 1. row ***************************
hostgroup: 0
schemaname: testing
username: pr_auser
digest: 0xE365BEB555319B9E
digest_text: DELETE FROM sbtest1 WHERE id=?
count_star: 2564
first_seen: 1508313300
last_seen: 1508313336
sum_time: 1923227
min_time: 149
max_time: 39773
*************************** 2. row ***************************
hostgroup: 0
schemaname: testing
username: pr_auser
digest: 0xFB239BC95A23CA36
digest_text: UPDATE sbtest1 SET c=? WHERE id=?
count_star: 2566
first_seen: 1508313300
last_seen: 1508313336
sum_time: 2016454
min_time: 158
max_time: 53514

*************************** 13. row ***************************
hostgroup: 1
schemaname: testing
username: pr_auser
digest: 0xDBF868B2AA296BC5
digest_text: SELECT SUM(k) FROM sbtest1 WHERE id BETWEEN ? AND ?
count_star: 2570
first_seen: 1508313300
last_seen: 1508313336
sum_time: 7970660
min_time: 216
max_time: 56153
*************************** 14. row ***************************
hostgroup: 1
schemaname: testing
username: pr_auser
digest: 0xAC80A5EA0101522E
digest_text: SELECT c FROM sbtest1 WHERE id BETWEEN ? AND ? ORDER BY c
count_star: 2570
first_seen: 1508313300
last_seen: 1508313336
sum_time: 10148202
min_time: 272
max_time: 58032
14 rows in set (0.00 sec)

可以看到读操作都发送给了hostgroup=1组,写操作都发送给了hostgroup=0组,说明读写分离已生效,读写分离配置请仔细阅读上面ProxySQL配置mysql_query_rules部分

此致

到此,用Docker实现MySQL ProxySQL读写分离已完成。另外ProxySQL提供的查询重写功能,其实是利用mysql_query_rules配置,对接收到的查询语句进行正则替换,再传递给数据库服务器,详情请阅读文档ProxySQL Configuration中的“MySQL Query Rules”部分;而数据分片功能,在真实数据分片的基础上,再结合mysql_query_rules配置,重写query到正确的主机、数据库或表上,详细内容可以阅读MySQL Sharding with ProxySQL