博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
用pika实现异步RPC
阅读量:1860 次
发布时间:2019-04-26

本文共 9148 字,大约阅读时间需要 30 分钟。

作业需求:

1、可以异步执行多个命令

2、对多台机器

 

完成情况:

实现了异步执行多个命令;通过在服务器端睡眠10s 模拟命令执行时间过长,进行异步操作,一次操作启动一次保护线程,主线程关闭,所有保护线程自动关闭

没做多台机器,没有这么多设备测试,虚拟机的linux python配置又一直没弄好,蛋疼。。。。

 

client.py

'''Created on 2018年7月5日@author: hcl'''import pikaimport uuidimport threadingclass Rpcclient(object):    def __init__(self):        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host = 'localhost'))#建立一个阻塞连接        self.channel = self.connection.channel()#在连接的基础上建立一个频道                result = self.channel.queue_declare(exclusive = True)#不指定queue名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除        self.callback_queue = result.method.queue #给频道自动分配一个队列名                #设置从哪个queue接收消息,        #处理消息的函数on_response,是否需要确认消息        #默认情况下是要对消息进行确认的,以防止消息的丢失        self.channel.basic_consume(self.on_response,                                   no_ack=True,                                   queue=self.callback_queue)                 self.task_id = []        def on_response(self,ch,method,props,body):        if self.corr_id == props.correlation_id:            self.response = body.decode()        def call(self,cmd):        #初始化response和corr_id属性        self.response = None        self.corr_id = str(uuid.uuid4())        #使用默认exchange向server种定义的rpc_queue发送消息        #在properties种指定replay_to属性和correlation_id属性用于告知远程server        #correlation_id属性用于匹配request和response                self.channel.basic_publish(exchange='',                                   routing_key='rpc_queue',                                   properties=pika.BasicProperties(                                       reply_to=self.callback_queue,                                       correlation_id=self.corr_id),                                   body = cmd)                while self.response is None:            self.connection.process_data_events()                print('Back response:',self.response)        #return str(self.response)if __name__ == '__main__':        while True:        Rpc = Rpcclient()        try:            input_cmd = input('Please input your command:').strip()        except:            break        #response_msg = Rpc.call(input_cmd)        t = threading.Thread(target=Rpc.call,args=(input_cmd,))        t.setDaemon(True)        t.start()        #print('Back response:',response_msg)

server.py

'''Created on 2018年7月5日@author: hcl'''import pikaimport osimport timeclass Rpcserver(object):    def __init__(self):        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))        self.channel = self.connection.channel()        self.queue_decl = self.channel.queue_declare(queue='rpc_queue')        self.bqos = self.channel.basic_qos(prefetch_count=1)        self.bcomsume = self.channel.basic_consume(self.on_request,queue='rpc_queue')                def on_request(self,ch,method,props,body):        receive_cmd = body.decode()        print("receive_cmd:",receive_cmd)        callback_msg = os.popen(receive_cmd).read()        print("received message:",callback_msg)        time.sleep(10)        ch.basic_publish(exchange='',                         routing_key = props.reply_to,                         properties = pika.BasicProperties(correlation_id=props.correlation_id),                         body = str(callback_msg)                        )                ch.basic_ack(delivery_tag = method.delivery_tag)if __name__ == '__main__':    Rpcserverdemo = Rpcserver()    print('server awaiting RPC requests')    Rpcserverdemo.channel.start_consuming()

 

