Contenidos
- Conectar con una URL en Python
- Conectar con una URL y cambiar el User-Agent en Python
- Realizar una petición HTTP POST en Python
- Comprobar si un puerto TCP está abierto en Python
- Servidor y cliente TCP básicos en Python
- Servidor y Cliente TCP en Python utilizando clases distintas
- Servidor y cliente UDP básicos en Python
- Cliente y servidor para enviar objeto serializado por TCP en Python (utilizando hilos)
- Enviar una imagen entre un cliente y un servidor utilizando sockets TCP en Python
- MulticastSocket en Python
Conectar con una URL en Python
1 2 3 4 5 6 7 8 |
import urllib.request def conectar_con_url(): with urllib.request.urlopen('http://www.example.com') as response: html = response.read() print(html) conectar_con_url() |
Conectar con una URL y cambiar el User-Agent en Python
1 2 3 4 5 6 7 8 9 10 |
import urllib.request def conectar_con_url_cambiar_user_agent(): url = 'http://www.example.com' req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'}) with urllib.request.urlopen(req) as response: html = response.read() print(html) conectar_con_url_cambiar_user_agent() |
Realizar una petición HTTP POST en Python
1 2 3 4 5 6 7 8 9 10 11 12 |
import urllib.request import urllib.parse def realizar_peticion_post(): url = 'http://www.example.com/login' data = urllib.parse.urlencode({'username': 'user', 'password': 'pass'}).encode() req = urllib.request.Request(url, data=data) with urllib.request.urlopen(req) as response: result = response.read() print(result) realizar_peticion_post() |
Comprobar si un puerto TCP está abierto en Python
1 2 3 4 5 6 7 8 9 10 11 12 |
import socket def comprobar_puerto_tcp_abierto(host, port): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) result = sock.connect_ex((host, port)) if result == 0: print("Puerto abierto") else: print("Puerto cerrado") sock.close() comprobar_puerto_tcp_abierto('127.0.0.1', 80) |
Servidor y cliente TCP básicos en Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
import socket # Servidor TCP def servidor_tcp(): server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.bind(('0.0.0.0', 65432)) server_socket.listen(1) conn, addr = server_socket.accept() with conn: print(f'Conectado por {addr}') while True: data = conn.recv(1024) if not data: break conn.sendall(data) # Cliente TCP def cliente_tcp(): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.connect(('127.0.0.1', 65432)) s.sendall(b'Hola, servidor') data = s.recv(1024) print(f'Recibido {data}') # Para ejecutar el servidor y cliente # servidor_tcp() # cliente_tcp() |
Servidor y Cliente TCP en Python utilizando clases distintas
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
import socket import threading class ServidorTCP: def __init__(self, host='0.0.0.0', port=65432): self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server_socket.bind((host, port)) self.server_socket.listen(1) def start(self): while True: conn, addr = self.server_socket.accept() threading.Thread(target=self.handle_client, args=(conn, addr)).start() def handle_client(self, conn, addr): with conn: print(f'Conectado por {addr}') while True: data = conn.recv(1024) if not data: break conn.sendall(data) class ClienteTCP: def __init__(self, host='127.0.0.1', port=65432): self.host = host self.port = port def send_message(self, message): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.connect((self.host, self.port)) s.sendall(message.encode()) data = s.recv(1024) print(f'Recibido {data.decode()}') # Para ejecutar el servidor y cliente # servidor = ServidorTCP() # threading.Thread(target=servidor.start).start() # cliente = ClienteTCP() # cliente.send_message('Hola, servidor') |
Servidor y cliente UDP básicos en Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
import socket # Servidor UDP def servidor_udp(): server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) server_socket.bind(('0.0.0.0', 65432)) while True: data, addr = server_socket.recvfrom(1024) print(f'Recibido {data} de {addr}') server_socket.sendto(data, addr) # Cliente UDP def cliente_udp(): client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) client_socket.sendto(b'Hola, servidor UDP', ('127.0.0.1', 65432)) data, addr = client_socket.recvfrom(1024) print(f'Recibido {data} de {addr}') # Para ejecutar el servidor y cliente # servidor_udp() # cliente_udp() |
Cliente y servidor para enviar objeto serializado por TCP en Python (utilizando hilos)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
import socket import threading import pickle class ServidorTCP: def __init__(self, host='0.0.0.0', port=65432): self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server_socket.bind((host, port)) self.server_socket.listen(1) def start(self): while True: conn, addr = self.server_socket.accept() threading.Thread(target=self.handle_client, args=(conn, addr)).start() def handle_client(self, conn, addr): with conn: print(f'Conectado por {addr}') data = conn.recv(1024) obj = pickle.loads(data) print(f'Recibido: {obj}') response = pickle.dumps(obj) conn.sendall(response) class ClienteTCP: def __init__(self, host='127.0.0.1', port=65432): self.host = host self.port = port def send_object(self, obj): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.connect((self.host, self.port)) data = pickle.dumps(obj) s.sendall(data) response = s.recv(1024) print(f'Recibido: {pickle.loads(response)}') # Para ejecutar el servidor y cliente # servidor = ServidorTCP() # threading.Thread(target=servidor.start).start() # cliente = ClienteTCP() # cliente.send_object({'mensaje': 'Hola, servidor'}) |
Enviar una imagen entre un cliente y un servidor utilizando sockets TCP en Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
import socket import threading class ServidorTCP: def __init__(self, host='0.0.0.0', port=65432): self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server_socket.bind((host, port)) self.server_socket.listen(1) def start(self): while True: conn, addr = self.server_socket.accept() threading.Thread(target=self.handle_client, args=(conn, addr)).start() def handle_client(self, conn, addr): with conn: print(f'Conectado por {addr}') with open('imagen_recibida.jpg', 'wb') as f: while True: data = conn.recv(1024) if not data: break f.write(data) class ClienteTCP: def __init__(self, host='127.0.0.1', port=65432): self.host = host self.port = port def send_image(self, image_path): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.connect((self.host, self.port)) with open(image_path, 'rb') as f: for data in iter(lambda: f.read(1024), b''): s.sendall(data) # Para ejecutar el servidor y cliente # servidor = ServidorTCP() # threading.Thread(target=servidor.start).start() # cliente = ClienteTCP() # cliente.send_image('imagen_a_enviar.jpg') |
MulticastSocket en Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
import socket import struct import threading class MulticastReceiver: def __init__(self, multicast_group='224.1.1.1', port=5004): self.multicast_group = multicast_group self.port = port def start(self): sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.bind(('', self.port)) group = socket.inet_aton(self.multicast_group) mreq = struct.pack('4sL', group, socket.INADDR_ANY) sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) while True: data, address = sock.recvfrom(1024) print(f'Recibido {data} de {address}') class MulticastSender: def __init__(self, multicast_group='224.1.1.1', port=5004): self.multicast_group = (multicast_group, port) def send_message(self, message): sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) ttl = struct.pack('b', 1) sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, ttl) sock.sendto(message.encode(), self.multicast_group) # Para ejecutar el receptor y el emisor # receptor = MulticastReceiver() # threading.Thread(target=receptor.start).start() # emisor = MulticastSender() # emisor.send_message('Hola, Multicast') |