ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • netty (1)
    카테고리 없음 2023. 4. 19. 09:55

    netty

    예전에 책을 샀는데 쓸일이 있을 것도 같아서(혹은 vertx) 공부해본다. 책에 있는 내용을 정리 해본다.

    블로킹과 논블러킹

    블로킹은 요청한 작업이 성공하거나 에러가 발생하기 전까지 응답을 해주지 않는다. 논블로킹은 성공여부와 상관없이 바로 결과를 돌려주는 것을 말한다. 간단하게(?) 알아보자
    public class BlockingServer {
      public static void main(String[] args) throws IOException {
        new BlockingServer().run();
      }
    
      private void run() throws IOException {
        ServerSocket serverSocket = new ServerSocket(8080);
        System.out.println("접속 대기중");
        while (true) {
          Socket socket = serverSocket.accept();
          System.out.println("클라이언트와 연결");
          OutputStream outputStream = socket.getOutputStream();
          InputStream inputStream = socket.getInputStream();
    
          while (true) {
            try {
              int request = inputStream.read();
              System.out.print((char) request);
              outputStream.write(request);
            } catch (IOException e) {
              break;
            }
          }
        }
      }
    }
    
    자바에서 기본적으로 사용 할 수 있는 블로킹 socket 서버다. 블로킹 소켓의 문제점은 호출된 입출력 메서드의 처리가 완료될 때 까지 응답을 돌려 주지 않는다. 그래서 여러 클라이언트들이 접속을 했을 경우엔 시간이 오래 걸리는 일을 한다면 그 다음 클라이언트는 대기를 하고 있어야된다. 그래서 나온게 클라이언트가 접속 했을 경우 새로운 스레드를 만들어서 처리 하는 방법이 생겨났다. 그러나 이거에 문제점은 접속자가 많을 경우 스레드양이 어마어마하게 증가 할 것이다. 이걸 막기 위해 스레드풀이라는걸 사용한다. 스레드 풀 사용 해도 동시접속자가 늘어나면 스레드 풀의 크기를 늘려야 된다. 아주 고민이 많다. 스레드 풀을 자바힙이 허용하는 최대 한도에 도달할 때까지 늘리는 것이 맞는지도 봐야된다. 이런 단점을 개선한 방식이 논블로킹 소켓이다. 일단 소스를 보자. 복잡하다.
    public class NonBlockingServer {
    
        private Map<SocketChannel, List<byte[]>> keepDataTrack = new HashMap<>();
        private ByteBuffer buffer = ByteBuffer.allocate(2*1024);
    
        public static void main(String[] args) {
            NonBlockingServer nonBlockingServer = new NonBlockingServer();
            nonBlockingServer.startEchoServer();
        }
    
        private void startEchoServer() {
    
            try(Selector selector = Selector.open(); ServerSocketChannel serverSocketChannel = ServerSocketChannel.open()){
                if((serverSocketChannel.isOpen()) && selector.isOpen()){
                    serverSocketChannel.configureBlocking(false);
                    serverSocketChannel.bind(new InetSocketAddress(8888));
    
                    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
                    System.out.println("접속 대기 중");
    
                    while(true) {
                        selector.select();
                        Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
                        while (keys.hasNext()) {
                            SelectionKey key = keys.next();
                            keys.remove();
    
                            if (!key.isValid()) {
                                continue;
                            }
                            if (key.isAcceptable()) {
                                this.acceptOP(key, selector);
                            } else if (key.isReadable()) {
                                this.readOP(key);
                            } else if (key.isWritable()) {
                                this.write(key);
    
                            } else {
                                System.out.println("서버 소켓 생성 못함");
                            }
                        }
                    }
                }
    
    
            } catch (Exception e){
                System.out.println("에러 ");
            }
        }
    
        private void write(SelectionKey key) throws IOException, InterruptedException {
            SocketChannel socketChannel = (SocketChannel) key.channel();
    
            List<byte[]> channelData = keepDataTrack.get(socketChannel);
            Iterator<byte[]> its = channelData.iterator();
    
            while(its.hasNext()){
                byte[] it = its.next();
                its.remove();
                socketChannel.write(ByteBuffer.wrap(it));
            }
            key.interestOps(SelectionKey.OP_READ);
        }
    
        private void readOP(SelectionKey key) {
            try{
                SocketChannel socketChannel = (SocketChannel) key.channel();
                buffer.clear();
                int numRead = -1;
                try{
                    numRead = socketChannel.read(buffer);
                } catch (IOException e){
                    System.out.println("데이터 읽기 에러 ");
                }
                if(numRead == -1){
                    this.keepDataTrack.remove(socketChannel);
                    System.out.println("클라이언트 연결 종료");
                    socketChannel.close();
                    key.cancel();
                    return;
                }
    
                byte[] data = new byte[numRead];
                System.arraycopy(buffer.array(), 0 ,data, 0, numRead);
                System.out.println(new String(data, "utf-8") + " from" + socketChannel.getRemoteAddress());
    
                doEchoJob(key, data);
    
            } catch (IOException e) {
                e.printStackTrace();
            }
    
        }
    
        private void doEchoJob(SelectionKey key, byte[] data) {
            SocketChannel socketChannel = (SocketChannel) key.channel();
            List<byte[]> channelData = keepDataTrack.get(socketChannel);
            channelData.add(data);
            key.interestOps(SelectionKey.OP_WRITE);
        }
    
        private void acceptOP(SelectionKey key, Selector selector) throws IOException{
            ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
            SocketChannel socketChannel = serverSocketChannel.accept();
            socketChannel.configureBlocking(false);
            System.out.println("클라이언트 연결 " + socketChannel.getRemoteAddress());
            keepDataTrack.put(socketChannel, new ArrayList<byte[]>());
            socketChannel.register(selector, SelectionKey.OP_READ);
        }
    }
    
    
    데이터를 읽고 쓰는 부분의 로직이 완전히 분리 되어있다. 구조는 다음과 같다. 블로킹 blocking 논블로킹 nonblocking 그림이 영 시원찮네. 물론 자바의 기본적인 socket server를 구현해서 개발해도 상관은 없다. 하지만 유지보수면이나 개발의 난이도를 볼때 netty를 이용하는게 더 효율적이지 않나 싶다. 네티의 기본적인 에코 서버를 한번 만들어보자.
    public class NettyServer {
    
      public static void main(String[] args) {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
    
        try {
          ServerBootstrap b = new ServerBootstrap();
          b.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .handler(new LoggingHandler(LogLevel.INFO))
            .childHandler(new ChannelInitializer<SocketChannel>() {
              protected void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline p = ch.pipeline();
                p.addLast(new EchoServiceHandler());
              }
            });
    
          Channel ch = b.bind(8080).sync().channel();
          ch.closeFuture().sync();
    
        } catch (InterruptedException e) {
          e.printStackTrace();
        } finally {
          workerGroup.shutdownGracefully();
          workerGroup.shutdownGracefully();
        }
      }
    }
    
    
    NioEventLoopGroup 인자는 스레드 개수를 의미한다. 만약 없을 경우에는 코어 수에 따른 스레드가 발생한다.(더 자세히는 코어 수 * 2 , 하이퍼 스레딩을 지원한다면 코어 * 2(하이퍼 스레딩) * 2) 첫번째의 bossgroup은 클라이언트의 연결을 수학하는 역할을 하고 두번째의 workgroup은 연결된 클라리언트의 데이터 입력출 및 이벤트 처리를 담당한다. option들은 책 혹은 인터넷에 찾아보기 바라며 중요한건 파이브라인에 EchoServiceHandler를 등록하는 것이다. 이는 클라이언트의 연결이 되었을때 데이터를 처리하는 역할을 한다. netty는 아주 간편하게 블로킹 논블로킹 epoll로 바꿀수가 있다. 위의 클래스에서 다음과 같이 3개만 변경하면 된다.
    EventLoopGroup bossGroup = new OioEventLoopGroup(1); //변경 부분
    EventLoopGroup workerGroup = new OioEventLoopGroup(); //변경 부분
    
    ...
    
    ServerBootstrap b = new ServerBootstrap();
    b.group(bossGroup, workerGroup)
      .channel(OioServerSocketChannel.class) //변경 부분
    ...
    
    아주 간편하게 바뀌었다. epoll도 마찬가지로 3부분만 바꾸면 가능하다. epoll은 리눅스에서만 실행된다. (mac도 안된다.) 다음으로 EchoServiceHandler 클래스를 보자
    public class EchoServiceHandler extends ChannelInboundHandlerAdapter{
    
      @Override
      public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.print(byteBuf.toString(Charset.defaultCharset()));
        ctx.writeAndFlush(msg);
      }
    
      @Override
      public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
      }
    }
    
    
    ChannelInboundHandlerAdapter 클래스를 상속받았다. 그리고 channelRead과 exceptionCaught를 오버라이딩 했다. channelRead은 클라이언트에서 데이터를 보내면 호출 되는 메서드이다. channelRead에 있는 writeAndFlush 메서드는 데이터의 기록과 전송을 하는 메소드이다. 한마디로 write와 flush가 함께작동하는 메서드라 생각하면 되겠다. exceptionCaught 메서드는 오류가 발생 하였을 경우 호출 된다. 한번 telnet으로 접속해서 테스트를 해보자
    telnet localhost 8080
    
    그리고 나서 제대로 동작하는지 커멘드를 날려보자! 간단하게 네티에 대해서(?) 알아봤다. 몇편 더 쓸예정인데.. 아무튼 공부좀 더 하고 써야겠다.

    댓글

Designed by Tistory.