kafka数据从一台机器copy到另一台机器上

kafka消息转移

使用多线程实现从133.0.120.193转移到133.0.189.157
利用它的订阅发布实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
ArrayList<Object> list = new ArrayList<>();
Runnable runnablec = new Runnable() {
@Override
public void run() {
//配置信息
Properties props = new Properties();
//kafka服务器地址
props.put("bootstrap.servers", "133.0.120.193:9092");
//必须指定消费者组
props.put("group.id", "t1");
props.setProperty("auto.offset.reset", "earliest");
//设置数据key和value的序列化处理类
props.put("key.deserializer", StringDeserializer.class);
props.put("value.deserializer", StringDeserializer.class);
//创建消息者实例
KafkaConsumer<Object,Object> consumer = new KafkaConsumer<>(props);
//订阅topic1的消息
consumer.subscribe(Arrays.asList("eopstat"));
//到服务器中读取记录
while (true){
ConsumerRecords<Object,Object> records = consumer.poll(Duration.ofMillis(1000));
for(ConsumerRecord<Object,Object> record : records){
System.out.println( record.key() + "" + ",value:" + record.value());
list.add(record.value());
}
}
}
};
Runnable runnablep = new Runnable() {
@Override
public void run() {
String topicName = "eopstat";

Properties props = new Properties();
props.put("bootstrap.servers", "133.0.189.157:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
/*创建生产者*/
Producer<String, String> producer = new KafkaProducer<>(props);
for (Object o : list) {
ProducerRecord<String, String> record = new ProducerRecord<>(topicName,o.toString());
System.err.println(record);
producer.send(record);
System.out.println(o);
}
producer.close();
}
};
Thread threadc = new Thread(runnablec);
Thread threadp = new Thread(runnablep);
threadc.setPriority(8);
threadp.setPriority(1);

threadc.start();
threadc.join(10000);
threadp.start();
threadp.join(120000);