Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ The system interfaces with various official and unofficial sources to provide a
- **Details:** Dynamically retrieves forecast maps for the following day via the Pretemp archive.
- **Notification:** If conditions require it (presence of ongoing alerts), it sends the thunderstorm forecast image to the Telegram channel.

4. **River levels (Allerta Meteo hydrometric stations)**
- **Feature:** Monitoring of water levels at configured hydrometric stations.
- **Details:** Periodically polls the Allerta Meteo time-series endpoint for each registered station and compares the latest reading against operator-defined thresholds (`soglia1`, `soglia2`, `soglia3`). Station metadata and thresholds live in the `rivers` table; each check is appended to `river_levels`.
- **Notification:** Sends a Telegram message only on crossing events — when a reading rises above or falls below a threshold relative to the previous check.

## Scheduled Tasks (Crons)

The application relies on scheduled tasks (crons) to automate the weather monitoring flow:
Expand All @@ -31,6 +36,28 @@ The application relies on scheduled tasks (crons) to automate the weather monito
- Verifies and sends the Pretemp map for the following day, provided there is an ongoing alert and the map hasn't been sent yet.
- **Estofex Report Check**
- Verifies and sends the Estofex map for the following day, following the same conditional logic based on ongoing alerts.
- **River Levels Check**
- Every 5 minutes, for each row in the `rivers` table, fetches the latest hydrometric reading and appends it to `river_levels`. Sends a Telegram message only when the reading crosses one of the configured thresholds since the previous check.

## Database bootstrap

Schema is applied manually (no migration tooling). The required tables are in [sql/rivers.sql](sql/rivers.sql):

```bash
psql "$DATABASE_URL" -f sql/rivers.sql
```

## River stations CRUD

Stations are managed via HTTP (port 3000):

- `GET /rivers` — list registered stations
- `POST /rivers` — create; body `{ station_id, river_name, station_name, soglia1?, soglia2?, soglia3? }`
- `PATCH /rivers/:id` — update any of `river_name`, `station_name`, `soglia1`, `soglia2`, `soglia3`
- `DELETE /rivers/:id` — delete (cascades to `river_levels`)
- `POST /river-levels` — trigger an on-demand check (returns `{ checked, crossings, skipped }`)

