分类
Java

如何在Debian 9上安装Apache Kafka

Apache Kafka是一种流行的分布式消息代理,旨在有效处理大量实时数据。在本教程中,您将在Debian 9上安装和使用Apache Kafka。

作者选择了自由和开源基金作为Write for DOnations计划的一部分进行捐赠。

介绍

Apache Kafka是一种流行的分布式消息代理,旨在有效处理大量实时数据。 Kafka集群不仅具有高度可扩展性和容错性,而且与其他消息代理(如ActiveMQRabbitMQ)相比,它还具有更高的吞吐量。 虽然它通常用作发布/订阅消息传递系统,但许多组织也将其用于日志聚合,因为它为已发布的消息提供持久存储。

发布/订阅消息传递系统允许一个或多个生成器发布消息,而不考虑消费者的数量或他们将如何处理消息。 将自动通知订阅的客户端有关更新和新消息的创建。 与客户端定期轮询以确定新消息是否可用的系统相比,该系统更高效且可扩展。

在本教程中,您将在Debian 9上安装和使用Apache Kafka 1.1.1。

先决条件

要继续,您将需要:

  • 一个Debian 9服务器和一个具有sudo权限的非root用户。 如果您没有设置非root用户,请按照本指南中指定的步骤操作。
  • 服务器上至少有4GB的RAM。 没有这么多RAM的安装可能会导致Kafka服务失败, Java虚拟机(JVM)在启动期间抛出“Out Of Memory”异常。
  • OpenJDK 8安装在您的服务器上。 要安装此版本,请按照这些说明安装特定版本的OpenJDK。 Kafka是用Java编写的,所以它需要一个JVM; 但是,它的启动shell脚本有一个版本检测错误,导致它无法启动8以上的JVM版本。

第1步 – 为Kafka创建用户

由于Kafka可以通过网络处理请求,因此您应该为其创建专用用户。 如果Kafka服务器受到损害,这可以最大限度地减少对Debian机器的损害。 我们将在此步骤中创建一个专用的kafka用户,但是您应该创建一个不同的非root用户,以便在完成Kafka设置后在此服务器上执行其他任务。

以非root sudo用户身份登录,使用useradd命令创建名为kafka的用户:

sudo useradd kafka -m

-m标志确保将为用户创建主目录。 此主目录/home/kafka将作为我们的工作区目录,用于执行以下部分中的命令。

使用passwd设置密码:

sudo passwd kafka

使用adduser命令将kafka用户添加到sudo组,以便它具有安装Kafka依赖项所需的权限:

sudo adduser kafka sudo

您的kafka用户现已准备就绪。 使用su登录此帐户:

su -l kafka

现在我们已经创建了特定于Kafka的用户,我们可以继续下载和解压缩Kafka二进制文件。

第2步 – 下载和提取Kafka二进制文件

https://googleads.g.doubleclick.net/pagead/ads?client=ca-pub-7163569408396951&output=html&h=182&slotname=7452267897&adk=314624303&adf=331369438&pi=t.ma~as.7452267897&w=728&fwrn=4&lmt=1620976319&rafmt=11&psa=1&format=728×182&url=https%3A%2F%2Fwww.howtoing.com%2Fhow-to-install-apache-kafka-on-debian-9&flash=0&wgl=1&dt=1620976318979&bpp=16&bdt=497&idt=66&shv=r20210511&cbv=%2Fr20190131&ptt=9&saldr=aa&abxe=1&cookie=ID%3D48527fd304079bb0-22092b2c25c80005%3AT%3D1620976314%3ART%3D1620976314%3AS%3DALNI_Mb-NttFUDu89qowb358-mBugn_bfg&correlator=4005534740767&frm=20&pv=2&ga_vid=835390221.1620976319&ga_sid=1620976319&ga_hid=1902469506&ga_fc=0&rplot=4&u_tz=480&u_his=9&u_java=0&u_h=1080&u_w=1920&u_ah=1040&u_aw=1920&u_cd=24&u_nplug=0&u_nmime=0&adx=428&ady=1972&biw=1903&bih=938&scr_x=0&scr_y=0&oid=3&pvsid=4464633753090611&pem=659&ref=http%3A%2F%2Flink.putaoduo.com%2F&eae=0&fc=896&brdim=-8%2C-8%2C-8%2C-8%2C1920%2C0%2C1936%2C1056%2C1920%2C938&vis=1&rsz=d%7C%7CeEbr%7C&abl=CS&pfx=0&fu=128&bc=31&ifi=1&uci=a!1&btvi=1&fsb=1&xpc=9xGXlI1OqQ&p=https%3A//www.howtoing.com&dtd=179

让我们将kafka二进制文件下载并解压缩到我们kafka用户主目录中的专用文件夹中。

首先,在/home/kafka创建一个名为Downloads的目录来存储你的下载:

mkdir ~/Downloads

使用apt-get安装curl以便您能够下载远程文件:

sudo apt-get update && sudo apt-get install -y curl

安装curl ,使用它来下载Kafka二进制文件:

curl "http://www-eu.apache.org/dist/kafka/1.1.1/kafka_2.12-1.1.1.tgz" -o ~/Downloads/kafka.tgz

创建一个名为kafka的目录并切换到此目录。 这将是Kafka安装的基本目录:

