Kafka是一个高吞吐量分布式消息系统。linkedin开源的kafka。 Kafka就跟这个名字一样,设计非常独特。首先,kafka的开发者们认为不需要在内存里缓存什么数据,操作系统的文件缓存已经足够完善和强大,只要你不搞随机写,顺序读写的性能是非常高效的。kafka的数据只会顺序append,数据的删除策略是累积到一定程度或者超过一定时间再删除。Kafka另一个独特的地方是将消费者信息保存在客户端而不是MQ服务器,这样服务器就不用记录消息的投递过程,每个客户端都自己知道自己下一次应该从什么地方什么位置读取消息,消息的投递过程也是采用客户端主动pull的模型,这样大大减轻了服务器的负担。Kafka还强调减少数据的序列化和拷贝开销,它会将一些消息组织成Message Set做批量存储和发送,并且客户端在pull数据的时候,尽量以zero-copy的方式传输,利用sendfile(对应java里的 FileChannel.transferTo/transferFrom)这样的高级IO函数来减少拷贝开销。可见,kafka是一个精心设计,特定于某些应用的MQ系统,这种偏向特定领域的MQ系统我估计会越来越多,垂直化的产品策略值的考虑。
操作步骤,官网有详细说明,之前没去看,走了不少弯路
1. 下载并解压Kafka
https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.0/kafka-0.8.0.tgz
我解压在/usr/local下
> tar -zxvf kafka-<VERSION>.tgz > cd kafka-<VERSION> > ./sbt update > ./sbt package > ./sbt assembly-package-dependency
执行上面的步骤时, 机器需要联网。
2. 修改conf/server.properties 属性文件
host.name=192.168.80.100 (修改为主机ip,不然服务器返回给客户端的是主机的hostname,客户端并不一定能够识别)
修改conf/zookeeper.properties 属性文件
dataDir=/usr/local/tmp/zookeeper (zookeeper临时数据文件)
3. 先后自动zookeeper 和 kafka
cd bin 启动zookeeper ./zookeeper-server-start.sh ../config/zookeeper.properties & (&推出命令行,服务守护执行) 启动kafka ./kafka-server-start.sh ../config/server.properties &
4. 新建kafka maven工程
pom.xml添加 <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.0</version> </dependency>
生产者:
public class KafkaProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("zk.connect", "192.168.80.100:2181");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("metadata.broker.list", "192.168.80.100:9092");
props.put("request.required.acks", "1");
//props.put("partitioner.class", "com.xq.SimplePartitioner");
ProducerConfig config = new ProducerConfig(props);
final Producer<String, String> producer = new Producer<String, String>(config);
String ip = "192.168.80.1";
String msg ="this is a message!";
KeyedMessage<String, String> data = new KeyedMessage<String, String>("test", ip,msg);
try {
while(true){
producer.send(data);
}
} catch (Exception e) {
e.printStackTrace();
producer.close();
}
}
}
消费者:
public class KafkaConsumer {
public static void main(String[] args) {
// specify some consumer properties
Properties props = new Properties();
props.put("zookeeper.connect", "192.168.80.100:2181");
props.put("zookeeper.connection.timeout.ms", "10000");
props.put("group.id", "test_group");
// Create the connection to the cluster
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector connector = Consumer
.createJavaConsumerConnector(consumerConfig);
Map<String, Integer> topics = new HashMap<String, Integer>();
topics.put("test", 1);
Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams = connector
.createMessageStreams(topics);
// ExecutorService threadPool = Executors.newFixedThreadPool(2);
List<KafkaStream<byte[], byte[]>> streams = topicMessageStreams
.get("test");
int i = 0;
for (final KafkaStream<byte[], byte[]> stream : streams) {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext())
System.out.println(new String(it.next().message()) + "," + i++);
}
}
}
目前的单机配置,消费性能惊人。先有producer产生数据,然后再启动consumer,一秒接收数万条消息不再话下。
转载请注明:爱开源 » Kafka-0.8.0单机配置安装