The `station_id` is the Allerta Meteo `idstazione`; threshold values must be looked up manually from the [Allerta Meteo portal](https://allertameteo.regione.emilia-romagna.it/) and stored alongside the station.

## Configuration and Installation

Expand Down
25 changes: 25 additions & 0 deletions sql/rivers.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
CREATE TABLE IF NOT EXISTS rivers (
id SERIAL PRIMARY KEY,
station_id TEXT NOT NULL UNIQUE,
river_name TEXT NOT NULL,
station_name TEXT NOT NULL,
soglia1 NUMERIC,
soglia2 NUMERIC,
soglia3 NUMERIC,
created_on TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_on TIMESTAMPTZ NOT NULL DEFAULT now()
);

CREATE TABLE IF NOT EXISTS river_levels (
id SERIAL PRIMARY KEY,
river_id INTEGER NOT NULL REFERENCES rivers(id) ON DELETE CASCADE,
value NUMERIC NOT NULL,
measured_at TIMESTAMPTZ NOT NULL,
soglia1_above BOOLEAN,
soglia2_above BOOLEAN,
soglia3_above BOOLEAN,
created_on TIMESTAMPTZ NOT NULL DEFAULT now()
);

CREATE INDEX IF NOT EXISTS river_levels_river_id_created_on_idx
ON river_levels (river_id, created_on DESC);
28 changes: 28 additions & 0 deletions src/models/river-level.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { database } from '..'

const tableName = 'river_levels'

export interface RiverLevel {
id: number
river_id: number
value: number
measured_at: string
soglia1_above: boolean | null
soglia2_above: boolean | null
soglia3_above: boolean | null
created_on: string
}

export type CreatableRiverLevel = Omit<RiverLevel, 'id' | 'created_on'>

export const getLatestRiverLevel = async (riverId: number): Promise<RiverLevel | undefined> => {
const query = `SELECT * FROM ${tableName} WHERE river_id = $1 ORDER BY created_on DESC, id DESC LIMIT 1`

const rows = await database.query<RiverLevel>(query, [riverId])

return rows[0]
}

export const createRiverLevel = async (level: CreatableRiverLevel): Promise<RiverLevel> => {
return database.create<RiverLevel>(tableName, { ...level, created_on: new Date().toISOString() })
}
54 changes: 54 additions & 0 deletions src/models/river.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import { database } from '..'

const tableName = 'rivers'

export interface River {
id: number
station_id: string
river_name: string
station_name: string
soglia1: number | null
soglia2: number | null
soglia3: number | null
created_on: string
updated_on: string
}

export type CreatableRiver = Omit<River, 'id' | 'created_on' | 'updated_on'>
export type RiverPatch = Partial<Omit<CreatableRiver, 'station_id'>>

export const getRivers = async (): Promise<River[]> => {
const query = `SELECT * FROM ${tableName} ORDER BY id ASC`

return database.query<River>(query)
}

export const getRiverById = async (id: number): Promise<River | undefined> => {
const query = `SELECT * FROM ${tableName} WHERE id = $1`

const rows = await database.query<River>(query, [id])

return rows[0]
}

export const getRiverByStationId = async (stationId: string): Promise<River | undefined> => {
const query = `SELECT * FROM ${tableName} WHERE station_id = $1`

const rows = await database.query<River>(query, [stationId])

return rows[0]
}

export const createRiver = async (river: CreatableRiver): Promise<River> => {
const now = new Date().toISOString()

return database.create<River>(tableName, { ...river, created_on: now, updated_on: now })
}

export const updateRiver = async (id: number, patch: RiverPatch): Promise<River> => {
return database.edit<River>(tableName, { ...patch, updated_on: new Date().toISOString() }, id)
}

export const deleteRiver = async (id: number): Promise<void> => {
await database.delete(tableName, id)
}
148 changes: 148 additions & 0 deletions src/routes/rivers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
import { FastifyInstance, FastifyReply, FastifyRequest } from 'fastify'
import {
createRiver,
deleteRiver,
getRiverById,
getRiverByStationId,
getRivers,
updateRiver,
} from '../models/river'
import { runRiverLevelCheck } from '../tasks/river-levels'

const nullableNumber = { type: ['number', 'null'] }

const createBodySchema = {
type: 'object',
required: ['station_id', 'river_name', 'station_name'],
additionalProperties: false,
properties: {
station_id: { type: 'string', minLength: 1, maxLength: 64 },
river_name: { type: 'string', minLength: 1, maxLength: 128 },
station_name: { type: 'string', minLength: 1, maxLength: 128 },
soglia1: nullableNumber,
soglia2: nullableNumber,
soglia3: nullableNumber,
},
}

const patchBodySchema = {
type: 'object',
additionalProperties: false,
minProperties: 1,
properties: {
river_name: { type: 'string', minLength: 1, maxLength: 128 },
station_name: { type: 'string', minLength: 1, maxLength: 128 },
soglia1: nullableNumber,
soglia2: nullableNumber,
soglia3: nullableNumber,
},
}

const idParamSchema = {
type: 'object',
required: ['id'],
additionalProperties: false,
properties: {
id: { type: 'integer', minimum: 1 },
},
}

interface IdParams {
id: number
}

interface CreateRiverBody {
station_id: string
river_name: string
station_name: string
soglia1?: number | null
soglia2?: number | null
soglia3?: number | null
}

interface PatchRiverBody {
river_name?: string
station_name?: string
soglia1?: number | null
soglia2?: number | null
soglia3?: number | null
}

export const registerRiversRoutes = (fastify: FastifyInstance) => {
fastify.route({
method: 'GET',
url: '/rivers',
handler: async (_request: FastifyRequest, reply: FastifyReply) => {
const rivers = await getRivers()
reply.status(200).send(rivers)
},
})

fastify.route<{ Body: CreateRiverBody }>({
method: 'POST',
url: '/rivers',
schema: { body: createBodySchema },
handler: async (request, reply) => {
const existing = await getRiverByStationId(request.body.station_id)

if (existing) {
return reply.status(409).send({ error: 'Station already registered' })
}

const created = await createRiver({
station_id: request.body.station_id,
river_name: request.body.river_name,
station_name: request.body.station_name,
soglia1: request.body.soglia1 ?? null,
soglia2: request.body.soglia2 ?? null,
soglia3: request.body.soglia3 ?? null,
})

reply.status(201).send(created)
},
})

fastify.route<{ Params: IdParams; Body: PatchRiverBody }>({
method: 'PATCH',
url: '/rivers/:id',
schema: { params: idParamSchema, body: patchBodySchema },
handler: async (request, reply) => {
const river = await getRiverById(request.params.id)

if (!river) {
return reply.status(404).send({ error: 'River not found' })
}

const updated = await updateRiver(request.params.id, request.body)
reply.status(200).send(updated)
},
})

fastify.route<{ Params: IdParams }>({
method: 'DELETE',
url: '/rivers/:id',
schema: { params: idParamSchema },
handler: async (request, reply) => {
const river = await getRiverById(request.params.id)

if (!river) {
return reply.status(404).send({ error: 'River not found' })
}

await deleteRiver(request.params.id)
reply.status(204).send(undefined)
},
})

fastify.route({
method: 'POST',
url: '/river-levels',
schema: {
body: { type: 'object', additionalProperties: false, maxProperties: 0 },
},
handler: async (_request: FastifyRequest, reply: FastifyReply) => {
const summary = await runRiverLevelCheck()
reply.status(200).send(summary)
},
})
}
2 changes: 2 additions & 0 deletions src/scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Cron } from 'croner'
import { runMeteoAlertCheck } from './tasks/meteo-alerts'
import { runPretempCheck } from './tasks/pretemp'
import { runEstofexCheck } from './tasks/estofex'
import { runRiverLevelCheck } from './tasks/river-levels'
import logger from './logger'

