feat: Initial commit - Train tracking system
Some checks failed
Auto Tag on Merge to Main / auto-tag (push) Successful in 27s
CI - Lint and Build / lint-backend (push) Failing after 30s
CI - Lint and Build / lint-frontend (push) Failing after 2s
CI - Lint and Build / build-frontend (push) Has been skipped
CI - Lint and Build / docker-build-test (push) Has been skipped

Complete real-time train tracking system for Spanish railways (Renfe/Cercanías):

- Backend API (Node.js/Express) with GTFS-RT polling workers
- Frontend dashboard (React/Vite) with Leaflet maps
- Real-time updates via Socket.io WebSocket
- PostgreSQL/PostGIS database with Flyway migrations
- Redis caching layer
- Docker Compose configuration for development and production
- Gitea CI/CD workflows (lint, auto-tag, release)
- Production deployment with nginx + Let's Encrypt SSL

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Millaguie
2025-11-28 00:21:15 +01:00
commit 34c0cb50c7
64 changed files with 15577 additions and 0 deletions

View File

@@ -0,0 +1,112 @@
import express from 'express';
import db from '../../lib/db.js';
import redis from '../../lib/redis.js';
const router = express.Router();
// GET /alerts - Get all active alerts
router.get('/', async (req, res, next) => {
try {
const { route_id, severity, type } = req.query;
let query = `
SELECT *
FROM alerts
WHERE (end_time IS NULL OR end_time > NOW())
AND (start_time IS NULL OR start_time <= NOW())
`;
const params = [];
let paramIndex = 1;
if (route_id) {
query += ` AND route_id = $${paramIndex}`;
params.push(route_id);
paramIndex++;
}
if (severity) {
query += ` AND severity = $${paramIndex}`;
params.push(severity);
paramIndex++;
}
if (type) {
query += ` AND alert_type = $${paramIndex}`;
params.push(type);
paramIndex++;
}
query += ' ORDER BY severity DESC, created_at DESC';
const result = await db.query(query, params);
res.json(result.rows);
} catch (error) {
next(error);
}
});
// GET /alerts/:id - Get specific alert
router.get('/:id', async (req, res, next) => {
try {
const { id } = req.params;
const result = await db.query(
'SELECT * FROM alerts WHERE alert_id = $1',
[id]
);
if (result.rows.length === 0) {
return res.status(404).json({
error: 'Alert not found',
});
}
res.json(result.rows[0]);
} catch (error) {
next(error);
}
});
// GET /alerts/route/:routeId - Get alerts for specific route
router.get('/route/:routeId', async (req, res, next) => {
try {
const { routeId } = req.params;
const result = await db.query(`
SELECT *
FROM alerts
WHERE route_id = $1
AND (end_time IS NULL OR end_time > NOW())
AND (start_time IS NULL OR start_time <= NOW())
ORDER BY severity DESC, created_at DESC
`, [routeId]);
res.json(result.rows);
} catch (error) {
next(error);
}
});
// GET /alerts/train/:trainId - Get alerts for specific train
router.get('/train/:trainId', async (req, res, next) => {
try {
const { trainId } = req.params;
const result = await db.query(`
SELECT *
FROM alerts
WHERE train_id = $1
AND (end_time IS NULL OR end_time > NOW())
AND (start_time IS NULL OR start_time <= NOW())
ORDER BY severity DESC, created_at DESC
`, [trainId]);
res.json(result.rows);
} catch (error) {
next(error);
}
});
export default router;

View File

@@ -0,0 +1,360 @@
import express from 'express';
import db from '../../lib/db.js';
import redis from '../../lib/redis.js';
const router = express.Router();
// GET /analytics/traffic/heatmap - Get traffic heatmap data
router.get('/traffic/heatmap', async (req, res, next) => {
try {
const {
start_date = new Date(Date.now() - 7 * 24 * 60 * 60 * 1000).toISOString(),
end_date = new Date().toISOString(),
grid_size = 0.1,
} = req.query;
// Check cache
const cacheKey = `analytics:heatmap:${start_date}:${end_date}:${grid_size}`;
const cached = await redis.get(cacheKey);
if (cached) {
return res.json(JSON.parse(cached));
}
const result = await db.query(
'SELECT * FROM get_traffic_heatmap($1, $2, $3)',
[start_date, end_date, parseFloat(grid_size)]
);
const heatmapData = result.rows.map(row => ({
lat: row.lat_bucket,
lon: row.lon_bucket,
intensity: parseInt(row.train_count, 10),
avgSpeed: parseFloat(row.avg_speed || 0),
}));
// Cache for 15 minutes
await redis.set(cacheKey, JSON.stringify(heatmapData), 900);
res.json(heatmapData);
} catch (error) {
next(error);
}
});
// GET /analytics/traffic/hourly - Get hourly traffic pattern
router.get('/traffic/hourly', async (req, res, next) => {
try {
const { days = 7 } = req.query;
const cacheKey = `analytics:hourly:${days}`;
const cached = await redis.get(cacheKey);
if (cached) {
return res.json(JSON.parse(cached));
}
const result = await db.query(
'SELECT * FROM get_hourly_traffic_pattern($1)',
[parseInt(days, 10)]
);
// Cache for 1 hour
await redis.set(cacheKey, JSON.stringify(result.rows), 3600);
res.json(result.rows);
} catch (error) {
next(error);
}
});
// GET /analytics/traffic/by-hour - Get materialized view data
router.get('/traffic/by-hour', async (req, res, next) => {
try {
const { limit = 24 } = req.query;
const result = await db.query(
'SELECT * FROM traffic_by_hour ORDER BY hour DESC LIMIT $1',
[parseInt(limit, 10)]
);
res.json(result.rows);
} catch (error) {
next(error);
}
});
// GET /analytics/traffic/by-route - Get traffic by route
router.get('/traffic/by-route', async (req, res, next) => {
try {
const result = await db.query(
'SELECT * FROM traffic_by_route ORDER BY total_trains DESC'
);
res.json(result.rows);
} catch (error) {
next(error);
}
});
// GET /analytics/statistics/daily - Get daily statistics
router.get('/statistics/daily', async (req, res, next) => {
try {
const { days = 30 } = req.query;
const result = await db.query(
'SELECT * FROM daily_statistics ORDER BY date DESC LIMIT $1',
[parseInt(days, 10)]
);
res.json(result.rows);
} catch (error) {
next(error);
}
});
// GET /analytics/statistics/system - Get current system status
router.get('/statistics/system', async (req, res, next) => {
try {
const result = await db.query('SELECT * FROM system_status');
res.json(result.rows[0] || {});
} catch (error) {
next(error);
}
});
// GET /analytics/performance/routes - Get route performance metrics
router.get('/performance/routes', async (req, res, next) => {
try {
const { limit = 20 } = req.query;
const result = await db.query(
'SELECT * FROM route_performance ORDER BY punctuality_percentage DESC LIMIT $1',
[parseInt(limit, 10)]
);
res.json(result.rows);
} catch (error) {
next(error);
}
});
// GET /analytics/performance/route/:routeId - Get specific route statistics
router.get('/performance/route/:routeId', async (req, res, next) => {
try {
const { routeId } = req.params;
const { days = 7 } = req.query;
const result = await db.query(
'SELECT * FROM get_route_statistics($1, $2)',
[routeId, parseInt(days, 10)]
);
if (result.rows.length === 0) {
return res.status(404).json({
error: 'Route not found or no data available',
});
}
res.json(result.rows[0]);
} catch (error) {
next(error);
}
});
// GET /analytics/delays/top-routes - Get most delayed routes
router.get('/delays/top-routes', async (req, res, next) => {
try {
const result = await db.query('SELECT * FROM top_delayed_routes');
res.json(result.rows);
} catch (error) {
next(error);
}
});
// GET /analytics/stations/busiest - Get busiest stations
router.get('/stations/busiest', async (req, res, next) => {
try {
const { limit = 20 } = req.query;
const result = await db.query(
'SELECT * FROM busiest_stations LIMIT $1',
[parseInt(limit, 10)]
);
res.json(result.rows);
} catch (error) {
next(error);
}
});
// GET /analytics/stations/:stationId/statistics - Get station statistics
router.get('/stations/:stationId/statistics', async (req, res, next) => {
try {
const { stationId } = req.params;
const { days = 7 } = req.query;
const result = await db.query(
'SELECT * FROM get_station_statistics($1, $2)',
[stationId, parseInt(days, 10)]
);
if (result.rows.length === 0) {
return res.status(404).json({
error: 'Station not found or no data available',
});
}
res.json(result.rows[0]);
} catch (error) {
next(error);
}
});
// GET /analytics/trains/:trainId/distance - Calculate distance traveled
router.get('/trains/:trainId/distance', async (req, res, next) => {
try {
const { trainId } = req.params;
const {
start_time = new Date(Date.now() - 24 * 60 * 60 * 1000).toISOString(),
end_time = new Date().toISOString(),
} = req.query;
const result = await db.query(
'SELECT calculate_distance_traveled($1, $2, $3) as distance_km',
[trainId, start_time, end_time]
);
res.json({
train_id: trainId,
start_time,
end_time,
distance_km: parseFloat(result.rows[0].distance_km || 0),
});
} catch (error) {
next(error);
}
});
// POST /analytics/refresh - Refresh materialized views (admin endpoint)
router.post('/refresh', async (req, res, next) => {
try {
await db.query('SELECT refresh_all_analytics_views()');
res.json({
success: true,
message: 'Analytics views refreshed successfully',
timestamp: new Date().toISOString(),
});
} catch (error) {
next(error);
}
});
// GET /analytics/export - Export data (basic implementation)
router.get('/export', async (req, res, next) => {
try {
const {
table,
format = 'json',
start_date,
end_date,
limit = 1000,
} = req.query;
// Whitelist allowed tables
const allowedTables = [
'train_positions',
'trains',
'routes',
'stations',
'alerts',
'trip_updates',
'traffic_by_hour',
'daily_statistics',
];
if (!table || !allowedTables.includes(table)) {
return res.status(400).json({
error: 'Invalid or missing table parameter',
allowed_tables: allowedTables,
});
}
// Build query with filters
let query = `SELECT * FROM ${table}`;
const params = [];
const conditions = [];
if (start_date && (table === 'train_positions' || table === 'trip_updates' || table === 'alerts')) {
params.push(start_date);
conditions.push(`recorded_at >= $${params.length}`);
}
if (end_date && (table === 'train_positions' || table === 'trip_updates' || table === 'alerts')) {
params.push(end_date);
conditions.push(`recorded_at <= $${params.length}`);
}
if (conditions.length > 0) {
query += ' WHERE ' + conditions.join(' AND ');
}
query += ` ORDER BY ${table === 'train_positions' || table === 'trip_updates' ? 'recorded_at' : table === 'routes' ? 'route_id' : 'created_at'} DESC LIMIT $${params.length + 1}`;
params.push(parseInt(limit, 10));
const result = await db.query(query, params);
if (format === 'csv') {
// Simple CSV export
if (result.rows.length === 0) {
return res.status(404).json({ error: 'No data found' });
}
const headers = Object.keys(result.rows[0]);
const csvRows = [
headers.join(','),
...result.rows.map(row =>
headers.map(header => {
const value = row[header];
return typeof value === 'string' && value.includes(',')
? `"${value}"`
: value;
}).join(',')
),
];
res.setHeader('Content-Type', 'text/csv');
res.setHeader('Content-Disposition', `attachment; filename="${table}_${Date.now()}.csv"`);
res.send(csvRows.join('\n'));
} else if (format === 'geojson' && (table === 'train_positions' || table === 'stations')) {
// GeoJSON export for spatial data
const features = result.rows
.filter(row => row.latitude && row.longitude)
.map(row => ({
type: 'Feature',
geometry: {
type: 'Point',
coordinates: [parseFloat(row.longitude), parseFloat(row.latitude)],
},
properties: { ...row, latitude: undefined, longitude: undefined },
}));
const geojson = {
type: 'FeatureCollection',
features,
};
res.setHeader('Content-Type', 'application/geo+json');
res.setHeader('Content-Disposition', `attachment; filename="${table}_${Date.now()}.geojson"`);
res.json(geojson);
} else {
// JSON export (default)
res.json(result.rows);
}
} catch (error) {
next(error);
}
});
export default router;

View File