mkdir ~/kafka && cd ~/kafka

使用tar命令解压缩下载的存档:

tar -xvzf ~/Downloads/kafka.tgz --strip 1

我们指定--strip 1标志以确保存档的内容在~/kafka/本身中提取,而不是在其中的另一个目录(例如~/kafka/kafka_ 2.12-1.1.1 / )中~/kafka/kafka_ 2.12-1.1.1 /

现在我们已经成功下载并提取了二进制文件,我们可以继续配置Kafka以允许删除主题。

第3步 – 配置Kafka服务器

Kafka的默认行为将不允许我们删除可以发布消息的主题 ,类别,组或订阅源名称。 要修改它,让我们编辑配置文件。

Kafka的配置选项在server.properties中指定。 使用nano或您喜欢的编辑器打开此文件:

nano ~/kafka/config/server.properties

让我们添加一个允许我们删除Kafka主题的设置。 将以下内容添加到文件的底部: 〜/Kafka/配置/ server.properties

delete.topic.enable = true

保存文件,然后退出nano 。 现在我们已经配置了Kafka,我们可以继续创建systemd单元文件,以便在启动时运行并启用它。

第4步 – 创建系统单元文件并启动Kafka服务器

https://googleads.g.doubleclick.net/pagead/ads?client=ca-pub-7163569408396951&output=html&h=182&slotname=7452267897&adk=314624303&adf=3526007643&pi=t.ma~as.7452267897&w=728&fwrn=4&lmt=1620976319&rafmt=11&psa=1&format=728×182&url=https%3A%2F%2Fwww.howtoing.com%2Fhow-to-install-apache-kafka-on-debian-9&flash=0&wgl=1&dt=1620976318979&bpp=4&bdt=498&idt=79&shv=r20210511&cbv=%2Fr20190131&ptt=9&saldr=aa&abxe=1&cookie=ID%3D48527fd304079bb0-22092b2c25c80005%3AT%3D1620976314%3ART%3D1620976314%3AS%3DALNI_Mb-NttFUDu89qowb358-mBugn_bfg&prev_fmts=728×182&correlator=4005534740767&frm=20&pv=1&ga_vid=835390221.1620976319&ga_sid=1620976319&ga_hid=1902469506&ga_fc=0&rplot=4&u_tz=480&u_his=9&u_java=0&u_h=1080&u_w=1920&u_ah=1040&u_aw=1920&u_cd=24&u_nplug=0&u_nmime=0&adx=428&ady=3260&biw=1903&bih=938&scr_x=0&scr_y=0&oid=3&pvsid=4464633753090611&pem=659&ref=http%3A%2F%2Flink.putaoduo.com%2F&eae=0&fc=896&brdim=-8%2C-8%2C-8%2C-8%2C1920%2C0%2C1936%2C1056%2C1920%2C938&vis=1&rsz=d%7C%7CeEbr%7C&abl=CS&pfx=0&fu=128&bc=31&ifi=2&uci=a!2&btvi=2&fsb=1&xpc=7S5HNZaHtm&p=https%3A//www.howtoing.com&dtd=185

在本节中,我们将为Kafka服务创建systemd单元文件 。 这将帮助我们执行常见的服务操作,例如以与其他Linux服务一致的方式启动,停止和重新启动Kafka。

ZooKeeper是Kafka用于管理其集群状态和配置的服务。 它通常在许多分布式系统中用作不可或缺的组件。 如果您想了解更多信息,请访问官方的ZooKeeper文档

zookeeper创建单元文件:

sudo nano /etc/systemd/system/zookeeper.service

在文件中输入以下单位定义: /etc/systemd/system/zookeeper.service