const jobs: Cron[] = []
Expand Down Expand Up @@ -35,6 +36,7 @@ export const startScheduler = () => {
schedule('meteo-alerts', '*/5 * * * *', runMeteoAlertCheck)
schedule('pretemp', '*/5 * * * *', runPretempCheck)
schedule('estofex', '*/5 * * * *', runEstofexCheck)
schedule('river-levels', '*/5 * * * *', runRiverLevelCheck)

logger.info({ count: jobs.length }, 'Scheduler started')
}
Expand Down
2 changes: 2 additions & 0 deletions src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import i18next from 'i18next'
import italian from './resources/locales/it.json'
import { registerTestMessageRoutes } from './routes/test-message'
import { registerForecastReportsRoutes } from './routes/forecast-reports'
import { registerRiversRoutes } from './routes/rivers'
import logger from './logger'

const translations = {
Expand Down Expand Up @@ -45,6 +46,7 @@ export const startServer = async (): Promise<FastifyInstance> => {
registerTestMessageRoutes(app)
registerMeteoAlertsRoutes(app)
registerForecastReportsRoutes(app)
registerRiversRoutes(app)

await fastify.listen({
host: '127.0.0.1',
Expand Down
40 changes: 40 additions & 0 deletions src/services/river-sensors.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { http } from './http'
import logger from '../logger'

export interface RiverSensorTimePoint {
t: number
v: number
}

const VARIABILE = '254,0,0/1,-,-,-/B13215'
const TIME_SERIES_BASE_URL = 'https://allertameteo.regione.emilia-romagna.it/o/api/allerta/get-time-series/'

export const getRiverSensorTimeSeries = async (stationId: string): Promise<RiverSensorTimePoint[]> => {
const url = `${TIME_SERIES_BASE_URL}?stazione=${encodeURIComponent(stationId)}&variabile=${VARIABILE}`

try {
const response = await http.get<RiverSensorTimePoint[]>(url).then((res) => res.data)

if (!Array.isArray(response)) {
return []
}

return response.filter(
(point): point is RiverSensorTimePoint =>
point != null && typeof point.t === 'number' && typeof point.v === 'number'
)
} catch (error) {
logger.error({ err: error, stationId }, 'Failed to retrieve river sensor time series')
throw error
}
}

export const getLatestRiverSensorValue = async (stationId: string): Promise<RiverSensorTimePoint | undefined> => {
const series = await getRiverSensorTimeSeries(stationId)

if (series.length === 0) {
return undefined
}

return series.reduce((latest, point) => (point.t > latest.t ? point : latest), series[0])
}
Loading
Loading