python聊天程序
Ⅰ 利用python语言,设计一个网络聊天程序oswa实现网络通信或者文件传输
你是要外网通信还是只在内网通信,如果外网需要先做端口映射或者租服务器
Ⅱ python怎么建立socket服务端
socket服务器再细分可分为多种了,tcp,udp,websocket,都是调用socket模块,但是具体实现起来有一点细微的差别
先给出一个tcp和udp通过socket协议实现的聊天室的例子
python聊天室(python2.7版本):
都是分别运行server.py和client.py,就可以进行通讯了。
TCP版本:
socket-tcp-server.py(服务端):
#-*-encoding:utf-8-*-
#socket.getaddrinfo(host,port,family=0,socktype=0,proto=0,flags=0)
#根据给定的参数host/port,相应的转换成一个包含用于创建socket对象的五元组,
#参数host为域名,以字符串形式给出代表一个IPV4/IPV6地址或者None.
#参数port如果字符串形式就代表一个服务名,比如“http”"ftp""email"等,或者为数字,或者为None
#参数family为地主族,可以为AF_INET,AF_INET6,AF_UNIX.
#参数socktype可以为SOCK_STREAM(TCP)或者SOCK_DGRAM(UDP)
#参数proto通常为0可以直接忽略
#参数flags为AI_*的组合,比如AI_NUMERICHOST,它会影响函数的返回值
#附注:给参数host,port传递None时建立在C基础,通过传递NULL。
#该函数返回一个五元组(family,socktype,proto,canonname,sockaddr),同时第五个参数sockaddr也是一个二元组(address,port)
#更多的方法及链接请访问
#Echoserverprogram
fromsocketimport*
importsys
importthreading
fromtimeimportctime
fromtimeimportlocaltime
importtraceback
importtime
importsubprocess
reload(sys)
sys.setdefaultencoding("utf8")
HOST='127.0.0.1'
PORT=8555#设置侦听端口
BUFSIZ=1024
classTcpServer():
def__init__(self):
self.ADDR=(HOST,PORT)
try:
self.sock=socket(AF_INET,SOCK_STREAM)
print'%disopen'%PORT
self.sock.bind(self.ADDR)
self.sock.listen(5)
#设置退出条件
self.STOP_CHAT=False
#所有监听的客户端
self.clients={}
self.thrs={}
self.stops=[]
exceptException,e:
print"%disdown"%PORT
returnFalse
defIsOpen(ip,port):
s=socket(AF_INET,SOCK_STREAM)
try:
s.connect((ip,int(port)))
#s.shutdown(2)
#利用shutdown()函数使socket双向数据传输变为单向数据传输。shutdown()需要一个单独的参数,
#该参数表示s了如何关闭socket。具体为:0表示禁止将来读;1表示禁止将来写;2表示禁止将来读和写。
print'%disopen'%port
returnTrue
except:
print'%disdown'%port
returnFalse
deflisten_client(self):
whilenotself.STOP_CHAT:
print(u'等待接入,侦听端口:%d'%(PORT))
self.tcpClientSock,self.addr=self.sock.accept()
print(u'接受连接,客户端地址:',self.addr)
address=self.addr
#将建立的clientsocket链接放到列表self.clients中
self.clients[address]=self.tcpClientSock
#分别将每个建立的链接放入进程中,接收且分发消息
self.thrs[address]=threading.Thread(target=self.readmsg,args=[address])
self.thrs[address].start()
time.sleep(0.5)defreadmsg(self,address):
#如果地址不存在,则返回False
ifaddressnotinself.clients:
returnFalse
#得到发送消息的clientsocket
client=self.clients[address]
whileTrue:
try:
#获取到消息内容data
data=client.recv(BUFSIZ)
except:
print(e)
self.close_client(address)
break
ifnotdata:
break
#python3使用bytes,所以要进行编码
#s='%s发送给我的信息是:[%s]%s'%(addr[0],ctime(),data.decode('utf8'))
#对日期进行一下格式化
ISOTIMEFORMAT='%Y-%m-%d%X'
stime=time.strftime(ISOTIMEFORMAT,localtime())
s=u'%s发送给我的信息是:%s'%(str(address),data.decode('utf8'))
#将获得的消息分发给链接中的clientsocket
forkinself.clients:
self.clients[k].send(s.encode('utf8'))
self.clients[k].sendall('sendall:'+s.encode('utf8'))
printstr(k)
print([stime],':',data.decode('utf8'))
#如果输入quit(忽略大小写),则程序退出
STOP_CHAT=(data.decode('utf8').upper()=="QUIT")
ifSTOP_CHAT:
print"quit"
self.close_client(address)
print"alreadyquit"
break
defclose_client(self,address):
try:
client=self.clients.pop(address)
self.stops.append(address)
client.close()
forkinself.clients:
self.clients[k].send(str(address)+u"已经离开了")
except:
pass
print(str(address)+u'已经退出')
if__name__=='__main__':
tserver=TcpServer()
tserver.listen_client()
——————————华丽的分割线——————————
socket-tcp-client.py(客户端):
#-*-encoding:utf-8-*-
fromsocketimport*
importsys
importthreading
importtime
reload(sys)
sys.setdefaultencoding("utf8")
#测试,连接本机
HOST='127.0.0.1'
#设置侦听端口
PORT=8555
BUFSIZ=1024
classTcpClient:
ADDR=(HOST,PORT)
def__init__(self):
self.HOST=HOST
self.PORT=PORT
self.BUFSIZ=BUFSIZ
#创建socket连接
self.client=socket(AF_INET,SOCK_STREAM)
self.client.connect(self.ADDR)
#起一个线程,监听接收的信息
self.trecv=threading.Thread(target=self.recvmsg)
self.trecv.start()
defsendmsg(self):
#循环发送聊天消息,如果socket连接存在则一直循环,发送quit时关闭链接
whileself.client.connect_ex(self.ADDR):
data=raw_input('>:')
ifnotdata:
break
self.client.send(data.encode('utf8'))
print(u'发送信息到%s:%s'%(self.HOST,data))
ifdata.upper()=="QUIT":
self.client.close()
printu"已关闭"
break
defrecvmsg(self):
#接收消息,如果链接一直存在,则持续监听接收消息
try:
whileself.client.connect_ex(self.ADDR):
data=self.client.recv(self.BUFSIZ)
print(u'从%s收到信息:%s'%(self.HOST,data.decode('utf8')))
exceptException,e:
printstr(e)
if__name__=='__main__':
client=TcpClient()
client.sendmsg()
UDP版本:
socket-udp-server.py
#-*-coding:utf8-*-
importsys
importtime
importtraceback
importthreading
reload(sys)
sys.setdefaultencoding('utf-8')
importsocket
importtraceback
HOST="127.0.0.1"
PORT=9555
CHECK_PERIOD=20
CHECK_TIMEOUT=15
classUdpServer(object):
def__init__(self):
self.clients=[]
self.beats={}
self.ADDR=(HOST,PORT)
try:
self.sock=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
self.sock.bind(self.ADDR)#绑定同一个域名下的所有机器
self.beattrs=threading.Thread(target=self.checkheartbeat)
self.beattrs.start()
exceptException,e:
traceback.print_exc()
returnFalse
deflisten_client(self):
whileTrue:
time.sleep(0.5)
print"hohohohohoo"
try:
recvData,address=self.sock.recvfrom(2048)
ifnotrecvData:
self.close_client(address)
break
ifaddressinself.clients:
senddata=u"%s发送给我的信息是:%s"%(str(address),recvData.decode('utf8'))
ifrecvData.upper()=="QUIT":
self.close_client(address)
ifrecvData=="HEARTBEAT":
self.heartbeat(address)
continue
else:
self.clients.append(address)
senddata=u"%s发送给我的信息是:%s"%(str(address),u'进入了聊天室')
forcinself.clients:
try:
self.sock.sendto(senddata,c)
exceptException,e:
printstr(e)
self.close_client(c)
exceptException,e:
#traceback.print_exc()
printstr(e)
pass
defheartbeat(self,address):
self.beats[address]=time.time()
defcheckheartbeat(self):
whileTrue:
print"checkheartbeat"
printself.beats
try:
forcinself.clients:
printtime.time()
printself.beats[c]
ifself.beats[c]+CHECK_TIMEOUT<time.time():
printu"%s心跳超时,连接已经断开"%str(c)
self.close_client(c)
else:
printu"checkp%s,没有断开"%str(c)
exceptException,e:
traceback.print_exc()
printstr(e)
pass
time.sleep(CHECK_PERIOD)
defclose_client(self,address):
try:
ifaddressinself.clients:
self.clients.remove(address)
ifself.beats.has_key(address):
delself.beats[address]
printself.clients
forcinself.clients:
self.sock.sendto(u'%s已经离开了'%str(address),c)
print(str(address)+u'已经退出')
exceptException,e:
printstr(e)
raise
if__name__=="__main__":
udpServer=UdpServer()
udpServer.listen_client()
——————————华丽的分割线——————————
socket-udp-client.py:
#-*-coding:utf8-*-
importsys
importthreading
importtime
reload(sys)
sys.setdefaultencoding('utf-8')
importsocket
HOST="127.0.0.1"
PORT=9555
#BEAT_PORT=43278
BEAT_PERIOD=5
classUdpClient(object):
def__init__(self):
self.clientsock=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
self.HOST=HOST
self.ADDR=(HOST,PORT)
self.clientsock.sendto(u'请求建立链接',self.ADDR)
self.recvtrs=threading.Thread(target=self.recvmsg)
self.recvtrs.start()
self.hearttrs=threading.Thread(target=self.heartbeat)
self.hearttrs.start()
defsendmsg(self):
whileTrue:
data=raw_input(">:")
ifnotdata:
break
self.clientsock.sendto(data.encode('utf-8'),self.ADDR)
ifdata.upper()=='QUIT':
self.clientsock.close()
break
defheartbeat(self):
whileTrue:
self.clientsock.sendto('HEARTBEAT',self.ADDR)
time.sleep(BEAT_PERIOD)
defrecvmsg(self):
whileTrue:
recvData,addr=self.clientsock.recvfrom(1024)
ifnotrecvData:
break
print(u'从%s收到信息:%s'%(self.HOST,recvData.decode('utf8')))if__name__=="__main__":
udpClient=UdpClient()
udpClient.sendmsg()
Ⅲ [python练手]使用WordCloud模块将qq聊天记录生成炫酷的关键词云
这个项目的github地址。 https://github.com/susususuhanmo/QQChatLogWordCloud
最近准备开始学习python,写一个综合一点的小demo练练手~
读取文件
编写清洗函数,清洗聊天数据。主要是需要清洗掉一些无用的关键词:
分词,分词结果如果出现一些特有的词语截了一半或者截多了几个字符的情况,可以手动添加分词词库。
根据关键词数据,建立pandas的DataFrame,通过停词词库过滤掉一些中文中不适合做关键词的词语,进行关键词数统计并根据次数排序。
词云属性解释
调整成120之后就好看很多,有很明显的差别。
设置图片为可爱的莫古力
根据这个莫古力的颜色分布,生成的词云如下,我这个不是特别好看,大家可以选择轮廓明显一点的图片来生成。
相比默认的模式,
关于更详细的词云配置可以看这篇文章,这个作者对wordcloud的配置讲解的十分详细。
https://blog.csdn.net/heyuexianzi/article/details/76851377
在你想导出的人或群处右键,点导出消息记录,
然后选择txt格式
Ⅳ 用Python编写一个udp聊天器,为什么接收正常,但发送却显示向一个无法连接的网络尝试了一个套接
你这个只接收一次就关闭了啊,应该写个循环接收和发送
Ⅳ 如何用Python编写一个聊天室
python聊天室(python2.7版本):
暂时先给出两种版本的,tcp+udp
都是分别运行server.py和client.py,就可以进行通讯了。
别外还有websocket版本,这个是有web界面的和基本web服务的,如果需要的话,我会把基本的代码贴一版上来。
TCP版本:
socket-tcp-server.py(服务端):
#-*-encoding:utf-8-*-
#socket.getaddrinfo(host,port,family=0,socktype=0,proto=0,flags=0)
#根据给定的参数host/port,相应的转换成一个包含用于创建socket对象的五元组,
#参数host为域名,以字符串形式给出代表一个IPV4/IPV6地址或者None.
#参数port如果字符串形式就代表一个服务名,比如“http”"ftp""email"等,或者为数字,或者为None
#参数family为地主族,可以为AF_INET,AF_INET6,AF_UNIX.
#参数socktype可以为SOCK_STREAM(TCP)或者SOCK_DGRAM(UDP)
#参数proto通常为0可以直接忽略
#参数flags为AI_*的组合,比如AI_NUMERICHOST,它会影响函数的返回值
#附注:给参数host,port传递None时建立在C基础,通过传递NULL。
#该函数返回一个五元组(family,socktype,proto,canonname,sockaddr),同时第五个参数sockaddr也是一个二元组(address,port)
#更多的方法及链接请访问
#Echoserverprogram
fromsocketimport*
importsys
importthreading
fromtimeimportctime
fromtimeimportlocaltime
importtraceback
importtime
importsubprocess
reload(sys)
sys.setdefaultencoding("utf8")
HOST='127.0.0.1'
PORT=8555#设置侦听端口
BUFSIZ=1024
classTcpServer():
def__init__(self):
self.ADDR=(HOST,PORT)
try:
self.sock=socket(AF_INET,SOCK_STREAM)
print'%disopen'%PORT
self.sock.bind(self.ADDR)
self.sock.listen(5)
#设置退出条件
self.STOP_CHAT=False
#所有监听的客户端
self.clients={}
self.thrs={}
self.stops=[]
exceptException,e:
print"%disdown"%PORT
returnFalse
defIsOpen(ip,port):
s=socket(AF_INET,SOCK_STREAM)
try:
s.connect((ip,int(port)))
#s.shutdown(2)
#利用shutdown()函数使socket双向数据传输变为单向数据传输。shutdown()需要一个单独的参数,
#该参数表示s了如何关闭socket。具体为:0表示禁止将来读;1表示禁止将来写;2表示禁止将来读和写。
print'%disopen'%port
returnTrue
except:
print'%disdown'%port
returnFalse
deflisten_client(self):
whilenotself.STOP_CHAT:
print(u'等待接入,侦听端口:%d'%(PORT))
self.tcpClientSock,self.addr=self.sock.accept()
print(u'接受连接,客户端地址:',self.addr)
address=self.addr
#将建立的clientsocket链接放到列表self.clients中
self.clients[address]=self.tcpClientSock
#分别将每个建立的链接放入进程中,接收且分发消息
self.thrs[address]=threading.Thread(target=self.readmsg,args=[address])
self.thrs[address].start()
time.sleep(0.5)defreadmsg(self,address):
#如果地址不存在,则返回False
ifaddressnotinself.clients:
returnFalse
#得到发送消息的clientsocket
client=self.clients[address]
whileTrue:
try:
#获取到消息内容data
data=client.recv(BUFSIZ)
except:
print(e)
self.close_client(address)
break
ifnotdata:
break
#python3使用bytes,所以要进行编码
#s='%s发送给我的信息是:[%s]%s'%(addr[0],ctime(),data.decode('utf8'))
#对日期进行一下格式化
ISOTIMEFORMAT='%Y-%m-%d%X'
stime=time.strftime(ISOTIMEFORMAT,localtime())
s=u'%s发送给我的信息是:%s'%(str(address),data.decode('utf8'))
#将获得的消息分发给链接中的clientsocket
forkinself.clients:
self.clients[k].send(s.encode('utf8'))
self.clients[k].sendall('sendall:'+s.encode('utf8'))
printstr(k)
print([stime],':',data.decode('utf8'))
#如果输入quit(忽略大小写),则程序退出
STOP_CHAT=(data.decode('utf8').upper()=="QUIT")
ifSTOP_CHAT:
print"quit"
self.close_client(address)
print"alreadyquit"
break
defclose_client(self,address):
try:
client=self.clients.pop(address)
self.stops.append(address)
client.close()
forkinself.clients:
self.clients[k].send(str(address)+u"已经离开了")
except:
pass
print(str(address)+u'已经退出')
if__name__=='__main__':
tserver=TcpServer()
tserver.listen_client()
——————————华丽的分割线——————————
socket-tcp-client.py(客户端):
#-*-encoding:utf-8-*-
fromsocketimport*
importsys
importthreading
importtime
reload(sys)
sys.setdefaultencoding("utf8")
#测试,连接本机
HOST='127.0.0.1'
#设置侦听端口
PORT=8555
BUFSIZ=1024
classTcpClient:
ADDR=(HOST,PORT)
def__init__(self):
self.HOST=HOST
self.PORT=PORT
self.BUFSIZ=BUFSIZ
#创建socket连接
self.client=socket(AF_INET,SOCK_STREAM)
self.client.connect(self.ADDR)
#起一个线程,监听接收的信息
self.trecv=threading.Thread(target=self.recvmsg)
self.trecv.start()
defsendmsg(self):
#循环发送聊天消息,如果socket连接存在则一直循环,发送quit时关闭链接
whileself.client.connect_ex(self.ADDR):
data=raw_input('>:')
ifnotdata:
break
self.client.send(data.encode('utf8'))
print(u'发送信息到%s:%s'%(self.HOST,data))
ifdata.upper()=="QUIT":
self.client.close()
printu"已关闭"
break
defrecvmsg(self):
#接收消息,如果链接一直存在,则持续监听接收消息
try:
whileself.client.connect_ex(self.ADDR):
data=self.client.recv(self.BUFSIZ)
print(u'从%s收到信息:%s'%(self.HOST,data.decode('utf8')))
exceptException,e:
printstr(e)
if__name__=='__main__':
client=TcpClient()
client.sendmsg()
UDP版本:
socket-udp-server.py
#-*-coding:utf8-*-
importsys
importtime
importtraceback
importthreading
reload(sys)
sys.setdefaultencoding('utf-8')
importsocket
importtraceback
HOST="127.0.0.1"
PORT=9555
CHECK_PERIOD=20
CHECK_TIMEOUT=15
classUdpServer(object):
def__init__(self):
self.clients=[]
self.beats={}
self.ADDR=(HOST,PORT)
try:
self.sock=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
self.sock.bind(self.ADDR)#绑定同一个域名下的所有机器
self.beattrs=threading.Thread(target=self.checkheartbeat)
self.beattrs.start()
exceptException,e:
traceback.print_exc()
returnFalse
deflisten_client(self):
whileTrue:
time.sleep(0.5)
print"hohohohohoo"
try:
recvData,address=self.sock.recvfrom(2048)
ifnotrecvData:
self.close_client(address)
break
ifaddressinself.clients:
senddata=u"%s发送给我的信息是:%s"%(str(address),recvData.decode('utf8'))
ifrecvData.upper()=="QUIT":
self.close_client(address)
ifrecvData=="HEARTBEAT":
self.heartbeat(address)
continue
else:
self.clients.append(address)
senddata=u"%s发送给我的信息是:%s"%(str(address),u'进入了聊天室')
forcinself.clients:
try:
self.sock.sendto(senddata,c)
exceptException,e:
printstr(e)
self.close_client(c)
exceptException,e:
#traceback.print_exc()
printstr(e)
pass
defheartbeat(self,address):
self.beats[address]=time.time()
defcheckheartbeat(self):
whileTrue:
print"checkheartbeat"
printself.beats
try:
forcinself.clients:
printtime.time()
printself.beats[c]
ifself.beats[c]+CHECK_TIMEOUT<time.time():
printu"%s心跳超时,连接已经断开"%str(c)
self.close_client(c)
else:
printu"checkp%s,没有断开"%str(c)
exceptException,e:
traceback.print_exc()
printstr(e)
pass
time.sleep(CHECK_PERIOD)
defclose_client(self,address):
try:
ifaddressinself.clients:
self.clients.remove(address)
ifself.beats.has_key(address):
delself.beats[address]
printself.clients
forcinself.clients:
self.sock.sendto(u'%s已经离开了'%str(address),c)
print(str(address)+u'已经退出')
exceptException,e:
printstr(e)
raise
if__name__=="__main__":
udpServer=UdpServer()
udpServer.listen_client()
——————————华丽的分割线——————————
socket-udp-client.py:
#-*-coding:utf8-*-
importsys
importthreading
importtime
reload(sys)
sys.setdefaultencoding('utf-8')
importsocket
HOST="127.0.0.1"
PORT=9555
#BEAT_PORT=43278
BEAT_PERIOD=5
classUdpClient(object):
def__init__(self):
self.clientsock=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
self.HOST=HOST
self.ADDR=(HOST,PORT)
self.clientsock.sendto(u'请求建立链接',self.ADDR)
self.recvtrs=threading.Thread(target=self.recvmsg)
self.recvtrs.start()
self.hearttrs=threading.Thread(target=self.heartbeat)
self.hearttrs.start()
defsendmsg(self):
whileTrue:
data=raw_input(">:")
ifnotdata:
break
self.clientsock.sendto(data.encode('utf-8'),self.ADDR)
ifdata.upper()=='QUIT':
self.clientsock.close()
break
defheartbeat(self):
whileTrue:
self.clientsock.sendto('HEARTBEAT',self.ADDR)
time.sleep(BEAT_PERIOD)
defrecvmsg(self):
whileTrue:
recvData,addr=self.clientsock.recvfrom(1024)
ifnotrecvData:
break
print(u'从%s收到信息:%s'%(self.HOST,recvData.decode('utf8')))if__name__=="__main__":
udpClient=UdpClient()
udpClient.sendmsg()
Ⅵ 如何用Python编写一个聊天室
一、课程介绍
1.简介
本次项目课是实现简单聊天室程序的服务器端和客户端。
2.知识点
服务器端涉及到asyncore、asynchat和socket这几个模块,客户端用到了telnetlib、wx、time和thread这几个模块。
3.所需环境
本次课中编写客户端需要用到wxPython,它是一个GUI工具包,请先使用下面的命令安装:
$ sudo apt-get install python-wxtools
密码为shiyanlou
4.项目效果截图
登录窗口
二、项目实战(服务器端)
1.服务器类
首先需要一个聊天服务器,这里继承asyncore的dispatcher类来实现,代码如下
class ChatServer(dispatcher):
"""
聊天服务器
"""
def __init__(self, port):
dispatcher.__init__(self)
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr()
self.bind(('', port))
self.listen(5)
self.users = {}
self.main_room = ChatRoom(self)
def handle_accept(self):
conn, addr = self.accept()
ChatSession(self, conn)
2.会话类
有了服务器类还需要能维护每个用户的连接会话,这里继承asynchat的async_chat类来实现,代码如下:
class ChatSession(async_chat):
"""
负责和单用户通信
"""
def __init__(self, server, sock):
async_chat.__init__(self, sock)
self.server = server
self.set_terminator('
')
self.data = []
self.name = None
self.enter(LoginRoom(server))
def enter(self, room):
'从当前房间移除自身,然后添加到指定房间'
try:
cur = self.room
except AttributeError:
pass
else:
cur.remove(self)
self.room = room
room.add(self)
def collect_incoming_data(self, data):
'接受客户端的数据'
self.data.append(data)
def found_terminator(self):
'当客户端的一条数据结束时的处理'
line = ''.join(self.data)
self.data = []
try:
self.room.handle(self, line)
except EndSession:
self.handle_close()
def handle_close(self):
async_chat.handle_close(self)
self.enter(LogoutRoom(self.server))
3.命令解释器
现在就需要一个命令解释器能够解释用户的命令,例如登录、查询在线用户和发消息等,代码如下:
class CommandHandler:
"""
命令处理类
"""
def unknown(self, session, cmd):
'响应未知命令'
session.push('Unknown command: %s
' % cmd)
def handle(self, session, line):
'命令处理'
if not line.strip():
return
parts = line.split(' ', 1)
cmd = parts[0]
try:
line = parts[1].strip()
except IndexError:
line = ''
meth = getattr(self, 'do_' + cmd, None)
try:
meth(session, line)
except TypeError:
self.unknown(session, cmd)
4.房间
接下来就需要实现聊天室的房间了,这里我们定义了三种房间,分别是用户刚登录时的房间、聊天的房间和退出登录的房间,这三种房间都有一个公共的父类,代码如下:
class Room(CommandHandler):
"""
包含多个用户的环境,负责基本的命令处理和广播
"""
def __init__(self, server):
self.server = server
self.sessions = []
def add(self, session):
'一个用户进入房间'
self.sessions.append(session)
def remove(self, session):
'一个用户离开房间'
self.sessions.remove(session)
def broadcast(self, line):
'向所有的用户发送指定消息'
for session in self.sessions:
session.push(line)
def do_logout(self, session, line):
'退出房间'
raise EndSession
class LoginRoom(Room):
"""
刚登录的用户的房间
"""
def add(self, session):
'用户连接成功的回应'
Room.add(self, session)
session.push('Connect Success')
def do_login(self, session, line):
'登录命令处理'
name = line.strip()
if not name:
session.push('UserName Empty')
elif name in self.server.users:
session.push('UserName Exist')
else:
session.name = name
session.enter(self.server.main_room)
class ChatRoom(Room):
"""
聊天用的房间
"""
def add(self, session):
'广播新用户进入'
session.push('Login Success')
self.broadcast(session.name + ' has entered the room.
')
self.server.users[session.name] = session
Room.add(self, session)
def remove(self, session):
'广播用户离开'
Room.remove(self, session)
self.broadcast(session.name + ' has left the room.
')
def do_say(self, session, line):
'客户端发送消息'
self.broadcast(session.name + ': ' + line + '
')
def do_look(self, session, line):
'查看在线用户'
session.push('Online Users:
')
for other in self.sessions:
session.push(other.name + '
')
class LogoutRoom(Room):
"""
用户退出时的房间
"""
def add(self, session):
'从服务器中移除'
try:
del self.server.users[session.name]
except KeyError:
pass
5.服务器端完整代码
#!/usr/bin/python
# encoding: utf-8
from asyncore import dispatcher
from asynchat import async_chat
import socket, asyncore
PORT = 6666 #端口
class EndSession(Exception):
"""
自定义会话结束时的异常
"""
pass
class CommandHandler:
"""
命令处理类
"""
def unknown(self, session, cmd):
'响应未知命令'
session.push('Unknown command: %s
' % cmd)
def handle(self, session, line):
'命令处理'
if not line.strip():
return
parts = line.split(' ', 1)
cmd = parts[0]
try:
line = parts[1].strip()
except IndexError:
line = ''
meth = getattr(self, 'do_' + cmd, None)
try:
meth(session, line)
except TypeError:
self.unknown(session, cmd)
class Room(CommandHandler):
"""
包含多个用户的环境,负责基本的命令处理和广播
"""
def __init__(self, server):
self.server = server
self.sessions = []
def add(self, session):
'一个用户进入房间'
self.sessions.append(session)
def remove(self, session):
'一个用户离开房间'
self.sessions.remove(session)
def broadcast(self, line):
'向所有的用户发送指定消息'
for session in self.sessions:
session.push(line)
def do_logout(self, session, line):
'退出房间'
raise EndSession
class LoginRoom(Room):
"""
刚登录的用户的房间
"""
def add(self, session):
'用户连接成功的回应'
Room.add(self, session)
session.push('Connect Success')
def do_login(self, session, line):
'登录命令处理'
name = line.strip()
if not name:
session.push('UserName Empty')
elif name in self.server.users:
session.push('UserName Exist')
else:
session.name = name
session.enter(self.server.main_room)
class ChatRoom(Room):
"""
聊天用的房间
"""
def add(self, session):
'广播新用户进入'
session.push('Login Success')
self.broadcast(session.name + ' has entered the room.
')
self.server.users[session.name] = session
Room.add(self, session)
def remove(self, session):
'广播用户离开'
Room.remove(self, session)
self.broadcast(session.name + ' has left the room.
')
def do_say(self, session, line):
'客户端发送消息'
self.broadcast(session.name + ': ' + line + '
')
def do_look(self, session, line):
'查看在线用户'
session.push('Online Users:
')
for other in self.sessions:
session.push(other.name + '
')
class LogoutRoom(Room):
"""
用户退出时的房间
"""
def add(self, session):
'从服务器中移除'
try:
del self.server.users[session.name]
except KeyError:
pass
class ChatSession(async_chat):
"""
负责和单用户通信
"""
def __init__(self, server, sock):
async_chat.__init__(self, sock)
self.server = server
self.set_terminator('
')
self.data = []
self.name = None
self.enter(LoginRoom(server))
def enter(self, room):
'从当前房间移除自身,然后添加到指定房间'
try:
cur = self.room
except AttributeError:
pass
else:
cur.remove(self)
self.room = room
room.add(self)
def collect_incoming_data(self, data):
'接受客户端的数据'
self.data.append(data)
def found_terminator(self):
'当客户端的一条数据结束时的处理'
line = ''.join(self.data)
self.data = []
try:
self.room.handle(self, line)
except EndSession:
self.handle_close()
def handle_close(self):
async_chat.handle_close(self)
self.enter(LogoutRoom(self.server))
class ChatServer(dispatcher):
"""
聊天服务器
"""
def __init__(self, port):
dispatcher.__init__(self)
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr()
self.bind(('', port))
self.listen(5)
self.users = {}
self.main_room = ChatRoom(self)
def handle_accept(self):
conn, addr = self.accept()
ChatSession(self, conn)
if __name__ == '__main__':
s = ChatServer(PORT)
try:
asyncore.loop()
except KeyboardInterrupt:
print
三、项目实战(客户端)
完成了服务器端后,就需要实现客户端了,这里客户端连接服务器使用了telnetlib模块。
1.登录窗口
这里的图形界面包选择了wxPython,前面有安装说明,登录窗口通过继承wx.Frame类来实现,代码如下:
class LoginFrame(wx.Frame):
"""
登录窗口
Ⅶ 如何优雅的用Python玩转语音聊天机器人
所需硬件:
树莓派B+
人体红外线感应模块
内置麦克风摄像头(实测树莓派免驱淘宝链接)
申请API:
网络语音api
图灵api
语音聊天机器人实现原理:当有人来到跟前时--》触发聊天功能,开始以每2s检测录制语音--》通过网络语音api合成文字--》传递给图灵api返回回答信息--》通过网络语音合成播放
【人体感应识别部分Python代码renti.py】
#/usr/bin/python#coding:utf-8import RPi.GPIO as GPIOimport timeimport osimport signalimport atexitGPIO.setmode(GPIO.BCM) GPIO_PIR = 14 GPIO.setup(GPIO_PIR,GPIO.IN) # Echojing = 0dong = 0 sum = 0sum1 = 0oldren = 0sleep = 0def ganying(): i = 0 ok = 0 error = 0 while i < 10: if GPIO.input(GPIO_PIR) == 1 : ok = ok + 1 if GPIO.input(GPIO_PIR) == 0 : error = error + 1 time.sleep(0.01) i = i + 1 ren = ok/(error+1) return ren
1
GPIO_PIR = 14
为 红外线检测模块与树莓派的针脚,脚本函数返回0表示无人,>0 为有人
【Python语音识别聊天部分robot.py】
#/usr/bin/python# -*- coding:utf-8 -*-import sysreload(sys)sys.setdefaultencoding( "utf-8" )import urllibimport urllib2import jsonimport uuidimport base64import osimport timefrom renti import * #获取网络tokenappid=7647466apikey="网络API"secretkey="网络API" _url="h.com/oauth/2.0/token?grant_type=client_credentials&client_id=" + apikey + "&client_secret=" + secretkey; y_post=urllib2.urlopen(_url)y_read=y_post.read()y_token=json.loads(y_read)['access_token']#print y_read#print y_token #------------------function------------- def luyin(): os.system('arecord -D plughw:1,0 -c 1 -d 2 1.wav -r 8000 -f S16_LE 2>/dev/null') def fanyi():
#---------------语音识别部分 mac_address="haogeoyes" with open("1.wav",'rb') as f: s_file = f.read() speech_base64=base64.b64encode(s_file).decode('utf-8') speech_length=len(s_file) data_dict = {'format':'wav', 'rate':8000, 'channel':1, 'cuid':mac_address, 'token':y_token, 'lan':'zh', 'speech':speech_base64, 'len':speech_length} json_data = json.mps(data_dict).encode('utf-8') json_length = len(json_data) asr_server = 'm/server_api' request = urllib2.Request(url=asr_server) request.add_header("Content-Type", "application/json") request.add_header("Content-Length", json_length) fs = urllib2.urlopen(url=request, data=json_data) result_str = fs.read().decode('utf-8') json_resp = json.loads(result_str) if json_resp.has_key('result'): out_txt=json_resp['result'][0] else: out_txt="Null" return out_txt def tuling(b): f=urllib.urlopen("23.com/openapi/api?key="此处为图灵API"&info=%s" % b) f=json.loads(f.read())['text'] return f def hecheng(text,y_token): #text="你好我是机器人牛牛很高兴能够认识你" geturl="u.com/text2audio?tex="+text+"&lan=zh&per=1&pit=9&spd=6&cuid=CCyo6UGf16ggKZGwGpQYL9Gx&ctp=1&tok="+y_token return os.system('omxplayer "%s" > /dev/null 2>&1 '%(geturl)) #return os.system('omxplayer "%s" > /dev/null 2>&1 '%(geturl)) def nowtime(): return time.strftime('%Y-%m-%d %H:%M:%S ') #---------------main-----------------num=0 #num用来判断是第一次说话,还是在对话过程中first=1 #判断是不是第一说话 当1000次没有人动认为是第一次while True: if ganying()!=0: run=open('run.log','a') if first==0: hecheng("你好,我是牛牛机器人,你可以和我聊天,不过说话的时候你必须靠近话筒近一点,",y_token) hecheng("说点什么吧,2秒钟内说完哦.",y_token) first=1 #为1一段时间就不执行 num=0 #从新计数 #print ganying() run.write(nowtime()+"说点神马吧..........."+'\n') print nowtime()+"说点神马吧.........." luyin() #开始录音 out=fanyi().encode("utf-8") #翻译文字 run.write(nowtime()+"我说:"+out+'\n') print nowtime()+"我说:"+out if out == "Null": text="没有听清楚你说什么" os.system('omxplayer "shenme.wav" > /dev/null 2>&1 ') else: text=tuling(out) hecheng(text,y_token) print nowtime()+"牛牛:"+text run.write(nowtime()+"牛牛:"+text+'\n') run.close() else: #print ganying() #调试查看是否为0有人没人 #print num num=num+1 #num长时间增大说明没有人在旁边 if num > 1000: first=0 #0表示第一次说话
万事俱备 运行nohup python robot.py 哈哈就可以脱离屏幕开始愉快的语音聊天啦
下面看看聊天的日志记录吧
后续更新。。。。。。Python如何用语音优雅的控制小车
Ⅷ 怎么利用Python做一个即时通信软件呢,类似于QQ,能实现基本的聊天和文件传输功能
这个基本的聊天就是最基本的socket操作,python界面方面比较弱势,如果不用第三方库的话。
Ⅸ python聊天程序地址含义
执行指定的运行指令即可。
导入文件时,其中的所有内容都将继续运行。所以你现在所做的并不是实际并行运行两个程序,import中的second.py语句指示运行first.py,然后继续执行其余的指令。
执行此操作的pythonic方法只是通过import语句传递对象。我们可以定义具有rocket属性的charge类,第二个程序检查此属性,然后通过方法添加到该属性。
Ⅹ python写的socket自动聊天:从mysql返回到程序中的数据是nonetype类型,怎么解决
我觉得你应把MYSQL的返回内容打印出来看看,我猜是没有在MYSQL中查到内容,返回了空值造成的。