@@ -0,0 +1,347 @@
import express from 'express';
import db from '../../lib/db.js';
import redis from '../../lib/redis.js';
const router = express.Router();
// Map nucleo codes to region names
const NUCLEO_NAMES = {
'10': 'Madrid',
'20': 'Barcelona',
'30': 'Sevilla',
'31': 'Cadiz',
'40': 'Valencia',
'41': 'Murcia/Alicante',
'50': 'Bilbao',
'60': 'Asturias',
'61': 'Santander',
'62': 'San Sebastian',
'70': 'Malaga',
};
// GET /dashboard/snapshot - Get stats for a specific point in time
router.get('/snapshot', async (req, res, next) => {
try {
const { timestamp } = req.query;
const targetTime = timestamp ? new Date(timestamp) : new Date();
// Get a 1-minute window around the target time
const startTime = new Date(targetTime.getTime() - 30000);
const endTime = new Date(targetTime.getTime() + 30000);
// Get train positions at that time
const positionsResult = await db.query(`
SELECT DISTINCT ON (train_id)
train_id,
status,
speed,
latitude,
longitude,
bearing,
recorded_at
FROM train_positions
WHERE recorded_at BETWEEN $1 AND $2
ORDER BY train_id, recorded_at DESC
`, [startTime, endTime]);
// Get punctuality data at that time
const punctualityResult = await db.query(`
SELECT DISTINCT ON (train_id)
train_id,
line_code,
nucleo,
delay_minutes,
origin_station_code,
destination_station_code,
recorded_at
FROM train_punctuality
WHERE recorded_at BETWEEN $1 AND $2
ORDER BY train_id, recorded_at DESC
`, [startTime, endTime]);
// Calculate statistics
const trains = positionsResult.rows;
const punctuality = punctualityResult.rows;
// Train status breakdown
const statusCounts = {
IN_TRANSIT_TO: 0,
STOPPED_AT: 0,
INCOMING_AT: 0,
UNKNOWN: 0,
};
for (const train of trains) {
statusCounts[train.status] = (statusCounts[train.status] || 0) + 1;
}
// Punctuality breakdown
const punctualityCounts = {
on_time: 0, // delay = 0
minor_delay: 0, // 1-5 min
moderate_delay: 0, // 6-15 min
severe_delay: 0, // >15 min
early: 0, // < 0
};
let totalDelay = 0;
for (const p of punctuality) {
const delay = p.delay_minutes;
totalDelay += delay;
if (delay < 0) punctualityCounts.early++;
else if (delay === 0) punctualityCounts.on_time++;
else if (delay <= 5) punctualityCounts.minor_delay++;
else if (delay <= 15) punctualityCounts.moderate_delay++;
else punctualityCounts.severe_delay++;
}
// Lines breakdown
const linesCounts = {};
for (const p of punctuality) {
if (p.line_code) {
linesCounts[p.line_code] = (linesCounts[p.line_code] || 0) + 1;
}
}
// Nucleos breakdown
const nucleosCounts = {};
for (const p of punctuality) {
if (p.nucleo) {
nucleosCounts[p.nucleo] = (nucleosCounts[p.nucleo] || 0) + 1;
}
}
res.json({
timestamp: targetTime.toISOString(),
total_trains: trains.length,
status_breakdown: statusCounts,
punctuality_breakdown: punctualityCounts,
average_delay: punctuality.length > 0 ? (totalDelay / punctuality.length).toFixed(2) : 0,
lines_breakdown: linesCounts,
nucleos_breakdown: nucleosCounts,
punctuality_percentage: punctuality.length > 0
? ((punctualityCounts.on_time + punctualityCounts.minor_delay) / punctuality.length * 100).toFixed(1)
: 0,
});
} catch (error) {
next(error);
}
});
// GET /dashboard/current - Get current live stats
router.get('/current', async (req, res, next) => {
try {
// Get active trains from Redis
const trainIds = await redis.sMembers('trains:active');
// Get all current positions
const trains = await Promise.all(
trainIds.map(async (trainId) => {
const data = await redis.get(`trains:current:${trainId}`);
const fleetData = await redis.get(`fleet:${trainId}`);
return {
position: data ? JSON.parse(data) : null,
fleet: fleetData ? JSON.parse(fleetData) : null,
};
})
);
const validTrains = trains.filter(t => t.position !== null);
// Calculate status breakdown
const statusCounts = {
IN_TRANSIT_TO: 0,
STOPPED_AT: 0,
INCOMING_AT: 0,
UNKNOWN: 0,
};
for (const t of validTrains) {
const status = t.position.status || 'UNKNOWN';
statusCounts[status] = (statusCounts[status] || 0) + 1;
}
// Calculate punctuality from fleet data
const punctualityCounts = {
on_time: 0,
minor_delay: 0,
moderate_delay: 0,
severe_delay: 0,
early: 0,
};
let totalDelay = 0;
let delayCount = 0;
// Lines breakdown by nucleo (key: "nucleo:line")
const linesWithNucleo = {};
const nucleosCounts = {};
for (const t of validTrains) {
if (t.fleet) {
const delay = parseInt(t.fleet.retrasoMin, 10) || 0;
totalDelay += delay;
delayCount++;
if (delay < 0) punctualityCounts.early++;
else if (delay === 0) punctualityCounts.on_time++;
else if (delay <= 5) punctualityCounts.minor_delay++;
else if (delay <= 15) punctualityCounts.moderate_delay++;
else punctualityCounts.severe_delay++;
if (t.fleet.codLinea && t.fleet.nucleo) {
const key = `${t.fleet.nucleo}:${t.fleet.codLinea}`;
if (!linesWithNucleo[key]) {
linesWithNucleo[key] = {
line_code: t.fleet.codLinea,
nucleo: t.fleet.nucleo,
nucleo_name: NUCLEO_NAMES[t.fleet.nucleo] || t.fleet.nucleo,
count: 0,
};
}
linesWithNucleo[key].count++;
}
if (t.fleet.nucleo) {
if (!nucleosCounts[t.fleet.nucleo]) {
nucleosCounts[t.fleet.nucleo] = {
nucleo: t.fleet.nucleo,
nucleo_name: NUCLEO_NAMES[t.fleet.nucleo] || t.fleet.nucleo,
count: 0,
};
}
nucleosCounts[t.fleet.nucleo].count++;
}
}
}
// Convert to arrays sorted by count
const linesArray = Object.values(linesWithNucleo).sort((a, b) => b.count - a.count);
const nucleosArray = Object.values(nucleosCounts).sort((a, b) => b.count - a.count);
res.json({
timestamp: new Date().toISOString(),
total_trains: validTrains.length,
status_breakdown: statusCounts,
punctuality_breakdown: punctualityCounts,
average_delay: delayCount > 0 ? (totalDelay / delayCount).toFixed(2) : 0,
lines_breakdown: linesArray,
nucleos_breakdown: nucleosArray,
punctuality_percentage: delayCount > 0
? ((punctualityCounts.on_time + punctualityCounts.minor_delay) / delayCount * 100).toFixed(1)
: 0,
});
} catch (error) {
next(error);
}
});
// GET /dashboard/timeline - Get stats over a time range
router.get('/timeline', async (req, res, next) => {
try {
const { start, end, interval = '5' } = req.query;
const startTime = start ? new Date(start) : new Date(Date.now() - 3600000); // Last hour
const endTime = end ? new Date(end) : new Date();
const intervalMinutes = parseInt(interval, 10);
const result = await db.query(`
WITH time_buckets AS (
SELECT
date_trunc('minute', recorded_at) -
(EXTRACT(MINUTE FROM recorded_at)::INTEGER % $3) * INTERVAL '1 minute' as time_bucket,
train_id,
delay_minutes,
line_code
FROM train_punctuality
WHERE recorded_at BETWEEN $1 AND $2
)
SELECT
time_bucket,
COUNT(DISTINCT train_id) as train_count,
AVG(delay_minutes)::FLOAT as avg_delay,
COUNT(CASE WHEN delay_minutes <= 5 THEN 1 END)::FLOAT /
NULLIF(COUNT(*), 0) * 100 as punctuality_pct,
COUNT(CASE WHEN delay_minutes = 0 THEN 1 END) as on_time,
COUNT(CASE WHEN delay_minutes > 0 AND delay_minutes <= 5 THEN 1 END) as minor_delay,
COUNT(CASE WHEN delay_minutes > 5 AND delay_minutes <= 15 THEN 1 END) as moderate_delay,
COUNT(CASE WHEN delay_minutes > 15 THEN 1 END) as severe_delay
FROM time_buckets
GROUP BY time_bucket
ORDER BY time_bucket
`, [startTime, endTime, intervalMinutes]);
res.json({
start: startTime.toISOString(),
end: endTime.toISOString(),
interval_minutes: intervalMinutes,
data: result.rows.map(row => ({
timestamp: row.time_bucket,
train_count: parseInt(row.train_count, 10),
avg_delay: parseFloat(row.avg_delay) || 0,
punctuality_pct: parseFloat(row.punctuality_pct) || 0,
on_time: parseInt(row.on_time, 10),
minor_delay: parseInt(row.minor_delay, 10),
moderate_delay: parseInt(row.moderate_delay, 10),
severe_delay: parseInt(row.severe_delay, 10),
})),
});
} catch (error) {
next(error);
}
});
// GET /dashboard/lines-ranking - Get lines ranked by punctuality
router.get('/lines-ranking', async (req, res, next) => {
try {
const { timestamp, hours = 24 } = req.query;
const targetTime = timestamp ? new Date(timestamp) : new Date();
const startTime = new Date(targetTime.getTime() - hours * 3600000);
const result = await db.query(`
SELECT
line_code,
nucleo,
COUNT(*) as observations,
COUNT(DISTINCT train_id) as unique_trains,
AVG(delay_minutes)::FLOAT as avg_delay,
MAX(delay_minutes) as max_delay,
ROUND(
COUNT(CASE WHEN delay_minutes <= 5 THEN 1 END)::NUMERIC /
NULLIF(COUNT(*), 0) * 100, 1
) as punctuality_pct
FROM train_punctuality
WHERE recorded_at BETWEEN $1 AND $2
AND line_code IS NOT NULL
GROUP BY line_code, nucleo
HAVING COUNT(*) >= 10
ORDER BY punctuality_pct ASC
`, [startTime, targetTime]);
// Add nucleo_name to each row
const rowsWithNucleoName = result.rows.map(row => ({
...row,
nucleo_name: NUCLEO_NAMES[row.nucleo] || row.nucleo,
}));
res.json(rowsWithNucleoName);
} catch (error) {
next(error);
}
});
// GET /dashboard/available-range - Get available data time range
router.get('/available-range', async (req, res, next) => {
try {
const result = await db.query(`
SELECT
MIN(recorded_at) as earliest,
MAX(recorded_at) as latest
FROM train_punctuality
`);
res.json({
earliest: result.rows[0]?.earliest,
latest: result.rows[0]?.latest,
});
} catch (error) {
next(error);
}
});
export default router;

View File

@@ -0,0 +1,371 @@
import express from 'express';
import db from '../../lib/db.js';
import redis from '../../lib/redis.js';
const router = express.Router();
// GET /explorer/routes/:routeId - Get complete route information
router.get('/routes/:routeId', async (req, res, next) => {
try {
const { routeId } = req.params;
// Get route info
const routeResult = await db.query(
'SELECT * FROM routes WHERE route_id = $1',
[routeId]
);
if (routeResult.rows.length === 0) {
return res.status(404).json({ error: 'Route not found' });
}
const route = routeResult.rows[0];
// Get all trips for this route
const tripsResult = await db.query(
'SELECT * FROM trips WHERE route_id = $1 ORDER BY trip_id',
[routeId]
);
// Get all stops for this route (from any trip)
const stopsResult = await db.query(`
SELECT DISTINCT ON (s.stop_id)
s.stop_id,
s.stop_name,
s.stop_lat,
s.stop_lon,
s.location_type,
s.parent_station
FROM stops s
JOIN stop_times st ON s.stop_id = st.stop_id
JOIN trips t ON st.trip_id = t.trip_id
WHERE t.route_id = $1
ORDER BY s.stop_id
`, [routeId]);
// Get shape if available
const shapeResult = await db.query(`
SELECT DISTINCT shape_id FROM trips WHERE route_id = $1 AND shape_id IS NOT NULL LIMIT 1
`, [routeId]);
let shape = null;
if (shapeResult.rows.length > 0 && shapeResult.rows[0].shape_id) {
const shapePointsResult = await db.query(`
SELECT
shape_pt_lat,
shape_pt_lon,
shape_pt_sequence,
shape_dist_traveled
FROM shapes
WHERE shape_id = $1
ORDER BY shape_pt_sequence
`, [shapeResult.rows[0].shape_id]);
shape = {
shape_id: shapeResult.rows[0].shape_id,
points: shapePointsResult.rows.map(p => ({
lat: parseFloat(p.shape_pt_lat),
lon: parseFloat(p.shape_pt_lon),
sequence: p.shape_pt_sequence,
distance: p.shape_dist_traveled,
})),
};
}
res.json({
route,
trips: tripsResult.rows,
stops: stopsResult.rows,
shape,
total_trips: tripsResult.rows.length,
total_stops: stopsResult.rows.length,
});
} catch (error) {
next(error);
}
});
// GET /explorer/trips/:tripId/schedule - Get complete trip schedule
router.get('/trips/:tripId/schedule', async (req, res, next) => {
try {
const { tripId } = req.params;
const result = await db.query(
'SELECT * FROM get_trip_schedule($1)',
[tripId]
);
if (result.rows.length === 0) {
return res.status(404).json({ error: 'Trip not found' });
}
res.json(result.rows);
} catch (error) {
next(error);
}
});
// GET /explorer/stations/:stationId - Get complete station information
router.get('/stations/:stationId', async (req, res, next) => {
try {
const { stationId } = req.params;
// Get station info
const stationResult = await db.query(
'SELECT * FROM stops WHERE stop_id = $1',
[stationId]
);
if (stationResult.rows.length === 0) {
return res.status(404).json({ error: 'Station not found' });
}
const station = stationResult.rows[0];
// Get next departures
const departuresResult = await db.query(
'SELECT * FROM get_next_departures($1, 10)',
[stationId]
);
// Get routes that serve this station
const routesResult = await db.query(`
SELECT DISTINCT r.*
FROM routes r
JOIN trips t ON r.route_id = t.route_id
JOIN stop_times st ON t.trip_id = st.trip_id
WHERE st.stop_id = $1
`, [stationId]);
// Get statistics
const statsResult = await db.query(
'SELECT * FROM get_station_statistics($1, 7)',
[stationId]
);
res.json({
station,
next_departures: departuresResult.rows,
routes: routesResult.rows,
statistics: statsResult.rows[0] || null,
});
} catch (error) {
next(error);
}
});
// GET /explorer/planner - Trip planner (basic implementation)
router.get('/planner', async (req, res, next) => {
try {
const { origin, destination, time, date } = req.query;
if (!origin || !destination) {
return res.status(400).json({
error: 'Origin and destination are required',
});
}
// Get direct trips between two stations
const directTripsResult = await db.query(`
SELECT
t.trip_id,
t.route_id,
r.route_name,
t.trip_headsign,
origin_st.departure_time as origin_departure,
dest_st.arrival_time as destination_arrival,
EXTRACT(EPOCH FROM (dest_st.arrival_time - origin_st.departure_time)) / 60 as duration_minutes,
origin_st.stop_sequence as origin_sequence,
dest_st.stop_sequence as destination_sequence
FROM trips t
JOIN routes r ON t.route_id = r.route_id
JOIN stop_times origin_st ON t.trip_id = origin_st.trip_id AND origin_st.stop_id = $1
JOIN stop_times dest_st ON t.trip_id = dest_st.trip_id AND dest_st.stop_id = $2
WHERE origin_st.stop_sequence < dest_st.stop_sequence
AND ($3::TIME IS NULL OR origin_st.departure_time >= $3::TIME)
ORDER BY origin_st.departure_time
LIMIT 10
`, [origin, destination, time || null]);
// Get trip updates for delay info
const tripsWithDelays = await Promise.all(
directTripsResult.rows.map(async (trip) => {
const delayResult = await db.query(`
SELECT delay_seconds, schedule_relationship
FROM trip_updates
WHERE trip_id = $1
AND received_at > NOW() - INTERVAL '10 minutes'
ORDER BY received_at DESC
LIMIT 1
`, [trip.trip_id]);
return {
...trip,
delay: delayResult.rows[0] || null,
};
})
);
// Find trips with one transfer
const oneTransferResult = await db.query(`
WITH possible_transfers AS (
SELECT
t1.trip_id as trip1_id,
t1.route_id as route1_id,
r1.route_name as route1_name,
t2.trip_id as trip2_id,
t2.route_id as route2_id,
r2.route_name as route2_name,
origin_st.departure_time as origin_departure,
transfer_st1.arrival_time as transfer_arrival,
transfer_st2.departure_time as transfer_departure,
dest_st.arrival_time as destination_arrival,
transfer_st1.stop_id as transfer_station,
s.stop_name as transfer_station_name
FROM trips t1
JOIN routes r1 ON t1.route_id = r1.route_id
JOIN stop_times origin_st ON t1.trip_id = origin_st.trip_id AND origin_st.stop_id = $1
JOIN stop_times transfer_st1 ON t1.trip_id = transfer_st1.trip_id
JOIN stops s ON transfer_st1.stop_id = s.stop_id
JOIN stop_times transfer_st2 ON transfer_st1.stop_id = transfer_st2.stop_id
JOIN trips t2 ON transfer_st2.trip_id = t2.trip_id
JOIN routes r2 ON t2.route_id = r2.route_id
JOIN stop_times dest_st ON t2.trip_id = dest_st.trip_id AND dest_st.stop_id = $2
WHERE origin_st.stop_sequence < transfer_st1.stop_sequence
AND transfer_st2.stop_sequence < dest_st.stop_sequence
AND transfer_st1.arrival_time < transfer_st2.departure_time
AND (transfer_st2.departure_time - transfer_st1.arrival_time) >= INTERVAL '5 minutes'
AND (transfer_st2.departure_time - transfer_st1.arrival_time) <= INTERVAL '60 minutes'
AND ($3::TIME IS NULL OR origin_st.departure_time >= $3::TIME)
)
SELECT
*,
EXTRACT(EPOCH FROM (destination_arrival - origin_departure)) / 60 as total_duration_minutes
FROM possible_transfers
ORDER BY origin_departure, total_duration_minutes
LIMIT 5
`, [origin, destination, time || null]);
res.json({
origin,
destination,
requested_time: time || 'any',
requested_date: date || 'today',
direct_trips: tripsWithDelays,
trips_with_transfer: oneTransferResult.rows,
total_options: directTripsResult.rows.length + oneTransferResult.rows.length,
});
} catch (error) {
next(error);
}
});
// GET /explorer/stations/:stationId/nearby - Get nearby stations
router.get('/stations/:stationId/nearby', async (req, res, next) => {
try {
const { stationId } = req.params;
const { radius = 5 } = req.query; // radius in km
const result = await db.query(`
WITH target_station AS (
SELECT stop_lat, stop_lon
FROM stops
WHERE stop_id = $1
)
SELECT
s.stop_id,
s.stop_name,
s.stop_lat,
s.stop_lon,
ST_Distance(
ST_SetSRID(ST_MakePoint(s.stop_lon, s.stop_lat), 4326)::geography,
ST_SetSRID(ST_MakePoint(ts.stop_lon, ts.stop_lat), 4326)::geography
) / 1000 as distance_km
FROM stops s, target_station ts
WHERE s.stop_id != $1
AND ST_DWithin(
ST_SetSRID(ST_MakePoint(s.stop_lon, s.stop_lat), 4326)::geography,
ST_SetSRID(ST_MakePoint(ts.stop_lon, ts.stop_lat), 4326)::geography,
$2 * 1000
)
ORDER BY distance_km
LIMIT 20
`, [stationId, parseFloat(radius)]);
res.json(result.rows);
} catch (error) {
next(error);
}
});
// GET /explorer/routes/between - Find all routes between two stations
router.get('/routes/between', async (req, res, next) => {
try {
const { origin, destination } = req.query;
if (!origin || !destination) {
return res.status(400).json({
error: 'Origin and destination are required',
});
}
const result = await db.query(`
SELECT DISTINCT
r.route_id,
r.route_name,
r.route_type,
r.route_color,
COUNT(DISTINCT t.trip_id) as daily_trips
FROM routes r
JOIN trips t ON r.route_id = t.route_id
JOIN stop_times origin_st ON t.trip_id = origin_st.trip_id AND origin_st.stop_id = $1
JOIN stop_times dest_st ON t.trip_id = dest_st.trip_id AND dest_st.stop_id = $2
WHERE origin_st.stop_sequence < dest_st.stop_sequence
GROUP BY r.route_id, r.route_name, r.route_type, r.route_color
ORDER BY daily_trips DESC
`, [origin, destination]);
res.json(result.rows);
} catch (error) {
next(error);
}
});
// GET /explorer/search - Search for stations by name
router.get('/search', async (req, res, next) => {
try {
const { query, limit = 10 } = req.query;
if (!query || query.length < 2) {
return res.status(400).json({
error: 'Search query must be at least 2 characters',
});
}
const result = await db.query(`
SELECT
stop_id,
stop_name,
stop_lat,
stop_lon,
location_type,
parent_station
FROM stops
WHERE stop_name ILIKE $1
ORDER BY
CASE
WHEN stop_name ILIKE $2 THEN 1
WHEN stop_name ILIKE $3 THEN 2
ELSE 3
END,
stop_name
LIMIT $4
`, [`%${query}%`, `${query}%`, `% ${query}%`, parseInt(limit, 10)]);
res.json(result.rows);
} catch (error) {
next(error);
}
});
export default router;