[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Type=simple
User=kafka
ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties
ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

[Unit]部分指定ZooKeeper在启动之前需要网络并且文件系统准备就绪。

[Service]部分指定systemd应使用zookeeper-server-start.shzookeeper-server-stop.sh shell文件来启动和停止服务。 它还指定ZooKeeper应该在异常退出时自动重启。

接下来,为kafka创建systemd服务文件:

sudo nano /etc/systemd/system/kafka.service

在文件中输入以下单位定义: /etc/systemd/system/kafka.service

[Unit]
Requires=zookeeper.service
After=zookeeper.service

[Service]
Type=simple
User=kafka
ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1'
ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

[Unit]部分指定此单元文件依赖于zookeeper.service 。 这将确保在kafka服务启动时zookeeper自动启动。

[Service]部分指定systemd应使用kafka-server-start.shkafka-server-stop.sh shell文件来启动和停止服务。 它还指定如果Kafka异常退出则应自动重启。

现在已经定义了单元,使用以下命令启动Kafka:

sudo systemctl start kafka

要确保服务器已成功启动,请检查kafka单元的日志日志:

sudo journalctl -u kafka

您应该看到类似于以下内容的输出:





Mar 23 13:31:48 kafka systemd[1]: Started kafka.service.

您现在有一个Kafka服务器监听端口9092

虽然我们已经启动了kafka服务,但如果我们要重新启动服务器,它将不会自动启动。 要在服务器启动时启用kafka ,请运行:

sudo systemctl enable kafka

现在我们已经启动并启用了服务,让我们检查安装。

第5步 – 测试安装

让我们发布并使用“Hello World”消息以确保Kafka服务器正常运行。 在Kafka中发布消息需要:

  • 生产者 ,可以将记录和数据发布到主题。
  • 消费者 ,从主题中读取消息和数据。

首先,输入以下命令创建一个名为TutorialTopic的主题:

~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TutorialTopic

您可以使用kafka-console-producer.sh脚本从命令行创建生成器。 它期望Kafka服务器的主机名,端口和主题名称作为参数。

通过键入以下内容将字符串"Hello, World"TutorialTopic主题:

echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic > /dev/null

接下来,您可以使用kafka-console-consumer.sh脚本创建Kafka使用者。 它期望ZooKeeper服务器的主机名和端口,以及主题名称作为参数。

以下命令使用来自TutorialTopic消息。 注意使用--from-beginning标志,它允许消费在消费者启动之前发布的消息:

~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TutorialTopic --from-beginning

如果没有配置问题,您应该在终端中看到Hello, World





Hello, World

该脚本将继续运行,等待将更多消息发布到该主题。 随意打开一个新终端并启动生产者发布更多消息。 您应该能够在消费者的输出中看到它们。

完成测试后,按CTRL+C以停止使用者脚本。 现在我们已经测试了安装,让我们继续安装KafkaT。

第6步 – 安装KafkaT(可选)

https://googleads.g.doubleclick.net/pagead/ads?client=ca-pub-7163569408396951&output=html&h=182&slotname=7452267897&adk=314624303&adf=170399143&pi=t.ma~as.7452267897&w=728&fwrn=4&lmt=1620976424&rafmt=11&psa=1&format=728×182&url=https%3A%2F%2Fwww.howtoing.com%2Fhow-to-install-apache-kafka-on-debian-9&flash=0&wgl=1&dt=1620976318963&bpp=3&bdt=482&idt=98&shv=r20210511&cbv=%2Fr20190131&ptt=9&saldr=aa&abxe=1&cookie=ID%3D48527fd304079bb0-22092b2c25c80005%3AT%3D1620976314%3ART%3D1620976314%3AS%3DALNI_Mb-NttFUDu89qowb358-mBugn_bfg&prev_fmts=728×182%2C728x182%2C300x600%2C1200x280%2C0x0%2C300x600&nras=3&correlator=4005534740767&frm=20&pv=1&ga_vid=835390221.1620976319&ga_sid=1620976319&ga_hid=1902469506&ga_fc=0&rplot=4&u_tz=480&u_his=9&u_java=0&u_h=1080&u_w=1920&u_ah=1040&u_aw=1920&u_cd=24&u_nplug=0&u_nmime=0&adx=428&ady=5771&biw=1903&bih=938&scr_x=0&scr_y=2410&oid=3&pvsid=4464633753090611&pem=659&ref=http%3A%2F%2Flink.putaoduo.com%2F&eae=0&fc=896&brdim=-8%2C-8%2C-8%2C-8%2C1920%2C0%2C1936%2C1056%2C1920%2C938&vis=1&rsz=d%7C%7CeEbr%7C&abl=CS&pfx=0&fu=128&bc=31&ifi=3&uci=a!3&btvi=5&fsb=1&xpc=X1D7mT49f0&p=https%3A//www.howtoing.com&dtd=M

KafkaT是Airbnb的一款工具,可让您更轻松地查看有关Kafka群集的详细信息,并从命令行执行某些管理任务。 因为它是一个Ruby gem,所以你需要Ruby才能使用它。 您还需要build-essential软件包才能构建其依赖的其他gem。 使用apt安装它们:

sudo apt install ruby ruby-dev build-essential

您现在可以使用gem命令安装KafkaT:

sudo gem install kafkat

KafkaT使用.kafkatcfg作为配置文件来确定Kafka服务器的安装和日志目录。 它还应该有一个条目将KafkaT指向您的ZooKeeper实例。

创建一个名为.kafkatcfg的新文件:

nano ~/.kafkatcfg

添加以下行以指定有关Kafka服务器和Zookeeper实例的必需信息: 〜/ .kafkatcfg

{
  "kafka_path": "~/kafka",
  "log_path": "/tmp/kafka-logs",
  "zk_path": "localhost:2181"
}

您现在可以使用KafkaT了。 首先,您可以使用它来查看有关所有Kafka分区的详细信息:

kafkat partitions

您将看到以下输出:





Topic                 Partition   Leader      Replicas        ISRs    
TutorialTopic         0             0         [0]             [0]
__consumer_offsets    0               0           [0]                           [0]
...
...

您将看到TutorialTopic以及__consumer_offsets ,这是Kafka用于存储客户端相关信息的内部主题。 您可以安全地忽略以__consumer_offsets开头的__consumer_offsets

要了解有关KafkaT的更多信息,请参阅其GitHub存储库

第7步 – 设置多节点群集(可选)

如果要使用更多Debian 9计算机创建多代理群集,则应在每台新计算机上重复第1步,第4步和第5步。 此外,您应该在server.properties文件中为每个进行以下更改:

  • 应更改broker.id属性的值,使其在整个群集中是唯一的。 此属性唯一标识集群中的每个服务器,并且可以将任何字符串作为其值。 例如, "server1""server2"等。
  • 应更改zookeeper.connect属性的值,以便所有节点都指向同一个ZooKeeper实例。 此属性指定ZooKeeper实例的地址,并遵循<HOSTNAME/IP_ADDRESS>:<PORT>格式。 例如, " 203.0.113.0 :2181"" 203.0.113.1 :2181"等。

如果要为群集设置多个ZooKeeper实例,则每个节点上zookeeper.connect属性的值应该是一个相同的逗号分隔字符串,其中列出了所有ZooKeeper实例的IP地址和端口号。

第8步 – 限制Kafka用户

https://googleads.g.doubleclick.net/pagead/ads?client=ca-pub-7163569408396951&output=html&h=182&slotname=7452267897&adk=314624303&adf=530343939&pi=t.ma~as.7452267897&w=728&fwrn=4&lmt=1620976425&rafmt=11&psa=1&format=728×182&url=https%3A%2F%2Fwww.howtoing.com%2Fhow-to-install-apache-kafka-on-debian-9&flash=0&wgl=1&dt=1620976318966&bpp=1&bdt=485&idt=96&shv=r20210511&cbv=%2Fr20190131&ptt=9&saldr=aa&abxe=1&cookie=ID%3D48527fd304079bb0-22092b2c25c80005%3AT%3D1620976314%3ART%3D1620976314%3AS%3DALNI_Mb-NttFUDu89qowb358-mBugn_bfg&prev_fmts=728×182%2C728x182%2C300x600%2C1200x280%2C0x0%2C300x600%2C728x182&nras=3&correlator=4005534740767&frm=20&pv=1&ga_vid=835390221.1620976319&ga_sid=1620976319&ga_hid=1902469506&ga_fc=0&rplot=4&u_tz=480&u_his=9&u_java=0&u_h=1080&u_w=1920&u_ah=1040&u_aw=1920&u_cd=24&u_nplug=0&u_nmime=0&adx=428&ady=7137&biw=1903&bih=938&scr_x=0&scr_y=3588&oid=3&pvsid=4464633753090611&pem=659&ref=http%3A%2F%2Flink.putaoduo.com%2F&eae=0&fc=896&brdim=-8%2C-8%2C-8%2C-8%2C1920%2C0%2C1936%2C1056%2C1920%2C938&vis=1&rsz=d%7C%7CeEbr%7C&abl=CS&pfx=0&fu=128&bc=31&ifi=4&uci=a!4&btvi=6&fsb=1&xpc=742j1SQH1c&p=https%3A//www.howtoing.com&dtd=M

现在所有安装都已完成,您可以删除kafka用户的管理员权限。 在执行此操作之前,请注销并以任何其他非root sudo用户身份重新登录。 如果您仍在运行相同的shell会话,那么只需键入exit即可启动本教程。

从sudo组中删除kafka用户:

sudo deluser kafka sudo

要进一步提高Kafka服务器的安全性,请使用passwd命令锁定kafka用户的密码。 这可以确保没有人可以使用此帐户直接登录服务器:

sudo passwd kafka -l

此时,只有root或sudo用户可以通过键入以下命令以kafka身份登录:

sudo su - kafka

将来,如果要解锁它,请使用带-u选项的passwd

sudo passwd kafka -u

您现在已成功限制kafka用户的管理员权限。

结论

您现在可以在Debian服务器上安全地运行Apache Kafka。 您可以使用Kafka客户端 (可用于大多数编程语言)创建Kafka生产者和使用者,从而在项目中使用它。 要了解有关Kafka的更多信息,您还可以查阅其文档

https://www.howtoing.com/how-to-install-apache-kafka-on-debian-9

分类
Java Spring Boot 未分类

mybatis-plus的坑

mybatis-plus的坑

BaseMapper的selectBatchIds方法,当idList参数为空时,调用此方法会导致服务cpu占用率几乎100%
分类
Java

gRPC基础教程之一

前言

本文将介绍 gRPC、Protocol Buffers 的概念,同时会给出 Protocol Buffers 代码生成器的使用教程,还有编写第一个基于 gRPC 的服务提供者与服务消费者的示例程序。

相关站点

gRPC 简介

gRPC 是一个高性能、开源和通用的 RPC 框架,面向移动和 HTTP/2 设计。目前提供 C、Java、Go 语言版本,分别是:grpc、grpc-java、grpc-go,其中 C 版本支持 C、C++、Node.js、Python、Ruby、Objective-C、PHP、C#。gRPC 基于 HTTP/2 标准设计,带来诸如双向流、流控、头部压缩、单 TCP 连接上的多复用请求等特性。在 gRPC 里客户端应用可以像调用本地对象一样直接调用另一台不同的机器上服务端应用的方法,更容易地创建分布式应用和服务。与许多 RPC 系统类似,gRPC 也是基于以下理念:定义一个服务,指定其能够被远程调用的方法(包含参数和返回类型)。在服务端实现这个接口,并运行一个 gRPC 服务器来处理客户端调用。在客户端拥有一个存根能够像服务端一样的方法。值得说明的是,gRPC 客户端和服务端可以在多种环境中运行和交互,支持用任何 gRPC 支持的语言来编写,所以可以很容易地用 Java 创建一个 gRPC 服务端,用 Go、Python、Ruby 来创建客户端。

使用 Protocol Buffers

gRPC 默认使用 Protocol Buffers,这是 Google 开源的一套成熟的结构数据序列化机制(当然也可以使用其他数据格式如 JSON)。当使用 proto files 创建 gRPC 服务,用 Protocol Buffers 消息类型来定义方法参数和返回类型。尽管 Protocol Buffers 已经存在了一段时间,官方的示例代码种使用了一种名叫 proto3 的新风格的 Protocol Buffers,它拥有轻量简化的语法、一些有用的新功能,并且支持更多新语言。当前针对 Java 和 C++ 发布了 beta 版本,针对 JavaNano(即 Android Java)发布 alpha 版本,在Protocol Buffers Github 源码库里有 Ruby 支持, 在 Github 源码库里还有针对 Go 语言的生成器, 对更多语言的支持正在开发中。虽然可以使用 proto2 (当前默认的 Protocol Buffers 版本), 通常建议在 gRPC 里使用 proto3,因为这样可以使用 gRPC 支持全部范围的的语言,并且能避免 proto2 客户端与 proto3 服务端交互时出现的兼容性问题,反之亦然。

本地编译安装 Protocol Buffers(可选)

参考自 gRPC-Java、Protobuf 编译构建的官方教程,一般情况下不需要构建 gRPC-Java,只有在对 gRPC-Java 的源码进行了更改或测试使用 gRPC-Java 库的非发布版本(例如 master 分支)时才需要构建。若本地安装了 Protobuf,则可以直接通过命令的方式调用 Protobuf 的代码生成器,无需再依赖额外的 IDE 插件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# 系统环境
CentOS Linux release 7.6.1810 (Core)
Linux develop 3.10.0-957.21.3.el7.x86_64 #1 SMP Tue Jun 18 16:35:19 UTC 2019 x86_64 x86_64 x86_64 GNU/Linux

# 拉取源码
# git clone https://github.com/google/protobuf.git

# 进入源码目录
# cd protobuf

# 切换至需要编译的版本的分支
# git checkout v3.7.1

# 查看当前所在的分支信息
# git branch -v

# 检测安装环境
# ./autogen.sh
# ./configure –disable-shared

# 编译安装
# make -j 8
# make install

# 如果/usr/local/lib不在库搜索路径中,可以通过运行以下命令添加
# sh -c ‘echo /usr/local/lib >> /etc/ld.so.conf’

# 使添加的库搜索路径生效
# ldconfig

# 查看protobuf安装的版本号
# protoc –version

# 编写.proto文件,使用protobuf的代码生成器自动生成Java代码,命令格式如下
# protoc -I=$SRC_DIR –java_out=$DST_DIR $SRC_DIR/addressbook.proto

# 默认安装路径:/usr/local
# 指定安装目录可以使用此命令: ./configure –disable-shared –prefix=/usr/local/protobuf-3.7.1

Eclipse 项目中添加 Protobuf 自动生成代码的 Maven 插件与 Protobuf 依赖

Protobuf 的原型文件和一些适合的插件,默认放在 src/main/proto 和 src/test/proto 目录中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>1.21.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>1.21.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>1.21.0</version>
</dependency>

<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.5.0.Final</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.5.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.7.1:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.21.0:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

往 Gradle 构建的项目添加 Protobuf 自动生成代码的插件与 Protobuf 依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
plugins {
id ‘com.google.protobuf’ version ‘0.8.8’
}

protobuf {
protoc {
artifact = “com.google.protobuf:protoc:3.7.1”
}
plugins {
grpc {
artifact = ‘io.grpc:protoc-gen-grpc-java:1.21.0’
}
}
generateProtoTasks {
all()*.plugins {
grpc {}
}
}
}

dependencies {
compile ‘io.grpc:grpc-stub:1.21.0’
compile ‘io.grpc:grpc-protobuf:1.21.0’
compile ‘io.grpc:grpc-netty-shaded:1.21.0’
testCompile group: ‘junit’, name: ‘junit’, version: ‘4.12’
}

编写 Proto 文件(定义服务),执行编译后自动生成 Java 文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# 创建gradle工程grpc-demo-provider,目录结构如下:
grpc-demo-provider/
├── build.gradle
└── src
├── main
│   ├── java
│   ├── proto
│   │   └── helloworld.proto
│   └── resources
└── test
├── java
├── proto
└── resources

# 进入工程目录
# cd grpc-demo-provider

# 编辑build.gradle文件,添加protobuf插件与依赖,可参考上面给出的gradle配置内容

# 创建proto文件
# mkdir -p src/main/proto
# vim src/main/proto/helloworld.proto

syntax = “proto3”;
option java_multiple_files = true;
option java_package = “com.grpc.demo.generate”;
option java_outer_classname = “HelloWorldProto”;
option objc_class_prefix = “HLW”;

package helloworld;

service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply) {}
}

