#!/usr/bin/env python3 import time import struct import socket import RPi.GPIO as GPIO def clamp(n, a, b): if n < a: return a elif n > b: return b else: return n interval = 0.001 udp_addr = ("192.168.1.1", 6000) GPIO.setmode(GPIO.BCM) motor_pins = [4, 18, 17, 27, 23, 22, 24, 25] motor_pwms = [] def init_motor(): GPIO.setup(motor_pins, GPIO.OUT) for motor_index in range(0, 8): motor_pwms.append(GPIO.PWM(motor_pins[motor_index], 500)) motor_pwms[motor_index].start(0) motor_LF_forward = 0 motor_LF_reverse = 1 motor_RF_forward = 3 motor_RF_reverse = 2 motor_LR_forward = 5 motor_LR_reverse = 4 motor_RR_forward = 6 motor_RR_reverse = 7 def stop_motor(): for motor_index in range(0, 8): motor_pwms[motor_index].ChangeDutyCycle(0.0) def set_motor(motor, speed, is_brake = False): motor_forward = -1 motor_reverse = -1 if motor == "LF": motor_forward = motor_LF_forward motor_reverse = motor_LF_reverse elif motor == "RF": motor_forward = motor_RF_forward motor_reverse = motor_RF_reverse elif motor == "LR": motor_forward = motor_LR_forward motor_reverse = motor_LR_reverse else: # motor == "RR" motor_forward = motor_RR_forward motor_reverse = motor_RR_reverse if is_brake == True: motor_pwms[motor_forward].ChangeDutyCycle(100.0) motor_pwms[motor_reverse].ChangeDutyCycle(100.0) elif speed < 0: motor_pwms[motor_forward].ChangeDutyCycle(0.0) motor_pwms[motor_reverse].ChangeDutyCycle(-speed * 100.0) elif speed > 0: motor_pwms[motor_forward].ChangeDutyCycle(speed * 100.0) motor_pwms[motor_reverse].ChangeDutyCycle(0.0) else: motor_pwms[motor_forward].ChangeDutyCycle(0.0) motor_pwms[motor_reverse].ChangeDutyCycle(0.0) sock = socket.socket() lasttime = 0 def init_udp(): global sock sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.bind(("0.0.0.0", 6000)) sock.setblocking(False) def hand_udp(): global sock global udp_addr global lasttime try: (data, source) = sock.recvfrom(1024) udp_addr = source if len(data) == 6: seq, speed_LF, speed_RF, speed_LR, speed_RR, is_brake = struct.unpack("!Bbbbb?", data) set_motor("LF", clamp(speed_LF / 128, -1.0, 1.0), is_brake) set_motor("RF", clamp(speed_RF / 128, -1.0, 1.0), is_brake) set_motor("LR", clamp(speed_LR / 128, -1.0, 1.0), is_brake) set_motor("RR", clamp(speed_RR / 128, -1.0, 1.0), is_brake) sock.sendto(struct.pack("!B", seq), udp_addr) lasttime = time.time() except socket.error: pass if time.time() - lasttime >= 0.5: set_motor("LF", 0.0, True) set_motor("RF", 0.0, True) set_motor("LR", 0.0, True) set_motor("RR", 0.0, True) try: init_motor() init_udp() while True: hand_udp() time.sleep(interval) except KeyboardInterrupt: print("\nKeyboardInterrupt") finally: stop_motor() GPIO.cleanup() print("Exit")