1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| ArrayList<Object> list = new ArrayList<>(); Runnable runnablec = new Runnable() { @Override public void run() { Properties props = new Properties(); props.put("bootstrap.servers", "133.0.120.193:9092"); props.put("group.id", "t1"); props.setProperty("auto.offset.reset", "earliest"); props.put("key.deserializer", StringDeserializer.class); props.put("value.deserializer", StringDeserializer.class); KafkaConsumer<Object,Object> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("eopstat")); while (true){ ConsumerRecords<Object,Object> records = consumer.poll(Duration.ofMillis(1000)); for(ConsumerRecord<Object,Object> record : records){ System.out.println( record.key() + "" + ",value:" + record.value()); list.add(record.value()); } } } }; Runnable runnablep = new Runnable() { @Override public void run() { String topicName = "eopstat";
Properties props = new Properties(); props.put("bootstrap.servers", "133.0.189.157:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); for (Object o : list) { ProducerRecord<String, String> record = new ProducerRecord<>(topicName,o.toString()); System.err.println(record); producer.send(record); System.out.println(o); } producer.close(); } }; Thread threadc = new Thread(runnablec); Thread threadp = new Thread(runnablep); threadc.setPriority(8); threadp.setPriority(1);
threadc.start(); threadc.join(10000); threadp.start(); threadp.join(120000);
|