172 lines
3.3 KiB
Python
172 lines
3.3 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import time
|
|
import struct
|
|
import socket
|
|
import RPi.GPIO as GPIO
|
|
|
|
def clamp(n, a, b):
|
|
|
|
if n < a:
|
|
|
|
return a
|
|
|
|
elif n > b:
|
|
|
|
return b
|
|
|
|
else:
|
|
|
|
return n
|
|
|
|
interval = 0.001
|
|
|
|
udp_addr = ("192.168.1.1", 6000)
|
|
|
|
GPIO.setmode(GPIO.BCM)
|
|
|
|
motor_pins = [4, 18, 17, 27, 23, 22, 24, 25]
|
|
motor_pwms = []
|
|
|
|
def init_motor():
|
|
|
|
GPIO.setup(motor_pins, GPIO.OUT)
|
|
|
|
for motor_index in range(0, 8):
|
|
|
|
motor_pwms.append(GPIO.PWM(motor_pins[motor_index], 500))
|
|
motor_pwms[motor_index].start(0)
|
|
|
|
motor_LF_forward = 0
|
|
motor_LF_reverse = 1
|
|
|
|
motor_RF_forward = 3
|
|
motor_RF_reverse = 2
|
|
|
|
motor_LR_forward = 5
|
|
motor_LR_reverse = 4
|
|
|
|
motor_RR_forward = 6
|
|
motor_RR_reverse = 7
|
|
|
|
def stop_motor():
|
|
|
|
for motor_index in range(0, 8):
|
|
|
|
motor_pwms[motor_index].ChangeDutyCycle(0.0)
|
|
|
|
def set_motor(motor, speed, is_brake = False):
|
|
|
|
motor_forward = -1
|
|
motor_reverse = -1
|
|
|
|
if motor == "LF":
|
|
|
|
motor_forward = motor_LF_forward
|
|
motor_reverse = motor_LF_reverse
|
|
|
|
elif motor == "RF":
|
|
|
|
motor_forward = motor_RF_forward
|
|
motor_reverse = motor_RF_reverse
|
|
|
|
elif motor == "LR":
|
|
|
|
motor_forward = motor_LR_forward
|
|
motor_reverse = motor_LR_reverse
|
|
|
|
else: # motor == "RR"
|
|
|
|
motor_forward = motor_RR_forward
|
|
motor_reverse = motor_RR_reverse
|
|
|
|
if is_brake == True:
|
|
|
|
motor_pwms[motor_forward].ChangeDutyCycle(100.0)
|
|
motor_pwms[motor_reverse].ChangeDutyCycle(100.0)
|
|
|
|
elif speed < 0:
|
|
|
|
motor_pwms[motor_forward].ChangeDutyCycle(0.0)
|
|
motor_pwms[motor_reverse].ChangeDutyCycle(-speed * 100.0)
|
|
|
|
elif speed > 0:
|
|
|
|
motor_pwms[motor_forward].ChangeDutyCycle(speed * 100.0)
|
|
motor_pwms[motor_reverse].ChangeDutyCycle(0.0)
|
|
|
|
else:
|
|
|
|
motor_pwms[motor_forward].ChangeDutyCycle(0.0)
|
|
motor_pwms[motor_reverse].ChangeDutyCycle(0.0)
|
|
|
|
sock = socket.socket()
|
|
lasttime = 0
|
|
|
|
def init_udp():
|
|
|
|
global sock
|
|
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
sock.bind(("0.0.0.0", 6000))
|
|
sock.setblocking(False)
|
|
|
|
def hand_udp():
|
|
|
|
global sock
|
|
global udp_addr
|
|
global lasttime
|
|
|
|
try:
|
|
|
|
(data, source) = sock.recvfrom(1024)
|
|
|
|
udp_addr = source
|
|
|
|
if len(data) == 6:
|
|
|
|
seq, speed_LF, speed_RF, speed_LR, speed_RR, is_brake = struct.unpack("!Bbbbb?", data)
|
|
|
|
set_motor("LF", clamp(speed_LF / 128, -1.0, 1.0), is_brake)
|
|
set_motor("RF", clamp(speed_RF / 128, -1.0, 1.0), is_brake)
|
|
set_motor("LR", clamp(speed_LR / 128, -1.0, 1.0), is_brake)
|
|
set_motor("RR", clamp(speed_RR / 128, -1.0, 1.0), is_brake)
|
|
|
|
sock.sendto(struct.pack("!B", seq), udp_addr)
|
|
|
|
lasttime = time.time()
|
|
|
|
except socket.error:
|
|
|
|
pass
|
|
|
|
if time.time() - lasttime >= 0.5:
|
|
|
|
set_motor("LF", 0.0, True)
|
|
set_motor("RF", 0.0, True)
|
|
set_motor("LR", 0.0, True)
|
|
set_motor("RR", 0.0, True)
|
|
|
|
try:
|
|
|
|
init_motor()
|
|
init_udp()
|
|
|
|
while True:
|
|
|
|
hand_udp()
|
|
|
|
time.sleep(interval)
|
|
|
|
except KeyboardInterrupt:
|
|
|
|
print("\nKeyboardInterrupt")
|
|
|
|
finally:
|
|
|
|
stop_motor()
|
|
|
|
GPIO.cleanup()
|
|
|
|
print("Exit")
|