View File

@@ -0,0 +1,160 @@
import express from 'express';
import db from '../../lib/db.js';
import logger from '../../lib/logger.js';
const router = express.Router();
// GET /lines - Get all train lines
router.get('/', async (req, res, next) => {
try {
const { nucleo } = req.query;
let query = `
SELECT
line_id,
line_code,
line_name,
nucleo_id,
nucleo_name,
color,
metadata
FROM train_lines
`;
const params = [];
if (nucleo) {
query += ' WHERE nucleo_id = $1';
params.push(nucleo);
}
query += ' ORDER BY nucleo_name, line_code';
const result = await db.query(query, params);
res.json(result.rows);
} catch (error) {
next(error);
}
});
// GET /lines/:id - Get specific line with geometry
router.get('/:id', async (req, res, next) => {
try {
const { id } = req.params;
const result = await db.query(
`SELECT
line_id,
line_code,
line_name,
nucleo_id,
nucleo_name,
color,
ST_AsGeoJSON(geometry) as geometry,
metadata
FROM train_lines
WHERE line_id = $1 OR line_code = $1`,
[id]
);
if (result.rows.length === 0) {
return res.status(404).json({ error: 'Line not found' });
}
const line = result.rows[0];
if (line.geometry) {
line.geometry = JSON.parse(line.geometry);
}
res.json(line);
} catch (error) {
next(error);
}
});
// GET /lines/:id/stations - Get stations on a line
router.get('/:id/stations', async (req, res, next) => {
try {
const { id } = req.params;
// Get line first
const lineResult = await db.query(
`SELECT line_code FROM train_lines WHERE line_id = $1 OR line_code = $1`,
[id]
);
if (lineResult.rows.length === 0) {
return res.status(404).json({ error: 'Line not found' });
}
const lineCode = lineResult.rows[0].line_code;
// Get stations that have this line in their LINEAS metadata
const result = await db.query(
`SELECT
station_id,
station_code,
station_name,
latitude,
longitude,
station_type,
metadata
FROM stations
WHERE metadata->>'lineas' LIKE $1
ORDER BY station_name`,
[`%${lineCode}%`]
);
res.json(result.rows);
} catch (error) {
next(error);
}
});
// GET /lines/geojson - Get all lines as GeoJSON FeatureCollection
router.get('/format/geojson', async (req, res, next) => {
try {
const { nucleo } = req.query;
let query = `
SELECT
line_id,
line_code,
line_name,
nucleo_id,
nucleo_name,
color,
ST_AsGeoJSON(geometry) as geometry
FROM train_lines
WHERE geometry IS NOT NULL
`;
const params = [];
if (nucleo) {
query += ' AND nucleo_id = $1';
params.push(nucleo);
}
const result = await db.query(query, params);
const features = result.rows.map(line => ({
type: 'Feature',
properties: {
id: line.line_id,
codigo: line.line_code,
nombre: line.line_name,
nucleo: line.nucleo_name,
color: line.color,
},
geometry: line.geometry ? JSON.parse(line.geometry) : null,
})).filter(f => f.geometry);
res.json({
type: 'FeatureCollection',
features,
});
} catch (error) {
next(error);
}
});
export default router;

View File

@@ -0,0 +1,267 @@
import express from 'express';
import db from '../../lib/db.js';
const router = express.Router();
// GET /punctuality/summary - Get overall punctuality summary
router.get('/summary', async (req, res, next) => {
try {
const { days = 7 } = req.query;
const result = await db.query(`
SELECT
COUNT(*) as total_observations,
COUNT(DISTINCT train_id) as unique_trains,
COUNT(DISTINCT line_code) as unique_lines,
AVG(delay_minutes)::FLOAT as avg_delay_minutes,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY delay_minutes)::FLOAT as median_delay_minutes,
MAX(delay_minutes) as max_delay_minutes,
COUNT(CASE WHEN delay_minutes = 0 THEN 1 END) as on_time_count,
COUNT(CASE WHEN delay_minutes > 0 AND delay_minutes <= 5 THEN 1 END) as minor_delay_count,
COUNT(CASE WHEN delay_minutes > 5 AND delay_minutes <= 15 THEN 1 END) as moderate_delay_count,
COUNT(CASE WHEN delay_minutes > 15 THEN 1 END) as severe_delay_count,
ROUND(
COUNT(CASE WHEN delay_minutes <= 5 THEN 1 END)::NUMERIC /
NULLIF(COUNT(*), 0) * 100,
2
) as punctuality_percentage
FROM train_punctuality
WHERE recorded_at > NOW() - INTERVAL '1 day' * $1
`, [parseInt(days, 10)]);
res.json(result.rows[0] || {});
} catch (error) {
next(error);
}
});
// GET /punctuality/by-line - Get punctuality statistics by line
router.get('/by-line', async (req, res, next) => {
try {
const { days = 7, limit = 50 } = req.query;
const result = await db.query(`
SELECT
line_code,
nucleo,
COUNT(*) as total_observations,
COUNT(DISTINCT train_id) as unique_trains,
AVG(delay_minutes)::FLOAT as avg_delay_minutes,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY delay_minutes)::FLOAT as median_delay_minutes,
MAX(delay_minutes) as max_delay_minutes,
COUNT(CASE WHEN delay_minutes = 0 THEN 1 END) as on_time_count,
COUNT(CASE WHEN delay_minutes > 0 AND delay_minutes <= 5 THEN 1 END) as minor_delay_count,
COUNT(CASE WHEN delay_minutes > 5 AND delay_minutes <= 15 THEN 1 END) as moderate_delay_count,
COUNT(CASE WHEN delay_minutes > 15 THEN 1 END) as severe_delay_count,
ROUND(
COUNT(CASE WHEN delay_minutes <= 5 THEN 1 END)::NUMERIC /
NULLIF(COUNT(*), 0) * 100,
2
) as punctuality_percentage
FROM train_punctuality
WHERE recorded_at > NOW() - INTERVAL '1 day' * $1
AND line_code IS NOT NULL
GROUP BY line_code, nucleo
HAVING COUNT(*) >= 10
ORDER BY punctuality_percentage ASC
LIMIT $2
`, [parseInt(days, 10), parseInt(limit, 10)]);
res.json(result.rows);
} catch (error) {
next(error);
}
});
// GET /punctuality/by-hour - Get punctuality pattern by hour of day
router.get('/by-hour', async (req, res, next) => {
try {
const { days = 7, line_code } = req.query;
let query = `
SELECT
EXTRACT(HOUR FROM recorded_at)::INTEGER as hour_of_day,
COUNT(*) as total_observations,
AVG(delay_minutes)::FLOAT as avg_delay_minutes,
ROUND(
COUNT(CASE WHEN delay_minutes <= 5 THEN 1 END)::NUMERIC /
NULLIF(COUNT(*), 0) * 100,
2
) as punctuality_percentage
FROM train_punctuality
WHERE recorded_at > NOW() - INTERVAL '1 day' * $1
`;
const params = [parseInt(days, 10)];
if (line_code) {
query += ` AND line_code = $2`;
params.push(line_code);
}
query += `
GROUP BY EXTRACT(HOUR FROM recorded_at)
ORDER BY hour_of_day
`;
const result = await db.query(query, params);
res.json(result.rows);
} catch (error) {
next(error);
}
});
// GET /punctuality/daily - Get daily punctuality trend
router.get('/daily', async (req, res, next) => {
try {
const { days = 30, line_code } = req.query;
let query = `
SELECT
DATE(recorded_at) as date,
COUNT(*) as total_observations,
COUNT(DISTINCT train_id) as unique_trains,
AVG(delay_minutes)::FLOAT as avg_delay_minutes,
ROUND(
COUNT(CASE WHEN delay_minutes <= 5 THEN 1 END)::NUMERIC /
NULLIF(COUNT(*), 0) * 100,
2
) as punctuality_percentage
FROM train_punctuality
WHERE recorded_at > NOW() - INTERVAL '1 day' * $1
`;
const params = [parseInt(days, 10)];
if (line_code) {
query += ` AND line_code = $2`;
params.push(line_code);
}
query += `
GROUP BY DATE(recorded_at)
ORDER BY date DESC
`;
const result = await db.query(query, params);
res.json(result.rows);
} catch (error) {
next(error);
}
});
// GET /punctuality/worst-lines - Get worst performing lines
router.get('/worst-lines', async (req, res, next) => {
try {
const { days = 7, limit = 10 } = req.query;
const result = await db.query(`
SELECT * FROM get_worst_punctuality_lines($1, $2)
`, [parseInt(days, 10), parseInt(limit, 10)]);
res.json(result.rows);
} catch (error) {
next(error);
}
});
// GET /punctuality/line/:lineCode - Get detailed stats for a specific line
router.get('/line/:lineCode', async (req, res, next) => {
try {
const { lineCode } = req.params;
const { days = 7 } = req.query;
const result = await db.query(`
SELECT * FROM get_line_punctuality_summary($1, $2)
`, [lineCode, parseInt(days, 10)]);
if (result.rows.length === 0) {
return res.status(404).json({ error: 'Line not found or no data available' });
}
res.json(result.rows[0]);
} catch (error) {
next(error);
}
});
// GET /punctuality/routes - Get punctuality by origin-destination pairs
router.get('/routes', async (req, res, next) => {
try {
const { days = 7, limit = 50, line_code } = req.query;
let query = `
SELECT
origin_station_code,
destination_station_code,
line_code,
COUNT(*) as total_trips,
AVG(delay_minutes)::FLOAT as avg_delay_minutes,
MAX(delay_minutes) as max_delay_minutes,
ROUND(
COUNT(CASE WHEN delay_minutes <= 5 THEN 1 END)::NUMERIC /
NULLIF(COUNT(*), 0) * 100,
2
) as punctuality_percentage
FROM train_punctuality
WHERE recorded_at > NOW() - INTERVAL '1 day' * $1
AND origin_station_code IS NOT NULL
AND destination_station_code IS NOT NULL
`;
const params = [parseInt(days, 10)];
if (line_code) {
query += ` AND line_code = $2`;
params.push(line_code);
}
query += `
GROUP BY origin_station_code, destination_station_code, line_code
HAVING COUNT(*) >= 5
ORDER BY punctuality_percentage ASC
LIMIT $${params.length + 1}
`;
params.push(parseInt(limit, 10));
const result = await db.query(query, params);
res.json(result.rows);
} catch (error) {
next(error);
}
});
// GET /punctuality/current-delays - Get trains currently delayed
router.get('/current-delays', async (req, res, next) => {
try {
const { min_delay = 5 } = req.query;
const result = await db.query(`
SELECT DISTINCT ON (train_id)
train_id,
trip_id,
line_code,
nucleo,
origin_station_code,
destination_station_code,
current_station_code,
next_station_code,
delay_minutes,
platform,
recorded_at
FROM train_punctuality
WHERE recorded_at > NOW() - INTERVAL '10 minutes'
AND delay_minutes >= $1
ORDER BY train_id, recorded_at DESC
`, [parseInt(min_delay, 10)]);
res.json(result.rows);
} catch (error) {
next(error);
}
});
export default router;

View File

@@ -0,0 +1,42 @@
import express from 'express';
import db from '../../lib/db.js';
const router = express.Router();
// GET /routes - Get all routes
router.get('/', async (req, res, next) => {
try {
const result = await db.query(`
SELECT * FROM routes
ORDER BY route_name
`);
res.json(result.rows);
} catch (error) {
next(error);
}
});
// GET /routes/:id - Get specific route
router.get('/:id', async (req, res, next) => {
try {
const { id } = req.params;
const result = await db.query(
'SELECT * FROM routes WHERE route_id = $1',
[id]
);
if (result.rows.length === 0) {
return res.status(404).json({
error: 'Route not found',
});
}
res.json(result.rows[0]);
} catch (error) {
next(error);
}
});
export default router;

View File

@@ -0,0 +1,51 @@
import express from 'express';
import db from '../../lib/db.js';
const router = express.Router();
// GET /stations - Get all stations
router.get('/', async (req, res, next) => {
try {
const { type } = req.query;
let query = 'SELECT * FROM stations';
const params = [];
if (type) {
query += ' WHERE station_type = $1';
params.push(type);
}
query += ' ORDER BY station_name';
const result = await db.query(query, params);
res.json(result.rows);
} catch (error) {
next(error);
}
});
// GET /stations/:id - Get specific station
router.get('/:id', async (req, res, next) => {
try {
const { id } = req.params;
const result = await db.query(
'SELECT * FROM stations WHERE station_id = $1',
[id]
);
if (result.rows.length === 0) {
return res.status(404).json({
error: 'Station not found',
});
}
res.json(result.rows[0]);
} catch (error) {
next(error);
}
});
export default router;

View File

@@ -0,0 +1,70 @@
import express from 'express';
import db from '../../lib/db.js';
import redis from '../../lib/redis.js';
const router = express.Router();
// GET /stats - Get system statistics
router.get('/', async (req, res, next) => {
try {
// Get active trains count
const activeTrains = await redis.sMembers('trains:active');
const activeCount = activeTrains.length;
// Get last update time
const lastUpdate = await redis.get('stats:last_update');
// Get total trains in database
const totalResult = await db.query(
'SELECT COUNT(*) as total FROM trains'
);
const totalTrains = parseInt(totalResult.rows[0].total, 10);
// Get total positions stored
const positionsResult = await db.query(
'SELECT COUNT(*) as total FROM train_positions WHERE recorded_at > NOW() - INTERVAL \'24 hours\''
);
const positions24h = parseInt(positionsResult.rows[0].total, 10);
res.json({
active_trains: activeCount,
total_trains: totalTrains,
positions_24h: positions24h,
last_update: lastUpdate,
timestamp: new Date().toISOString(),
});
} catch (error) {
next(error);
}
});
// GET /stats/train/:id - Get statistics for specific train
router.get('/train/:id', async (req, res, next) => {
try {
const { id } = req.params;
const { from, to } = req.query;
if (!from || !to) {
return res.status(400).json({
error: 'Missing required parameters: from, to',
});
}
const result = await db.query(
`SELECT * FROM calculate_train_statistics($1, $2, $3)`,
[id, new Date(from), new Date(to)]
);
if (result.rows.length === 0) {
return res.status(404).json({
error: 'No data found for this train in the specified period',
});
}
res.json(result.rows[0]);
} catch (error) {
next(error);
}
});
export default router;

View File

