Результаты поиска
Найдено результатов: 1
прокидывать exception в блоках try-catch JAVA
я новичок. дали первую таску, одну часть выполнила, на второй встряла. подскажите пожалуйста что конкретно делать тут
"...в сервисных классах кафки нужно прокидывать exception в блоках try-catch, где вызывается log.error(). Нужно будет написать новые эксепшены нашего сервиса и прокидывать их"
мои сервисы
@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaListenersService {
private final KafkaSenderService kafkaSender;
@Value(value = "${kafka-topics.platform-direction-create}")
private String platformDirectionCreate;
@Value(value = "${kafka-topics.platform-direction-update}")
private String platformDirectionUpdate;
@Value(value = "${kafka-topics.platform-direction-delete}")
private String platformDirectionDelete;
private final ObjectMapper objectMapper;
@KafkaListener(topics = "${kafka-topics.adapter-direction-create}")
public void listenerAdapterDirectionCreate(ConsumerRecord<String, JsonNode> myRecord){
log.info("Get message in topic adapter-direction-create, key {} value {}", myRecord.key(), myRecord.value());
DirectionRequest directionRequest = null;
try {
directionRequest = objectMapper.treeToValue(myRecord.value(), DirectionRequest.class);
} catch (JsonProcessingException e) {
log.error("Error reading message: {}", e.getMessage());
}
log.info("Created request to create direction: " + directionRequest.getName());
kafkaSender.sendMessage(platformDirectionCreate, "Create direction", myRecord.value());
}
@KafkaListener(topics = "${kafka-topics.adapter-direction-update}")
public void listenerAdapterDirectionUpdate(ConsumerRecord<String, JsonNode> myRecord){
log.info("Get message in topic adapter-direction-update, key {} value {}", myRecord.key(), myRecord.value());
DirectionRequest directionRequest = null;
try {
directionRequest = objectMapper.treeToValue(myRecord.value(), DirectionRequest.class);
} catch (JsonProcessingException e) {
log.error("Error reading message: {}", e.getMessage());
}
log.info("Created request to update direction: " + directionRequest.getName());
kafkaSender.sendMessage(platformDirectionUpdate, "Update direction", myRecord.value());
}
@KafkaListener(topics = "${kafka-topics.adapter-direction-delete}")
public void listenerAdapterDirectionDelete(ConsumerRecord<String, JsonNode> myRecord){
log.info("Get message in topic adapter-direction-delete, key {} value {}", myRecord.key(), myRecord.value());
String id = null;
try {
id = objectMapper.treeToValue(myRecord.value(), String.class);
} catch (JsonProcessingException e) {
log.error("Error reading message: {}", e.getMessage());
}
log.info("Created request to delete direction with id: " + id);
kafkaSender.sendMessage(platformDirectionDelete, "Delete direction", myRecord.value());
}
}
----------------------------------------------------------------------------------------------------
@Slf4j
@Service
@RequiredArgsConstructor
public class KafkaSenderService {
private final KafkaTemplate<String, Object> kafkaTemplate;
@Value(value = "${acks-timeout-mseconds}")
private Integer acksTimeoutMseconds;
public void sendMessage(String topic, String key, Object msg) {
try {
kafkaTemplate.send(topic, key, msg).get(acksTimeoutMseconds, TimeUnit.MILLISECONDS);
} catch (ExecutionException | InterruptedException | TimeoutException e) {
log.error("Message timeout may not be connected to Kafka! {} , stackTrace {}", e, Arrays.toString(e.getStackTrace()));
}
log.info("Sending via Kafka to a topic: {} massage: {}", topic, msg);
}
}