message HelloRequest {
string name = 1;
}

message HelloReply {
string message = 1;
}

# 执行编译,自动生成Java文件
# gradle clean build

# 查看自动生成的文件目录结构,默认生成文件所在的目录是:$buildDir/generated/source/proto,其中Message在main/java目录下,Service在目录main/grpc下
# tree build/generated/source/proto
main
├── grpc
│   └── com
│   └── grpc
│   └── demo
│   └── generate
│   └── GreeterGrpc.java
└── java
└── com
└── grpc
└── demo
└── generate
├── HelloReply.java
├── HelloReplyOrBuilder.java
├── HelloRequest.java
├── HelloRequestOrBuilder.java
└── HelloWorldProto.java

Gradle 指定 Protobuf 代码自动生成的目录位置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// 指定Message代码的生成位置,最终生成位置在src/main/java目录下
protobuf {
generatedFilesBaseDir = “src”
}

// 指定Service代码的生成位置,最终生成位置在src/main/java目录下
protobuf {
generateProtoTasks {
all()*.plugins {
grpc {
outputSubDir = ‘java’
}
}
}
}

// 完整的写法,同时指定Message、Service代码生成的目录位置为src/main/java
protobuf {
protoc {
artifact = “com.google.protobuf:protoc:3.7.1”
}
plugins {
grpc {
artifact = ‘io.grpc:protoc-gen-grpc-java:1.21.0’
}
}
generateProtoTasks {
all()*.plugins {
grpc {
outputSubDir = ‘java’
}
}
}
generatedFilesBaseDir = ‘src’
}