@@ -0,0 +1,321 @@
import express from 'express';
import db from '../../lib/db.js';
import redis from '../../lib/redis.js';
import logger from '../../lib/logger.js';
const router = express.Router();
// Normalize station code - Renfe uses codes with leading zeros (04040) but geojson has them without (4040)
function normalizeStationCode(code) {
if (!code) return null;
// Remove leading zeros for lookup
return code.replace(/^0+/, '');
}
// Helper to get station names map from codes
async function getStationNamesMap(stationCodes) {
if (!stationCodes || stationCodes.length === 0) return new Map();
const uniqueCodes = [...new Set(stationCodes.filter(Boolean))];
if (uniqueCodes.length === 0) return new Map();
// Create both original and normalized versions for lookup
const normalizedCodes = uniqueCodes.map(c => normalizeStationCode(c));
const allCodes = [...new Set([...uniqueCodes, ...normalizedCodes])];
const result = await db.query(
`SELECT station_code, station_name FROM stations WHERE station_code = ANY($1)`,
[allCodes]
);
// Build map that works with both original and normalized codes
const dbMap = new Map(result.rows.map(s => [s.station_code, s.station_name]));
const resultMap = new Map();
for (const code of uniqueCodes) {
// Try original code first, then normalized
const name = dbMap.get(code) || dbMap.get(normalizeStationCode(code));
if (name) {
resultMap.set(code, name);
}
}
return resultMap;
}
// GET /trains/current - Get all current train positions with fleet data
router.get('/current', async (req, res, next) => {
try {
const trainIds = await redis.sMembers('trains:active');
if (trainIds.length === 0) {
return res.json([]);
}
// First pass: collect all positions and fleet data
const trainsData = await Promise.all(
trainIds.map(async (trainId) => {
const data = await redis.get(`trains:current:${trainId}`);
if (!data) return null;
const position = JSON.parse(data);
const fleetData = await redis.get(`fleet:${trainId}`);
const fleet = fleetData ? JSON.parse(fleetData) : null;
return { position, fleet };
})
);
const validTrains = trainsData.filter(t => t !== null);
// Collect all station codes to resolve names in one query
const allStationCodes = [];
for (const { fleet } of validTrains) {
if (fleet) {
allStationCodes.push(fleet.codEstAct, fleet.codEstSig, fleet.codEstDest, fleet.codEstOrig);
}
}
// Get station names map
const stationNames = await getStationNamesMap(allStationCodes);
// Build final response with station names
const positions = validTrains.map(({ position, fleet }) => {
if (fleet) {
return {
...position,
codLinea: fleet.codLinea,
retrasoMin: fleet.retrasoMin,
codEstAct: fleet.codEstAct,
estacionActual: stationNames.get(fleet.codEstAct) || null,
codEstSig: fleet.codEstSig,
estacionSiguiente: stationNames.get(fleet.codEstSig) || null,
horaLlegadaSigEst: fleet.horaLlegadaSigEst,
codEstDest: fleet.codEstDest,
estacionDestino: stationNames.get(fleet.codEstDest) || null,
codEstOrig: fleet.codEstOrig,
estacionOrigen: stationNames.get(fleet.codEstOrig) || null,
nucleo: fleet.nucleo,
accesible: fleet.accesible,
via: fleet.via,
};
}
return position;
});
res.json(positions);
} catch (error) {
next(error);
}
});
// GET /trains/:id - Get specific train information with fleet data
router.get('/:id', async (req, res, next) => {
try {
const { id } = req.params;
// Get current position from Redis
const currentData = await redis.get(`trains:current:${id}`);
const current = currentData ? JSON.parse(currentData) : null;
// Get fleet data from Renfe
const fleetData = await redis.get(`fleet:${id}`);
const fleet = fleetData ? JSON.parse(fleetData) : null;
// Get train info from database
const trainResult = await db.query(
'SELECT * FROM trains WHERE train_id = $1',
[id]
);
const train = trainResult.rows[0] || null;
if (!train && !current) {
return res.status(404).json({
error: 'Train not found',
});
}
// Resolve station names if we have fleet data
let currentStation = null;
let nextStation = null;
let destStation = null;
let origStation = null;
if (fleet) {
const stationCodes = [fleet.codEstAct, fleet.codEstSig, fleet.codEstDest, fleet.codEstOrig].filter(Boolean);
if (stationCodes.length > 0) {
const stationMap = await getStationNamesMap(stationCodes);
currentStation = stationMap.get(fleet.codEstAct);
nextStation = stationMap.get(fleet.codEstSig);
destStation = stationMap.get(fleet.codEstDest);
origStation = stationMap.get(fleet.codEstOrig);
}
}
res.json({
...train,
current_position: current,
fleet_data: fleet ? {
codLinea: fleet.codLinea,
retrasoMin: fleet.retrasoMin,
codEstAct: fleet.codEstAct,
estacionActual: currentStation,
codEstSig: fleet.codEstSig,
estacionSiguiente: nextStation,
horaLlegadaSigEst: fleet.horaLlegadaSigEst,
codEstDest: fleet.codEstDest,
estacionDestino: destStation,
codEstOrig: fleet.codEstOrig,
estacionOrigen: origStation,
nucleo: fleet.nucleo,
accesible: fleet.accesible,
via: fleet.via,
} : null,
});
} catch (error) {
next(error);
}
});
// GET /trains/history/all - Get all train positions in a time range
router.get('/history/all', async (req, res, next) => {
try {
const { from, to, limit = 5000 } = req.query;
// Default to last hour if no time range specified
const endTime = to ? new Date(to) : new Date();
const startTime = from ? new Date(from) : new Date(endTime.getTime() - 3600000);
const result = await db.query(
`SELECT
train_id,
latitude,
longitude,
bearing,
speed,
status,
occupancy_status,
trip_id,
timestamp,
recorded_at
FROM train_positions
WHERE timestamp >= $1 AND timestamp <= $2
ORDER BY timestamp ASC
LIMIT $3`,
[startTime, endTime, parseInt(limit, 10)]
);
// Convert latitude/longitude to numbers
const positions = result.rows.map(row => ({
...row,
latitude: parseFloat(row.latitude),
longitude: parseFloat(row.longitude),
}));
res.json(positions);
} catch (error) {
next(error);
}
});
// GET /trains/:id/history - Get train position history
router.get('/:id/history', async (req, res, next) => {
try {
const { id } = req.params;
const { from, to, limit = 100 } = req.query;
let query = `
SELECT
train_id,
latitude,
longitude,
bearing,
speed,
status,
timestamp,
recorded_at
FROM train_positions
WHERE train_id = $1
`;
const params = [id];
let paramIndex = 2;
if (from) {
query += ` AND timestamp >= $${paramIndex}`;
params.push(new Date(from));
paramIndex++;
}
if (to) {
query += ` AND timestamp <= $${paramIndex}`;
params.push(new Date(to));
paramIndex++;
}
query += ` ORDER BY timestamp DESC LIMIT $${paramIndex}`;
params.push(parseInt(limit, 10));
const result = await db.query(query, params);
res.json(result.rows);
} catch (error) {
next(error);
}
});
// GET /trains/:id/path - Get train path for visualization
router.get('/:id/path', async (req, res, next) => {
try {
const { id } = req.params;
const { from, to } = req.query;
if (!from || !to) {
return res.status(400).json({
error: 'Missing required parameters: from, to',
});
}
const result = await db.query(
`SELECT * FROM get_train_path($1, $2, $3)`,
[id, new Date(from), new Date(to)]
);
res.json(result.rows);
} catch (error) {
next(error);
}
});
// GET /trains/area - Get trains in a geographic area
router.get('/area', async (req, res, next) => {
try {
const { minLat, minLon, maxLat, maxLon, time } = req.query;
if (!minLat || !minLon || !maxLat || !maxLon) {
return res.status(400).json({
error: 'Missing required parameters: minLat, minLon, maxLat, maxLon',
});
}
const timestamp = time ? new Date(time) : new Date();
const result = await db.query(
`SELECT * FROM get_trains_in_area($1, $2, $3, $4, $5)`,
[
parseFloat(minLat),
parseFloat(minLon),
parseFloat(maxLat),
parseFloat(maxLon),
timestamp,
]
);
res.json(result.rows);
} catch (error) {
next(error);
}
});
export default router;

View File

@@ -0,0 +1,213 @@
import express from 'express';
import db from '../../lib/db.js';
import redis from '../../lib/redis.js';
const router = express.Router();
// GET /trips - Get all active trips for today
router.get('/', async (req, res, next) => {
try {
const { route_id, service_id } = req.query;
let query = 'SELECT * FROM active_trips_today WHERE 1=1';
const params = [];
let paramIndex = 1;
if (route_id) {
query += ` AND route_id = $${paramIndex}`;
params.push(route_id);
paramIndex++;
}
if (service_id) {
query += ` AND service_id = $${paramIndex}`;
params.push(service_id);
paramIndex++;
}
query += ' ORDER BY trip_id';
const result = await db.query(query, params);
res.json(result.rows);
} catch (error) {
next(error);
}
});
// GET /trips/:id - Get specific trip details
router.get('/:id', async (req, res, next) => {
try {
const { id } = req.params;
const tripResult = await db.query(
'SELECT * FROM trips WHERE trip_id = $1',
[id]
);
if (tripResult.rows.length === 0) {
return res.status(404).json({
error: 'Trip not found',
});
}
const trip = tripResult.rows[0];
// Get schedule using the function
const scheduleResult = await db.query(
'SELECT * FROM get_trip_schedule($1)',
[id]
);
res.json({
...trip,
schedule: scheduleResult.rows,
});
} catch (error) {
next(error);
}
});
// GET /trips/:id/updates - Get real-time updates for a trip
router.get('/:id/updates', async (req, res, next) => {
try {
const { id } = req.params;
// Get latest trip update
const updateResult = await db.query(`
SELECT
tu.*,
json_agg(
json_build_object(
'stop_sequence', stu.stop_sequence,
'stop_id', stu.stop_id,
'arrival_delay', stu.arrival_delay,
'departure_delay', stu.departure_delay,
'schedule_relationship', stu.schedule_relationship
) ORDER BY stu.stop_sequence
) as stop_time_updates
FROM trip_updates tu
LEFT JOIN stop_time_updates stu ON tu.update_id = stu.update_id
WHERE tu.trip_id = $1
AND tu.received_at > NOW() - INTERVAL '10 minutes'
GROUP BY tu.update_id
ORDER BY tu.received_at DESC
LIMIT 1
`, [id]);
if (updateResult.rows.length === 0) {
return res.json({
trip_id: id,
has_updates: false,
message: 'No recent updates available',
});
}
res.json({
trip_id: id,
has_updates: true,
...updateResult.rows[0],
});
} catch (error) {
next(error);
}
});
// GET /trips/:id/delays - Get delay information for a trip
router.get('/:id/delays', async (req, res, next) => {
try {
const { id } = req.params;
// Check Redis cache first
const cachedDelay = await redis.get(`trip:delay:${id}`);
if (cachedDelay) {
return res.json(JSON.parse(cachedDelay));
}
// Get from database
const result = await db.query(`
SELECT
trip_id,
delay_seconds,
schedule_relationship,
received_at,
CASE
WHEN delay_seconds IS NULL THEN 'NO_DATA'
WHEN delay_seconds = 0 THEN 'ON_TIME'
WHEN delay_seconds > 0 AND delay_seconds <= 300 THEN 'MINOR_DELAY'
WHEN delay_seconds > 300 AND delay_seconds <= 900 THEN 'MODERATE_DELAY'
WHEN delay_seconds > 900 THEN 'MAJOR_DELAY'
WHEN delay_seconds < 0 THEN 'EARLY'
END as delay_status,
CASE
WHEN delay_seconds >= 60 THEN
FLOOR(delay_seconds / 60) || ' min ' || MOD(delay_seconds::int, 60) || ' s'
WHEN delay_seconds IS NOT NULL THEN
delay_seconds || ' s'
ELSE 'N/A'
END as delay_formatted
FROM trip_updates
WHERE trip_id = $1
AND received_at > NOW() - INTERVAL '10 minutes'
ORDER BY received_at DESC
LIMIT 1
`, [id]);
if (result.rows.length === 0) {
return res.json({
trip_id: id,
delay_status: 'NO_DATA',
delay_seconds: null,
delay_formatted: 'N/A',
message: 'No recent delay information available',
});
}
const delayInfo = result.rows[0];
// Cache for 30 seconds
await redis.set(`trip:delay:${id}`, JSON.stringify(delayInfo), 30);
res.json(delayInfo);
} catch (error) {
next(error);
}
});
// GET /trips/route/:routeId - Get all trips for a specific route
router.get('/route/:routeId', async (req, res, next) => {
try {
const { routeId } = req.params;
const result = await db.query(`
SELECT t.*
FROM trips t
WHERE t.route_id = $1
ORDER BY t.trip_id
`, [routeId]);
res.json(result.rows);
} catch (error) {
next(error);
}
});
// GET /trips/delayed - Get all currently delayed trips
router.get('/delayed/all', async (req, res, next) => {
try {
const { min_delay } = req.query;
const minDelaySeconds = min_delay ? parseInt(min_delay, 10) : 0;
const result = await db.query(`
SELECT * FROM delayed_trips
WHERE delay_seconds >= $1
ORDER BY delay_seconds DESC
`, [minDelaySeconds]);
res.json(result.rows);
} catch (error) {
next(error);
}
});
export default router;

278
backend/src/api/server.js Normal file
View File

@@ -0,0 +1,278 @@
import express from 'express';
import { createServer } from 'http';
import { Server } from 'socket.io';
import config from '../config/index.js';
import logger from '../lib/logger.js';
import db from '../lib/db.js';
import redis from '../lib/redis.js';
import {
rateLimiters,
helmetConfig,
hppProtection,
securityHeaders,
sanitizeRequest,
securityErrorHandler
} from '../lib/security.js';
import trainsRouter from './routes/trains.js';
import routesRouter from './routes/routes.js';
import stationsRouter from './routes/stations.js';
import statsRouter from './routes/stats.js';
import alertsRouter from './routes/alerts.js';
import tripsRouter from './routes/trips.js';
import analyticsRouter from './routes/analytics.js';
import explorerRouter from './routes/explorer.js';
import linesRouter from './routes/lines.js';
import punctualityRouter from './routes/punctuality.js';
import dashboardRouter from './routes/dashboard.js';
class APIServer {
constructor() {
this.app = express();
this.httpServer = createServer(this.app);
this.io = new Server(this.httpServer, {
cors: {
origin: config.cors.origin,
methods: ['GET', 'POST'],
},
});
this.watchInterval = null;
}
setupMiddleware() {
// Trust proxy (for rate limiting behind nginx)
this.app.set('trust proxy', 1);
// Security headers (Helmet)
this.app.use(helmetConfig);
// Additional security headers
this.app.use(securityHeaders);
// HPP - HTTP Parameter Pollution protection
this.app.use(hppProtection);
// CORS
this.app.use((req, res, next) => {
const origin = req.headers.origin;
if (config.cors.origin.includes(origin) || config.cors.origin.includes('*')) {
res.header('Access-Control-Allow-Origin', origin);
}
res.header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS');
res.header('Access-Control-Allow-Headers', 'Origin, X-Requested-With, Content-Type, Accept, Authorization');
res.header('Access-Control-Max-Age', '86400'); // 24 hours
// Handle preflight
if (req.method === 'OPTIONS') {
return res.sendStatus(204);
}
next();
});
// JSON parser with size limit
this.app.use(express.json({ limit: '1mb' }));
this.app.use(express.urlencoded({ extended: true, limit: '1mb' }));
// Request sanitization
this.app.use(sanitizeRequest);
// General rate limiting (skip in development if needed)
if (config.env === 'production' || config.security?.enableRateLimiting) {
this.app.use(rateLimiters.general);
}
// Request logging
this.app.use((req, res, next) => {
const start = Date.now();
res.on('finish', () => {
const duration = Date.now() - start;
logger.info({
method: req.method,
url: req.url,
status: res.statusCode,
duration: `${duration}ms`,
ip: req.ip,
}, 'HTTP Request');
});
next();
});
}
setupRoutes() {
// Health check (no rate limiting)
this.app.get('/health', (req, res) => {
res.json({
status: 'ok',
timestamp: new Date().toISOString(),
uptime: process.uptime(),
security: {
rateLimiting: config.env === 'production' || config.security?.enableRateLimiting,
helmet: true,
hpp: true
}
});
});
// API routes with rate limiting
this.app.use('/trains', trainsRouter);
this.app.use('/routes', routesRouter);
this.app.use('/stations', stationsRouter);
this.app.use('/stats', statsRouter);
this.app.use('/alerts', alertsRouter);
this.app.use('/trips', tripsRouter);
this.app.use('/lines', linesRouter);
this.app.use('/punctuality', punctualityRouter);
this.app.use('/dashboard', dashboardRouter);
// Analytics routes with stricter rate limiting
this.app.use('/analytics', rateLimiters.strict, analyticsRouter);
// Explorer routes with strict rate limiting
this.app.use('/explorer', rateLimiters.strict, explorerRouter);
// Security error handler
this.app.use(securityErrorHandler);
// 404 handler
this.app.use((req, res) => {
res.status(404).json({
error: 'Not Found',
path: req.url,
});
});
// Error handler
this.app.use((err, req, res, next) => {
// Don't expose stack trace in production
const errorResponse = {
error: err.message || 'Internal Server Error',
};
if (config.env !== 'production') {
errorResponse.stack = err.stack;
}
logger.error({ error: err.message, stack: err.stack }, 'API Error');
res.status(err.status || 500).json(errorResponse);
});
}
setupWebSocket() {
this.io.on('connection', (socket) => {
logger.info({ socketId: socket.id }, 'WebSocket client connected');
// Join default room
socket.join('trains');
// Handle subscribe to specific train
socket.on('subscribe:train', (trainId) => {
socket.join(`train:${trainId}`);
logger.debug({ socketId: socket.id, trainId }, 'Client subscribed to train');
});
// Handle unsubscribe from specific train
socket.on('unsubscribe:train', (trainId) => {
socket.leave(`train:${trainId}`);
logger.debug({ socketId: socket.id, trainId }, 'Client unsubscribed from train');
});
socket.on('disconnect', () => {
logger.info({ socketId: socket.id }, 'WebSocket client disconnected');
});
});
// Watch Redis for updates and broadcast via WebSocket
this.startRedisWatch();
}
async startRedisWatch() {
// Poll Redis every 2 seconds for changes
this.watchInterval = setInterval(async () => {
try {
const lastUpdate = await redis.get('stats:last_update');
if (!lastUpdate) return;
const trainIds = await redis.sMembers('trains:active');
if (trainIds.length === 0) return;
// Get all current positions
const positions = await Promise.all(
trainIds.map(async (trainId) => {
const data = await redis.get(`trains:current:${trainId}`);
return data ? JSON.parse(data) : null;
})
);
const validPositions = positions.filter(p => p !== null);
if (validPositions.length > 0) {
// Broadcast to all clients
this.io.to('trains').emit('trains:update', validPositions);
// Broadcast individual train updates
for (const position of validPositions) {
this.io.to(`train:${position.train_id}`).emit('train:update', position);
}
}
} catch (error) {
logger.error({ error: error.message }, 'Error in Redis watch');
}
}, 2000);
}
async start() {
// Connect to databases
await db.connect();
await redis.connect();
// Setup middleware and routes
this.setupMiddleware();
this.setupRoutes();
this.setupWebSocket();
// Start HTTP server
this.httpServer.listen(config.port, () => {
logger.info({
port: config.port,
env: config.env,
}, 'API Server started');
});
}
async stop() {
logger.info('Stopping API Server...');
if (this.watchInterval) {
clearInterval(this.watchInterval);
this.watchInterval = null;
}
this.io.close();
this.httpServer.close();
await db.disconnect();
await redis.disconnect();
logger.info('API Server stopped');
}
}
// Main execution
const server = new APIServer();
// Graceful shutdown
const shutdown = async (signal) => {
logger.info({ signal }, 'Received shutdown signal');
await server.stop();
process.exit(0);
};
process.on('SIGTERM', () => shutdown('SIGTERM'));
process.on('SIGINT', () => shutdown('SIGINT'));
// Start server
server.start().catch((error) => {
logger.fatal({ error }, 'Failed to start API server');
process.exit(1);
});
export default APIServer;

