Share
Sign In
기술이야기
Spring Kafka 메시지 암호화
전지훈
👍
Created by
  • 전지훈
Created at
들어가며
개발 업무를 진행하면 종종 개인정보에 대해 다룰 때가 생긴다. 보통 이러한 민감정보는 DB, Redis, Kafka 혹은 로깅에서도 암호화하여 저장하는 것이 권장된다. 이 글에서는 스프링으로 Kafka를 사용할 때, 본문 내용에 대해 암호화하여 저장하는 방법을 다루고자 한다.
Json 역/직렬화 예제
이 글에서는 기본적으로 Kafka의 값에는 Json 형식으로 메시지를 저장하는 부분에서 나아가 Json 부분을 암호화하고, 복호화하는 것을 추가하여 설명하려고 한다. 우선 Json 값을 저장하는 예제 프로젝트로 아래와 같이 Producer와 Consumer 세팅하였다.
root |- consumer |- producer |- build.gradle
💬
producer와 consumer를 분리하였는데 이는 Spring Kafka에서 제공하는 JsonSerializer/Deserializer의 지원에 대해 설명하기 위해 분리하였다.
producer의 코드들
ProducerChatMessage
Kafka에 값으로 저장할 내용을 아래와 같이 정의했다.
@Data public class ProducerChatMessage { private String sender; private String message; }
ProducingController
Http 요청을 받아 ProducerChatMessage를 Kafka로 전송하는 부분으로 아래와 같이 정의한다.
@RequiredArgsConstructor @RequestMapping("/") @RestController public class ProducingController { private final KafkaTemplate<String, Object> kafkaTemplate; @PostMapping("/plain") public Map<String, Object> sendPlain(@RequestBody ProducerChatMessage message) { kafkaTemplate.send("plain", message); return Map.of("successful", true); } }
application.yaml
Kafka에 대한 설정과 위에서 정의한 ProducerChatMessage를 아래와 같이 Json으로 직렬화하면서 별칭(alias)을 줄 수 있다.
spring: kafka: producer: bootstrap-servers: localhost:9092 value-serializer: org.springframework.kafka.support.serializer.JsonSerializer properties: spring.json.type.mapping: message:com.ignite.kafka.producer.ProducerChatMessage
💬
spring.kafka.producer.value-serializer와 spring.kafka.consumer.value-deserializer의 값으로 JsonSerializer, JsonDeserializer를 사용할 수 있다.
위 두 개의 클래스를 지정할 때 spring.kafka.[producer|consumer].properties.spring.json.type.mapping으로 위 예제와 같이 콜론(:) 구분으로 별칭을 줄 수 있다. 이러한 별칭은 추후 클래스를 다른 패키지로 변경하거나, 다른 클래스를 사용하게 될 때에도 유연하게 대응할 수 있도록 도와준다. 참고:
Spring Boot / Reference / Messaging / Apache Kafka Support
별칭은 Kafka 메시지의 헤더로 저장되며, JsonSerializer가 헤더를 생성, JsonDeserializer가 헤더를 제거하는 식으로 동작한다.
ex) key: __TypeId__, value: message
위와 같은 방법으로 아래 예제와 같이, 값에 인터페이스를 받도록 대응할 수 있다.
예제를 단순화하기 위해서 if/else를 사용했지만 실무에서는 사용하지 않기를 바란다.
@KafkaListener(topics = "plain", groupId = "group1") public void consumePlain(ConsumerRecord<String, SomeInterface> record) { if (record.value() instanceof ConsumerChatSender) { log.info("sender: {}", record.value()); } else if (record.value() instanceof ConsumerChatMessage) { log.info("message: {}", record.value()); } }
consumer의 코드들
ConsumerChatMessage
producer 프로젝트의 ProducerChatMessage에 대응하는 모델을 ConsumerChatMessage로 작성한다.
@Data public class ConsumerChatMessage { private String sender; private String message; }
IgniteKakfaConsumer
단순하게 메시지를 받아 로그를 남기는 부분을 consumer로 작성한다.
@Slf4j @Component public class IgniteKafkaConsumer { @KafkaListener(topics = "plain", groupId = "group1") public void consume(ConsumerChatMessage message) { log.info("message: {}", message); } }
application.yaml
JsonDeserializer를 통해 ConsumerChatMessage로 역직렬화 할 수 있도록 구성한다.
spring: kafka: consumer: bootstrap-servers: localhost:9092 value-serializer: com.ignite.kafka.serializer.EncryptedJsonDeserializer properties: spring.json.type.mapping: message:com.ignite.kafka.consumer.ConsumerChatMessage
테스트 결과
producer 애플리케이션을 실행하고 아래와 같이 요청을 보낸다.
curl 'http://localhost:8080/plain' \ --header 'Content-Type: application/json' \ --data '{ "sender": "noname", "message": "Hello world!" }'
Kafka를 직접 콘솔로 consume 해보면 아래와 같은 내용이 있음을 알 수 있다.
{"sender": "noname", "message": "Hello world!"}
consumer에서는 아래와 같이 로그를 확인할 수 있다.
message: ConsumerChatMessage(sender=noname, message=Hello world!)
암호화 적용
위 예제에서 본 바와 같이 JsonSerializer/JsonDeserializer를 사용하면 메시지 객체에 대해 json 평문으로 Kafka에 저장하고 있음을 알 수 있다. 만약 민감정보가 포함된 메시지를 카프카로 전송한다면 이러한 부분은 보안 감사 등에서 문제로 제기될 가능성이 있다. 지금부터는 JsonSerializer/JsonDeserializer를 사용하면서 실제 값은 암호화하여 저장하는 방법을 공유한다.
encrypted-json 모듈 추가
기존 예제에서 producer/consumer의 프로젝트를 분리했었다. 양쪽의 프로젝트에서 모두 참조할 수 있도록 여기서도 암호화 모듈을 의존할 수 있도록 별도 프로젝트로 생성했다.
root |- consumer |- producer |- encrypted-json |- build.gradle
encrypted-json 모듈의 암호화 지원
예제에서는 암호화/복호화를 위해 양방향 암호화를 사용해야 하는데, 간단한 구현을 위해 Jasypt 라이브러리를 사용했다. 또한 encrypted-json 모듈은 이를 참조하고 있는 프로젝트에서 자동 설정이 되도록 구현했다.
dependencies { implementation 'org.jasypt:jasypt:1.9.3' implementation 'org.springframework.boot:spring-boot-starter-json' implementation 'org.springframework.boot:spring-boot-autoconfigure' }
암호화를 수행하는 핵심 클래스는 아래와 같다.
public class IgniteCrypto { private static BasicBinaryEncryptor encryptor; public static byte[] encrypt(byte[] plain) { return encryptor.encrypt(plain); } public static byte[] decrypt(byte[] encrypted) { return encryptor.decrypt(encrypted); } public static void configure(String password) { if (encryptor != null) { throw new IllegalStateException("IgniteCrypto is already configured"); } encryptor = new BasicBinaryEncryptor(); encryptor.setPassword(password); } public static class IgniteCryptoInitializingPostProcessor implements EnvironmentPostProcessor { public static final String PROPERTY_NAME = "ignite.kafka.encryptor.password"; @Override public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application) { Optional.ofNullable(environment.getProperty(PROPERTY_NAME)) .ifPresent(IgniteCrypto::configure); } } }
encrypted-json을 의존하는 프로젝트에서 ignite.kafka.encryptor.password 프로퍼티를 통해서 암호화 패스워드를 입력 받는다.
EnvironmentPostProcessor를 통해서 IgniteCrypto에 패스워드를 초기화한다.
암호화(encrypt)/복호화(decrypt)는 Jasypt에서 제공하는 BasicBinaryEncryprtor에 위임한다.
Json 역/직렬화에 암호화 적용하기
JsonSerializer/JsonDeserializer는 상속을 지원하기 때문에 실제 역/직렬화가 이루어지는 부분을 오버라이드 했다.
public class EncryptedJsonDeserializer extends JsonDeserializer<Object> { @Override public Object deserialize(String topic, Headers headers, byte[] encrypted) { byte[] plain = IgniteCrypto.decrypt(encrypted); return super.deserialize(topic, headers, plain); } @Override public Object deserialize(String topic, byte[] encrypted) { byte[] plain = IgniteCrypto.decrypt(encrypted); return super.deserialize(topic, plain); } } public class EncryptedJsonSerializer extends JsonSerializer<Object> { @Override public byte[] serialize(String topic, Object data) { byte[] original = super.serialize(topic, data); return IgniteCrypto.encrypt(original); } }
위와 같이 상속을 통해 Json 직렬화 → 암호화 → 복호화 → Json 역직렬화 순으로 동작할 수 있도록 구현되었다.
💬
일반적으로 상속보다 조합을 사용을 권하지만 Spring Kafka에서 지원하는 Serializer는 생성/초기화 부분이 까다로운 부분이 있어 상속이 오히려 구현의 간편함을 제공한다.
💬
JsonSerializer의 경우, 직렬화할 때 2개의 메서드가 존재하는다.
public byte[] serialize(String topic, Headers headers, @Nullable T data)
public byte[] serialize(String topic, @Nullable T data)
헤더를 포함하는 것과 아닌 것인데, 필자가 예제를 작성하는 현재 버전(spring-kafka:3.2.0)에서는 위의 메서드가 결국 아래의 메서드를 호출하기 때문에 두 메서드를 모두 오버라이드 해서 암호화를 하면 암호화를 2번 수행하게 된다. 때문에 아래 메서드 하나만 암호화를 적용하였다.
producer/consumer에 적용
소스 코드가 변경되는 부분은 없다. encrypted-json을 각 프로젝트에서 의존성을 참조하도록 build.gradle을 변경한다.
dependencies { implementation(project(':encrypted-json')) }
또한 application.yaml에서 기존의 역/직렬화하기 위한 클래스와 암호화를 적용하기 위한 ignite.kafka.encryptor.password 설정을 추가한다.
# 공통 적용 ignite.kafka.encryptor.password: sample # producer 측 수정 spring.kafka.producer.value-serializer: com.ignite.kafka.serializer.EncryptedJsonSerializer # consumer 측 수정 spring.kafka.consumer.value-deserializer: com.ignite.kafka.deserializer.EncryptedJsonDeserializer
테스트
consumer에서는 이전과 같이 아래 로그로 확인할 수 있다.
message: ConsumerChatMessage(sender=noname, message=Hello world!)
kafka에서 조회시에는 암호화된 값이 저장된다.
zx���|OYW�h���j�\Gs�*L�_�%oD\S[n��2s����{��b>a��
마무리
Spring Kafka를 사용하면서 실제 Kafka에 저장되는 값에 대해 암호화하는 방법에 대해 알아보았다. 개인 정보와 같은 민감정보는 개발 프로젝트를 진행하면서 실제로 많은 감시를 받게 되며, 앞으로도 그 중요성은 더 커질 것이라 생각된다. 스프링에서는 이 글에서 살펴본 것과 같이 쉽게 암호화를 적용할 수 있는 방법을 제공한다. 이러한 메커니즘은 비단 Kafka뿐만 아니라 Redis를 활용한 캐시, NoSQL 혹은 RBDMS와 같은 저장소 및 파일에도 적용할 수 있는 부분이기 때문에 비슷한 다른 부분에 응용에 있어서도 참고가 되기를 바란다.
Subscribe to '이그나이트'
Welcome to '이그나이트'!
By subscribing to my site, you'll be the first to receive notifications and emails about the latest updates, including new posts.
Join SlashPage and subscribe to '이그나이트'!
Subscribe
👍
전지훈
RestTemplate Request/Response 저장하기
들어가며 개인적인 경험으로, 회사에서 진행하는 프로젝트 중에서 오직 단독 시스템으로 동작하는 서비스는 없었던 것 같다. 대부분은 외부 시스템과 연동하게 되며, MSA를 주력으로 사용하는 부서라면 더더욱 많은 연동을 하게 될 것이다. 연동해야 하는 시스템이 많다는 것은 그만큼 관리 포인트가 늘어나는 것이고, 그에 따라 맞닥뜨려야 할 오류 또한 증가하게 된다. 일부 요청 추적(Request Tracing)을 지원하는 DataDog, PinPoint, Jaeger 등의 서비스를 사용하여 요청/응답을 간략하게 확인할 수 있는 툴을 사용한다면 좋겠지만, 그렇지 않고 독자적으로 시스템에서 외부 시스템을 호출할 때의 로그를 남기고 싶어 하는 부서도 있다. 이 글에서는 그중에서도 RestTemplate을 사용하여 외부 시스템과 연동할 때, 요청/응답에 대해 로그를 DB로 남기는 것을 설명하려 한다. 예제 프로젝트 세팅 이 글에서는 기본적으로 Kafka의 값에는 Json 형식으로 메시지를 저장하는 부분에서 나아가 Json 부분을 암호화하고, 복호화하는 것을 추가하여 설명하려고 한다. 우선 Json 값을 저장하는 예제 프로젝트로 아래와 같이 Producer와 Consumer 세팅하였다. build.gradle DelegateController http://localhost:8080을 호출할 때 RestTemplate을 사용하여, http://httpbin.org/get을 호출하고, 이에 대한 응답을 HttpBinResponse로 정의하여 반환한다. 로컬에서 애플리케이션을 실행시켜 아래와 같이 동작하는 것을 확인할 수 있다. RestTemplate 요청/응답 로그를 DB에 저장하기 1. 사용자 정의 ClientHttpRequestInterceptor 작성하기 Spring에서는 ClientHttpRequestInterceptor를 통해, RestTemplate이 요청/응답을 보내기 전/후의 동작을 정의할 수 있다. 인터페이스는 다음과 같다. HttpRequest: 요청에 대한 객체이며, HttpHeaders getHeaders(), URI getURI(), HttpMethod getMethod()를 가지는 인터페이스다. body: payload에 해당하는 byte의 배열이다. execution: execution.execute(request, body)을 수행해야 실제로 요청을 수행하고, interceptor 메서드에서 반환해야 할 응답 결과(ClientHttpResponse)를 얻을 수 있다. 우선 아래와 같이 LoggingClientHttpRequestInterceptor를 작성하자. 2. 요청 로그 남기기 요청 로그를 남기기 위해서 아래와 같이 LoggingClientHttpRequestInterceptor를 변경하자. HttpRequest.toString()은 오버라이딩 되어있지 않아, 그대로 로그를 남긴다면 인스턴스의 해시 코드 값을 반환하게 된다. 각각 URI, Method, Headers의 getter를 호출하여 원하는 값을 얻을 수 있다. 3. 응답 로그 남기기 응답 로그를 남기기 위해서 다시 아래와 같이 LoggingClientHttpRequestInterceptor를 변경하자. 이번에 위 인터셉터를 적용한 RestTemplate을 호출하면 아래와 같이 정상적인 로그를 확인할 수 있다. 다만 브라우저에서는 비어있는 화면을 확인할 수 있을 것이다.
  • 전지훈
김재우
머신러닝(Machine Learning) 입문: 타이타닉 생존자 예측 문제로 배우는 ML 기초
머신러닝 소개 머신러닝, ML 엔지니어가 하는 일 머신러닝(Machine Learning)은 인공지능(AI)의 한 분야로, 컴퓨터가 데이터로부터 스스로 학습하여 패턴을 인식하고 예측을 수행할 수 있게 해주는 기술입니다. 전통적인 프로그래밍 방식은 규칙을 직접 코딩하지만, 머신러닝은 데이터로부터 규칙을 스스로 학습합니다. 이를 통해 복잡한 패턴 인식, 예측, 의사결정 등이 가능해집니다. 메일에서 스팸 메일을 찾아내는 간단한 일에서부터, 의료 분야에서는 질병 예측과 진단, 금융 분야에서는 사기 탐지, 그리고 자율주행 자동차 같은 것들이 머신러닝을 통해 이루어집니다. 머신러닝은 크게 정답이 포함된 데이타로 학습하는 지도학습, 데이타의 패턴을 스스로 찾아내는 비지도학습, 환경과 상호작용하며 보상을 최대화 하는 강화학습. 이렇게 3가지 유형으로 나눌 수 있습니다. 머신러닝 모델은 이런 학습과정을 거쳐 만들어진 규칙입니다. 입력 데이터를 받아 출력을 생성하는 함수와 같은 것입니다. 예를 들어, 스펨 메일 필터링 모델은 입력 메일의 특징을 분석하여 스펨 메일인지 아닌지를 예측하는 알고리즘입니다 머신러닝 모델을 개발하고 배포하는 ML 엔지니어는 데이터를 수집하고 분석하며, 머신러닝 모델을 만들고, 모델을 배포하여 실제 업무에 적용합니다. 데이타의 양이 커서, 데이타에 노이즈나 결측치가 있어서 어려움을 격기도 하고, A하드웨어에서 만든 모델이 B하드웨어에서 제대로 작동안하는 문제를 해결하기도 합니다. 이 글에서는 ML 엔지니어의 기본 업무인 데이터 분석과 모델 구축 과정을 타이타닉 생존자 예측 문제를 예로 들어 설명하겠습니다. 이를 통해 ML 엔지니어의 실제 업무와 그 과정에서 겪는 다양한 도전 과제들을 이해할 수 있을 것입니다. 타이타닉 문제로 시작하기 타이타닉 문제는 1912년 타이타닉 호 침몰 사고 당시의 실제 데이터를 기반으로 승객의 나이, 성별, 탑승 티켓 등급 등의 데이타로 해당 승객이 생존했는지 여부를 예측하는 것입니다. 데이터 규모가 크지 않아(12개의 컬럼, 891건의 데이터) 초보자도 다루기 쉬우며, 이진 분류(생존/비생존) 문제라 평가가 용이합니다. 또한 숫자, 범주형 등 다양한 유형의 데이터와 결측치(누락된 데이타)를 포함하고 있어, 데이터 전처리 과정을 연습할 수 있습니다. 이 문제는 기계학습 파이프라인의 전체 과정을 경험할 수 있는 대표적인 예제입니다. 일단 손으로 풀어보고, 그 다음 실제 머신러닝 모델을 만들어보도록 하겠습니다. 손(Excel)으로 하는 머신러닝 타이타닉 문제의 데이터 파일은 kaggle 에서 다운로드 받으실 수 있습니다. 회원 가입 후 파일을 다운로드 하시면 되고, train.csv 모델 훈련용으로 사용하는 데이터 파일입니다. test.csv 모델의 검증용으로 사용하는 데이터 파일입니다. gender_submission.csv kaggle 에 업로드 해 만들어진 모델을 평가하기 위한 파일입니다. 모델 구축 후 모델의 결과를 이 파일에 저장하고 업로드하면 만들어진 모델의 성능을 확인할 수 있습니다. 데이터 살펴보기 파일을 엑셀에서 열어보면 위와 같은 데이터가 있고, 각 컬럼의 의미는 아래와 같습니다. PassengerId: 승객 ID Pclass: 티켓 클래스 Name: 이름
  • 김재우
전지훈
Spring Properties를 외부 저장소와 연동하기
들어가며 SaaS(Software As A Service)가 보편화되며 아마 많은 개발자가 이를 구현하기 위해 Twelve-Factor를 읽어보았을 것으로 생각된다. 이 글은 Twelve-Factor에서 설정에 대해 어떻게 Spring Boot를 사용하는 개발자들이 외부의 설정 저장소에 접근하여 설정을 읽을 수 있을지 구현에 관해 이야기하고자 한다. 이미 대부분의 스타트업들은 클라우드 환경에서 시작하고 있으며, 국내 IT 대기업들은 자체 클라우드를 구축하고, IT 업계 외의 사업을 주로 하는 기업들도 클라우드로 옮겨가고 있다. 주된 방향성은 클라우드 환경에 있지만 아직 온프레미스를 주로 사용하는 조직도 있기에, 여기서는 온프레미스와 클라우드 환경에 따라 어떻게 설정을 달리할 수 있는지 다뤄보려고 한다. Spring Externalized Configuration 본문에서 다루고자 하는 모든 개발 방법의 기반은 Spring Documentation에서 이미 기술하고 있는 내용에 대한 응용이다. 또한 외부에서 설정을 읽는 부분에 대한 많은 지원을 이미 Spring Cloud Config에서 제공하고 있으니, 사용하려는 환경이 이미 Spring Cloud Config에서 지원하고 있다면 사용하는 것이 좋다. 이 글은 Spring Cloud Config보다는 좀 더 바퀴를 만드는 방법에 대해 다룬다. 본문이 도움될 만 한 상황 아래와 같은 상황에서 이 글이 도움이 될 것이라 생각된다. Spring Cloud Config를 조금 더 가볍게 모듈화하거나, 자신의 조직에 맞게 구성하고 싶은 경우. 전체 혹은 일부 설정에 대해 Spring Cloud Config에서 지원하지 않는 외부 설정 저장소를 사용하려는 경우. 혹은 아래와 같이 application.properties를 커스터마이징하려는 경우. 외부 환경에서 설정을 개발자가 직접 호출하도록 개발하기 Spring Cloud Config에서 지원하지 않는 환경에 설정을 저장하고, 불러와야 하는 경우가 있다. 사내에서 자체 구축한 시스템을 사용하는 경우이다. 이런 경우 EnvironmentPostProcessor를 진입점으로 간주하고 원하는 결과를 이룰 수 있다. EnvironmentPostProcessor EnvironmentPostProcessor는 ApplicationContext가 초기화되기전에 애플리케이션의 환경 설정을 변경할 수 있으며 아래 인터페이스를 가지고 있다. ConfigurationEnvironment: getPropertySources()를 통해 MutablePropertySources를 사용할 수 있으며, 여기서 property를 추가하거나, 우선순위를 조절하여 덮어쓸 수 있다. 따라서 내부 구현을 통해 외부 설정 저장소에서 설정값을 불러와 우선순위를 높여 PropertySources에 저장하면 외부 설정 저장소의 설정값을 애플리케이션에서 참조할 수 있다. CSV 예제 단순한 예제로 CSV에서 설정을 읽어와 사용하는 경우를 구현해 보자. custom.greeting이라는 설정에 Hello world!를 출력하고자 한다. application.yaml HelloWorldController.java CsvEnvironmentPostProcessor.java
  • 전지훈