RPC 服务提供者的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package com.grpc.demo.provider.service;

import com.grpc.demo.generate.GreeterGrpc;
import com.grpc.demo.generate.HelloReply;
import com.grpc.demo.generate.HelloRequest;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;

import java.io.IOException;
import java.util.logging.Logger;

public class HelloWorldProvider {

private Server server;
private static final Logger logger = Logger.getLogger(HelloWorldProvider.class.getName());

private void start() throws IOException {
int port = 50051;
server = ServerBuilder.forPort(port)
.addService(new GreeterImpl())
.build()
.start();
logger.info(“==> Server started, listening on ” + port);

Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
System.err.println(“*** shutting down gRPC server since JVM is shutting down”);
HelloWorldProvider.this.stop();
System.err.println(“*** server shut down”);
}
});
}

private void stop() {
if (server != null) {
server.shutdown();
}
}

/**
* Await termination on the main thread since the grpc library uses daemon threads.
*/
private void blockUntilShutdown() throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
}

public static void main(String[] args) throws IOException, InterruptedException {
final HelloWorldProvider server = new HelloWorldProvider();
server.start();
server.blockUntilShutdown();
}

static class GreeterImpl extends GreeterGrpc.GreeterImplBase {

@Override
public void sayHello(HelloRequest req, StreamObserver<HelloReply> responseObserver) {
HelloReply reply = HelloReply.newBuilder().setMessage(“Hello ” + req.getName()).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
}
}