View File

@@ -0,0 +1,62 @@
import dotenv from 'dotenv';
dotenv.config();
export const config = {
// Server
port: parseInt(process.env.PORT || '3000', 10),
env: process.env.NODE_ENV || 'development',
logLevel: process.env.LOG_LEVEL || 'info',
// Database
database: {
url: process.env.DATABASE_URL,
poolMin: 2,
poolMax: 10,
},
// Redis
redis: {
url: process.env.REDIS_URL,
},
// GTFS-RT
gtfsRT: {
vehiclePositionsUrl: process.env.GTFS_RT_URL || 'https://gtfsrt.renfe.com/vehicle_positions.pb',
tripUpdatesUrl: process.env.GTFS_TRIP_UPDATES_URL || 'https://gtfsrt.renfe.com/trip_updates_cercanias.pb',
alertsUrl: process.env.GTFS_ALERTS_URL || 'https://gtfsrt.renfe.com/alerts.pb',
pollingInterval: parseInt(process.env.POLLING_INTERVAL || '30000', 10),
},
// CORS
cors: {
origin: process.env.CORS_ORIGIN?.split(',') || ['http://localhost:3000'],
},
// JWT
jwt: {
secret: process.env.JWT_SECRET || 'default_secret_change_me',
expiresIn: '7d',
},
// Security
security: {
enableRateLimiting: process.env.ENABLE_RATE_LIMITING !== 'false',
rateLimits: {
general: {
windowMs: parseInt(process.env.RATE_LIMIT_WINDOW_MS || '900000', 10), // 15 minutes
max: parseInt(process.env.RATE_LIMIT_MAX || '1000', 10),
},
strict: {
windowMs: parseInt(process.env.RATE_LIMIT_STRICT_WINDOW_MS || '900000', 10),
max: parseInt(process.env.RATE_LIMIT_STRICT_MAX || '100', 10),
},
export: {
windowMs: parseInt(process.env.RATE_LIMIT_EXPORT_WINDOW_MS || '3600000', 10), // 1 hour
max: parseInt(process.env.RATE_LIMIT_EXPORT_MAX || '10', 10),
},
},
},
};
export default config;

62
backend/src/lib/db.js Normal file
View File

@@ -0,0 +1,62 @@
import pg from 'pg';
import config from '../config/index.js';
import logger from './logger.js';
const { Pool } = pg;
class Database {
constructor() {
this.pool = null;
}
async connect() {
if (this.pool) {
return this.pool;
}
try {
this.pool = new Pool({
connectionString: config.database.url,
min: config.database.poolMin,
max: config.database.poolMax,
});
// Test connection
const client = await this.pool.connect();
logger.info('PostgreSQL connected successfully');
client.release();
return this.pool;
} catch (error) {
logger.error({ error }, 'Failed to connect to PostgreSQL');
throw error;
}
}
async query(text, params) {
if (!this.pool) {
await this.connect();
}
try {
const result = await this.pool.query(text, params);
return result;
} catch (error) {
logger.error({ error, query: text }, 'Database query error');
throw error;
}
}
async disconnect() {
if (this.pool) {
await this.pool.end();
logger.info('PostgreSQL disconnected');
this.pool = null;
}
}
}
// Singleton instance
const db = new Database();
export default db;

16
backend/src/lib/logger.js Normal file
View File

@@ -0,0 +1,16 @@
import pino from 'pino';
import config from '../config/index.js';
const logger = pino({
level: config.logLevel,
transport: config.env === 'development' ? {
target: 'pino-pretty',
options: {
colorize: true,
translateTime: 'SYS:standard',
ignore: 'pid,hostname',
},
} : undefined,
});
export default logger;

131
backend/src/lib/redis.js Normal file
View File

@@ -0,0 +1,131 @@
import { createClient } from 'redis';
import config from '../config/index.js';
import logger from './logger.js';
class RedisClient {
constructor() {
this.client = null;
}
async connect() {
if (this.client?.isOpen) {
return this.client;
}
try {
this.client = createClient({
url: config.redis.url,
});
this.client.on('error', (err) => {
logger.error({ error: err }, 'Redis error');
});
this.client.on('connect', () => {
logger.info('Redis connecting...');
});
this.client.on('ready', () => {
logger.info('Redis connected successfully');
});
await this.client.connect();
return this.client;
} catch (error) {
logger.error({ error }, 'Failed to connect to Redis');
throw error;
}
}
async get(key) {
if (!this.client?.isOpen) {
await this.connect();
}
try {
return await this.client.get(key);
} catch (error) {
logger.error({ error, key }, 'Redis GET error');
throw error;
}
}
async set(key, value, options = {}) {
if (!this.client?.isOpen) {
await this.connect();
}
try {
return await this.client.set(key, value, options);
} catch (error) {
logger.error({ error, key }, 'Redis SET error');
throw error;
}
}
async del(key) {
if (!this.client?.isOpen) {
await this.connect();
}
try {
return await this.client.del(key);
} catch (error) {
logger.error({ error, key }, 'Redis DEL error');
throw error;
}
}
async keys(pattern) {
if (!this.client?.isOpen) {
await this.connect();
}
try {
return await this.client.keys(pattern);
} catch (error) {
logger.error({ error, pattern }, 'Redis KEYS error');
throw error;
}
}
async sAdd(key, ...members) {
if (!this.client?.isOpen) {
await this.connect();
}
try {
return await this.client.sAdd(key, members);
} catch (error) {
logger.error({ error, key }, 'Redis SADD error');
throw error;
}
}
async sMembers(key) {
if (!this.client?.isOpen) {
await this.connect();
}
try {
return await this.client.sMembers(key);
} catch (error) {
logger.error({ error, key }, 'Redis SMEMBERS error');
throw error;
}
}
async disconnect() {
if (this.client?.isOpen) {
await this.client.quit();
logger.info('Redis disconnected');
this.client = null;
}
}
}
// Singleton instance
const redis = new RedisClient();
export default redis;

319
backend/src/lib/security.js Normal file
View File

