http://kafka.apache.org/07/quickstart.html
和基本的消费者群体示例:
https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
我已经编写了Consumer和ConsumerThreadPool,如上所示:
import kafka.consumer.KafkaStream;
import kafka.consumer.ConsumerIterator;
public class Consumer implements Runnable {
private KafkaStream m_stream;
private Integer m_threadNumber;
public Consumer(KafkaStream a_stream,Integer a_threadNumber) {
m_threadNumber = a_threadNumber;
m_stream = a_stream;
}
public void run() {
ConsumerIterator<byte[],byte[]> it = m_stream.iterator();
while (it.hasNext()) {
System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));
}
System.out.println("Shutting down Thread: " + m_threadNumber);
}
}
其他几个方面:我使用spring来管理我的zookeeper:
import javax.inject.Named;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
@Configuration
@ComponentScan("com.truecar.inventory.worker.core")
public class AppConfig {
@Bean
@Named("consumerConfig")
private static ConsumerConfig createConsumerConfig() {
String zookeeperAddress = "127.0.0.1:2181";
String groupId = "inventory";
Properties props = new Properties();
props.put("zookeeper.connect",zookeeperAddress);
props.put("group.id",groupId);
props.put("zookeeper.session.timeout.ms","400");
props.put("zookeeper.sync.time.ms","200");
props.put("auto.commit.interval.ms","1000");
return new ConsumerConfig(props);
}
}
我正在使用Maven和OneJar Maven插件进行编译.但是,我编译然后运行生成的一个jar我得到以下错误:
Aug 26,2013 6:15:41 PM org.springframework.context.annotation.ClasspathScanningCandidateComponentProvider registerDefaultFilters INFO: JSR-330 'javax.inject.Named' annotation found and supported for component scanning Exception in thread "main" java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at com.simontuffs.onejar.Boot.run(Boot.java:340) at com.simontuffs.onejar.Boot.main(Boot.java:166) Caused by: java.lang.NoClassDefFoundError: scala/ScalaObject at java.lang.classLoader.defineClass1(Native Method) at java.lang.classLoader.defineClass(ClassLoader.java:792) at com.simontuffs.onejar.JarClassLoader.defineClass(JarClassLoader.java:803) at com.simontuffs.onejar.JarClassLoader.findClass(JarClassLoader.java:710) at java.lang.classLoader.loadClass(ClassLoader.java:424) at com.simontuffs.onejar.JarClassLoader.loadClass(JarClassLoader.java:630) at java.lang.classLoader.loadClass(ClassLoader.java:357) at java.lang.class.getDeclaredMethods0(Native Method) at java.lang.class.privateGetDeclaredMethods(Class.java:2521) at java.lang.class.getDeclaredMethods(Class.java:1845) at org.springframework.core.type.StandardAnnotationMetadata.getAnnotatedMethods(StandardAnnotationMetadata.java:180) at org.springframework.context.annotation.ConfigurationClassparser.doProcessConfigurationClass(ConfigurationClassparser.java:222) at org.springframework.context.annotation.ConfigurationClassparser.processConfigurationClass(ConfigurationClassparser.java:165) at org.springframework.context.annotation.ConfigurationClassparser.parse(ConfigurationClassparser.java:140) at org.springframework.context.annotation.ConfigurationClasspostProcessor.processConfigBeanDeFinitions(ConfigurationClasspostProcessor.java:282) at org.springframework.context.annotation.ConfigurationClasspostProcessor.postProcessBeanDeFinitionRegistry(ConfigurationClasspostProcessor.java:223) at org.springframework.context.support.AbstractApplicationContext.invokebeanfactoryPostProcessors(AbstractApplicationContext.java:630) at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:461) at org.springframework.context.annotation.AnnotationConfigApplicationContext.<init>(AnnotationConfigApplicationContext.java:73) at com.truecar.inventory.worker.core.consumer.ConsumerThreadPool.<clinit>(ConsumerThreadPool.java:31) at com.truecar.inventory.worker.core.application.Starter.main(Starter.java:20) ... 6 more Caused by: java.lang.classNotFoundException: scala.ScalaObject at com.simontuffs.onejar.JarClassLoader.findClass(JarClassLoader.java:713) at java.lang.classLoader.loadClass(ClassLoader.java:424) at com.simontuffs.onejar.JarClassLoader.loadClass(JarClassLoader.java:630) at java.lang.classLoader.loadClass(ClassLoader.java:357) ... 27 more
现在,我对Kafka知之甚少,对Scala一无所知.我该如何解决?接下来我该怎么办?这是一个已知的问题?我需要其他依赖项吗?这是我的pom.xml中的kafka版本:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.0-beta1</version>
</dependency>
更新:我联系了Kafka dev邮件列表,他们让我知道了scala依赖项的一些特定版本要求.但是,还有一个未记录的log4j依赖项,这会导致另一个运行时,而不是编译时异常.
Exception in thread "main" java.lang.reflect.InvocationTargetException Caused by: java.lang.NoSuchMethodError: ch.qos.logback.classic.Logger.filterandLog(Ljava/lang/String;Lorg/slf4j/Marker;Lch/qos/logback/classic/Level;Ljava/lang/String;[Ljava/lang/Object;Ljava/lang/Throwable;)V at org.apache.log4j.Category.log(Category.java:333) at org.apache.commons.logging.impl.Log4JLogger.debug(Log4JLogger.java:177)
另一个更新:
我发现了正确的log4j依赖:
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
但现在我遇到了一个更加神秘的运行时异常:
Exception in thread "main" java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at com.simontuffs.onejar.Boot.run(Boot.java:340) at com.simontuffs.onejar.Boot.main(Boot.java:166) Caused by: java.lang.NoClassDefFoundError: org/I0Itec/zkclient/IZkStateListener at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:64) at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:66) at kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:100) at kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala)
在这一点上,我得到了WTF的那种感觉.所以我添加了另一个依赖:
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.3</version>
</dependency>
但是这暴露了另一个运行时异常:
Exception in thread "main" java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at com.simontuffs.onejar.Boot.run(Boot.java:340) at com.simontuffs.onejar.Boot.main(Boot.java:166) Caused by: java.lang.NoClassDefFoundError: com/yammer/metrics/core/Gauge at kafka.consumer.ZookeeperConsumerConnector.createFetcher(ZookeeperConsumerConnector.scala:146) at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:113) at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:64) at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:66) at kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:100) at kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala)
我希望能够让这个婴儿的例子正常运行,但也许这是使用beta产品的代价?也许我应该切换到Apache Active MQ.但这听起来不那么有趣.我错过了什么吗?
解决方法
完整依赖列表如下.请注意,您必须相应地更改scala版本依赖关系到kafka工件的后缀.
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.15</version>
<exclusions>
<exclusion>
<groupId>com.sun.jmx</groupId>
<artifactId>jmxri</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jdmk</groupId>
<artifactId>jmxtools</artifactId>
</exclusion>
<exclusion>
<groupId>javax.jms</groupId>
<artifactId>jms</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>net.sf.jopt-simple</groupId>
<artifactId>jopt-simple</artifactId>
<version>3.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.6.4</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.3</version>
</dependency>
<dependency>
<groupId>com.yammer.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>com.yammer.metrics</groupId>
<artifactId>metrics-annotation</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<version>3.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest</artifactId>
<version>1.2</version>
<scope>test</scope>
</dependency>
至于
Maybe I should switch to Apache Active MQ. But that sounds less fun.
Am I missing something?
那么,你不要忘记这是测试版吗?确实发生了一些不好的事情,但目前我们正在毫不费力地运行kafka 0.7.