RPC 服务消费者的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.grpc.demo.consumer.service;

import com.grpc.demo.generate.GreeterGrpc;
import com.grpc.demo.generate.HelloReply;
import com.grpc.demo.generate.HelloRequest;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;

import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

public class HelloWorldConsumer {

private final ManagedChannel channel;
private final GreeterGrpc.GreeterBlockingStub blockingStub;
private static final Logger logger = Logger.getLogger(HelloWorldConsumer.class.getName());

public HelloWorldConsumer(String host, int port) {
this(ManagedChannelBuilder.forAddress(host, port)
.usePlaintext()
.build());
}

HelloWorldConsumer(ManagedChannel channel) {
this.channel = channel;
blockingStub = GreeterGrpc.newBlockingStub(channel);
}

public void shutdown() throws InterruptedException {
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
}

public void greet(String name) {
logger.info(“==> Will try to greet ” + name + ” …”);
HelloRequest request = HelloRequest.newBuilder().setName(name).build();
HelloReply response;
try {
response = blockingStub.sayHello(request);
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, “RPC failed: {0}”, e.getStatus());
return;
}
logger.info(“==> Greeting: ” + response.getMessage());
}

public static void main(String[] args) throws Exception {
HelloWorldConsumer client = new HelloWorldConsumer(“localhost”, 50051);
try {
String user = “World”;
client.greet(user);
} finally {
client.shutdown();
}
}
}

先后启动Provider、Consumer应用,最终输出的日志信息如下图所示

Provider应用的日志信息:

Consumer应用的日志信息:

本文作者: Clay
发布时间: 2019-06-25 20:40:21
本文链接: https://www.techgrow.cn/posts/ceb4ff2b.html
版权声明: 本作品采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可。转载请注明出处!

分类
Java 开发经历

spring cloud config遇到的问题

本机环境调试一个使用到spring cloud config的项目,启动到时候访问config服务报503错误,感到奇怪到是之前一直运行得好好到。后来终于发现网络启用了全局代理模式,也就是通过远程访问本地网络环境,那肯定是不通的。

附上http状态码说明:

5xx(服务器错误)

这些状态代码表示服务器在尝试处理请求时发生内部错误。 这些错误可能是服务器本身的错误,而不是请求出错。

代码 说明

500 (服务器内部错误) 服务器遇到错误,无法完成请求。

501 (尚未实施) 服务器不具备完成请求的功能。 例如,服务器无法识别请求方法时可能会返回此代码。

502 (错误网关) 服务器作为网关或代理,从上游服务器收到无效响应。

503 (服务不可用) 服务器目前无法使用(由于超载或停机维护)。 通常,这只是暂时状态。

