Collector#

From its literal meaning, we can easily know that the Collector in Tianshou is used to collect training data. More specifically, the Collector controls the interaction between Policy (agent) and the environment. It also helps save the interaction data into the ReplayBuffer and returns episode statistics.

Usages#

Collector can be used both for training (data collecting) and evaluation in Tianshou.

Policy evaluation#

We need to evaluate our trained policy from time to time in DRL experiments. Collector can help us with this.

First we have to initialize a Collector with an (vectorized) environment and a given policy (agent).

Hide code cell content
import gymnasium as gym
import torch

from tianshou.data import Collector
from tianshou.env import DummyVectorEnv
from tianshou.policy import PGPolicy
from tianshou.utils.net.common import Net
from tianshou.utils.net.discrete import Actor
from tianshou.data import VectorReplayBuffer
env = gym.make("CartPole-v1")
test_envs = DummyVectorEnv([lambda: gym.make("CartPole-v1") for _ in range(2)])

# model
net = Net(
    env.observation_space.shape,
    hidden_sizes=[
        16,
    ],
)
actor = Actor(net, env.action_space.shape)
optim = torch.optim.Adam(actor.parameters(), lr=0.0003)

policy = PGPolicy(
    actor=actor,
    optim=optim,
    dist_fn=torch.distributions.Categorical,
    action_space=env.action_space,
    action_scaling=False,
)
test_collector = Collector(policy, test_envs)

Now we would like to collect 9 episodes of data to test how our initialized Policy performs.

collect_result = test_collector.collect(n_episode=9)
print(collect_result)
print("Rewards of 9 episodes are {}".format(collect_result["rews"]))
print("Average episode reward is {}.".format(collect_result["rew"]))
print("Average episode length is {}.".format(collect_result["len"]))
{'n/ep': 9, 'n/st': 81, 'rews': array([ 8.,  8.,  9., 11.,  9.,  9.,  9.,  9.,  9.]), 'lens': array([ 8,  8,  9, 11,  9,  9,  9,  9,  9]), 'idxs': array([0, 1, 1, 0, 1, 0, 1, 0, 1]), 'rew': 9.0, 'len': 9.0, 'rew_std': 0.816496580927726, 'len_std': 0.816496580927726}
Rewards of 9 episodes are [ 8.  8.  9. 11.  9.  9.  9.  9.  9.]
Average episode reward is 9.0.
Average episode length is 9.0.

Now we wonder what is the performance of a random policy.

# Reset the collector
test_collector.reset()
collect_result = test_collector.collect(n_episode=9, random=True)
print(collect_result)
print("Rewards of 9 episodes are {}".format(collect_result["rews"]))
print("Average episode reward is {}.".format(collect_result["rew"]))
print("Average episode length is {}.".format(collect_result["len"]))
{'n/ep': 9, 'n/st': 198, 'rews': array([11., 17., 13., 18., 46., 38., 17., 11., 27.]), 'lens': array([11, 17, 13, 18, 46, 38, 17, 11, 27]), 'idxs': array([0, 1, 0, 1, 0, 1, 0, 0, 1]), 'rew': 22.0, 'len': 22.0, 'rew_std': 11.766241729815194, 'len_std': 11.766241729815194}
Rewards of 9 episodes are [11. 17. 13. 18. 46. 38. 17. 11. 27.]
Average episode reward is 22.0.
Average episode length is 22.0.

Seems that an initialized policy performs even worse than a random policy without any training.

Data Collecting#

Data collecting is mostly used during training, when we need to store the collected data in a ReplayBuffer.

train_env_num = 4
buffer_size = 100
train_envs = DummyVectorEnv([lambda: gym.make("CartPole-v1") for _ in range(train_env_num)])
replaybuffer = VectorReplayBuffer(buffer_size, train_env_num)

train_collector = Collector(policy, train_envs, replaybuffer)

