How to config Kafka SASL zookeeper authentication

2018-11-21 17:06:00

单节点zookeeper配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
[root@localhost kafka]# cat /etc/zookeeper/zoo.cfg 
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
dataDir=/var/lib/zookeeper/data
# the port at which the clients will connect
clientPort=2181

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000

JAAS配置文件指定身份认证插件,并提供登录到zookeeper的登录名与密码。我创建了一个配置文件kafka_zoo_jaas.conf

1
2
3
4
5
6
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin"
user_admin="admin";
};

拷贝第三方Jar包到ZooKeeper目录,可以从https://mvnrepository.com/ 查看需要的包依赖

1
2
3
4
5
kafka-clients-2.0.1.jar 
lz4-java-1.4.jar
org.osgi.core-4.3.0.jar
slf4j-api-1.7.25.jar
snappy-java-1.1.4.jar

/usr/local/kafka_2.12-1.1.1/config 目录下的 server.properties 文件 进行修改

1
2
3
4
5
6
7
8
9
listeners=SASL_PLAINTEXT://172.16.10.112:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=true
auto.create.topics.enable=false
super.users=User:admin
delete.topic.enable=true

在config目录添加kafka_server_jaas.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin"
user_admin="admin"
user_kafka="kafka";
};


Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin";
};

KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="kafka"
password="kafka";
};

kafka-run-class.sh添加下面配置

1
2
3
4
5
6
7
8
KAFKA_SASL_OPTS="-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf"

# Launch mode
if [ "x$DAEMON_MODE" = "xtrue" ]; then
nohup $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_SASL_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
else
exec $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_SASL_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@"
fi

kafka已经开启了SASL/PLAIN权限认证,producer和consumer不做配置是无法连接kafka的
在config目录下创建kafka_client_jaas.conf

1
2
3
4
5
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="kafka"
password="kafka";
};

在config下的producer.properties和consumer.properties添加下面配置:

1
2
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

在bin下的kafka-console-producer.sh和kafka-console-consumer.sh下添加下面配置:

1
2
3
if [ "x$KAFKA_OPTS" ]; then
export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/config/kafka_client_jaas.conf"
fi

为你需要使用的用户授权

1
./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181/kafka --add --allow-principal User:alice --operation Read --operation Write --topic sean-security

查询已经授权的用户

1
./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181/kafka --list --topic sean-security

配置完成后,测试下,注意如果有开机自启,需要修改相应脚本,加载KAFKA_OPTS 选项
创建topic

1
./kafka-topics.sh  --zookeeper localhost:2181/kafka --create --topic Test  --partitions 1 --replication-factor 1

producer发送消息:

1
2
3

./kafka-console-producer.sh --broker-list 172.16.10.100:9092 --topic test
--producer.config /opt/kafk/config/producer.properties

consumer消费消息:

1
2
./kafka-console-consumer.sh --bootstrap-server 172.16.10.100:9092 --topic test --from-beginning
--consumer.config /opt/kafk/config/consumer.properties

如果是在程序里面只需要在kafka配置信息里添加如下配置:

1
2
3
System.setProperty("java.security.auth.login.config", "kafka_client_jaas.conf"); // 环境变量添加,需要输入配置文件的路径
propsMap.put("security.protocol","SASL_PLAINTEXT");
propsMap.put("sasl.mechanism", "PLAIN");

无论是producer还是consumer,都需要配置认证信息,其中kafka_client_jaas.conf的内容如下:

1
2
3
4
5
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="kafka"
password="kafka";
};

ref
Documentation
为Kafka 添加用户名 密码权限,即SASL/PLAIN
Kafka的SASL/PLAIN认证配置说明
Kafka SASL zookeeper authentication
单机节点Kafka配置SASL用户名密码认证
Kafka SASL配置 & Demo测试