504 (网关超时) 服务器作为网关或代理,但是没有及时从上游服务器收到请求。

505 (HTTP 版本不受支持) 服务器不支持请求中所用的 HTTP 协议版本。

分类
Java

Spring Security实例-具有基于令牌的身份验证的Spring Security

介绍
如何使用Spring实现基于令牌的安全性功能。 让我们看一下工作流程以获得更好的理解:

  1. 用户使用用户名和密码发送请求。
  2. Spring Security将令牌返回给客户端API。
  3. 客户端API在每个请求中发送令牌作为身份验证的一部分。
  4. 使令牌在注销时失效。

让我们看看这个工作流程的样子:

1.Maven安装
我们将使用Spring Boot和Maven处理依赖关系。 在构建Spring Boot Web应用程序时,我们将为应用程序使用以下starters。

  1. Spring Boot Web starter
  2. Spring Boot Security starter.
  3. JPA starter

现在,我们的pom.xml如下所示:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-security</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-jpa</artifactId>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-lang3</artifactId>
        <version>3.8.1</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.security</groupId>
        <artifactId>spring-security-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

2. 数据库布局

我将在数据库级别简化此应用程序,我将使用一个表来存储用户详细信息和令牌。 在用户配置文件请求用户创建一个并返回此令牌之前,不会有针对用户个人资料的令牌。 表结构如下所示:

这不是产品级别就绪的表,但主要思想是存储用于客户概要文件的令牌,并将该令牌用于身份验证和授权。 您可以根据需要更改/调整此工作流程。

3. JPA 资料库

要保存并获取客户档案的令牌信息,我们需要创建一个自定义存储库。 该存储库负责根据令牌获取客户信息。 客户服务将使用我们的客户资源库基于令牌获取客户详细信息或执行登录。

@Repository
public interface CustomerRepository extends CrudRepository<Customer, Long> {

    @Query(value = "SELECT u FROM Customer u where u.userName = ?1 and u.password = ?2 ")
    Optional login(String username,String password);
    Optional findByToken(String token);
}

4. 客户验证服务

我们的客户验证服务遵循两个核心运营

提供登录功能以将令牌返回给客户端。
根据提供的令牌验证客户。
我们的客户服务如下所示:

@Service("customerService")
public class DefaultCustomerService implements CustomerService {

    @Autowired
    CustomerRepository customerRepository;

    @Override
    public String login(String username, String password) {
        Optional customer = customerRepository.login(username,password);
        if(customer.isPresent()){
            String token = UUID.randomUUID().toString();
            Customer custom= customer.get();
            custom.setToken(token);
            customerRepository.save(custom);
            return token;
        }

        return StringUtils.EMPTY;
    }

    @Override
    public Optional findByToken(String token) {
        Optional customer= customerRepository.findByToken(token);
        if(customer.isPresent()){
            Customer customer1 = customer.get();
            User user= new User(customer1.getUserName(), customer1.getPassword(), true, true, true, true,
                    AuthorityUtils.createAuthorityList("USER"));
            return Optional.of(user);
        }
        return  Optional.empty();
    }
}

让我们检查一下上面代码中的操作:

  1. 登录方法接受用户名和密码,并将返回用于成功凭证的令牌
  2. 我们将对所有安全资源使用第二种方法  

5. Spring Security 配置

这些是使用Spring Security和基于令牌的身份验证来保护REST API的主要配置类。在本节中,我们将讨论以下类:

  • AuthenticationProvider : 通过其身份验证令牌查找用户.
  • AuthenticationFilter :从请求标头中提取身份验证令牌
  • SecurityConfiguration : Spring Security 配置

5.1 Token Authentication Provider

AuthenticationProvider负责根据客户端在标头中发送的身份验证令牌来查找用户。 这就是我们基于Spring的令牌身份验证提供程序的外观:

@Component
public class AuthenticationProvider extends AbstractUserDetailsAuthenticationProvider {

 @Autowired
 CustomerService customerService;

 @Override
 protected void additionalAuthenticationChecks(UserDetails userDetails, UsernamePasswordAuthenticationToken usernamePasswordAuthenticationToken) throws AuthenticationException {
  //
 }

 @Override
 protected UserDetails retrieveUser(String userName, UsernamePasswordAuthenticationToken usernamePasswordAuthenticationToken) throws AuthenticationException {

  Object token = usernamePasswordAuthenticationToken.getCredentials();
  return Optional
   .ofNullable(token)
   .map(String::valueOf)
   .flatMap(customerService::findByToken)
   .orElseThrow(() -> new UsernameNotFoundException("Cannot find user with authentication token=" + token));
 }

我们的AuthenticationProvider使用CustomerService根据令牌查找客户。

5.2  Token Authentication Filter

令牌认证过滤器负责从头获取认证过滤器,并调用认证管理器进行认证。 身份验证过滤器如下所示:

public class AuthenticationFilter extends AbstractAuthenticationProcessingFilter {

    AuthenticationFilter(final RequestMatcher requiresAuth) {
        super(requiresAuth);
    }

