Commit 1fced57a by Dinesh Jampa

Latest

parent d61d645a
...@@ -26,6 +26,10 @@ ...@@ -26,6 +26,10 @@
<artifactId>spring-boot-starter-web</artifactId> <artifactId>spring-boot-starter-web</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId> <groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId> <artifactId>spring-kafka</artifactId>
</dependency> </dependency>
...@@ -50,6 +54,12 @@ ...@@ -50,6 +54,12 @@
<artifactId>spring-kafka-test</artifactId> <artifactId>spring-kafka-test</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<!-- https://mvnrepository.com/artifact/com.oracle.jdbc/ojdbc8 -->
<dependency>
<groupId>com.oracle</groupId>
<artifactId>ojdbc8</artifactId>
<version>12.2.0.1</version>
</dependency>
</dependencies> </dependencies>
<build> <build>
......
//package com.epragati.status.config;
//
//import java.util.HashMap;
//import java.util.Map;
//
//import org.apache.kafka.common.serialization.StringDeserializer;
//import org.springframework.beans.factory.annotation.Value;
//import org.springframework.context.annotation.Bean;
//import org.springframework.context.annotation.Configuration;
//import org.springframework.kafka.annotation.EnableKafka;
//import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
//import org.springframework.kafka.core.ConsumerFactory;
//import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
//import org.springframework.kafka.support.serializer.JsonSerializer;
//import org.apache.kafka.clients.consumer.ConsumerConfig;
//
//import com.epragati.status.pojo.Status;
//
//@EnableKafka
//@Configuration
//public class KafkaConsumerConfig {
//
// @Value("${kafka.bootstrap-servers}")
// private String bootstrapServers;
//
// @Bean
// public ConsumerFactory<String, Status> consumerFactory() {
// Map<String, Object> props = new HashMap<>();
// props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// props.put(ConsumerConfig.GROUP_ID_CONFIG, "gsws");
// props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer.class);
// return new DefaultKafkaConsumerFactory<>(props);
// }
//
// @Bean
// public ConcurrentKafkaListenerContainerFactory<String, Status> kafkaListenerContainerFactory() {
//
// ConcurrentKafkaListenerContainerFactory<String, Status> factory = new ConcurrentKafkaListenerContainerFactory<>();
// factory.setConsumerFactory(consumerFactory());
// return factory;
// }
//
//}
...@@ -14,6 +14,8 @@ import org.springframework.web.bind.annotation.RestController; ...@@ -14,6 +14,8 @@ import org.springframework.web.bind.annotation.RestController;
import com.epragati.status.common.RestError; import com.epragati.status.common.RestError;
import com.epragati.status.pojo.Status; import com.epragati.status.pojo.Status;
import com.epragati.status.repo.GSWSDEPTRESPONSE;
import com.epragati.status.repo.StatusDataRepository;
import com.epragati.status.service.Sender; import com.epragati.status.service.Sender;
@RestController @RestController
...@@ -25,6 +27,9 @@ public class StatusController { ...@@ -25,6 +27,9 @@ public class StatusController {
@Autowired @Autowired
Sender sender; Sender sender;
@Autowired
StatusDataRepository statusDataRepo;
@PostMapping(value = "post") @PostMapping(value = "post")
public ResponseEntity<Object> producer(@RequestBody Status message) { public ResponseEntity<Object> producer(@RequestBody Status message) {
logger.info("Entered into producer method"); logger.info("Entered into producer method");
...@@ -40,12 +45,13 @@ public class StatusController { ...@@ -40,12 +45,13 @@ public class StatusController {
RestError apiError = new RestError(HttpStatus.BAD_REQUEST, error, error); RestError apiError = new RestError(HttpStatus.BAD_REQUEST, error, error);
return new ResponseEntity<Object>(apiError, new HttpHeaders(), apiError.getStatus()); return new ResponseEntity<Object>(apiError, new HttpHeaders(), apiError.getStatus());
} }
/*if (message.getBeneficiaryId() == null || message.getBeneficiaryId().isEmpty()) { /*
String error = "Beneficiary ID should not be empty"; * if (message.getBeneficiaryId() == null ||
logger.warn(error); * message.getBeneficiaryId().isEmpty()) { String error =
RestError apiError = new RestError(HttpStatus.BAD_REQUEST, error, error); * "Beneficiary ID should not be empty"; logger.warn(error); RestError apiError
return new ResponseEntity<Object>(apiError, new HttpHeaders(), apiError.getStatus()); * = new RestError(HttpStatus.BAD_REQUEST, error, error); return new
}*/ * ResponseEntity<Object>(apiError, new HttpHeaders(), apiError.getStatus()); }
*/
if (message.getServiceId() == null || message.getServiceId().isEmpty()) { if (message.getServiceId() == null || message.getServiceId().isEmpty()) {
String error = "Service ID should not be empty"; String error = "Service ID should not be empty";
logger.warn(error); logger.warn(error);
...@@ -86,7 +92,6 @@ public class StatusController { ...@@ -86,7 +92,6 @@ public class StatusController {
return new ResponseEntity<Object>(apiError, new HttpHeaders(), apiError.getStatus()); return new ResponseEntity<Object>(apiError, new HttpHeaders(), apiError.getStatus());
} }
if (message.getDeptId() == null || message.getDeptId().isEmpty()) { if (message.getDeptId() == null || message.getDeptId().isEmpty()) {
String error = "Department ID should not be empty"; String error = "Department ID should not be empty";
logger.warn(error); logger.warn(error);
...@@ -107,4 +112,40 @@ public class StatusController { ...@@ -107,4 +112,40 @@ public class StatusController {
return "working"; return "working";
} }
@PostMapping(value = "add")
public void testInsert() {
GSWSDEPTRESPONSE gdr = new GSWSDEPTRESPONSE();
gdr.setTransactionId(0);
gdr.setDeptTransactionId("");
gdr.setDeptId(0);
gdr.setBenTranId("");
gdr.setStatus_code(0);
gdr.setRemarks("");
gdr.setTransUpdatedOn("");
gdr.setServiceId(0);
gdr.setIsTxnClosed(0);
gdr.setTxnClosedDate("");
gdr.setTxnRemarks("");
gdr.setStatus(0);
gdr.setStatusMsg("");
gdr.setServiceDelAkgSts(0);
gdr.setServiceDelRemarks("");
gdr.setClusterId(0);
gdr.setServiceDelDate("");
gdr.setServiceDelDate("");
gdr.setServiceDelByVV(0);
gdr.setServieDelAccBy(0);
statusDataRepo.save(gdr);
}
// (TRANSACTION_ID, DEPT_TRANS_ID, DEPT_ID, BEN_TRANS_ID, STATUS_CODE, REMARKS,
// TRANS_UPDATED_ON, SERVICE_ID, IS_TXN_CLOSED, TXN_CLOSED_DATE, TXN_REMARKS,
// STATUS, STATUS_MESG, SERVICE_DEL_AKG_STS, SERVICE_DEL_REMARKS, CLUSTER_ID,
// SERVICE_DEL_DATE, SERVICE_DEL_BY_VV, SERVICE_DEL_ACC_BY)
// VALUES(0, '', 0, '', 0, '', sysdate, 0, 0, '', '', '', '', 0, '', 0, '', 0,
// 0);
} }
package com.epragati.status.repo;
public class GSWSDEPTRESPONSE {
private int transactionId;
private String deptTransactionId;
private int deptId;
private String benTranId;
private int status_code;
private String remarks;
private String transUpdatedOn;
private int serviceId;
private int isTxnClosed;
private String txnClosedDate;
private String txnRemarks;
private int status;
private String statusMsg;
private int serviceDelAkgSts;
private String serviceDelRemarks;
private int clusterId;
private String serviceDelDate;
private int serviceDelByVV;
private int servieDelAccBy;
public int getTransactionId() {
return transactionId;
}
public void setTransactionId(int transactionId) {
this.transactionId = transactionId;
}
public String getDeptTransactionId() {
return deptTransactionId;
}
public void setDeptTransactionId(String deptTransactionId) {
this.deptTransactionId = deptTransactionId;
}
public int getDeptId() {
return deptId;
}
public void setDeptId(int deptId) {
this.deptId = deptId;
}
public String getBenTranId() {
return benTranId;
}
public void setBenTranId(String benTranId) {
this.benTranId = benTranId;
}
public int getStatus_code() {
return status_code;
}
public void setStatus_code(int status_code) {
this.status_code = status_code;
}
public String getRemarks() {
return remarks;
}
public void setRemarks(String remarks) {
this.remarks = remarks;
}
public String getTransUpdatedOn() {
return transUpdatedOn;
}
public void setTransUpdatedOn(String transUpdatedOn) {
this.transUpdatedOn = transUpdatedOn;
}
public int getServiceId() {
return serviceId;
}
public void setServiceId(int serviceId) {
this.serviceId = serviceId;
}
public int getIsTxnClosed() {
return isTxnClosed;
}
public void setIsTxnClosed(int isTxnClosed) {
this.isTxnClosed = isTxnClosed;
}
public String getTxnClosedDate() {
return txnClosedDate;
}
public void setTxnClosedDate(String txnClosedDate) {
this.txnClosedDate = txnClosedDate;
}
public String getTxnRemarks() {
return txnRemarks;
}
public void setTxnRemarks(String txnRemarks) {
this.txnRemarks = txnRemarks;
}
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
}
public String getStatusMsg() {
return statusMsg;
}
public void setStatusMsg(String statusMsg) {
this.statusMsg = statusMsg;
}
public int getServiceDelAkgSts() {
return serviceDelAkgSts;
}
public void setServiceDelAkgSts(int serviceDelAkgSts) {
this.serviceDelAkgSts = serviceDelAkgSts;
}
public String getServiceDelRemarks() {
return serviceDelRemarks;
}
public void setServiceDelRemarks(String serviceDelRemarks) {
this.serviceDelRemarks = serviceDelRemarks;
}
public int getClusterId() {
return clusterId;
}
public void setClusterId(int clusterId) {
this.clusterId = clusterId;
}
public String getServiceDelDate() {
return serviceDelDate;
}
public void setServiceDelDate(String serviceDelDate) {
this.serviceDelDate = serviceDelDate;
}
public int getServiceDelByVV() {
return serviceDelByVV;
}
public void setServiceDelByVV(int serviceDelByVV) {
this.serviceDelByVV = serviceDelByVV;
}
public int getServieDelAccBy() {
return servieDelAccBy;
}
public void setServieDelAccBy(int servieDelAccBy) {
this.servieDelAccBy = servieDelAccBy;
}
}
//INSERT INTO GWS_TEST.GSWS_DEPT_RESPONSES
//(TRANSACTION_ID, DEPT_TRANS_ID, DEPT_ID, BEN_TRANS_ID, STATUS_CODE, REMARKS, TRANS_UPDATED_ON,
//SERVICE_ID, IS_TXN_CLOSED, TXN_CLOSED_DATE, TXN_REMARKS, STATUS, STATUS_MESG, SERVICE_DEL_AKG_STS,
//SERVICE_DEL_REMARKS, CLUSTER_ID, SERVICE_DEL_DATE, SERVICE_DEL_BY_VV, SERVICE_DEL_ACC_BY)
//VALUES(0, '', 0, '', 0, '', sysdate, 0, 0, '', '', '', '', 0, '', 0, '', 0, 0);
package com.epragati.status.repo;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
@Repository
public class StatusDataRepository {
Log logger = LogFactory.getLog(StatusDataRepository.class);
@Autowired
private JdbcTemplate jdbcTemplate;
public int count() {
return jdbcTemplate.queryForObject("select count(*) from GWS_TEST.GSWS_DEPT_RESPONSES", Integer.class);
}
public int deleteById(Long id) {
return jdbcTemplate.update("delete GWS_TEST.GSWS_DEPT_RESPONSES where TRANSACTION_ID = ?", id);
}
public int save(GSWSDEPTRESPONSE gswsRes) {
return jdbcTemplate.update("INSERT INTO GWS_TEST.GSWS_DEPT_RESPONSES\r\n"
+ "(TRANSACTION_ID, DEPT_TRANS_ID, DEPT_ID, BEN_TRANS_ID, STATUS_CODE, REMARKS, TRANS_UPDATED_ON, SERVICE_ID, "
+ "IS_TXN_CLOSED, TXN_CLOSED_DATE, TXN_REMARKS, STATUS, STATUS_MESG, SERVICE_DEL_AKG_STS, "
+ "SERVICE_DEL_REMARKS, CLUSTER_ID, SERVICE_DEL_DATE, SERVICE_DEL_BY_VV, SERVICE_DEL_ACC_BY)"
+ "VALUES(?, ?, ?, ?, ?, ?, sysdate, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", gswsRes.getTransactionId(),
gswsRes.getDeptTransactionId(), gswsRes.getDeptId(), gswsRes.getBenTranId(), gswsRes.getStatus_code(),
gswsRes.getRemarks(), gswsRes.getTransUpdatedOn(), gswsRes.getServiceId(), gswsRes.getIsTxnClosed(),
gswsRes.getTxnClosedDate(), gswsRes.getTxnRemarks(), gswsRes.getStatus(), gswsRes.getStatusMsg(),
gswsRes.getServiceDelAkgSts(), gswsRes.getServiceDelRemarks(), gswsRes.getClusterId(),
gswsRes.getServiceDelDate(), gswsRes.getServiceDelByVV(), gswsRes.getServieDelAccBy());
}
}
...@@ -4,10 +4,18 @@ import org.slf4j.Logger; ...@@ -4,10 +4,18 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import com.epragati.status.pojo.Status; import com.epragati.status.pojo.Status;
import com.epragati.status.repo.GSWSDEPTRESPONSE;
import com.epragati.status.repo.StatusDataRepository;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
@Service @Service
public class Sender { public class Sender {
...@@ -17,12 +25,68 @@ public class Sender { ...@@ -17,12 +25,68 @@ public class Sender {
@Autowired @Autowired
private KafkaTemplate<String, Status> kafkaTemplate; private KafkaTemplate<String, Status> kafkaTemplate;
@Autowired
private StatusDataRepository statusDataRepository;
@Value("${app.topic}") @Value("${app.topic}")
String kafkaTopic; String kafkaTopic;
public void send(Status message) { public void send(Status message) {
LOGGER.info("sending mesaage='{}'", message.toString()); ListenableFuture<SendResult<String, Status>> future = kafkaTemplate.send(kafkaTopic, message);
kafkaTemplate.send(kafkaTopic, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, Status>>() {
@Override
public void onSuccess(SendResult<String, Status> result) {
LOGGER.info("Sent message=[" + message + "] with offset=[" + result.getRecordMetadata().offset() + "]");
}
@Override
public void onFailure(Throwable ex) {
LOGGER.info("Unable to send message=[" + message + "] due to : " + ex.getMessage());
} }
});
// kafkaTemplate.send(kafkaTopic, message);
}
@KafkaListener(topics = "gswsstatus", groupId = "gsws")
public void listen(String msg) {
LOGGER.info("Received Message: " + msg);
GSWSDEPTRESPONSE gdr = new GSWSDEPTRESPONSE();
Status message;
try {
message = new ObjectMapper().readValue(msg, Status.class);
gdr.setTransactionId(Integer.parseInt(message.getTransactionId()));
gdr.setDeptTransactionId(message.getDeptTransactionId());
gdr.setDeptId(Integer.parseInt(message.getDeptId()));
gdr.setBenTranId(message.getBeneficiaryId());
gdr.setStatus_code(Integer.parseInt(message.getStatusCode()));
gdr.setRemarks(message.getRemarks());
gdr.setTransUpdatedOn(null);
gdr.setServiceId(Integer.parseInt(message.getServiceId()));
gdr.setIsTxnClosed(0);
gdr.setTxnClosedDate(null);
gdr.setTxnRemarks("");
gdr.setStatus(Integer.parseInt(message.getStatusCode()));
gdr.setStatusMsg(message.getStatusMessage());
gdr.setServiceDelAkgSts(0);
gdr.setServiceDelRemarks("");
gdr.setClusterId(1);
gdr.setServiceDelDate(null);
gdr.setServiceDelDate(null);
gdr.setServiceDelByVV(1);
gdr.setServieDelAccBy(1);
statusDataRepository.save(gdr);
} catch (JsonProcessingException e) {
// TODO Auto-generated catch block
LOGGER.error(e.getMessage());
}
}
} }
app.topic=gswsstatus app.topic=gswsstatus
kafka.bootstrap-servers=localhost:9092 kafka.bootstrap-servers=localhost:9092
#spring.datasource.url=jdbc:oracle:thin:@172.19.134.77:1521:xe
spring.datasource.url=jdbc:oracle:thin:@192.168.95.123:1521:xe
spring.datasource.username=GWS_TEST
spring.datasource.password=GWS_TEST123
spring.datasource.driver-class-name=oracle.jdbc.OracleDriver
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment