使用gRPC进行java和python沟通

Java

getStuNo.proto

  1. syntax = "proto3";
  2. option java_multiple_files = true;
  3. option java_package = "io.grpc.examples.getstuno";
  4. option java_outer_classname = "GetStuNoProto";
  5. option objc_class_prefix = "GSN";
  6. package getstuno;
  7. service GetStuNoService {
  8. rpc getMsg (StuNoRequest) returns (StuNoResponse){}
  9. }
  10. message StuNoRequest {
  11. string name = 1;
  12. }
  13. message StuNoResponse {
  14. string number = 1;
  15. }
syntax = "proto3";

option java_multiple_files = true;
option java_package = "io.grpc.examples.getstuno";
option java_outer_classname = "GetStuNoProto";
option objc_class_prefix = "GSN";

package getstuno;

service GetStuNoService {
    rpc getMsg (StuNoRequest) returns (StuNoResponse){}
}

message StuNoRequest {
    string name = 1;
}

message StuNoResponse {
    string number = 1;
}

protocol buffers 是一种语言无关、平台无关、可扩展的序列化结构数据的方法,它可用于(数据)通信协议、数据存储。这里使用proto文件用于生成gRPC所需要的框架。生成方式参照https://blog.csdn.net/qq_29319189/article/details/93539198

GetStuNoServer.java

  1. package getstuno;
  2. import io.grpc.Server;
  3. import io.grpc.ServerBuilder;
  4. import io.grpc.examples.getstuno.GetStuNoServiceGrpc;
  5. import io.grpc.examples.getstuno.StuNoResponse;
  6. import io.grpc.examples.getstuno.StuNoRequest;
  7. import io.grpc.stub.StreamObserver;
  8. import java.io.IOException;
  9. import java.util.logging.Logger;
  10. /**
  11. * Server that manages startup/shutdown of a {@code Greeter} server.
  12. */
  13. public class GetStuNoServer {
  14. private static final Logger logger = Logger.getLogger(GetStuNoServer.class.getName());
  15. private Server server;
  16. private void start() throws IOException {
  17. /* The port on which the server should run */
  18. int port = 50051;
  19. server = ServerBuilder.forPort(port)
  20. .addService(new GetStuNoServiceImpl())
  21. .build()
  22. .start();
  23. logger.info("Server started, listening on " + port);
  24. Runtime.getRuntime().addShutdownHook(new Thread() {
  25. @Override
  26. public void run() {
  27. // Use stderr here since the logger may have been reset by its JVM shutdown hook.
  28. System.err.println("*** shutting down gRPC server since JVM is shutting down");
  29. GetStuNoServer.this.stop();
  30. System.err.println("*** server shut down");
  31. }
  32. });
  33. }
  34. private void stop() {
  35. if (server != null) {
  36. server.shutdown();
  37. }
  38. }
  39. /**
  40. * Await termination on the main thread since the grpc library uses daemon threads.
  41. */
  42. private void blockUntilShutdown() throws InterruptedException {
  43. if (server != null) {
  44. server.awaitTermination();
  45. }
  46. }
  47. /**
  48. * Main launches the server from the command line.
  49. */
  50. public static void main(String[] args) throws IOException, InterruptedException {
  51. final GetStuNoServer server = new GetStuNoServer();
  52. server.start();
  53. server.blockUntilShutdown();
  54. }
  55. static class GetStuNoServiceImpl extends GetStuNoServiceGrpc.GetStuNoServiceImplBase {
  56. @Override
  57. public void getMsg(StuNoRequest req, StreamObserver<StuNoResponse> responseObserver) {
  58. String number = "0000";
  59. logger.info("Received name: " + req.getName());
  60. if (req.getName().equals("爱丽丝")) {
  61. number = "1234";
  62. }
  63. StuNoResponse reply = StuNoResponse.newBuilder().setNumber(number).build();
  64. responseObserver.onNext(reply);
  65. responseObserver.onCompleted();
  66. }
  67. }
  68. }
package getstuno;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.examples.getstuno.GetStuNoServiceGrpc;
import io.grpc.examples.getstuno.StuNoResponse;
import io.grpc.examples.getstuno.StuNoRequest;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.logging.Logger;

/**
 * Server that manages startup/shutdown of a {@code Greeter} server.
 */
public class GetStuNoServer {
    private static final Logger logger = Logger.getLogger(GetStuNoServer.class.getName());

    private Server server;