服务器输出:
server awaiting RPC requestsreceive_cmd: ipconfigreceived message: Windows IP 配置无线局域网适配器 WLAN:   媒体状态  . . . . . . . . . . . . : 媒体已断开连接   连接特定的 DNS 后缀 . . . . . . . : 无线局域网适配器 本地连接* 1:   媒体状态  . . . . . . . . . . . . : 媒体已断开连接   连接特定的 DNS 后缀 . . . . . . . : 无线局域网适配器 本地连接* 11:   媒体状态  . . . . . . . . . . . . : 媒体已断开连接   连接特定的 DNS 后缀 . . . . . . . : 以太网适配器 以太网:   连接特定的 DNS 后缀 . . . . . . . :    IPv6 地址 . . . . . . . . . . . . :    临时 IPv6 地址. . . . . . . . . . :    临时 IPv6 地址. . . . . . . . . . :    临时 IPv6 地址. . . . . . . . . . :    临时 IPv6 地址. . . . . . . . . . :    临时 IPv6 地址. . . . . . . . . . :    临时 IPv6 地址. . . . . . . . . . :    本地链接 IPv6 地址. . . . . . . . :    IPv4 地址 . . . . . . . . . . . . :    子网掩码  . . . . . . . . . . . . :    默认网关. . . . . . . . . . . . . :                                        以太网适配器 VMware Network Adapter VMnet1:   连接特定的 DNS 后缀 . . . . . . . :    本地链接 IPv6 地址. . . . . . . . :    IPv4 地址 . . . . . . . . . . . . :    子网掩码  . . . . . . . . . . . . :    默认网关. . . . . . . . . . . . . : 以太网适配器 以太网 2:   连接特定的 DNS 后缀 . . . . . . . :    本地链接 IPv6 地址. . . . . . . . :    IPv4 地址 . . . . . . . . . . . . :    子网掩码  . . . . . . . . . . . . :    默认网关. . . . . . . . . . . . . : receive_cmd: dirreceived message:  驱动器 D 中的卷是 DATA 卷的序列号是 CC72-C62F D:\F\eclipse-workspace\management\src\work 的目录2018/07/05  08:57    
.2018/07/05 08:57
..2018/07/05 14:09 2,548 client.py2018/07/05 14:00 1,282 server.py 2 个文件 3,830 字节 2 个目录 882,198,745,088 可用字节receive_cmd: ipconfigreceived message: Windows IP 配置无线局域网适配器 WLAN: 媒体状态 . . . . . . . . . . . . : 媒体已断开连接 连接特定的 DNS 后缀 . . . . . . . : 无线局域网适配器 本地连接* 1: 媒体状态 . . . . . . . . . . . . : 媒体已断开连接 连接特定的 DNS 后缀 . . . . . . . : 无线局域网适配器 本地连接* 11: 媒体状态 . . . . . . . . . . . . : 媒体已断开连接 连接特定的 DNS 后缀 . . . . . . . : 以太网适配器 以太网: 连接特定的 DNS 后缀 . . . . . . . : IPv6 地址 . . . . . . . . . . . . : 临时 IPv6 地址. . . . . . . . . . : 临时 IPv6 地址. . . . . . . . . . : 临时 IPv6 地址. . . . . . . . . . : 临时 IPv6 地址. . . . . . . . . . : 临时 IPv6 地址. . . . . . . . . . : 临时 IPv6 地址. . . . . . . . . . : 本地链接 IPv6 地址. . . . . . . . : IPv4 地址 . . . . . . . . . . . . : 子网掩码 . . . . . . . . . . . . : 默认网关. . . . . . . . . . . . . : 以太网适配器 VMware Network Adapter VMnet1: 连接特定的 DNS 后缀 . . . . . . . : 本地链接 IPv6 地址. . . . . . . . : IPv4 地址 . . . . . . . . . . . . : 子网掩码 . . . . . . . . . . . . : 默认网关. . . . . . . . . . . . . : 以太网适配器 以太网 2: 连接特定的 DNS 后缀 . . . . . . . : 本地链接 IPv6 地址. . . . . . . . : IPv4 地址 . . . . . . . . . . . . : 子网掩码 . . . . . . . . . . . . : 默认网关. . . . . . . . . . . . . :

client1输出:

