다양한 상황에서의 노드 통신
- 누락 없이 모두 잘 도착하는지
- 데이터 크기에 따른 전송속도
- 도착하는 데이터를 미처 처리하지 못하면?
- 주기적 발송에서 타임슬롯을 오버하면 어떻게 되나
- 협업해야 하는 노드를 순서대로 가동시킬 수 있나?
과제 1. 노드간 동기화문제 설명
퍼블리셔와 서브스크라이버간의 데이터를 빠짐없이 잘 보내고 받는지를 확인해보고자 한다.
과제 설명
- sender_serial.py receiver_serial.py sr_serial.launch
- 숫자를 보내서 받는 쪽에서 누락된 게 있는지 알아보자. 그래서 보내는 쪽이 안 보낸 것인지 받는 쪽이 못 받는 건지 구분할 수 있는가
- 숫자가 빠진 것에 대해 파이썬 코드를 사용하여 판단해보자
- 특히 처음과 끝
- 받는 쪽을 먼저 실행시켜놓고 그 다음에 보내는 쪽을 실행시켜야 다 받을텐데, roslaunch로는 노드를 순서대로 실행시킬 수 없으니 rosrun을 사용해야 하는데 더 좋은 방법이 있는가
코드 구현
- sender_serial.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#!/usr/bin/env python
import rospy
from std_msgs.msg import Int32
rospy.init_node('sender_serial')
pub = rospy.Publisher('my_topic',Int32)
rate = rospy.Rate(10)
count = 1
while not rospy.is_shutdown():
if count == 1 or 100:
print("start or end : ",count)
pub.publish(count)
if count == 100:
print("count complete")
break
count += 1
rate.sleep()
- receiver_serial.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#!/usr/bin/env python
import rospy
from std_msgs.msg import Int32
def callback(msg):
if msg.data == 1: # 1이 도착하면 문장을 출력하도록 했다.
print("1 is arrive")
print msg.data
rospy.init_node('receiver_serial')
sub = rospy.Subscriber('my_topic',Int32, callback)
rospy.spin()
실행 결과
문제점
publisher와 subscriber간의 실질적인 통신 구축에 지연문제가 존재한다. 받을 사람이 준비가 되지 않았는데 물건을 던지면 당연히 받지 못한다.
해결 방법
- 노드가 등록되어 있는지 확인하는 함수를 이용
1
2
while ( pub.get_num_connections() == 0):
count = 1
이 루프를 빠져나오면 이제부터 메시지를 전송하도록 한다.
과제 2. ROS 전송속도
데이터 크기에 따른 전송속도가 어떻게 되는지 알아보고자 한다.
과제 설명
- sender_speed.py receiver_speed.py sr_speed.launch
- 정해진 크기의 데이터를 반복해서 왕창 보내기
- 보내는 쪽은 10분동안 시간을 정해놓고 쉴새 없이 보내고
- 10분동안 몇 바이트 보냈는지 체크해서 송신속도 계산
- 받는 쪽도 10분 동안 시간 정해놓고 모두 얼마나 받았는지 체크해서 수신속도를 계산
- 사이즈를 바꿔서 1Mbyte, 5M, 10M, 20M, 50M 전송한 송수신 속도에 대한 그래프도 작성, 어떨 때 가장 빠른지
- 단위는 bps(bytes/sec)
받는 쪽이 없으면 더 빨라지는가
- sender_speed.py
- sender이라는 이름으로 노드 생성
- 발행하는 토픽 이름은 long_string, 타입은 string
- 1초에 1번씩 다양한 용량의 long string을 발행, 문자열은 #으로 가득 채워라
- 사이즈를 바꿔서 1Mbyte, 5M, 10M, 20M, 50M 전송해보며 최대 전송을 알아보는 것도 좋다.
- 코드에 사이즈를 나열해서 안쓰는 사이즈는 주석 처리하면 편하다.
- receiver_speed.py
- receiver이름으로 노드 생성
- 다양한 용량의 long_string을 수신해서 long_string 1개를 받으면 소요시간을 화면에 출력
- 가능하면 속도도 출력, 단위는 bps
코드 구현
- msg_speed.launch
1
2
3
4
5
6
<launch>
<node pkg='msg_send' type='sender_speed.py' name='sender'>
<param name='size' value="50" />
</node>
<node pkg='msg_send' type='receiver_speed.py' name='receiver' output='screen' />
</launch>
- sender_speed.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#!/usr/bin/env python
import rospy
from std_msgs.msg import String
dic = {"1M_str" : 1028000, # == 0.98Mbytes
"5M_str" : 1028000 * 5,
"10M_str" : 1028000 * 10,
"20M_str" : 1028000 * 20,
"50M_str" : 1028000 * 50,
}
rospy.init_node('sender')
pub = rospy.Publisher('long_string',String)
long_str = String()
size_dir = str(rospy.get_param('~size')) + "M_str"
hashs = '#' * dic[size_dir]
rate = rospy.Rate(1)
while not rospy.is_shutdown():
gettime = str(rospy.get_time())
long_str.data = hashs + ":" + gettime
pub.publish(long_str)
rospy.loginfo(gettime)
rate.sleep()
- receiver_speed.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#!/usr/bin/env python
import rospy
import sys
from std_msgs.msg import String
def callback(msg):
data = msg.data.split(":")
start_time = float(data[1])
current_time = rospy.get_time()
str_size = sys.getsizeof(data[0])
rospy.loginfo(str(str_size) + " byte : " + str(current_time - start_time) + " second")
rospy.loginfo("speed: " + str(float(str_size)/(current_time-start_time)) + " byte/s")
rospy.init_node('receiver')
sub = rospy.Subscriber('long_string',String, callback)
rospy.spin()
실행 결과
- 1Mbyte일 때: 약 0.01초, 속도는 280 Mbps
- 5Mbyte일 때: 약 0.02초, 속도는 240 Mbps
- 10Mbyte일 때: 약 0.05초, 속도는 200 Mbps
- 20Mbyte일 때: 약 0.08초, 속도는 280 Mbps
- 50Mbyte일 때: 약 0.2초, 속도는 180 Mbps
통계적으로 봤을 때, 용량이 커지게 되면 속도는 느려져야 맞지만, 그닥 그렇지는 않은 것 같다. 시간이 오래 걸리는 것은 맞으나 속도는 대체로 일정했다.
과제 3. ROS 처리 지연 문제
도착하는 데이터를 미처 처리하지 못하면 어떻게 할지에 대해 알아보고자 한다.
과제 설명
- sender_overflow.py receiver_overflow.py sr_overflow.launch
- 도착하는 데이터를 미처 처리하지 못하면 어떻게 되는지 알아본다. 늦더라도 다 처리하는지, 순서가 뒤섞이는지, 몇몇은 버리는지 확인한다.
- 받는쪽이 버벅되게 만들어놓고 데이터를 왕창보낸다.
- 구독자의 콜백함수 안에 시간 많이 걸리는 코드 넣어서 토픽 처리에 시간이 걸리도록 만들어라
- 콜백함수가 끝나지 않았는데 토픽이 새로 도착하면 어떻게 되나.
- 도착한 토픽은 임시로 어딘가에 쌓이는가? 나중에 꺼내서 처리할 수 있는가?
- 그냥 없어지는가? 한 번 못받은 토픽은 영영 못받는 것인가
- 발행자는 이 사실을 아는가? 알려줄 수 있는 방법이 있나
- sender_overflow.py
- sender이라는 이름으로 노드 생성
- 발행하는 토픽 이름은 my_topic, 타입은 int32
- 1초에 1000번씩 숫자를 1씩 증가해서 토픽을 발행
- receiver_overflow.py
- receiver이름으로 노드 생성
- sender로부터 my_topic을 화면에 출력하여 토픽의 누락 여부를 확인
- 1씩 숫자가 증가하지 않으면 뭔가 문제가 있다는 것을 확인할 수 있다.
받는 큐사이즈를 늘렸을 때 누락없이 잘 추출되는 것을 볼 수 있다.
코드 구현
- msg_overflow.launch
1
2
3
4
<launch>
<node pkg='msg_send' type='sender_overflow.py' name='sender' />
<node pkg='msg_send' type='receiver_overflow.py' name='receiver' output='screen' />
</launch>
- sender_overflow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#!/usr/bin/env python
import rospy
from std_msgs.msg import Int32
rospy.init_node('sender',anonymous=True)
pub = rospy.Publisher('my_topic', Int32)
rate = rospy.Rate(100000)
count = 1
while (pub.get_num_connections() == 0):
continue
while not rospy.is_shutdown():
pub.publish(count)
count = count + 1
rate.sleep()
- receiver_overflow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#!/usr/bin/env python
import rospy
from std_msgs.msg import Int32
def callback(msg):
rospy.loginfo("callback is being processed")
rospy.sleep(5)
print msg.data
rospy.init_node('receiver')
sub = rospy.Subscriber('my_topic',Int32, callback,queue_size = 1)
# Queue Size는 발행되는 메세지를 얼마나 가지고 있을지에 관련된 변수이며 신규 데이터가 들어오는 경우 오래된 데이터부터 삭제하게 된다.
# 출처 : https://changun516.tistory.com/127
rospy.spin()
실행 결과
중간에 누락이 되는 토픽들이 많았다.
과제 4. ROS 타임 슬롯 문제
주기적 발송에서 타임슬롯을 오버하면 어떻게 되는지 알아보고자 한다
과제 설명
- 주기적 발송에서 타임슬롯을 오버하면 어떻게 되는가
- 1초에 5번, rate(5)로 셋팅하고 작업시간이 0.2초가 넘도록 만들자. sleep앞에 들어간 작업코드의 수행시간을 늘려본다.
- 늘렸다 줄였다를 반복해서 해보자. 입력값을 받아서 이걸 조정할 수 있게 만들면 된다. input을 통해 사용자가 직접 느리게 하든지, 양을 크게 해서 시간을 늘리든지 그 후 중요한 것은 걸린 시간을 출력하는 것이다.
- 1초에 5번을 지킬 수 없으면 어떻게 작동하는지 보아라
- 앞에서부터 쭉 밀리는 식으로 일하는지
- 쉬는 시간을 조정하는지
- 3번만하고 다음으로 넘어가는지
- 입력받은 카운트만큼 반복을 진행해서 시간을 계속 늘리도록 한다.
- 각각의 시작, 끝, 쉬는 시간을 리스트로 묶는다.
코드 구현
- msg_timeslot.launch
1
2
3
4
5
6
<launch>
<node pkg="msg_send" type="msg_timeslot.py" name="teacher">
<!-- <param name='epoch' value="100" /> -->
</node>
<node pkg="msg_send" type="student_int.py" name="student" output="screen" />
</launch>
- sender_timeslot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#!/usr/bin/env python
import rospy
import timeit
from std_msgs.msg import Int32
rospy.init_node('teacher')
pub = rospy.Publisher('msg_to_students', Int32, queue_size = 0)
rate = rospy.Rate(5)
time = input('input epoch : ')
def do_job(time):
for i in range(0,time):
i += 1
pub.publish(i)
while not rospy.is_shutdown():
start_time = timeit.default_timer()
do_job(time)
end_time = timeit.default_timer()
print 'send time : %.4f sec'%(end_time - start_time)
rate.sleep()
end_sleeptime = timeit.default_timer()
print 'sleep time : %.4f sec'%(end_sleeptime-end_time)
total = end_sleeptime - start_time
print 'total time : %.4f sec'%(total)
print '\n'
- student_int.py
1
실행 결과
epoch, 반복을 100으로 했을 때는 대체로 0.2초는 잘 지켜지고 있는 것을 볼 수 있다. 반복을 10000으로 올려봐도 아직도 0.2초는 잘 지켜지고 있다.
크게 올려보기 위해 백만으로 반복해본다.
0.2초를 넘어서는 것을 볼 수 있다. 즉 아무리 오래 걸리더라도 모든 데이터는 다 보내는 방식으로 되어 있는 듯하다. 결국 매 타임마다 순차적으로 delay가 생긴다.
과제 5. ROS 노드의 순차 실행
협업해야 하는 노드를 순서대로 가동시키는 방법에 대해 알아보고자 한다.
과제 설명
- 협업해야 하는 노드를 순서대로 가동시킬 수 있나
- roslaunch로 구현해보고 정 안되면 rosrun으로 진행해보아라
- first.py second.py third.py fourth.py receiver.py sr_order.launch
- 순서대로 receiver에 메시지를 보내도록 만든다.
- receiver는 도착한 순서대로 출력해야 하는데, 이때 first→second→third→fourth가 되어야 한다.
- 앞에 노드가 움직이기 전에는 절대 토픽을 먼저 보내면 안된다.
- 어떻게 동기를 맞추고 순서를 맞추는가
- Launch파일을 이용해서 할 수 있는가, 이게 가장 편리할 듯하다. 이것을 먼저 고민해보라
- ros의 도움으로 할 수 있나
- 아니면 내가 프로그래밍 해야 하는가
- receiver.py 작성
- 구독해야할 토픽의 이름은 “msg_to_receiver”, 내용은 string,
- my name is first, my name is second, my name is third, my name is fourth
- 테스트를 위해 받은 토픽이 순차적으로 오는지 화면에 출력
- first.py/second.py/third.py/fourth.py
- 자기 이름에 맞춰서 first, second, third, fourth
- first노드가 receiver 노드로 최소한 첫 토픽을 보내는 시점 이후에 전송을 시작해야 한다.
코드 구현
과제 5에서는 새로운 패키지를 생성하려고 한다.
1
2
$ cd ~/xycar_ws/src
$ catkin_create_pkg order_test std_msgs rospy
- sr_order.launch
1
2
3
4
5
6
7
8
<!-- sr_order.launch -->
<launch>
<node name="receiver" pkg="order_test" type="receiver.py" output="screen" />
<node name="first" pkg="order_test" type="first.py" output="screen" />
<node name="second" pkg="order_test" type="second.py" output="screen" />
<node name="third" pkg="order_test" type="third.py" output="screen" />
<node name="fourth" pkg="order_test" type="fourth.py" output="screen" />
</launch>
한 개마다 command 명령 퍼블리셔를 하나 더 만들었고, 그것을 받는 서브스크라이버도 만들 것이다.
- first.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#!/usr/bin/env python
import rospy
from std_msgs.msg import String
name = 'first'
OK = None
def ctl_callback(data):
global OK
OK = str(data.data)
rospy.init_node(name)
rospy.Subscriber("start_ctl", String, ctl_callback)
while True:
if not OK: continue
d = OK.split(":")
if (len(d) == 2) and (d[0] == name) and (d[1] == "go"):
break
pub = rospy.Publisher("msg_to_receiver", String, queue_size=1)
rate = rospy.Rate(2)
hello_str = String()
while not rospy.is_shutdown():
hello_str.data = "my name is " + name
pub.publish(hello_str)
rate.sleep()
- second.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#!/usr/bin/env python
import rospy
from std_msgs.msg import String
name = 'second'
OK = None
def ctl_callback(data):
global OK
OK = str(data.data)
rospy.init_node(name)
rospy.Subscriber("start_ctl", String, ctl_callback)
while True:
if not OK: continue
d = OK.split(":")
if (len(d) == 2) and (d[0] == name) and (d[1] == "go"):
break
pub = rospy.Publisher("msg_to_receiver", String, queue_size=1)
rate = rospy.Rate(2)
hello_str = String()
while not rospy.is_shutdown():
hello_str.data = "my name is " + name
pub.publish(hello_str)
rate.sleep()
- third.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#!/usr/bin/env python
import rospy
from std_msgs.msg import String
name = 'third'
OK = None
def ctl_callback(data):
global OK
OK = str(data.data)
rospy.init_node(name)
rospy.Subscriber("start_ctl", String, ctl_callback)
while True:
if not OK: continue
d = OK.split(":")
if (len(d) == 2) and (d[0] == name) and (d[1] == "go"):
break
pub = rospy.Publisher("msg_to_receiver", String, queue_size=1)
rate = rospy.Rate(2)
hello_str = String()
while not rospy.is_shutdown():
hello_str.data = "my name is " + name
pub.publish(hello_str)
rate.sleep()
- fourth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#!/usr/bin/env python
import rospy
from std_msgs.msg import String
name = 'first'
OK = None
def ctl_callback(data):
global OK
OK = str(data.data)
rospy.init_node(name)
rospy.Subscriber("start_ctl", String, ctl_callback)
while True:
if not OK: continue
d = OK.split(":")
if (len(d) == 2) and (d[0] == name) and (d[1] == "go"):
break
pub = rospy.Publisher("msg_to_receiver", String, queue_size=1)
rate = rospy.Rate(2)
hello_str = String()
while not rospy.is_shutdown():
hello_str.data = "my name is " + name
pub.publish(hello_str)
rate.sleep()
- receiver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#!/usr/bin/env python
import rospy
from std_msgs.msg import String
def callback(msg):
rospy.loginfo("I heard %s", msg.data)
rospy.init_node('receiver')
rospy.Subscriber('msg_to_receiver',String, callback)
pub = rospy.Publisher('start_ctl',String,queue_size=1)
rate = rospy.Rate(10)
hello_str = String()
rospy.sleep(1)
sq = ['first','second','third','fourth']
pub_msg = String()
for i in sq:
pub_msg.data = i+":go"
pub.publish(pub_msg)
rospy.sleep(3)
rospy.spin()