    private void start() throws IOException {
        /* The port on which the server should run */
        int port = 50051;
        server = ServerBuilder.forPort(port)
                .addService(new GetStuNoServiceImpl())
                .build()
                .start();
        logger.info("Server started, listening on " + port);
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                // Use stderr here since the logger may have been reset by its JVM shutdown hook.
                System.err.println("*** shutting down gRPC server since JVM is shutting down");
                GetStuNoServer.this.stop();
                System.err.println("*** server shut down");
            }
        });
    }

    private void stop() {
        if (server != null) {
            server.shutdown();
        }
    }

    /**
     * Await termination on the main thread since the grpc library uses daemon threads.
     */
    private void blockUntilShutdown() throws InterruptedException {
        if (server != null) {
            server.awaitTermination();
        }
    }

    /**
     * Main launches the server from the command line.
     */
    public static void main(String[] args) throws IOException, InterruptedException {
        final GetStuNoServer server = new GetStuNoServer();
        server.start();
        server.blockUntilShutdown();
    }

    static class GetStuNoServiceImpl extends GetStuNoServiceGrpc.GetStuNoServiceImplBase {

        @Override
        public void getMsg(StuNoRequest req, StreamObserver<StuNoResponse> responseObserver) {
            String number = "0000";
            logger.info("Received name: " + req.getName());
            if (req.getName().equals("爱丽丝")) {
                number = "1234";
            }
            StuNoResponse reply = StuNoResponse.newBuilder().setNumber(number).build();
            responseObserver.onNext(reply);
            responseObserver.onCompleted();
        }
    }
}

Server端重点实现提供的服务

GetStuNoClient.java

  1. package getstuno;
  2. import io.grpc.ManagedChannel;
  3. import io.grpc.ManagedChannelBuilder;
  4. import io.grpc.StatusRuntimeException;
  5. import io.grpc.examples.getstuno.GetStuNoServiceGrpc;
  6. import io.grpc.examples.getstuno.StuNoResponse;
  7. import io.grpc.examples.getstuno.StuNoRequest;
  8. import java.util.concurrent.TimeUnit;
  9. import java.util.logging.Level;
  10. import java.util.logging.Logger;
  11. /**
  12. * A simple client that requests a greeting from the {@link GetStuNoServer}.
  13. */
  14. public class GetStuNoClient {
  15. private static final Logger logger = Logger.getLogger(GetStuNoClient.class.getName());
  16. private final ManagedChannel channel;
  17. private final GetStuNoServiceGrpc.GetStuNoServiceBlockingStub blockingStub;
  18. /** Construct client connecting to GetStuNo server at {@code host:port}. */
  19. public GetStuNoClient(String host, int port) {
  20. this(ManagedChannelBuilder.forAddress(host, port)
  21. // Channels are secure by default (via SSL/TLS). For the example we disable TLS to avoid
  22. // needing certificates.
  23. .usePlaintext()
  24. .build());
  25. }
  26. /** Construct client for accessing HelloWorld server using the existing channel. */
  27. GetStuNoClient(ManagedChannel channel) {
  28. this.channel = channel;
  29. blockingStub = GetStuNoServiceGrpc.newBlockingStub(channel);
  30. }
  31. public void shutdown() throws InterruptedException {
  32. channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
  33. }
  34. /** Say hello to server. */
  35. public void greet(String name) {
  36. StuNoRequest request = StuNoRequest.newBuilder().setName(name).build();
  37. StuNoResponse response;
  38. try {
  39. response = blockingStub.getMsg(request);
  40. } catch (StatusRuntimeException e) {
  41. logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
  42. return;
  43. }
  44. logger.info("Client received: " + response.getNumber());
  45. }
  46. /**
  47. * Greet server. If provided, the first element of {@code args} is the name to use in the
  48. * greeting.
  49. */
  50. public static void main(String[] args) throws Exception {
  51. GetStuNoClient client = new GetStuNoClient("localhost", 50051);
  52. try {
  53. /* Access a service running on the local machine on port 50051 */
  54. String user = "爱丽丝";
  55. if (args.length > 0) {
  56. user = args[0]; /* Use the arg as the name to greet if provided */
  57. }
  58. client.greet(user);
  59. } finally {
  60. client.shutdown();
  61. }
  62. }
  63. }
package getstuno;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import io.grpc.examples.getstuno.GetStuNoServiceGrpc;
import io.grpc.examples.getstuno.StuNoResponse;
import io.grpc.examples.getstuno.StuNoRequest;

import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * A simple client that requests a greeting from the {@link GetStuNoServer}.
 */
public class GetStuNoClient {
    private static final Logger logger = Logger.getLogger(GetStuNoClient.class.getName());

    private final ManagedChannel channel;
    private final GetStuNoServiceGrpc.GetStuNoServiceBlockingStub blockingStub;

    /** Construct client connecting to GetStuNo server at {@code host:port}. */
    public GetStuNoClient(String host, int port) {
        this(ManagedChannelBuilder.forAddress(host, port)
                // Channels are secure by default (via SSL/TLS). For the example we disable TLS to avoid
                // needing certificates.
                .usePlaintext()
                .build());
    }

    /** Construct client for accessing HelloWorld server using the existing channel. */
    GetStuNoClient(ManagedChannel channel) {
        this.channel = channel;
        blockingStub = GetStuNoServiceGrpc.newBlockingStub(channel);
    }

    public void shutdown() throws InterruptedException {
        channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
    }

    /** Say hello to server. */
    public void greet(String name) {
        StuNoRequest request = StuNoRequest.newBuilder().setName(name).build();
        StuNoResponse response;
        try {
            response = blockingStub.getMsg(request);
        } catch (StatusRuntimeException e) {
            logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
            return;
        }
        logger.info("Client received: " + response.getNumber());
    }