Please input your command:dirPlease input your command:ipconfigPlease input your command:Back response:  驱动器 D 中的卷是 DATA 卷的序列号是 CC72-C62F D:\F\eclipse-workspace\management\src\work 的目录2018/07/05  08:57    
.2018/07/05 08:57
..2018/07/05 14:09 2,548 client.py2018/07/05 14:00 1,282 server.py 2 个文件 3,830 字节 2 个目录 882,198,745,088 可用字节Back response: Windows IP 配置无线局域网适配器 WLAN: 媒体状态 . . . . . . . . . . . . : 媒体已断开连接 连接特定的 DNS 后缀 . . . . . . . : 无线局域网适配器 本地连接* 1: 媒体状态 . . . . . . . . . . . . : 媒体已断开连接 连接特定的 DNS 后缀 . . . . . . . : 无线局域网适配器 本地连接* 11: 媒体状态 . . . . . . . . . . . . : 媒体已断开连接 连接特定的 DNS 后缀 . . . . . . . : 以太网适配器 以太网: 连接特定的 DNS 后缀 . . . . . . . : IPv6 地址 . . . . . . . . . . . . : 临时 IPv6 地址. . . . . . . . . . : 临时 IPv6 地址. . . . . . . . . . : 临时 IPv6 地址. . . . . . . . . . : 临时 IPv6 地址. . . . . . . . . . : 临时 IPv6 地址. . . . . . . . . . : 临时 IPv6 地址. . . . . . . . . . : 本地链接 IPv6 地址. . . . . . . . : IPv4 地址 . . . . . . . . . . . . : 子网掩码 . . . . . . . . . . . . : 默认网关. . . . . . . . . . . . . : 以太网适配器 VMware Network Adapter VMnet1: 连接特定的 DNS 后缀 . . . . . . . : 本地链接 IPv6 地址. . . . . . . . : IPv4 地址 . . . . . . . . . . . . : 子网掩码 . . . . . . . . . . . . : 默认网关. . . . . . . . . . . . . : 以太网适配器 以太网 2: 连接特定的 DNS 后缀 . . . . . . . : 本地链接 IPv6 地址. . . . . . . . : IPv4 地址 . . . . . . . . . . . . : 子网掩码 . . . . . . . . . . . . : 默认网关. . . . . . . . . . . . . :

client2输出:

Please input your command:ipconfigPlease input your command:Back response: Windows IP 配置无线局域网适配器 WLAN:   媒体状态  . . . . . . . . . . . . : 媒体已断开连接   连接特定的 DNS 后缀 . . . . . . . : 无线局域网适配器 本地连接* 1:   媒体状态  . . . . . . . . . . . . : 媒体已断开连接   连接特定的 DNS 后缀 . . . . . . . : 无线局域网适配器 本地连接* 11:   媒体状态  . . . . . . . . . . . . : 媒体已断开连接   连接特定的 DNS 后缀 . . . . . . . : 以太网适配器 以太网:   连接特定的 DNS 后缀 . . . . . . . :    IPv6 地址 . . . . . . . . . . . . :    临时 IPv6 地址. . . . . . . . . . :    临时 IPv6 地址. . . . . . . . . . :    临时 IPv6 地址. . . . . . . . . . :    临时 IPv6 地址. . . . . . . . . . :    临时 IPv6 地址. . . . . . . . . . :    临时 IPv6 地址. . . . . . . . . . :    本地链接 IPv6 地址. . . . . . . . :    IPv4 地址 . . . . . . . . . . . . :    子网掩码  . . . . . . . . . . . . :    默认网关. . . . . . . . . . . . . :                                        以太网适配器 VMware Network Adapter VMnet1:   连接特定的 DNS 后缀 . . . . . . . :    本地链接 IPv6 地址. . . . . . . . :    IPv4 地址 . . . . . . . . . . . . :    子网掩码  . . . . . . . . . . . . :    默认网关. . . . . . . . . . . . . : 以太网适配器 以太网 2:   连接特定的 DNS 后缀 . . . . . . . :    本地链接 IPv6 地址. . . . . . . . :    IPv4 地址 . . . . . . . . . . . . :   子网掩码  . . . . . . . . . . . . :    默认网关. . . . . . . . . . . . . :

六、有兴趣接电子设计相关小型项目的请加下群,每个项目一般在1000元以内,非诚勿扰

转载地址:http://wafff.baihongyu.com/

你可能感兴趣的文章
线程中断以及线程中断引发的那些问题,你值得了解(待删除)
查看>>
Java 泛型类型擦除
查看>>
什么是IO中的阻塞、非阻塞、同步、异步
查看>>
mongodb数据库操作
查看>>
MongoTemplate数据库复杂查询
查看>>
java POI导出excel
查看>>
JavaScript中this关键字原理
查看>>
log4j、slf4j、logback的关系剖析
查看>>
CAS单点登录(客户端、服务端)搭建
查看>>
Shell特殊变量$
查看>>
spring mvc ibatis jbpm4.4 cxf weblogic 启动报错
查看>>
kettle 使用excel模板导出数据
查看>>
kettle 列转行的demo
查看>>
多线程执行 sql查询 提升整体查询效率
查看>>
对任何架构或应用使用消息队列 的十个理由
查看>>
ExecutorService
查看>>
oralce sql 创建指定时间段内的日历信息
查看>>
jdk1.7之后java不需要自己关闭io流了
查看>>
JDK1.8 List转Map
查看>>
IDEA试用重置插件
查看>>