Найдено результатов: 1

прокидывать exception в блоках try-catch JAVA

я новичок. дали первую таску, одну часть выполнила, на второй встряла. подскажите пожалуйста что конкретно делать тут
"...в сервисных классах кафки нужно прокидывать exception в блоках try-catch, где вызывается log.error(). Нужно будет написать новые эксепшены нашего сервиса и прокидывать их"

мои сервисы
 

@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaListenersService {

    private final KafkaSenderService kafkaSender;
    @Value(value = "${kafka-topics.platform-direction-create}")
    private String platformDirectionCreate;
    @Value(value = "${kafka-topics.platform-direction-update}")
    private String platformDirectionUpdate;
    @Value(value = "${kafka-topics.platform-direction-delete}")
    private String platformDirectionDelete;
    private final ObjectMapper objectMapper;

    @KafkaListener(topics = "${kafka-topics.adapter-direction-create}")
    public void listenerAdapterDirectionCreate(ConsumerRecord<String, JsonNode> myRecord){
        log.info("Get message in topic adapter-direction-create, key {} value {}", myRecord.key(), myRecord.value());

        DirectionRequest directionRequest = null;

        try {
            directionRequest = objectMapper.treeToValue(myRecord.value(), DirectionRequest.class);
        } catch (JsonProcessingException e) {
            log.error("Error reading message: {}", e.getMessage());
        }
        log.info("Created request to create direction: " + directionRequest.getName());

        kafkaSender.sendMessage(platformDirectionCreate, "Create direction", myRecord.value());
    }

    @KafkaListener(topics = "${kafka-topics.adapter-direction-update}")
    public void listenerAdapterDirectionUpdate(ConsumerRecord<String, JsonNode> myRecord){
        log.info("Get message in topic adapter-direction-update, key {} value {}", myRecord.key(), myRecord.value());

        DirectionRequest directionRequest = null;

        try {
            directionRequest = objectMapper.treeToValue(myRecord.value(), DirectionRequest.class);
        } catch (JsonProcessingException e) {
            log.error("Error reading message: {}", e.getMessage());
        }
        log.info("Created request to update direction: " + directionRequest.getName());

        kafkaSender.sendMessage(platformDirectionUpdate, "Update direction", myRecord.value());
    }

    @KafkaListener(topics = "${kafka-topics.adapter-direction-delete}")
    public void listenerAdapterDirectionDelete(ConsumerRecord<String, JsonNode> myRecord){
        log.info("Get message in topic adapter-direction-delete, key {} value {}", myRecord.key(), myRecord.value());

        String id = null;

        try {
            id = objectMapper.treeToValue(myRecord.value(), String.class);
        } catch (JsonProcessingException e) {
            log.error("Error reading message: {}", e.getMessage());
        }
        log.info("Created request to delete direction with id: " + id);

        kafkaSender.sendMessage(platformDirectionDelete, "Delete direction", myRecord.value());
    }
}

----------------------------------------------------------------------------------------------------

@Slf4j
@Service
@RequiredArgsConstructor
public class KafkaSenderService {

    private final KafkaTemplate<String, Object> kafkaTemplate;
    @Value(value = "${acks-timeout-mseconds}")
    private Integer acksTimeoutMseconds;

    public void sendMessage(String topic, String key, Object msg) {
        try {
            kafkaTemplate.send(topic, key, msg).get(acksTimeoutMseconds, TimeUnit.MILLISECONDS);
        } catch (ExecutionException | InterruptedException | TimeoutException e) {
            log.error("Message timeout may not be connected to Kafka! {} , stackTrace {}", e, Arrays.toString(e.getStackTrace()));
        }
        log.info("Sending via Kafka to a topic: {} massage: {}", topic, msg);
    }
}

exception   java   trycatch   кафка  

4380   0   21:11, 11th March, 2023