    /**
     * Greet server. If provided, the first element of {@code args} is the name to use in the
     * greeting.
     */
    public static void main(String[] args) throws Exception {
        GetStuNoClient client = new GetStuNoClient("localhost", 50051);
        try {
            /* Access a service running on the local machine on port 50051 */
            String user = "爱丽丝";
            if (args.length > 0) {
                user = args[0]; /* Use the arg as the name to greet if provided */
            }
            client.greet(user);
        } finally {
            client.shutdown();
        }
    }
}

Client端向Server端请求服务

Python

getStuNo.proto

  1. syntax = "proto3";
  2. package getstuno;
  3. service GetStuNoService {
  4. rpc getMsg (StuNoRequest) returns (StuNoResponse){}
  5. }
  6. message StuNoRequest {
  7. string name = 1;
  8. }
  9. message StuNoResponse {
  10. string number = 1;
  11. }
syntax = "proto3";

package getstuno;
 
service GetStuNoService {
 rpc getMsg (StuNoRequest) returns (StuNoResponse){}
}
 
message StuNoRequest {
  string name = 1;
}
 
message StuNoResponse {
  string number = 1;
}

Python与Java的proto文件基本相同,重点在于package必须相同,否则当提供名称、类型相同的service和message时仍然无法沟通。python利用proto生成grpc框架的方法参照:
https://www.cnblogs.com/zongfa/p/12218341.html

server.py

  1. import grpc
  2. import getStuNo_pb2
  3. import getStuNo_pb2_grpc
  4. from concurrent import futures
  5. import time
  6. _ONE_DAY_IN_SECONDS = 60 * 60 * 24
  7. class GetStuNoServicer(getStuNo_pb2_grpc.GetStuNoServiceServicer):
  8. def getMsg(self, request, context):
  9. print("Received name: %s" % request.name)
  10. if request.name == '爱丽丝':
  11. number = '1234'
  12. else:
  13. number = '0000'
  14. return getStuNo_pb2.StuNoResponse(number=number)
  15. def serve():
  16. server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
  17. getStuNo_pb2_grpc.add_GetStuNoServiceServicer_to_server(GetStuNoServicer(), server)
  18. server.add_insecure_port('[::]:50051')
  19. server.start()
  20. try:
  21. while True:
  22. time.sleep(_ONE_DAY_IN_SECONDS)
  23. except KeyboardInterrupt:
  24. server.stop(0)
  25. if __name__ == '__main__':
  26. serve()
import grpc
import getStuNo_pb2
import getStuNo_pb2_grpc
 
from concurrent import futures
import time
 
_ONE_DAY_IN_SECONDS = 60 * 60 * 24
 
 
class GetStuNoServicer(getStuNo_pb2_grpc.GetStuNoServiceServicer):
 
  def getMsg(self, request, context):
    print("Received name: %s" % request.name)
    if request.name == '爱丽丝':
      number = '1234'
    else:
      number = '0000'
    return getStuNo_pb2.StuNoResponse(number=number)
 
 
def serve():
  server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
  getStuNo_pb2_grpc.add_GetStuNoServiceServicer_to_server(GetStuNoServicer(), server)
  server.add_insecure_port('[::]:50051')
  server.start()
  try:
    while True:
      time.sleep(_ONE_DAY_IN_SECONDS)
  except KeyboardInterrupt:
    server.stop(0)
 
if __name__ == '__main__':
  serve()

client.py

  1. import grpc
  2. import getStuNo_pb2
  3. import getStuNo_pb2_grpc
  4. def run():
  5. # NOTE(gRPC Python Team): .close() is possible on a channel and should be
  6. # used in circumstances in which the with statement does not fit the needs
  7. # of the code.
  8. with grpc.insecure_channel('localhost:50051') as channel:
  9. stub = getStuNo_pb2_grpc.GetStuNoServiceStub(channel)
  10. response = stub.getMsg(getStuNo_pb2.StuNoRequest(name='爱丽丝'))
  11. print("Client received: " + response.number)
  12. if __name__ == '__main__':
  13. run()
import grpc
 
import getStuNo_pb2
import getStuNo_pb2_grpc
 
def run():
  # NOTE(gRPC Python Team): .close() is possible on a channel and should be
  # used in circumstances in which the with statement does not fit the needs
  # of the code.
  with grpc.insecure_channel('localhost:50051') as channel:
    stub = getStuNo_pb2_grpc.GetStuNoServiceStub(channel)
    response = stub.getMsg(getStuNo_pb2.StuNoRequest(name='爱丽丝'))
  print("Client received: " + response.number)
 
 
if __name__ == '__main__':
  run()

python端的代码较为精简。

此时,保证了java和python端的gRPC提供的gRPC服务名,服务传输的变量名、类型,服务使用的端口相同。先启动java或python任意一个server端,再启动java或python任意一个client端,都可以正确提供gRPC服务。

发表评论

邮箱地址不会被公开。 必填项已用*标注