Skip to content

meteo-spark is an open source project that aims to simplify the Climate Data Analysis using PySpark, which allow the processing of very big files saved in the cloud (S3, GCS, ...) on a large pyspark cluster managed by YARN or Kubernetes.

License

Notifications You must be signed in to change notification settings

hussein-awala/MeteoSpark

Repository files navigation

meteo-spark

meteo-spark is an open source project that aims to simplify the Climate Data Analysis using PySpark, which allow the processing of very big files saved in the cloud (S3, GCS, ...) on a large pyspark cluster managed by YARN or Kubernetes.

Installation

Install and update using pip:

pip install meteo-spark

A Simple Example

from pyspark import SparkContext, SparkConf
from meteo_spark import load_dataset

# create a new spark context
conf = SparkConf().setAppName("Meteo Spark app")
sc = SparkContext(conf=conf)

# use the method load_dataset to read the netcdf files
# and load them as a RDD partitioned by longitude and latitude with 10 slices
meteo_data = load_dataset(
    sc,
    paths="data/*.nc",
    num_partitions=10,
    partition_on=["longitude", "latitude"]
)
# calculate the max temperature for each point for the whole period
max_meteo_data = meteo_data.map(lambda x: x["t2m"].max())
# take the first element
max_meteo_data.take(1)

Process S3 data

In this example we use a local S3 server (minio), so we need to specify the S3 endpoint url:

import os
# create env vars for AWS access and secret key
# we can also provide the access and the secret keys as arguments or store them in ~/.aws/credentials,
# for more info, you can read https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html
os.environ["AWS_ACCESS_KEY_ID"] = "minioadmin"
os.environ["AWS_SECRET_ACCESS_KEY"] = "minioadmin"

from pyspark import SparkContext, SparkConf
from meteo_spark import load_dataset
import xarray as xr
conf = SparkConf().setAppName("Process S3 files").set("spark.executor.instances", "3")
sc = SparkContext(conf=conf)

# load the data file from minio bucket and slice it by latitude and longitude (RDD of 10 partitions)
dataset = load_dataset(
    sc,
    paths=["s3://climate-bucket/one_day.nc"],
    num_partitions=10,
    partition_on=["latitude", "longitude"],
    s3_endpoint_url="http://minio:9000"
)

# calculate the mean values over time
daily_mean = dataset.map(lambda x: x.mean(["time"]))

# reduce the data and create a new xarray dataset
result_dataset = daily_mean.reduce(lambda x, y: xr.combine_by_coords([x, y]))

About

meteo-spark is an open source project that aims to simplify the Climate Data Analysis using PySpark, which allow the processing of very big files saved in the cloud (S3, GCS, ...) on a large pyspark cluster managed by YARN or Kubernetes.

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published