Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 15 additions & 10 deletions internal/gtfs/gtfs_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type Manager struct {
realTimeVehicleLookupByTrip map[string]int
realTimeVehicleLookupByVehicle map[string]int
duplicatedVehicleByRoute map[string][]gtfs.Vehicle
vehiclesByRoute map[string][]gtfs.Vehicle
alertIdx alertIndex
agenciesMap map[string]*gtfs.Agency
routesMap map[string]*gtfs.Route
Expand Down Expand Up @@ -256,6 +257,7 @@ func InitGTFSManager(ctx context.Context, config Config) (*Manager, error) {
realTimeVehicleLookupByTrip: make(map[string]int),
realTimeVehicleLookupByVehicle: make(map[string]int),
duplicatedVehicleByRoute: make(map[string][]gtfs.Vehicle),
vehiclesByRoute: make(map[string][]gtfs.Vehicle),
feedTrips: make(map[string][]gtfs.Trip),
feedVehicles: make(map[string][]gtfs.Vehicle),
feedAlerts: make(map[string][]gtfs.Alert),
Expand Down Expand Up @@ -635,20 +637,23 @@ func (manager *Manager) VehiclesForAgencyID(agencyID string) []gtfs.Vehicle {
// Step 1: Acquire static lock, collect route IDs, then release.
manager.staticMutex.RLock()
routes := manager.RoutesForAgencyID(agencyID)
routeIDs := make(map[string]bool, len(routes))
routeIDs := make([]string, 0, len(routes))
seenRouteIDs := make(map[string]struct{}, len(routes))
for _, route := range routes {
routeIDs[route.Id] = true
if _, exists := seenRouteIDs[route.Id]; exists {
continue
}
seenRouteIDs[route.Id] = struct{}{}
routeIDs = append(routeIDs, route.Id)
}
manager.staticMutex.RUnlock()

// Step 2: Acquire real-time lock independently to read vehicles.
rtVehicles := manager.GetRealTimeVehicles()

var vehicles []gtfs.Vehicle
for _, v := range rtVehicles {
if v.Trip != nil && routeIDs[v.Trip.ID.RouteID] {
vehicles = append(vehicles, v)
}
// Step 2: Acquire real-time lock independently and union the prebuilt route buckets.
manager.realTimeMutex.RLock()
defer manager.realTimeMutex.RUnlock()
vehicles := make([]gtfs.Vehicle, 0)
for _, routeID := range routeIDs {
vehicles = append(vehicles, manager.vehiclesByRoute[routeID]...)
}

return vehicles
Expand Down
19 changes: 16 additions & 3 deletions internal/gtfs/gtfs_manager_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (m *Manager) MockAddVehicle(vehicleID, tripID, routeID string) {
}
}
now := time.Now()
m.realTimeVehicles = append(m.realTimeVehicles, gtfs.Vehicle{
v := gtfs.Vehicle{
ID: &gtfs.VehicleID{ID: vehicleID},
Timestamp: &now,
Trip: &gtfs.Trip{
Expand All @@ -49,13 +49,20 @@ func (m *Manager) MockAddVehicle(vehicleID, tripID, routeID string) {
RouteID: routeID,
},
},
})
}
m.realTimeVehicles = append(m.realTimeVehicles, v)

idx := len(m.realTimeVehicles) - 1
m.realTimeVehicleLookupByVehicle[vehicleID] = idx
if tripID != "" {
m.realTimeVehicleLookupByTrip[tripID] = idx
}
if routeID != "" {
if m.vehiclesByRoute == nil {
m.vehiclesByRoute = make(map[string][]gtfs.Vehicle)
}
m.vehiclesByRoute[routeID] = append(m.vehiclesByRoute[routeID], v)
}
}

type MockVehicleOptions struct {
Expand Down Expand Up @@ -113,6 +120,12 @@ func (m *Manager) MockAddVehicleWithOptions(vehicleID, tripID, routeID string, o
if tripID != "" && !opts.NoTrip {
m.realTimeVehicleLookupByTrip[tripID] = idx
}
if !opts.NoTrip && routeID != "" {
if m.vehiclesByRoute == nil {
m.vehiclesByRoute = make(map[string][]gtfs.Vehicle)
}
m.vehiclesByRoute[routeID] = append(m.vehiclesByRoute[routeID], v)
}
}

func (m *Manager) MockAddTrip(tripID, agencyID, routeID string) {
Expand Down Expand Up @@ -162,7 +175,7 @@ func (m *Manager) MockResetRealTimeData() {
m.realTimeVehicles = nil
m.realTimeVehicleLookupByVehicle = make(map[string]int)
m.realTimeVehicleLookupByTrip = make(map[string]int)
m.duplicatedVehicleByRoute = make(map[string][]gtfs.Vehicle)
m.vehiclesByRoute = make(map[string][]gtfs.Vehicle)
m.realTimeTrips = nil
m.realTimeTripLookup = make(map[string]int)
}
Expand Down
150 changes: 150 additions & 0 deletions internal/gtfs/gtfs_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,156 @@ func TestManager_RoutesForAgencyID(t *testing.T) {
assert.Equal(t, "25", route.Agency.Id)
}

func TestManager_VehiclesForAgencyID_UsesRouteBuckets(t *testing.T) {
manager := &Manager{
staticMutex: sync.RWMutex{},
realTimeMutex: sync.RWMutex{},
routesByAgencyID: map[string][]*gtfs.Route{
"agency-1": {
{Id: "route-a"},
{Id: "route-b"},
},
},
vehiclesByRoute: map[string][]gtfs.Vehicle{
"route-a": {
{
ID: &gtfs.VehicleID{ID: "v1"},
Trip: &gtfs.Trip{ID: gtfs.TripID{RouteID: "route-a"}},
},
},
"route-b": {
{
ID: &gtfs.VehicleID{ID: "v2"},
Trip: &gtfs.Trip{ID: gtfs.TripID{RouteID: "route-b"}},
},
},
},
realTimeVehicles: []gtfs.Vehicle{},
}

result := manager.VehiclesForAgencyID("agency-1")
require.Len(t, result, 2)
ids := []string{result[0].ID.ID, result[1].ID.ID}
assert.ElementsMatch(t, []string{"v1", "v2"}, ids)
}

func TestManager_VehiclesForAgencyID_DeduplicatesDuplicateRouteIDs(t *testing.T) {
manager := &Manager{
staticMutex: sync.RWMutex{},
realTimeMutex: sync.RWMutex{},
routesByAgencyID: map[string][]*gtfs.Route{
"agency-1": {
{Id: "route-a"},
{Id: "route-a"},
},
},
vehiclesByRoute: map[string][]gtfs.Vehicle{
"route-a": {
{
ID: &gtfs.VehicleID{ID: "v1"},
Trip: &gtfs.Trip{ID: gtfs.TripID{RouteID: "route-a"}},
},
},
},
}

result := manager.VehiclesForAgencyID("agency-1")
require.Len(t, result, 1, "duplicate route IDs in routesByAgencyID must not duplicate returned vehicles")
assert.Equal(t, "v1", result[0].ID.ID)
}

func TestManager_VehiclesForAgencyID_FiltersInvalidAndMatchesRoutes(t *testing.T) {
manager := &Manager{
staticMutex: sync.RWMutex{},
realTimeMutex: sync.RWMutex{},
routesByAgencyID: map[string][]*gtfs.Route{
"agency-1": {
{Id: "route-a"},
{Id: "route-b"},
},
},
feedVehicles: map[string][]gtfs.Vehicle{
"feed-0": {
{
ID: &gtfs.VehicleID{ID: "v1"},
Trip: &gtfs.Trip{ID: gtfs.TripID{ID: "trip1", RouteID: "route-a"}},
},
{
ID: &gtfs.VehicleID{ID: "v2"},
Trip: &gtfs.Trip{ID: gtfs.TripID{ID: "trip2", RouteID: "route-b"}},
},
{
ID: &gtfs.VehicleID{ID: "v3"},
Trip: &gtfs.Trip{ID: gtfs.TripID{ID: "trip3", RouteID: "route-c"}},
},
{
ID: &gtfs.VehicleID{ID: "v4"},
},
{
ID: &gtfs.VehicleID{ID: "v5"},
Trip: &gtfs.Trip{ID: gtfs.TripID{ID: "trip5", RouteID: ""}},
},
},
},
}
manager.rebuildMergedRealtimeLocked()

result := manager.VehiclesForAgencyID("agency-1")
require.Len(t, result, 2)
ids := []string{result[0].ID.ID, result[1].ID.ID}
assert.ElementsMatch(t, []string{"v1", "v2"}, ids)
}

func TestManager_VehiclesForAgencyID_RebuildClearsStaleIndex(t *testing.T) {
manager := &Manager{
staticMutex: sync.RWMutex{},
realTimeMutex: sync.RWMutex{},
routesByAgencyID: map[string][]*gtfs.Route{
"agency-1": {
{Id: "route-a"},
},
},
feedVehicles: map[string][]gtfs.Vehicle{
"feed-0": {
{
ID: &gtfs.VehicleID{ID: "v1"},
Trip: &gtfs.Trip{ID: gtfs.TripID{ID: "trip1", RouteID: "route-a"}},
},
},
},
}
manager.rebuildMergedRealtimeLocked()
require.Len(t, manager.VehiclesForAgencyID("agency-1"), 1)

manager.feedVehicles["feed-0"] = nil
manager.rebuildMergedRealtimeLocked()

assert.Empty(t, manager.VehiclesForAgencyID("agency-1"))
}

func TestManager_VehiclesForAgencyID_NoMatchReturnsEmpty(t *testing.T) {
manager := &Manager{
staticMutex: sync.RWMutex{},
realTimeMutex: sync.RWMutex{},
routesByAgencyID: map[string][]*gtfs.Route{
"agency-1": {
{Id: "route-a"},
},
},
feedVehicles: map[string][]gtfs.Vehicle{
"feed-0": {
{
ID: &gtfs.VehicleID{ID: "v1"},
Trip: &gtfs.Trip{ID: gtfs.TripID{ID: "trip1", RouteID: "route-b"}},
},
},
},
}
manager.rebuildMergedRealtimeLocked()

assert.Empty(t, manager.VehiclesForAgencyID("agency-1"))
}

func TestManager_GetStopsForLocation_UsesSpatialIndex(t *testing.T) {
ctx := context.Background()

Expand Down
13 changes: 12 additions & 1 deletion internal/gtfs/realtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,17 +633,27 @@ func (manager *Manager) rebuildMergedRealtimeLocked() {
vehicleLookupByTrip := make(map[string]int, len(allVehicles))
vehicleLookupByVehicle := make(map[string]int, len(allVehicles))
duplicatedVehicleByRoute := make(map[string][]gtfs.Vehicle)
vehiclesByRoute := make(map[string][]gtfs.Vehicle)
for i, vehicle := range allVehicles {
if vehicle.Trip != nil && vehicle.Trip.ID.ID != "" {
vehicleLookupByTrip[vehicle.Trip.ID.ID] = i
}
if vehicle.ID != nil && vehicle.ID.ID != "" {
vehicleLookupByVehicle[vehicle.ID.ID] = i
}
if vehicle.Trip == nil || vehicle.Trip.ID.ScheduleRelationship != gtfsrt.TripDescriptor_DUPLICATED {
if vehicle.Trip == nil {
continue
}

routeID := vehicle.Trip.ID.RouteID
if routeID != "" {
vehiclesByRoute[routeID] = append(vehiclesByRoute[routeID], vehicle)
}

if vehicle.Trip.ID.ScheduleRelationship != gtfsrt.TripDescriptor_DUPLICATED {
continue
}

// Some feeds omit route_id in VehiclePosition trip descriptors.
// Fall back to the corresponding TripUpdate to resolve the route.
if routeID == "" && vehicle.Trip.ID.ID != "" {
Expand Down Expand Up @@ -706,6 +716,7 @@ func (manager *Manager) rebuildMergedRealtimeLocked() {
manager.realTimeVehicleLookupByTrip = vehicleLookupByTrip
manager.realTimeVehicleLookupByVehicle = vehicleLookupByVehicle
manager.duplicatedVehicleByRoute = duplicatedVehicleByRoute
manager.vehiclesByRoute = vehiclesByRoute
manager.alertIdx = idx
}

Expand Down
39 changes: 38 additions & 1 deletion internal/gtfs/realtime_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func BenchmarkGetAlertsForStop(b *testing.B) {
}
}

func BenchmarkRebuildDuplicatedVehicleByRoute(b *testing.B) {
func BenchmarkRebuildVehiclesByRoute(b *testing.B) {
manager := &Manager{
realTimeMutex: sync.RWMutex{},
feedTrips: make(map[string][]gtfs.Trip),
Expand Down Expand Up @@ -221,6 +221,7 @@ func BenchmarkGetDuplicatedVehiclesForRoute(b *testing.B) {
RouteID: routeID,
},
}

feedVehicles[i] = gtfs.Vehicle{
ID: &gtfs.VehicleID{ID: fmt.Sprintf("vehicle_%d", i)},
Trip: &gtfs.Trip{
Expand All @@ -241,3 +242,39 @@ func BenchmarkGetDuplicatedVehiclesForRoute(b *testing.B) {
_ = manager.GetDuplicatedVehiclesForRoute(fmt.Sprintf("route_%d", i%100))
}
}

func BenchmarkVehiclesForAgencyID(b *testing.B) {
routes := make([]*gtfs.Route, 0, 10)
for i := 0; i < 10; i++ {
routes = append(routes, &gtfs.Route{Id: fmt.Sprintf("route_%d", i)})
}

manager := &Manager{
staticMutex: sync.RWMutex{},
realTimeMutex: sync.RWMutex{},
routesByAgencyID: map[string][]*gtfs.Route{
"agency_0": routes,
},
feedVehicles: make(map[string][]gtfs.Vehicle),
}

feedVehicles := make([]gtfs.Vehicle, 1000)
for i := 0; i < 1000; i++ {
feedVehicles[i] = gtfs.Vehicle{
ID: &gtfs.VehicleID{ID: fmt.Sprintf("vehicle_%d", i)},
Trip: &gtfs.Trip{
ID: gtfs.TripID{
ID: fmt.Sprintf("trip_%d", i),
RouteID: fmt.Sprintf("route_%d", i%100),
},
},
}
}
manager.feedVehicles["feed-0"] = feedVehicles
manager.rebuildMergedRealtimeLocked()

b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = manager.VehiclesForAgencyID("agency_0")
}
}
Loading
Loading