    @Override
    public Authentication attemptAuthentication(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse) throws AuthenticationException, IOException, ServletException {

        Optional tokenParam = Optional.ofNullable(httpServletRequest.getHeader(AUTHORIZATION)); //Authorization: Bearer TOKEN
        String token= httpServletRequest.getHeader(AUTHORIZATION);
        token= StringUtils.removeStart(token, "Bearer").trim();
        Authentication requestAuthentication = new UsernamePasswordAuthenticationToken(token, token);
        return getAuthenticationManager().authenticate(requestAuthentication);

    }

    @Override
    protected void successfulAuthentication(final HttpServletRequest request, final HttpServletResponse response, final FilterChain chain, final Authentication authResult) throws IOException, ServletException {
        SecurityContextHolder.getContext().setAuthentication(authResult);
        chain.doFilter(request, response);
    }
}

让我们强调几个这里的要点:

  1. 此过滤器将身份验证委托给UsernamePasswordAuthenticationToken 
  2. 此过滤器仅对特定的URL启用(在下一节中说明)

5.3  Spring Security配置

这负责将所有内容组合在一起。让我们看看我们的Spring安全配置是怎样:

@Configuration
@EnableWebSecurity
@EnableGlobalMethodSecurity(prePostEnabled = true)
public class SecurityConfiguration extends WebSecurityConfigurerAdapter {


 private static final RequestMatcher PROTECTED_URLS = new OrRequestMatcher(
  new AntPathRequestMatcher("/api/**")
 );

 AuthenticationProvider provider;

 public SecurityConfiguration(final AuthenticationProvider authenticationProvider) {
  super();
  this.provider = authenticationProvider;
 }

 @Override
 protected void configure(final AuthenticationManagerBuilder auth) {
  auth.authenticationProvider(provider);
 }

 @Override
 public void configure(final WebSecurity webSecurity) {
  webSecurity.ignoring().antMatchers("/token/**");
 }

 @Override
 public void configure(HttpSecurity http) throws Exception {
  http.sessionManagement()
   .sessionCreationPolicy(SessionCreationPolicy.STATELESS)
   .and()
   .exceptionHandling()
   .and()
   .authenticationProvider(provider)
   .addFilterBefore(authenticationFilter(), AnonymousAuthenticationFilter.class)
   .authorizeRequests()
   .requestMatchers(PROTECTED_URLS)
   .authenticated()
   .and()
   .csrf().disable()
   .formLogin().disable()
   .httpBasic().disable()
   .logout().disable();
 }

 @Bean
 AuthenticationFilter authenticationFilter() throws Exception {
  final AuthenticationFilter filter = new AuthenticationFilter(PROTECTED_URLS);
  filter.setAuthenticationManager(authenticationManager());
  //filter.setAuthenticationSuccessHandler(successHandler());
  return filter;
 }

 @Bean
 AuthenticationEntryPoint forbiddenEntryPoint() {
  return new HttpStatusEntryPoint(HttpStatus.FORBIDDEN);
 }
}

让我们检查一些要点:

  1. 与请求模式/api/ **匹配的所有URL都是安全的,并且需要有效的令牌进行访问。
  2. The webSecurity.ignoring().antMatchers("/token/**") 显示安全检查中排除的所有请求。
  3. 我们已经在Spring安全性中注册了AuthenticationProvider。 Spring Security将检查令牌验证.
  4. 配置方法包括基本配置以及禁用基于表单的登录和其他标准功能

此步骤总结了使用Spring Security和基于令牌的身份验证来保护REST API的步骤。 在下一步中,我们将设置一个简单的Spring Boot Web应用程序以测试我们的工作流程。

 6. Spring Boot 控制器

让我们创建一个简单的Spring Boot控制器来测试我们的应用程序:

6.1 Token Controller

该控制器负责返回令牌以获取有效凭证:

@RestController
public class TokenController {

    @Autowired
    private CustomerService customerService;

    @PostMapping("/token")
    public String getToken(@RequestParam("username") final String username, @RequestParam("password") final String password){
       String token= customerService.login(username,password);
       if(StringUtils.isEmpty(token)){
           return "no token found";
       }
       return token;
    }
}

6.2 安全用户资料控制器

这是安全控制器。 它将返回有效令牌的用户资料。仅在传递有效令牌后才能访问此控制器:

@RestController
public class UserProfileController {

    @Autowired
    private CustomerService customerService;

    @GetMapping(value = "/api/users/user/{id}",produces = "application/json")
    public Customer getUserDetail(@PathVariable Long id){
        return customerService.findById(id);
    }
}

7. 测试应用

让我们构建和部署应用程序。一旦应用程序运行,就可以使用任何REST客户端来测试我们的应用程序(我使用的是Postman):

不使用 Access Token:

让我们从API获取令牌:

将令牌用于安全URL:

总结

在本文中,我们了解了如何使用基于令牌的方法来使用Spring Security保护REST API。 我们介绍了用于保护REST API的不同配置和设置。

若有需要这篇文章的Intellij IDEA SpringBoot项目源代码的朋友,可先微信扫下图二维码打赏8~9元,然后截图付款收据,加QQ:5404125(备注:源码)联系下载源代码。

分类
Java

Spring Cloud入门使用例子

SpringCloud 是微服务中的翘楚,最佳的落地方案。本仓库整合了一些工作中经常用到的一些技术,作为使用微服务框架的演示。

https://github.com/intomylife/SpringCloud