@@ -0,0 +1,319 @@
import rateLimit from 'express-rate-limit';
import helmet from 'helmet';
import hpp from 'hpp';
import { validationResult, query, param } from 'express-validator';
import logger from './logger.js';
// Rate limiting configurations
export const rateLimiters = {
// General API rate limiter
general: rateLimit({
windowMs: 15 * 60 * 1000, // 15 minutes
max: 1000, // 1000 requests per 15 minutes
message: {
error: 'Too many requests',
message: 'You have exceeded the rate limit. Please try again later.',
retryAfter: '15 minutes'
},
standardHeaders: true,
legacyHeaders: false,
handler: (req, res, next, options) => {
logger.warn({
ip: req.ip,
path: req.path,
method: req.method
}, 'Rate limit exceeded');
res.status(429).json(options.message);
}
}),
// Strict rate limiter for heavy endpoints (analytics, export)
strict: rateLimit({
windowMs: 15 * 60 * 1000, // 15 minutes
max: 100, // 100 requests per 15 minutes
message: {
error: 'Too many requests',
message: 'This endpoint has stricter limits. Please try again later.',
retryAfter: '15 minutes'
},
standardHeaders: true,
legacyHeaders: false,
handler: (req, res, next, options) => {
logger.warn({
ip: req.ip,
path: req.path,
method: req.method
}, 'Strict rate limit exceeded');
res.status(429).json(options.message);
}
}),
// Very strict for export/heavy operations
export: rateLimit({
windowMs: 60 * 60 * 1000, // 1 hour
max: 10, // 10 exports per hour
message: {
error: 'Export limit reached',
message: 'You can only export 10 times per hour. Please try again later.',
retryAfter: '1 hour'
},
standardHeaders: true,
legacyHeaders: false
}),
// WebSocket connection limiter
websocket: rateLimit({
windowMs: 60 * 1000, // 1 minute
max: 10, // 10 connection attempts per minute
message: {
error: 'Too many connection attempts',
message: 'Please wait before trying to connect again.'
},
standardHeaders: true,
legacyHeaders: false
})
};
// Helmet security headers configuration
export const helmetConfig = helmet({
contentSecurityPolicy: {
directives: {
defaultSrc: ["'self'"],
styleSrc: ["'self'", "'unsafe-inline'", "https://unpkg.com"],
scriptSrc: ["'self'", "https://unpkg.com"],
imgSrc: ["'self'", "data:", "https:", "blob:"],
connectSrc: ["'self'", "wss:", "ws:", "https://gtfsrt.renfe.com", "https://data.renfe.com"],
fontSrc: ["'self'", "https://fonts.gstatic.com"],
objectSrc: ["'none'"],
mediaSrc: ["'self'"],
frameSrc: ["'none'"]
}
},
crossOriginEmbedderPolicy: false, // Allow loading map tiles
crossOriginResourcePolicy: { policy: "cross-origin" }
});
// HPP - HTTP Parameter Pollution protection
export const hppProtection = hpp({
whitelist: [
'route_id',
'train_id',
'station_id',
'type',
'severity',
'format'
]
});
// Input validation rules
export const validators = {
// Train ID validation
trainId: param('id')
.trim()
.notEmpty()
.withMessage('Train ID is required')
.isLength({ max: 100 })
.withMessage('Train ID too long')
.matches(/^[\w\-\.]+$/)
.withMessage('Invalid train ID format'),
// Route ID validation
routeId: param('routeId')
.trim()
.notEmpty()
.withMessage('Route ID is required')
.isLength({ max: 100 })
.withMessage('Route ID too long'),
// Station ID validation
stationId: param('stationId')
.trim()
.notEmpty()
.withMessage('Station ID is required')
.isLength({ max: 100 })
.withMessage('Station ID too long'),
// Pagination validation
pagination: [
query('limit')
.optional()
.isInt({ min: 1, max: 1000 })
.withMessage('Limit must be between 1 and 1000')
.toInt(),
query('offset')
.optional()
.isInt({ min: 0 })
.withMessage('Offset must be a positive integer')
.toInt()
],
// Date range validation
dateRange: [
query('from')
.optional()
.isISO8601()
.withMessage('Invalid from date format (use ISO8601)'),
query('to')
.optional()
.isISO8601()
.withMessage('Invalid to date format (use ISO8601)')
],
// Geographic bounds validation
geoBounds: [
query('minLat')
.optional()
.isFloat({ min: -90, max: 90 })
.withMessage('minLat must be between -90 and 90'),
query('maxLat')
.optional()
.isFloat({ min: -90, max: 90 })
.withMessage('maxLat must be between -90 and 90'),
query('minLon')
.optional()
.isFloat({ min: -180, max: 180 })
.withMessage('minLon must be between -180 and 180'),
query('maxLon')
.optional()
.isFloat({ min: -180, max: 180 })
.withMessage('maxLon must be between -180 and 180')
],
// Heatmap parameters
heatmap: [
query('gridSize')
.optional()
.isFloat({ min: 0.001, max: 1 })
.withMessage('gridSize must be between 0.001 and 1'),
query('hours')
.optional()
.isInt({ min: 1, max: 168 })
.withMessage('hours must be between 1 and 168 (1 week)')
],
// Alert filters
alertFilters: [
query('severity')
.optional()
.isIn(['LOW', 'MEDIUM', 'HIGH', 'CRITICAL'])
.withMessage('Invalid severity level'),
query('type')
.optional()
.isIn(['DELAY', 'CANCELLATION', 'INCIDENT', 'MAINTENANCE', 'OTHER'])
.withMessage('Invalid alert type')
],
// Export format
exportFormat: [
query('format')
.optional()
.isIn(['json', 'csv', 'geojson'])
.withMessage('Format must be json, csv, or geojson'),
query('type')
.optional()
.isIn(['positions', 'routes', 'stations', 'alerts', 'statistics'])
.withMessage('Invalid export type')
]
};
// Validation middleware
export const validate = (validations) => {
return async (req, res, next) => {
// Run all validations
for (const validation of validations) {
const result = await validation.run(req);
if (!result.isEmpty()) break;
}
const errors = validationResult(req);
if (errors.isEmpty()) {
return next();
}
logger.warn({
path: req.path,
errors: errors.array()
}, 'Validation failed');
return res.status(400).json({
error: 'Validation Error',
details: errors.array().map(err => ({
field: err.path,
message: err.msg
}))
});
};
};
// Security headers middleware
export const securityHeaders = (req, res, next) => {
// Remove X-Powered-By header
res.removeHeader('X-Powered-By');
// Add additional security headers
res.setHeader('X-Content-Type-Options', 'nosniff');
res.setHeader('X-Frame-Options', 'DENY');
res.setHeader('X-XSS-Protection', '1; mode=block');
res.setHeader('Referrer-Policy', 'strict-origin-when-cross-origin');
res.setHeader('Permissions-Policy', 'geolocation=(), microphone=(), camera=()');
next();
};
// Request sanitization middleware
export const sanitizeRequest = (req, res, next) => {
// Sanitize query parameters
if (req.query) {
for (const key of Object.keys(req.query)) {
if (typeof req.query[key] === 'string') {
// Remove potential SQL injection patterns
req.query[key] = req.query[key]
.replace(/[;'"\\]/g, '')
.substring(0, 500); // Limit length
}
}
}
// Sanitize path parameters
if (req.params) {
for (const key of Object.keys(req.params)) {
if (typeof req.params[key] === 'string') {
req.params[key] = req.params[key]
.replace(/[;'"\\]/g, '')
.substring(0, 200);
}
}
}
next();
};
// Error handler for security issues
export const securityErrorHandler = (err, req, res, next) => {
if (err.type === 'entity.too.large') {
return res.status(413).json({
error: 'Payload Too Large',
message: 'Request body exceeds size limit'
});
}
if (err.type === 'charset.unsupported') {
return res.status(415).json({
error: 'Unsupported Media Type',
message: 'Unsupported character encoding'
});
}
next(err);
};
export default {
rateLimiters,
helmetConfig,
hppProtection,
validators,
validate,
securityHeaders,
sanitizeRequest,
securityErrorHandler
};

View File

@@ -0,0 +1,381 @@
import GtfsRealtimeBindings from 'gtfs-realtime-bindings';
import fetch from 'node-fetch';
import config from '../config/index.js';
import logger from '../lib/logger.js';
import db from '../lib/db.js';
import redis from '../lib/redis.js';
class AlertsPoller {
constructor() {
this.isRunning = false;
this.pollInterval = null;
this.stats = {
totalPolls: 0,
successfulPolls: 0,
failedPolls: 0,
totalAlerts: 0,
lastPollTime: null,
};
}
async start() {
logger.info('Starting Service Alerts Poller...');
await db.connect();
await redis.connect();
this.isRunning = true;
// Initial poll
await this.poll();
// Setup polling interval
this.pollInterval = setInterval(
() => this.poll(),
config.gtfsRT.pollingInterval
);
logger.info({
interval: config.gtfsRT.pollingInterval,
url: config.gtfsRT.alertsUrl,
}, 'Service Alerts Poller started');
}
async poll() {
if (!this.isRunning) return;
this.stats.totalPolls++;
const startTime = Date.now();
try {
logger.debug('Polling Service Alerts feed...');
const response = await fetch(config.gtfsRT.alertsUrl, {
timeout: 10000,
});
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
const buffer = await response.arrayBuffer();
const feed = GtfsRealtimeBindings.transit_realtime.FeedMessage.decode(
new Uint8Array(buffer)
);
logger.debug({
entities: feed.entity?.length || 0,
}, 'Service Alerts feed decoded');
const alerts = [];
for (const entity of feed.entity || []) {
if (entity.alert) {
const alert = this.parseAlert(entity);
if (alert) {
alerts.push(alert);
}
}
}
logger.info({
alerts: alerts.length,
duration: Date.now() - startTime,
}, 'Processed service alerts');
if (alerts.length > 0) {
await this.storeAlerts(alerts);
await this.updateRedisCache(alerts);
}
this.stats.successfulPolls++;
this.stats.totalAlerts = alerts.length;
this.stats.lastPollTime = new Date();
} catch (error) {
this.stats.failedPolls++;
logger.error({
error: error.message,
stack: error.stack,
}, 'Error polling Service Alerts feed');
}
}
parseAlert(entity) {
try {
const alert = entity.alert;
// Extract header and description
const headerText = this.extractText(alert.headerText);
const descriptionText = this.extractText(alert.descriptionText);
const url = this.extractText(alert.url);
// Extract informed entities
const informedEntities = [];
for (const ie of alert.informedEntity || []) {
informedEntities.push({
agency_id: ie.agencyId,
route_id: ie.routeId,
route_type: ie.routeType,
trip_id: ie.trip?.tripId,
stop_id: ie.stopId,
});
}
// Extract route and train IDs
const routeIds = informedEntities
.map(ie => ie.route_id)
.filter(Boolean);
const tripIds = informedEntities
.map(ie => ie.trip_id)
.filter(Boolean);
// Determine type
const alertType = this.determineAlertType(alert);
return {
alert_id: entity.id,
route_id: routeIds[0] || null,
train_id: tripIds[0] || null,
alert_type: alertType,
severity: this.mapSeverity(alert.severityLevel),
title: headerText,
description: descriptionText,
url: url,
header_text: headerText,
description_text: descriptionText,
cause: this.mapCause(alert.cause),
effect: this.mapEffect(alert.effect),
start_time: alert.activePeriod?.[0]?.start
? new Date(alert.activePeriod[0].start * 1000)
: null,
end_time: alert.activePeriod?.[0]?.end
? new Date(alert.activePeriod[0].end * 1000)
: null,
informed_entity: informedEntities,
};
} catch (error) {
logger.error({
error: error.message,
entity: entity.id,
}, 'Error parsing alert');
return null;
}
}
extractText(translation) {
if (!translation) return null;
if (translation.translation && translation.translation.length > 0) {
return translation.translation[0].text;
}
return null;
}
determineAlertType(alert) {
const effect = alert.effect;
const cause = alert.cause;
if (effect === 4) return 'CANCELLATION'; // NO_SERVICE
if (effect === 1) return 'DELAY'; // REDUCED_SERVICE
if (effect === 8) return 'MODIFIED_SERVICE'; // MODIFIED_SERVICE
if (cause === 9) return 'INCIDENT'; // ACCIDENT
if (cause === 10) return 'INCIDENT'; // MEDICAL_EMERGENCY
return 'INFO';
}
mapSeverity(level) {
const map = {
1: 'LOW',
2: 'MEDIUM',
3: 'HIGH',
4: 'CRITICAL',
};
return map[level] || 'MEDIUM';
}
mapCause(cause) {
const map = {
1: 'UNKNOWN_CAUSE',
2: 'OTHER_CAUSE',
3: 'TECHNICAL_PROBLEM',
4: 'STRIKE',
5: 'DEMONSTRATION',
6: 'ACCIDENT',
7: 'HOLIDAY',
8: 'WEATHER',
9: 'MAINTENANCE',
10: 'CONSTRUCTION',
11: 'POLICE_ACTIVITY',
12: 'MEDICAL_EMERGENCY',
};
return map[cause] || 'UNKNOWN_CAUSE';
}
mapEffect(effect) {
const map = {
1: 'NO_SERVICE',
2: 'REDUCED_SERVICE',
3: 'SIGNIFICANT_DELAYS',
4: 'DETOUR',
5: 'ADDITIONAL_SERVICE',
6: 'MODIFIED_SERVICE',
7: 'OTHER_EFFECT',
8: 'UNKNOWN_EFFECT',
9: 'STOP_MOVED',
};
return map[effect] || 'UNKNOWN_EFFECT';
}
async storeAlerts(alerts) {
const client = await db.pool.connect();
try {
await client.query('BEGIN');
for (const alert of alerts) {
// Check if alert already exists
const existingResult = await client.query(
'SELECT alert_id FROM alerts WHERE alert_id = $1',
[alert.alert_id]
);
if (existingResult.rows.length === 0) {
// Insert new alert
await client.query(`
INSERT INTO alerts (
route_id, train_id, alert_type, severity, title,
description, url, header_text, description_text,
cause, effect, start_time, end_time, informed_entity
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
`, [
alert.route_id,
alert.train_id,
alert.alert_type,
alert.severity,
alert.title,
alert.description,
alert.url,
alert.header_text,
alert.description_text,
alert.cause,
alert.effect,
alert.start_time,
alert.end_time,
JSON.stringify(alert.informed_entity),
]);
} else {
// Update existing alert
await client.query(`
UPDATE alerts
SET
alert_type = $2,
severity = $3,
title = $4,
description = $5,
url = $6,
header_text = $7,
description_text = $8,
cause = $9,
effect = $10,
end_time = $11,
informed_entity = $12,
updated_at = NOW()
WHERE alert_id = $1
`, [
alert.alert_id,
alert.alert_type,
alert.severity,
alert.title,
alert.description,
alert.url,
alert.header_text,
alert.description_text,
alert.cause,
alert.effect,
alert.end_time,
JSON.stringify(alert.informed_entity),
]);
}
}
await client.query('COMMIT');
logger.debug({ count: alerts.length }, 'Alerts stored');
} catch (error) {
await client.query('ROLLBACK');
logger.error({ error: error.message }, 'Error storing alerts');
throw error;
} finally {
client.release();
}
}
async updateRedisCache(alerts) {
try {
// Clear previous active alerts
await redis.del('alerts:active');
// Store each alert
for (const alert of alerts) {
const key = `alert:${alert.alert_id}`;
await redis.set(key, JSON.stringify(alert), { EX: 600 }); // 10 min TTL
// Add to active alerts set
await redis.sAdd('alerts:active', alert.alert_id);
// Index by route
if (alert.route_id) {
await redis.sAdd(`alerts:route:${alert.route_id}`, alert.alert_id);
}
// Index by train
if (alert.train_id) {
await redis.sAdd(`alerts:train:${alert.train_id}`, alert.alert_id);
}
}
logger.debug({ count: alerts.length }, 'Redis cache updated');
} catch (error) {
logger.error({ error: error.message }, 'Error updating Redis cache');
}
}
async stop() {
logger.info('Stopping Service Alerts Poller...');
this.isRunning = false;
if (this.pollInterval) {
clearInterval(this.pollInterval);
this.pollInterval = null;
}
await db.disconnect();
await redis.disconnect();
logger.info('Service Alerts Poller stopped');
}
}
// Main execution
const poller = new AlertsPoller();
const shutdown = async (signal) => {
logger.info({ signal }, 'Received shutdown signal');
await poller.stop();
process.exit(0);
};
process.on('SIGTERM', () => shutdown('SIGTERM'));
process.on('SIGINT', () => shutdown('SIGINT'));
poller.start().catch((error) => {
logger.fatal({ error }, 'Failed to start Service Alerts Poller');
process.exit(1);
});
export default AlertsPoller;

View File

@@ -0,0 +1,125 @@
import cron from 'node-cron';
import db from '../lib/db.js';
import logger from '../lib/logger.js';
class AnalyticsRefresher {
constructor() {
this.schedule = process.env.ANALYTICS_REFRESH_SCHEDULE || '*/15 * * * *'; // Every 15 minutes
this.job = null;
this.isRefreshing = false;
}
async refreshViews() {
if (this.isRefreshing) {
logger.warn('Analytics refresh already in progress, skipping...');
return;
}
this.isRefreshing = true;
const startTime = Date.now();
try {
logger.info('Starting analytics views refresh...');
await db.query('SELECT refresh_all_analytics_views()');
const duration = Date.now() - startTime;
logger.info({
duration,
durationType: 'ms',
}, 'Analytics views refreshed successfully');
} catch (error) {
logger.error({
error: error.message,
stack: error.stack,
}, 'Error refreshing analytics views');
} finally {
this.isRefreshing = false;
}
}
async cleanupExports() {
try {
logger.info('Cleaning up old export requests...');
const result = await db.query('SELECT cleanup_old_export_requests() as deleted_count');
const deletedCount = result.rows[0].deleted_count;
logger.info({
deletedCount,
}, 'Export cleanup completed');
} catch (error) {
logger.error({
error: error.message,
}, 'Error cleaning up exports');
}
}
start() {
logger.info({
schedule: this.schedule,
}, 'Starting Analytics Refresher Worker');
// Refresh materialized views periodically
this.job = cron.schedule(this.schedule, async () => {
await this.refreshViews();
});
// Cleanup exports daily at 3 AM
cron.schedule('0 3 * * *', async () => {
await this.cleanupExports();
});
// Initial refresh
setTimeout(() => {
this.refreshViews();
}, 5000); // Wait 5 seconds after startup
logger.info('Analytics Refresher Worker started');
}
async stop() {
logger.info('Stopping Analytics Refresher Worker...');
if (this.job) {
this.job.stop();
this.job = null;
}
// Wait for any ongoing refresh to complete
while (this.isRefreshing) {
await new Promise(resolve => setTimeout(resolve, 1000));
}
await db.disconnect();
logger.info('Analytics Refresher Worker stopped');
}
}
// Main execution
const worker = new AnalyticsRefresher();
// Graceful shutdown
const shutdown = async (signal) => {
logger.info({ signal }, 'Received shutdown signal');
await worker.stop();
process.exit(0);
};
process.on('SIGTERM', () => shutdown('SIGTERM'));
process.on('SIGINT', () => shutdown('SIGINT'));
// Start worker
(async () => {
try {
await db.connect();
worker.start();
} catch (error) {
logger.fatal({ error }, 'Failed to start Analytics Refresher Worker');
process.exit(1);
}
})();
export default AnalyticsRefresher;

View File

@@ -0,0 +1,340 @@
import GtfsRealtimeBindings from 'gtfs-realtime-bindings';
import fetch from 'node-fetch';
import config from '../config/index.js';
import logger from '../lib/logger.js';
import db from '../lib/db.js';
import redis from '../lib/redis.js';
class GTFSRealtimePoller {
constructor() {
this.isRunning = false;
this.pollInterval = null;
this.statsInterval = null;
this.stats = {
totalPolls: 0,
successfulPolls: 0,
failedPolls: 0,
totalTrains: 0,
lastPollTime: null,
errors: [],
};
}
async start() {
logger.info('Starting GTFS-RT Poller...');
// Connect to databases
await db.connect();
await redis.connect();
this.isRunning = true;
// Initial poll
await this.poll();
// Setup polling interval
this.pollInterval = setInterval(
() => this.poll(),
config.gtfsRT.pollingInterval
);
// Setup stats interval (every minute)
this.statsInterval = setInterval(
() => this.logStats(),
60000
);
logger.info({
interval: config.gtfsRT.pollingInterval,
url: config.gtfsRT.vehiclePositionsUrl,
}, 'GTFS-RT Poller started');
}
async poll() {
if (!this.isRunning) {
return;
}
this.stats.totalPolls++;
const startTime = Date.now();
try {
logger.debug('Polling GTFS-RT feed...');
// Fetch GTFS-RT feed
const response = await fetch(config.gtfsRT.vehiclePositionsUrl, {
timeout: 10000,
});
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
const buffer = await response.arrayBuffer();
const feed = GtfsRealtimeBindings.transit_realtime.FeedMessage.decode(
new Uint8Array(buffer)
);
logger.debug({
entities: feed.entity?.length || 0,
header: feed.header,
}, 'GTFS-RT feed decoded');
// Process entities
const positions = [];
const trainIds = [];
for (const entity of feed.entity || []) {
if (entity.vehicle) {
const position = this.parseVehiclePosition(entity);
if (position) {
positions.push(position);
trainIds.push(position.train_id);
}
}
}
logger.info({
trains: positions.length,
duration: Date.now() - startTime,
}, 'Processed vehicle positions');
// Store positions
if (positions.length > 0) {
await this.storePositions(positions);
await this.updateRedisCache(positions, trainIds);
}
this.stats.successfulPolls++;
this.stats.totalTrains = positions.length;
this.stats.lastPollTime = new Date();
} catch (error) {
this.stats.failedPolls++;
this.stats.errors.push({
timestamp: new Date(),
message: error.message,
});
// Keep only last 10 errors
if (this.stats.errors.length > 10) {
this.stats.errors = this.stats.errors.slice(-10);
}
logger.error({
error: error.message,
stack: error.stack,
duration: Date.now() - startTime,
}, 'Error polling GTFS-RT feed');
}
}
parseVehiclePosition(entity) {
try {
const vehicle = entity.vehicle;
const position = vehicle.position;
const timestamp = vehicle.timestamp
? new Date(vehicle.timestamp * 1000)
: new Date();
// Validate required fields
if (!position || position.latitude == null || position.longitude == null) {
logger.warn({ entity: entity.id }, 'Vehicle position missing coordinates');
return null;
}
// Validate coordinate ranges
if (
position.latitude < -90 || position.latitude > 90 ||
position.longitude < -180 || position.longitude > 180
) {
logger.warn({
lat: position.latitude,
lon: position.longitude,
}, 'Invalid coordinates');
return null;
}
return {
train_id: vehicle.vehicle?.id || entity.id,
trip_id: vehicle.trip?.tripId || null,
route_id: vehicle.trip?.routeId || null,
latitude: position.latitude,
longitude: position.longitude,
bearing: position.bearing || null,
speed: position.speed ? position.speed * 3.6 : null, // Convert m/s to km/h
status: this.mapVehicleStatus(vehicle.currentStatus),
occupancy_status: this.mapOccupancyStatus(vehicle.occupancyStatus),
timestamp: timestamp,
recorded_at: new Date(),
};
} catch (error) {
logger.error({
error: error.message,
entity: entity.id,
}, 'Error parsing vehicle position');
return null;
}
}
mapVehicleStatus(status) {
const statusMap = {
0: 'INCOMING_AT',
1: 'STOPPED_AT',
2: 'IN_TRANSIT_TO',
};
return statusMap[status] || 'UNKNOWN';
}
mapOccupancyStatus(status) {
const occupancyMap = {
0: 'EMPTY',
1: 'MANY_SEATS_AVAILABLE',
2: 'FEW_SEATS_AVAILABLE',
3: 'STANDING_ROOM_ONLY',
4: 'CRUSHED_STANDING_ROOM_ONLY',
5: 'FULL',
6: 'NOT_ACCEPTING_PASSENGERS',
};
return occupancyMap[status] || null;
}
async storePositions(positions) {
const client = await db.pool.connect();
try {
await client.query('BEGIN');
// Batch insert positions
const values = positions.map((p, idx) => {
const offset = idx * 12;
return `($${offset + 1}, $${offset + 2}, $${offset + 3}, $${offset + 4}, $${offset + 5}, $${offset + 6}, $${offset + 7}, $${offset + 8}, $${offset + 9}, $${offset + 10}, $${offset + 11}, $${offset + 12})`;
}).join(',');
const params = positions.flatMap(p => [
p.train_id,
p.trip_id,
p.route_id,
p.latitude,
p.longitude,
p.bearing,
p.speed,
p.status,
p.occupancy_status,
p.timestamp,
p.recorded_at,
`SRID=4326;POINT(${p.longitude} ${p.latitude})`,
]);
await client.query(`
INSERT INTO train_positions (
train_id, trip_id, route_id, latitude, longitude, bearing, speed,
status, occupancy_status, timestamp, recorded_at, position
)
VALUES ${values}
`, params);
// Update trains table (upsert)
for (const p of positions) {
await client.query(`
INSERT INTO trains (train_id, route_id, train_type, last_seen, is_active)
VALUES ($1, $2, 'UNKNOWN', $3, true)
ON CONFLICT (train_id) DO UPDATE
SET last_seen = $3, is_active = true, route_id = COALESCE(trains.route_id, $2)
`, [p.train_id, p.route_id, p.recorded_at]);
}
await client.query('COMMIT');
logger.debug({ count: positions.length }, 'Positions stored in PostgreSQL');
} catch (error) {
await client.query('ROLLBACK');
logger.error({ error: error.message }, 'Error storing positions');
throw error;
} finally {
client.release();
}
}
async updateRedisCache(positions, trainIds) {
try {
// Store each position in Redis with 5-minute expiration
const promises = positions.map(async (p) => {
const key = `trains:current:${p.train_id}`;
await redis.set(key, JSON.stringify(p), { EX: 300 });
});
await Promise.all(promises);
// Update active trains set
if (trainIds.length > 0) {
await redis.del('trains:active');
await redis.sAdd('trains:active', ...trainIds);
}
// Store last update timestamp
await redis.set('stats:last_update', new Date().toISOString());
logger.debug({ count: positions.length }, 'Redis cache updated');
} catch (error) {
logger.error({ error: error.message }, 'Error updating Redis cache');
// Don't throw - Redis cache is not critical
}
}
logStats() {
const successRate = this.stats.totalPolls > 0
? ((this.stats.successfulPolls / this.stats.totalPolls) * 100).toFixed(2)
: 0;
logger.info({
...this.stats,
successRate: `${successRate}%`,
recentErrors: this.stats.errors.slice(-3),
}, 'Poller statistics');
}
async stop() {
logger.info('Stopping GTFS-RT Poller...');
this.isRunning = false;
if (this.pollInterval) {
clearInterval(this.pollInterval);
this.pollInterval = null;
}
if (this.statsInterval) {
clearInterval(this.statsInterval);
this.statsInterval = null;
}
await db.disconnect();
await redis.disconnect();
logger.info('GTFS-RT Poller stopped');
}
}
// Main execution
const poller = new GTFSRealtimePoller();
// Graceful shutdown
const shutdown = async (signal) => {
logger.info({ signal }, 'Received shutdown signal');
await poller.stop();
process.exit(0);
};
process.on('SIGTERM', () => shutdown('SIGTERM'));
process.on('SIGINT', () => shutdown('SIGINT'));
// Start poller
poller.start().catch((error) => {
logger.fatal({ error }, 'Failed to start poller');
process.exit(1);
});
export default GTFSRealtimePoller;

View File

@@ -0,0 +1,522 @@
import fetch from 'node-fetch';
import { createWriteStream, createReadStream } from 'fs';
import { pipeline } from 'stream/promises';
import { createHash } from 'crypto';
import { parse } from 'csv-parse';
import { unlink, mkdir } from 'fs/promises';
import { createGunzip } from 'zlib';
import { Extract } from 'unzipper';
import { join } from 'path';
import config from '../config/index.js';
import logger from '../lib/logger.js';
import db from '../lib/db.js';
import redis from '../lib/redis.js';
class GTFSStaticSyncer {
constructor() {
this.tmpDir = '/tmp/gtfs';
this.gtfsUrl = process.env.GTFS_STATIC_URL || 'https://data.renfe.com/api/gtfs/latest.zip';
this.syncInterval = null;
}
async start() {
logger.info('Starting GTFS Static Syncer...');
await db.connect();
await redis.connect();
// Initial sync (don't crash if it fails, just log and continue)
try {
await this.sync();
} catch (error) {
logger.warn({ error: error.message }, 'Initial GTFS sync failed, will retry at scheduled time');
}
// Schedule daily sync at 3 AM
const now = new Date();
const next3AM = new Date();
next3AM.setHours(3, 0, 0, 0);
if (next3AM <= now) {
next3AM.setDate(next3AM.getDate() + 1);
}
const msUntil3AM = next3AM - now;
setTimeout(() => {
this.sync();
// Then sync every 24 hours
this.syncInterval = setInterval(() => this.sync(), 24 * 60 * 60 * 1000);
}, msUntil3AM);
logger.info({
nextSync: next3AM.toISOString(),
msUntil: msUntil3AM,
}, 'GTFS Static Syncer scheduled');
}
async sync() {
const startTime = Date.now();
logger.info('Starting GTFS Static synchronization...');
try {
// Ensure tmp directory exists
await mkdir(this.tmpDir, { recursive: true });
// Download GTFS zip
const zipPath = await this.downloadGTFS();
// Calculate checksum
const checksum = await this.calculateChecksum(zipPath);
// Check if already imported
const existingResult = await db.query(
'SELECT feed_id FROM gtfs_feeds WHERE feed_checksum = $1',
[checksum]
);
if (existingResult.rows.length > 0) {
logger.info({ checksum }, 'GTFS feed already imported, skipping');
await unlink(zipPath);
return;
}
// Extract zip
const extractPath = await this.extractZip(zipPath);
// Parse GTFS files
const data = await this.parseGTFSFiles(extractPath);
// Import to database
await this.importToDatabase(data, checksum);
// Cleanup
await unlink(zipPath);
const duration = Date.now() - startTime;
logger.info({
duration,
checksum,
stats: {
routes: data.routes?.length || 0,
trips: data.trips?.length || 0,
stops: data.stops?.length || 0,
stopTimes: data.stopTimes?.length || 0,
},
}, 'GTFS Static synchronization completed');
// Invalidate cache
await this.invalidateCaches();
} catch (error) {
logger.error({
error: error.message,
stack: error.stack,
}, 'Error synchronizing GTFS Static');
throw error;
}
}
async downloadGTFS() {
const zipPath = join(this.tmpDir, 'gtfs.zip');
logger.info({ url: this.gtfsUrl }, 'Downloading GTFS feed...');
const response = await fetch(this.gtfsUrl);
if (!response.ok) {
throw new Error(`Failed to download GTFS: ${response.status} ${response.statusText}`);
}
await pipeline(
response.body,
createWriteStream(zipPath)
);
logger.info({ path: zipPath }, 'GTFS feed downloaded');
return zipPath;
}
async calculateChecksum(filePath) {
const hash = createHash('sha256');
await pipeline(
createReadStream(filePath),
hash
);
return hash.digest('hex');
}
async extractZip(zipPath) {
const extractPath = join(this.tmpDir, 'extracted');
logger.info('Extracting GTFS zip...');
await pipeline(
createReadStream(zipPath),
Extract({ path: extractPath })
);
logger.info({ path: extractPath }, 'GTFS zip extracted');
return extractPath;
}
async parseGTFSFiles(extractPath) {
logger.info('Parsing GTFS files...');
const data = {
routes: await this.parseCSV(join(extractPath, 'routes.txt')),
trips: await this.parseCSV(join(extractPath, 'trips.txt')),
stops: await this.parseCSV(join(extractPath, 'stops.txt')),
stopTimes: await this.parseCSV(join(extractPath, 'stop_times.txt')),
calendar: await this.parseCSV(join(extractPath, 'calendar.txt')),
calendarDates: await this.parseCSV(join(extractPath, 'calendar_dates.txt')),
shapes: await this.parseCSV(join(extractPath, 'shapes.txt')),
};
logger.info('GTFS files parsed');
return data;
}
async parseCSV(filePath) {
const records = [];
try {
await pipeline(
createReadStream(filePath),
parse({
columns: true,
skip_empty_lines: true,
trim: true,
}),
async function* (source) {
for await (const record of source) {
records.push(record);
}
}
);
} catch (error) {
logger.warn({ file: filePath, error: error.message }, 'Optional GTFS file not found');
}
return records;
}
async importToDatabase(data, checksum) {
const client = await db.pool.connect();
try {
await client.query('BEGIN');
logger.info('Importing GTFS data to database...');
// Create feed record
const feedResult = await client.query(`
INSERT INTO gtfs_feeds (feed_checksum, feed_url, imported_at)
VALUES ($1, $2, NOW())
RETURNING feed_id
`, [checksum, this.gtfsUrl]);
const feedId = feedResult.rows[0].feed_id;
// Import routes (update existing)
if (data.routes && data.routes.length > 0) {
logger.info({ count: data.routes.length }, 'Importing routes...');
for (const route of data.routes) {
await client.query(`
INSERT INTO routes (
route_id, route_name, route_short_name, route_type,
color, description, metadata
)
VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (route_id) DO UPDATE
SET
route_name = EXCLUDED.route_name,
route_short_name = EXCLUDED.route_short_name,
route_type = EXCLUDED.route_type,
color = EXCLUDED.color,
description = EXCLUDED.description,
updated_at = NOW()
`, [
route.route_id,
route.route_long_name || route.route_short_name,
route.route_short_name,
this.mapRouteType(route.route_type),
route.route_color ? '#' + route.route_color : null,
route.route_desc,
JSON.stringify({
gtfs_route_type: route.route_type,
text_color: route.route_text_color,
}),
]);
}
}
// Import stops (update existing stations)
if (data.stops && data.stops.length > 0) {
logger.info({ count: data.stops.length }, 'Importing stops...');
for (const stop of data.stops) {
await client.query(`
INSERT INTO stations (
station_id, station_name, station_code,
latitude, longitude, position, station_type, metadata
)
VALUES ($1, $2, $3, $4, $5, ST_SetSRID(ST_MakePoint($5, $4), 4326)::geography, $6, $7)
ON CONFLICT (station_id) DO UPDATE
SET
station_name = EXCLUDED.station_name,
latitude = EXCLUDED.latitude,
longitude = EXCLUDED.longitude,
position = EXCLUDED.position,
metadata = EXCLUDED.metadata,
updated_at = NOW()
`, [
stop.stop_id,
stop.stop_name,
stop.stop_code,
parseFloat(stop.stop_lat),
parseFloat(stop.stop_lon),
this.mapStationType(stop.location_type),
JSON.stringify({
location_type: stop.location_type,
parent_station: stop.parent_station,
wheelchair_boarding: stop.wheelchair_boarding,
platform_code: stop.platform_code,
}),
]);
}
}
// Import trips
if (data.trips && data.trips.length > 0) {
logger.info({ count: data.trips.length }, 'Importing trips...');
// Batch insert trips
const tripValues = data.trips.map((trip, idx) => {
const offset = idx * 10;
return `($${offset + 1}, $${offset + 2}, $${offset + 3}, $${offset + 4}, $${offset + 5}, $${offset + 6}, $${offset + 7}, $${offset + 8}, $${offset + 9}, $${offset + 10})`;
}).join(',');
const tripParams = data.trips.flatMap(trip => [
trip.trip_id,
trip.route_id,
trip.service_id,
trip.trip_headsign,
trip.trip_short_name,
parseInt(trip.direction_id) || null,
trip.block_id,
trip.shape_id,
parseInt(trip.wheelchair_accessible) || null,
parseInt(trip.bikes_allowed) || null,
]);
await client.query(`
INSERT INTO trips (
trip_id, route_id, service_id, trip_headsign, trip_short_name,
direction_id, block_id, shape_id, wheelchair_accessible, bikes_allowed
)
VALUES ${tripValues}
ON CONFLICT (trip_id) DO UPDATE
SET
route_id = EXCLUDED.route_id,
service_id = EXCLUDED.service_id,
trip_headsign = EXCLUDED.trip_headsign,
updated_at = NOW()
`, tripParams);
}
// Import stop_times (batch insert)
if (data.stopTimes && data.stopTimes.length > 0) {
logger.info({ count: data.stopTimes.length }, 'Importing stop times...');
// Delete old stop_times
await client.query('DELETE FROM stop_times');
// Batch insert in chunks of 1000
const chunkSize = 1000;
for (let i = 0; i < data.stopTimes.length; i += chunkSize) {
const chunk = data.stopTimes.slice(i, i + chunkSize);
const values = chunk.map((_, idx) => {
const offset = idx * 6;
return `($${offset + 1}, $${offset + 2}, $${offset + 3}, $${offset + 4}, $${offset + 5}, $${offset + 6})`;
}).join(',');
const params = chunk.flatMap(st => [
st.trip_id,
st.arrival_time,
st.departure_time,
st.stop_id,
parseInt(st.stop_sequence),
parseInt(st.pickup_type) || null,
]);
await client.query(`
INSERT INTO stop_times (
trip_id, arrival_time, departure_time, stop_id, stop_sequence, pickup_type
)
VALUES ${values}
`, params);
}
}
// Import calendar
if (data.calendar && data.calendar.length > 0) {
logger.info({ count: data.calendar.length }, 'Importing calendar...');
for (const cal of data.calendar) {
await client.query(`
INSERT INTO calendar (
service_id, monday, tuesday, wednesday, thursday,
friday, saturday, sunday, start_date, end_date
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
ON CONFLICT (service_id) DO UPDATE
SET
monday = EXCLUDED.monday,
tuesday = EXCLUDED.tuesday,
wednesday = EXCLUDED.wednesday,
thursday = EXCLUDED.thursday,
friday = EXCLUDED.friday,
saturday = EXCLUDED.saturday,
sunday = EXCLUDED.sunday,
start_date = EXCLUDED.start_date,
end_date = EXCLUDED.end_date
`, [
cal.service_id,
cal.monday === '1',
cal.tuesday === '1',
cal.wednesday === '1',
cal.thursday === '1',
cal.friday === '1',
cal.saturday === '1',
cal.sunday === '1',
cal.start_date,
cal.end_date,
]);
}
}
// Import calendar_dates
if (data.calendarDates && data.calendarDates.length > 0) {
logger.info({ count: data.calendarDates.length }, 'Importing calendar dates...');
await client.query('DELETE FROM calendar_dates');
for (const cd of data.calendarDates) {
await client.query(`
INSERT INTO calendar_dates (service_id, date, exception_type)
VALUES ($1, $2, $3)
`, [cd.service_id, cd.date, parseInt(cd.exception_type)]);
}
}
// Import shapes
if (data.shapes && data.shapes.length > 0) {
logger.info({ count: data.shapes.length }, 'Importing shapes...');
await client.query('DELETE FROM shapes');
const chunkSize = 1000;
for (let i = 0; i < data.shapes.length; i += chunkSize) {
const chunk = data.shapes.slice(i, i + chunkSize);
const values = chunk.map((_, idx) => {
const offset = idx * 5;
return `($${offset + 1}, $${offset + 2}, $${offset + 3}, $${offset + 4}, ST_SetSRID(ST_MakePoint($${offset + 3}, $${offset + 2}), 4326)::geography)`;
}).join(',');
const params = chunk.flatMap(shape => [
shape.shape_id,
parseFloat(shape.shape_pt_lat),
parseFloat(shape.shape_pt_lon),
parseInt(shape.shape_pt_sequence),
parseFloat(shape.shape_dist_traveled) || null,
]);
await client.query(`
INSERT INTO shapes (shape_id, shape_pt_lat, shape_pt_lon, shape_pt_sequence, shape_dist_traveled, geom)
VALUES ${values}
`, params);
}
}
await client.query('COMMIT');
logger.info('GTFS data imported successfully');
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
mapRouteType(gtfsType) {
const typeMap = {
'0': 'TRAM',
'1': 'SUBWAY',
'2': 'RAIL',
'3': 'BUS',
'4': 'FERRY',
'100': 'RAIL',
'101': 'HIGH_SPEED',
'102': 'LONG_DISTANCE',
'103': 'REGIONAL',
'109': 'COMMUTER',
};
return typeMap[gtfsType] || 'UNKNOWN';
}
mapStationType(locationType) {
const typeMap = {
'0': 'STOP',
'1': 'STATION',
'2': 'ENTRANCE',
'3': 'GENERIC_NODE',
'4': 'BOARDING_AREA',
};
return typeMap[locationType] || 'STOP';
}
async invalidateCaches() {
try {
await redis.del('routes:*');
await redis.del('stations:*');
await redis.del('trips:*');
logger.info('Caches invalidated');
} catch (error) {
logger.error({ error: error.message }, 'Error invalidating caches');
}
}
async stop() {
logger.info('Stopping GTFS Static Syncer...');
if (this.syncInterval) {
clearInterval(this.syncInterval);
this.syncInterval = null;
}
await db.disconnect();
await redis.disconnect();
logger.info('GTFS Static Syncer stopped');
}
}
// Main execution
const syncer = new GTFSStaticSyncer();
const shutdown = async (signal) => {
logger.info({ signal }, 'Received shutdown signal');
await syncer.stop();
process.exit(0);
};
process.on('SIGTERM', () => shutdown('SIGTERM'));
process.on('SIGINT', () => shutdown('SIGINT'));
syncer.start().catch((error) => {
logger.fatal({ error }, 'Failed to start GTFS Static Syncer');
process.exit(1);
});
export default GTFSStaticSyncer;

View File

@@ -0,0 +1,474 @@
import fetch from 'node-fetch';
import config from '../config/index.js';
import logger from '../lib/logger.js';
import db from '../lib/db.js';
import redis from '../lib/redis.js';
/**
* Renfe Fleet Data Poller
* Fetches additional train data from Renfe's real-time visualization endpoint
* This provides extra info like delays, line codes, next stations, etc.
*/
class RenfeFleetPoller {
constructor() {
this.isRunning = false;
this.pollInterval = null;
this.stationsInterval = null;
this.linesInterval = null;
this.stats = {
totalPolls: 0,
successfulPolls: 0,
failedPolls: 0,
totalTrains: 0,
lastPollTime: null,
errors: [],
};
// Renfe real-time endpoints
this.FLEET_URL = 'https://tiempo-real.renfe.com/renfe-visor/flota.json';
this.STATIONS_URL = 'https://tiempo-real.renfe.com/data/estaciones.geojson';
this.LINES_URL = 'https://tiempo-real.renfe.com/data/lineasnucleos.geojson';
}
async start() {
logger.info('Starting Renfe Fleet Poller...');
// Connect to databases
await db.connect();
await redis.connect();
this.isRunning = true;
// Initial data load
await this.loadStationsAndLines();
await this.pollFleet();
// Setup polling intervals
// Fleet data every 30 seconds (it updates frequently)
this.pollInterval = setInterval(
() => this.pollFleet(),
30000
);
// Stations and lines every 6 hours (static data)
this.stationsInterval = setInterval(
() => this.loadStationsAndLines(),
6 * 60 * 60 * 1000
);
logger.info('Renfe Fleet Poller started');
}
async loadStationsAndLines() {
logger.info('Loading stations and lines from Renfe...');
try {
// Fetch stations
const stationsResponse = await fetch(this.STATIONS_URL, { timeout: 30000 });
if (stationsResponse.ok) {
const stationsGeoJSON = await stationsResponse.json();
await this.processStations(stationsGeoJSON);
}
} catch (error) {
logger.error({ error: error.message }, 'Error loading stations');
}
try {
// Fetch lines
const linesResponse = await fetch(this.LINES_URL, { timeout: 30000 });
if (linesResponse.ok) {
const linesGeoJSON = await linesResponse.json();
await this.processLines(linesGeoJSON);
}
} catch (error) {
logger.error({ error: error.message }, 'Error loading lines');
}
}
async processStations(geoJSON) {
if (!geoJSON.features || geoJSON.features.length === 0) {
logger.warn('No stations found in GeoJSON');
return;
}
const client = await db.pool.connect();
try {
await client.query('BEGIN');
let inserted = 0;
let updated = 0;
for (const feature of geoJSON.features) {
const props = feature.properties;
const coords = feature.geometry?.coordinates;
if (!props.CODIGO_ESTACION || !coords) continue;
// Map Renfe station to our schema
const stationData = {
station_id: `renfe_${props.CODIGO_ESTACION}`,
station_code: props.CODIGO_ESTACION,
station_name: props.NOMBRE_ESTACION || 'Unknown',
latitude: props.LATITUD || coords[1],
longitude: props.LONGITUD || coords[0],
station_type: this.inferStationType(props),
metadata: {
nucleo: props.NUCLEO,
nucleo_name: props.NOMBRE_NUCLEO,
lineas: props.LINEAS,
color: props.COLOR,
accesibilidad: props.ACCESIBILIDAD,
parking_bicis: props.PARKING_BICIS,
bus_urbano: props.COR_BUS?.includes('Urbano'),
bus_interurbano: props.COR_BUS?.includes('Interurbano'),
metro: props.COR_METRO,
source: 'renfe_visor',
},
};
const result = await client.query(`
INSERT INTO stations (station_id, station_code, station_name, latitude, longitude, station_type, metadata, position)
VALUES ($1, $2, $3, $4, $5, $6, $7, ST_SetSRID(ST_MakePoint($8, $9), 4326))
ON CONFLICT (station_id) DO UPDATE SET
station_name = EXCLUDED.station_name,
latitude = EXCLUDED.latitude,
longitude = EXCLUDED.longitude,
station_type = EXCLUDED.station_type,
metadata = stations.metadata || EXCLUDED.metadata,
position = EXCLUDED.position,
updated_at = NOW()
RETURNING (xmax = 0) as is_insert
`, [
stationData.station_id,
stationData.station_code,
stationData.station_name,
stationData.latitude,
stationData.longitude,
stationData.station_type,
JSON.stringify(stationData.metadata),
stationData.longitude,
stationData.latitude,
]);
if (result.rows[0]?.is_insert) {
inserted++;
} else {
updated++;
}
}
await client.query('COMMIT');
logger.info({ inserted, updated, total: geoJSON.features.length }, 'Processed Renfe stations');
} catch (error) {
await client.query('ROLLBACK');
logger.error({ error: error.message }, 'Error processing stations');
throw error;
} finally {
client.release();
}
}
inferStationType(props) {
// Infer station importance based on available metadata
const lineas = props.LINEAS || '';
const lineCount = lineas.split(',').filter(Boolean).length;
if (lineCount >= 3 || props.COR_METRO) {
return 'MAJOR';
} else if (lineCount >= 2 || props.COR_BUS) {
return 'MEDIUM';
}
return 'MINOR';
}
async processLines(geoJSON) {
if (!geoJSON.features || geoJSON.features.length === 0) {
logger.warn('No lines found in GeoJSON');
return;
}
const client = await db.pool.connect();
try {
await client.query('BEGIN');
// Create lines table if not exists
await client.query(`
CREATE TABLE IF NOT EXISTS train_lines (
line_id VARCHAR(50) PRIMARY KEY,
line_code VARCHAR(20) NOT NULL,
line_name VARCHAR(255),
nucleo_id VARCHAR(20),
nucleo_name VARCHAR(255),
color VARCHAR(20),
geometry GEOMETRY(LineString, 4326),
metadata JSONB DEFAULT '{}',
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
)
`);
let processed = 0;
for (const feature of geoJSON.features) {
const props = feature.properties;
const coords = feature.geometry?.coordinates;
if (!props.CODIGO || !coords) continue;
const lineId = `${props.IDNUCLEO}_${props.CODIGO}`;
// Convert coordinates to WKT LineString
const coordsWKT = coords.map(c => `${c[0]} ${c[1]}`).join(',');
await client.query(`
INSERT INTO train_lines (line_id, line_code, line_name, nucleo_id, nucleo_name, color, geometry, metadata)
VALUES ($1, $2, $3, $4, $5, $6, ST_SetSRID(ST_GeomFromText('LINESTRING(' || $7 || ')'), 4326), $8)
ON CONFLICT (line_id) DO UPDATE SET
line_name = EXCLUDED.line_name,
color = EXCLUDED.color,
geometry = EXCLUDED.geometry,
metadata = EXCLUDED.metadata,
updated_at = NOW()
`, [
lineId,
props.CODIGO,
props.NOMBRE || props.CODIGO,
String(props.IDNUCLEO),
props.NUCLEO,
props.COLOR,
coordsWKT,
JSON.stringify({ idLinea: props.IDLINEA }),
]);
processed++;
}
await client.query('COMMIT');
logger.info({ processed, total: geoJSON.features.length }, 'Processed Renfe lines');
} catch (error) {
await client.query('ROLLBACK');
logger.error({ error: error.message }, 'Error processing lines');
} finally {
client.release();
}
}
async pollFleet() {
if (!this.isRunning) {
return;
}
this.stats.totalPolls++;
const startTime = Date.now();
try {
logger.debug('Polling Renfe fleet data...');
const response = await fetch(this.FLEET_URL, { timeout: 15000 });
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
const data = await response.json();
if (!data.trenes || !Array.isArray(data.trenes)) {
logger.warn('No trains found in fleet data');
return;
}
logger.info({
trains: data.trenes.length,
updateTime: data.fechaActualizacion,
}, 'Received Renfe fleet data');
// Process and store fleet data
await this.processFleetData(data.trenes, data.fechaActualizacion);
this.stats.successfulPolls++;
this.stats.totalTrains = data.trenes.length;
this.stats.lastPollTime = new Date();
} catch (error) {
this.stats.failedPolls++;
this.stats.errors.push({
timestamp: new Date(),
message: error.message,
});
if (this.stats.errors.length > 10) {
this.stats.errors = this.stats.errors.slice(-10);
}
logger.error({
error: error.message,
duration: Date.now() - startTime,
}, 'Error polling Renfe fleet');
}
}
async processFleetData(trains, updateTime) {
// Store fleet data in Redis for quick access
// This enriches the GTFS-RT data with additional info
const promises = [];
for (const train of trains) {
// Extract train number from tripId or codTren
const trainId = train.codTren;
const fleetData = {
tripId: train.tripId,
codTren: train.codTren,
codLinea: train.codLinea,
retrasoMin: parseInt(train.retrasoMin, 10) || 0,
codEstAct: train.codEstAct,
codEstSig: train.codEstSig,
horaLlegadaSigEst: train.horaLlegadaSigEst,
codEstDest: train.codEstDest,
codEstOrig: train.codEstOrig,
porAvanc: train.porAvanc, // E = en estación, C = circulando
latitud: train.latitud,
longitud: train.longitud,
nucleo: train.nucleo,
accesible: train.accesible,
via: train.via,
updatedAt: updateTime,
};
// Store in Redis with 5 minute expiration
promises.push(
redis.set(
`fleet:${trainId}`,
JSON.stringify(fleetData),
{ EX: 300 }
)
);
// Also index by tripId for cross-referencing
if (train.tripId) {
promises.push(
redis.set(
`fleet:trip:${train.tripId}`,
trainId,
{ EX: 300 }
)
);
}
}
// Store the full fleet list for API access
promises.push(
redis.set(
'fleet:all',
JSON.stringify(trains),
{ EX: 60 }
)
);
promises.push(redis.set('fleet:lastUpdate', updateTime));
await Promise.all(promises);
// Save punctuality data to database for historical analysis
await this.savePunctualityData(trains, updateTime);
logger.debug({ count: trains.length }, 'Fleet data stored in Redis');
}
async savePunctualityData(trains, updateTime) {
if (!trains || trains.length === 0) return;
const client = await db.pool.connect();
try {
// Use batch insert for efficiency
const values = [];
const params = [];
let paramIndex = 1;
for (const train of trains) {
const delayMinutes = parseInt(train.retrasoMin, 10) || 0;
values.push(`($${paramIndex}, $${paramIndex + 1}, $${paramIndex + 2}, $${paramIndex + 3}, $${paramIndex + 4}, $${paramIndex + 5}, $${paramIndex + 6}, $${paramIndex + 7}, $${paramIndex + 8}, $${paramIndex + 9}, $${paramIndex + 10}, $${paramIndex + 11})`);
params.push(
train.codTren, // train_id
train.tripId || null, // trip_id
train.codLinea || null, // line_code
train.nucleo || null, // nucleo
train.codEstOrig || null, // origin_station_code
train.codEstDest || null, // destination_station_code
train.codEstAct || null, // current_station_code
train.codEstSig || null, // next_station_code
delayMinutes, // delay_minutes
train.accesible || false, // is_accessible
train.via || null, // platform
updateTime || new Date() // renfe_update_time
);
paramIndex += 12;
}
const query = `
INSERT INTO train_punctuality (
train_id, trip_id, line_code, nucleo,
origin_station_code, destination_station_code,
current_station_code, next_station_code,
delay_minutes, is_accessible, platform, renfe_update_time
) VALUES ${values.join(', ')}
`;
await client.query(query, params);
logger.debug({ count: trains.length }, 'Punctuality data saved to database');
} catch (error) {
logger.error({ error: error.message }, 'Error saving punctuality data');
} finally {
client.release();
}
}
async stop() {
logger.info('Stopping Renfe Fleet Poller...');
this.isRunning = false;
if (this.pollInterval) {
clearInterval(this.pollInterval);
this.pollInterval = null;
}
if (this.stationsInterval) {
clearInterval(this.stationsInterval);
this.stationsInterval = null;
}
await db.disconnect();
await redis.disconnect();
logger.info('Renfe Fleet Poller stopped');
}
}
// Main execution
const poller = new RenfeFleetPoller();
// Graceful shutdown
const shutdown = async (signal) => {
logger.info({ signal }, 'Received shutdown signal');
await poller.stop();
process.exit(0);
};
process.on('SIGTERM', () => shutdown('SIGTERM'));
process.on('SIGINT', () => shutdown('SIGINT'));
// Start poller
poller.start().catch((error) => {
logger.fatal({ error }, 'Failed to start Renfe Fleet Poller');
process.exit(1);
});
export default RenfeFleetPoller;

View File

@@ -0,0 +1,292 @@
import GtfsRealtimeBindings from 'gtfs-realtime-bindings';
import fetch from 'node-fetch';
import config from '../config/index.js';
import logger from '../lib/logger.js';
import db from '../lib/db.js';
import redis from '../lib/redis.js';
class TripUpdatesPoller {
constructor() {
this.isRunning = false;
this.pollInterval = null;
this.stats = {
totalPolls: 0,
successfulPolls: 0,
failedPolls: 0,
totalUpdates: 0,
lastPollTime: null,
};
}
async start() {
logger.info('Starting Trip Updates Poller...');
await db.connect();
await redis.connect();
this.isRunning = true;
// Initial poll
await this.poll();
// Setup polling interval
this.pollInterval = setInterval(
() => this.poll(),
config.gtfsRT.pollingInterval
);
logger.info({
interval: config.gtfsRT.pollingInterval,
url: config.gtfsRT.tripUpdatesUrl,
}, 'Trip Updates Poller started');
}
async poll() {
if (!this.isRunning) return;
this.stats.totalPolls++;
const startTime = Date.now();
try {
logger.debug('Polling Trip Updates feed...');
const response = await fetch(config.gtfsRT.tripUpdatesUrl, {
timeout: 10000,
});
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
const buffer = await response.arrayBuffer();
const feed = GtfsRealtimeBindings.transit_realtime.FeedMessage.decode(
new Uint8Array(buffer)
);
logger.debug({
entities: feed.entity?.length || 0,
}, 'Trip Updates feed decoded');
const updates = [];
for (const entity of feed.entity || []) {
if (entity.tripUpdate) {
const update = this.parseTripUpdate(entity);
if (update) {
updates.push(update);
}
}
}
logger.info({
updates: updates.length,
duration: Date.now() - startTime,
}, 'Processed trip updates');
if (updates.length > 0) {
await this.storeUpdates(updates);
await this.updateRedisCache(updates);
}
this.stats.successfulPolls++;
this.stats.totalUpdates = updates.length;
this.stats.lastPollTime = new Date();
} catch (error) {
this.stats.failedPolls++;
logger.error({
error: error.message,
stack: error.stack,
}, 'Error polling Trip Updates feed');
}
}
parseTripUpdate(entity) {
try {
const tu = entity.tripUpdate;
const trip = tu.trip;
const timestamp = tu.timestamp
? new Date(tu.timestamp * 1000)
: new Date();
if (!trip?.tripId) {
logger.warn({ entity: entity.id }, 'Trip update missing trip_id');
return null;
}
const update = {
trip_id: trip.tripId,
route_id: trip.routeId || null,
start_time: trip.startTime || null,
start_date: trip.startDate || null,
schedule_relationship: this.mapScheduleRelationship(trip.scheduleRelationship),
delay_seconds: tu.delay || null,
timestamp: timestamp,
recorded_at: new Date(),
stop_time_updates: [],
};
// Parse stop time updates
for (const stu of tu.stopTimeUpdate || []) {
const stopUpdate = {
stop_sequence: stu.stopSequence,
stop_id: stu.stopId,
arrival_delay: stu.arrival?.delay || null,
arrival_time: stu.arrival?.time
? new Date(stu.arrival.time * 1000)
: null,
departure_delay: stu.departure?.delay || null,
departure_time: stu.departure?.time
? new Date(stu.departure.time * 1000)
: null,
schedule_relationship: this.mapScheduleRelationship(
stu.scheduleRelationship
),
};
update.stop_time_updates.push(stopUpdate);
}
return update;
} catch (error) {
logger.error({
error: error.message,
entity: entity.id,
}, 'Error parsing trip update');
return null;
}
}
mapScheduleRelationship(relationship) {
const map = {
0: 'SCHEDULED',
1: 'ADDED',
2: 'UNSCHEDULED',
3: 'CANCELED',
};
return map[relationship] || 'SCHEDULED';
}
async storeUpdates(updates) {
const client = await db.pool.connect();
try {
await client.query('BEGIN');
for (const update of updates) {
// Insert trip update
const result = await client.query(`
INSERT INTO trip_updates (
trip_id, route_id, start_time, start_date,
schedule_relationship, delay_seconds, timestamp, recorded_at
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
RETURNING id
`, [
update.trip_id,
update.route_id,
update.start_time,
update.start_date,
update.schedule_relationship,
update.delay_seconds,
update.timestamp,
update.recorded_at,
]);
const tripUpdateId = result.rows[0].id;
// Insert stop time updates
for (const stu of update.stop_time_updates) {
await client.query(`
INSERT INTO stop_time_updates (
trip_update_id, stop_sequence, stop_id,
arrival_delay, arrival_time, departure_delay,
departure_time, schedule_relationship
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
`, [
tripUpdateId,
stu.stop_sequence,
stu.stop_id,
stu.arrival_delay,
stu.arrival_time,
stu.departure_delay,
stu.departure_time,
stu.schedule_relationship,
]);
}
}
await client.query('COMMIT');
logger.debug({ count: updates.length }, 'Trip updates stored');
} catch (error) {
await client.query('ROLLBACK');
logger.error({ error: error.message }, 'Error storing trip updates');
throw error;
} finally {
client.release();
}
}
async updateRedisCache(updates) {
try {
// Store delays in Redis for quick access
for (const update of updates) {
const key = `trip_update:${update.trip_id}`;
await redis.set(key, JSON.stringify(update), { EX: 300 }); // 5 min TTL
// If delayed or canceled, add to delayed set
if (update.delay_seconds > 0 || update.schedule_relationship === 'CANCELED') {
await redis.sAdd('trips:delayed', update.trip_id);
}
// If canceled, add to canceled set
if (update.schedule_relationship === 'CANCELED') {
await redis.sAdd('trips:canceled', update.trip_id);
}
}
logger.debug({ count: updates.length }, 'Redis cache updated');
} catch (error) {
logger.error({ error: error.message }, 'Error updating Redis cache');
}
}
async stop() {
logger.info('Stopping Trip Updates Poller...');
this.isRunning = false;
if (this.pollInterval) {
clearInterval(this.pollInterval);
this.pollInterval = null;
}
await db.disconnect();
await redis.disconnect();
logger.info('Trip Updates Poller stopped');
}
}
// Main execution
const poller = new TripUpdatesPoller();
const shutdown = async (signal) => {
logger.info({ signal }, 'Received shutdown signal');
await poller.stop();
process.exit(0);
};
process.on('SIGTERM', () => shutdown('SIGTERM'));
process.on('SIGINT', () => shutdown('SIGINT'));
poller.start().catch((error) => {
logger.fatal({ error }, 'Failed to start Trip Updates Poller');
process.exit(1);
});
export default TripUpdatesPoller;