Now we can collect 50 steps of data, which will be automatically saved in the replay buffer. You can still choose to collect a certain number of episodes rather than steps. Try it yourself.

print(len(replaybuffer))
collect_result = train_collector.collect(n_step=50)
print(len(replaybuffer))
print(collect_result)
0
52
{'n/ep': 4, 'n/st': 52, 'rews': array([ 9.,  9., 10., 10.]), 'lens': array([ 9,  9, 10, 10]), 'idxs': array([25, 50,  0, 75]), 'rew': 9.5, 'len': 9.5, 'rew_std': 0.5, 'len_std': 0.5}
/home/docs/checkouts/readthedocs.org/user_builds/tianshou-fork/checkouts/latest/tianshou/data/collector.py:237: UserWarning: n_step=50 is not a multiple of #env (4), which may cause extra transitions collected into the buffer.
  warnings.warn(
for i in range(13):
    print(i, replaybuffer.next(i))
0 1
1 2
2 3
3 4
4 5
5 6
6 7
7 8
8 9
9 9
10 11
11 12
12 12
replaybuffer.sample(10)
(Batch(
     obs: array([[-0.06119931, -0.6093111 ,  0.00315758,  0.81847507],
                 [-0.05291342, -0.4142941 , -0.00740512,  0.5281347 ],
                 [-0.06119931, -0.6093111 ,  0.00315758,  0.81847507],
                 [-0.04852646, -0.21934842, -0.01219156,  0.23932211],
                 [-0.02893777, -0.3506799 ,  0.01476517,  0.5670283 ],
                 [-0.00589153, -0.3826413 , -0.01030633,  0.5587202 ],
                 [-0.04687149, -0.74147326,  0.04339226,  1.1651015 ],
                 [-0.03936836, -0.779306  ,  0.06194803,  1.2150629 ],
                 [-0.11043476, -1.7425121 ,  0.16553243,  2.630492  ],
                 [-0.07950447, -1.5465139 ,  0.11946139,  2.3035517 ]],
                dtype=float32),
     act: array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0]),
     rew: array([1., 1., 1., 1., 1., 1., 1., 1., 1., 1.]),
     terminated: array([False, False, False, False, False, False, False, False,  True,
                        False]),
     truncated: array([False, False, False, False, False, False, False, False, False,
                       False]),
     done: array([False, False, False, False, False, False, False, False,  True,
                  False]),
     obs_next: array([[-7.33855292e-02, -8.04476082e-01,  1.95270795e-02,
                        1.11214948e+00],
                      [-6.11993074e-02, -6.09311104e-01,  3.15757771e-03,
                        8.18475068e-01],
                      [-7.33855292e-02, -8.04476082e-01,  1.95270795e-02,
                        1.11214948e+00],
                      [-5.29134236e-02, -4.14294094e-01, -7.40511576e-03,
                        5.28134704e-01],
                      [-3.59513722e-02, -5.46005845e-01,  2.61057373e-02,
                        8.64326060e-01],
                      [-1.35443583e-02, -5.77617049e-01,  8.68072559e-04,
                        8.48138273e-01],
                      [-6.17009513e-02, -9.37132359e-01,  6.66942894e-02,
                        1.47106719e+00],
                      [-5.49544804e-02, -9.75170016e-01,  8.62492844e-02,
                        1.52649641e+00],
                      [-1.45284995e-01, -1.93846333e+00,  2.18142271e-01,
                        2.96884561e+00],
                      [-1.10434756e-01, -1.74251211e+00,  1.65532425e-01,
                        2.63049197e+00]], dtype=float32),
     info: Batch(
               env_id: array([0, 0, 0, 0, 1, 1, 1, 2, 3, 3]),
           ),
     policy: Batch(),
 ),
 array([ 3,  2,  3,  1, 27, 36, 29, 54, 84, 83]))

Further Reading#

The above collector actually collects 52 data at a time because 52 % 4 = 0. There is one asynchronous collector which allows you collect exactly 50 steps. Check the documentation for details.