我做了kafka 
 java生产者. 
 
但控制台说错误. kafka服务器在aws上.和制作人在我的Mac上.
然而kara服务器是可以访问的.当我从制作人发送消息时,kafka服务器显示“已接受的连接…”.
有什么问题?
但控制台说错误. kafka服务器在aws上.和制作人在我的Mac上.
然而kara服务器是可以访问的.当我从制作人发送消息时,kafka服务器显示“已接受的连接…”.
有什么问题?
1 [main] INFO kafka.utils.VerifiableProperties - Verifying properties 28 [main] INFO kafka.utils.VerifiableProperties - Property Metadata.broker.list is overridden to xxxxxx:9092 28 [main] INFO kafka.utils.VerifiableProperties - Property serializer.class is overridden to kafka.serializer.StringEncoder 137 [main] INFO kafka.client.ClientUtils$- Fetching Metadata from broker id:0,host: xxxxxx,port:9092 with correlation id 0 for 1 topic(s) Set(words_topic) 189 [main] ERROR kafka.producer.SyncProducer - Producer connection to xxxxxx:9092 unsuccessful 198 [main] WARN kafka.client.ClientUtils$- Fetching topic Metadata with correlation id 0 for topics [Set(words_topic)] from broker [id:0,port:9092] Failed 199 [main] ERROR kafka.utils.Utils$- fetching topic Metadata for topics [Set(words_topic)] from broker [ArrayBuffer(id:0,port:9092)] Failed
它是kafka控制台
[2015-01-27 05:23:33,767] DEBUG Accepted connection from /xxxxx on /xxxx:9092. sendBufferSize [actual|requested]: [212992|1048576] recvBufferSize [actual|requested]: [212992|1048576] (kafka.network.Acceptor) [2015-01-27 05:23:33,767] DEBUG Processor 1 listening to new connection from /xxxx:65307 (kafka.network.Processor) [2015-01-27 05:23:33,872] INFO Closing socket connection to /xxxx. (kafka.network.Processor) [2015-01-27 05:23:33,873] DEBUG Closing connection from /xxxx:65307 (kafka.network.Processor)
这是我的代码.
Properties props = new Properties();
    props.put("Metadata.broker.list","?????:9092");
    props.put("serializer.class","kafka.serializer.StringEncoder");
    ProducerConfig config = new ProducerConfig(props);
    Producer<String,String> producer = new Producer<String,String>(config);
    // Now we break each word from the paragraph
        for (String word :
            MetaMORPHOSIS_opening_Para.split("\\s")) {
        // Create message to be sent to "words_topic" topic with the word
        KeyedMessage<String,String> data =
                new KeyedMessage<String,String>
                        ("words_topic",word);
        // Send the message
        producer.send(data);
    }
    System.out.println("Produced data");
    // close the producer
    producer.close();
}
// First paragraph from Franz Kafka's Metamorphosis
private static String MetaMORPHOSIS_opening_Para =
        "One morning,when Gregor Samsa woke from troubled dreams,"
                + "he found himself transformed in his bed into a horrible "
                + "vermin. He lay on his armour-like back,and if he lifted "
                + "his head a little he Could see his brown belly,slightly "
                + "domed and divided by arches into stiff sections.";
解决方法
 我解决了 
  
 
        将Kafka代理的server.properties上的’advertised.host.name’设置为服务器的realIP(与生产者的’Metadata.broker.list’属性相同)
参考:https://issues.apache.org/jira/browse/KAFKA-1092