Список вопросов
Как зайти в Даркнет?!
25th January, 01:11
4
0
Как в tkinter из поля ввода Entry получить значение в одну переменную и обновить строку кнопкой, затем получить ещё одно введённое значение и затем сложить их. Ниже пример кода
21st July, 19:00
891
0
Программа, которая создает фейковые сервера в поиске игровых серверов CS 1.6 Steam
21st March, 17:43
946
0
Очень долго работает Update запрос Oracle
27th January, 09:58
912
0
не могу запустить сервер на tomcat HTTP Status 404 – Not Found
21st January, 18:02
905
0
Где можно найти фрилансера для выполнения поступающих задач, на постоянной основе?
2nd December, 09:48
936
0
Разработка мобильной кроссплатформенной военной игры
16th July, 17:57
1723
0
период по дням
25th October, 10:44
3953
0
Пишу скрипты для BAS только на запросах
16th September, 02:42
3720
0
Некорректный скрипт для закрытия блока
14th April, 18:33
4612
0
прокидывать exception в блоках try-catch JAVA
11th March, 21:11
4378
0
Помогите пожалуйста решить задачи
24th November, 23:53
6083
0
Не понимаю почему не открывается детальное описание продукта
11th November, 11:51
4349
0
Нужно решить задачу по программированию на массивы
27th October, 18:01
4393
0
Метода Крамера С++
23rd October, 11:55
4307
0
помогите решить задачу на C++
22nd October, 17:31
4000
0
Помогите решить задачу на python с codeforces
22nd October, 11:11
4491
0
Python с нуля: полное руководство для начинающих
18th June, 13:58
2596
0
прокидывать exception в блоках try-catch JAVA
Просмотров: 4378
 
Ответов: 0
я новичок. дали первую таску, одну часть выполнила, на второй встряла. подскажите пожалуйста что конкретно делать тут
"...в сервисных классах кафки нужно прокидывать exception в блоках try-catch, где вызывается log.error(). Нужно будет написать новые эксепшены нашего сервиса и прокидывать их"
мои сервисы
@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaListenersService {
private final KafkaSenderService kafkaSender;
@Value(value = "${kafka-topics.platform-direction-create}")
private String platformDirectionCreate;
@Value(value = "${kafka-topics.platform-direction-update}")
private String platformDirectionUpdate;
@Value(value = "${kafka-topics.platform-direction-delete}")
private String platformDirectionDelete;
private final ObjectMapper objectMapper;
@KafkaListener(topics = "${kafka-topics.adapter-direction-create}")
public void listenerAdapterDirectionCreate(ConsumerRecord<String, JsonNode> myRecord){
log.info("Get message in topic adapter-direction-create, key {} value {}", myRecord.key(), myRecord.value());
DirectionRequest directionRequest = null;
try {
directionRequest = objectMapper.treeToValue(myRecord.value(), DirectionRequest.class);
} catch (JsonProcessingException e) {
log.error("Error reading message: {}", e.getMessage());
}
log.info("Created request to create direction: " + directionRequest.getName());
kafkaSender.sendMessage(platformDirectionCreate, "Create direction", myRecord.value());
}
@KafkaListener(topics = "${kafka-topics.adapter-direction-update}")
public void listenerAdapterDirectionUpdate(ConsumerRecord<String, JsonNode> myRecord){
log.info("Get message in topic adapter-direction-update, key {} value {}", myRecord.key(), myRecord.value());
DirectionRequest directionRequest = null;
try {
directionRequest = objectMapper.treeToValue(myRecord.value(), DirectionRequest.class);
} catch (JsonProcessingException e) {
log.error("Error reading message: {}", e.getMessage());
}
log.info("Created request to update direction: " + directionRequest.getName());
kafkaSender.sendMessage(platformDirectionUpdate, "Update direction", myRecord.value());
}
@KafkaListener(topics = "${kafka-topics.adapter-direction-delete}")
public void listenerAdapterDirectionDelete(ConsumerRecord<String, JsonNode> myRecord){
log.info("Get message in topic adapter-direction-delete, key {} value {}", myRecord.key(), myRecord.value());
String id = null;
try {
id = objectMapper.treeToValue(myRecord.value(), String.class);
} catch (JsonProcessingException e) {
log.error("Error reading message: {}", e.getMessage());
}
log.info("Created request to delete direction with id: " + id);
kafkaSender.sendMessage(platformDirectionDelete, "Delete direction", myRecord.value());
}
}
----------------------------------------------------------------------------------------------------
@Slf4j
@Service
@RequiredArgsConstructor
public class KafkaSenderService {
private final KafkaTemplate<String, Object> kafkaTemplate;
@Value(value = "${acks-timeout-mseconds}")
private Integer acksTimeoutMseconds;
public void sendMessage(String topic, String key, Object msg) {
try {
kafkaTemplate.send(topic, key, msg).get(acksTimeoutMseconds, TimeUnit.MILLISECONDS);
} catch (ExecutionException | InterruptedException | TimeoutException e) {
log.error("Message timeout may not be connected to Kafka! {} , stackTrace {}", e, Arrays.toString(e.getStackTrace()));
}
log.info("Sending via Kafka to a topic: {} massage: {}", topic, msg);
}
}
Чтобы ответить на вопрос вам нужно войти в систему или зарегистрироваться