streettraffic package

streettraffic.database

class streettraffic.database.TrafficData(database_name: str = 'Traffic', database_ip: str = None) → None[source]

Bases: object

This class deals with traffic data interaction with database

Interaction include inserting traffic flow data from HERE.com and retriving them.

conn

connection – The connection to RethinkDB database

latest_crawled_batch_id

str – the latest crawled_batch_id of crawled_batch table

fetch_geojson_item(road_data_id: str, crawled_batch_id: str = None, calculate_traffic_color=True) → typing.Dict[source]

This function uses a road_data_id(primary key) to fethe a geojson object from the road_data database. If calculate_traffic_color == True, then we also use traffic_flow_color_scheme() to calcuroad_data_idlate a color for the road

For more infomation about geojson, check out http://geojson.org/

Parameters:
  • road_data_id (str) – the primary key of table road_data
  • crawled_batch_id (str) – the primary key of table crawled_batch, if crawled_batch_id is None, the default setting, then it will be initlized to be the same as self.latest_crawled_batch_id
  • calculate_traffic_color (bool) – whether you want to have a color attribute attached to your geojson object. The color is calculated by self.traffic_flow_color_scheme()
Returns:

a geojson object, see the example below.

Return type:

Dict

Examples

>>> traffic_server = TrafficServer(database_name="Traffic", database_ip="localhost")
>>> traffic_server.traffic_data.fetch_geojson_item('["34.85075,-82.39899 34.85154,-82.39863 "]',
        '44b57efd-8b63-4c49-8c86-9e92f79f528a', True)
{'geometry': {'coordinates': [[-82.39899, 34.85075], [-82.39863, 34.85154]],
 'type': 'LineString'},
'properties': {'CF': {'CN': 0.7,
  'FF': 10.56,
  'JF': 0,
  'LN': [],
  'SP': 8.7,
  'SU': 8.7,
  'TY': 'TR',
  'crawled_batch_id': '44b57efd-8b63-4c49-8c86-9e92f79f528a',
  'created_timestamp': '2017-07-14T05:06:39+00:00',
  'flow_data_id': '337cb755-8eb9-4808-94c9-29e422bc3e32',
  'flow_item_id': '{"DE": "SC-183/Beattie Pl/College St", "LE": 0.64899, "PC": 11029, "QD": "-"}',
  'original_data_id': '1a84e269-2070-4697-b75b-1349edcb7bbb'},
 'TMC': {'DE': 'SC-183/Beattie Pl/College St',
  'LE': 0.64899,
  'PC': 11029,
  'QD': '-'},
 'color': 'green'},
'type': 'Feature'}
static format_list_points_for_display(list_points: typing.List) → str[source]

This function format the example inputs into a string that can be used to plot in https://www.darrinward.com/lat-long/

Parameters:list_points (List) – a list of latlon coordiantes
Returns:print the string and plot it in https://www.darrinward.com/lat-long/
Return type:str

Examples

>>> traffic_server = TrafficServer(database_name="Traffic", database_ip="localhost")
>>> polygon = [[33.75931214236421, -84.41242218017578],
    [33.77030054809328, -84.37989234924316],
    [33.75731409904876, -84.36701774597168],
    [33.745110752457876, -84.37491416931152],
    [33.74368334701714, -84.40461158752441],
    [33.75931214236421, -84.41242218017578]]
>>> output = traffic_server.traffic_data.spatial_sampling_points_polygon(polygon)
>>> print(traffic_server.traffic_data.format_list_points_for_display(output))
use https://www.darrinward.com/lat-long/ for plotting
33.75699194755521,-84.40107107162476
33.75699194755521,-84.38971996307373
33.75699194755521,-84.3783688545227
static generate_geojson_collection(geojson_list: typing.List) → typing.Dict[source]

This function takes a list of geojson objects and assemble them in the following way The following format allows for multiple geojson object to be stored in the same geojson object. For more details, refer to https://macwright.org/2015/03/23/geojson-second-bite.html

Parameters:geojson_list (List) – a list of geojson objects generated by self.fetch_geojson_item()
Returns:a valid geojson object that contains all the geojson object in geojson_list
Return type:Dict

Examples

>>> traffic_server = TrafficServer(database_name="Traffic", database_ip="localhost")
>>> input_list
[{
  "type": "Feature",
  "geometry": {
    "type": "Point",
    "coordinates": [125.6, 10.1]
  },
  "properties": {
    "name": "Dinagat Islands"
  }
},
{
  "type": "Feature",
  "geometry": {
    "type": "Point",
    "coordinates": [15,15]
  },
  "properties": {
    "name": "Test Islands"
  }
}]  # notice the ... just mean more of those geojson object
>>> traffic_server.traffic_data.generate_geojson_collection(input_list)
{
  "type": "FeatureCollection",
  "features": [
    {
      "type": "Feature",
      "geometry": {
        "type": "Point",
        "coordinates": [125.6, 10.1]
      },
      "properties": {
        "name": "Dinagat island"
      }
    },
    {
      "type": "Feature",
      "geometry": {
        "type": "Point",
        "coordinates": [15,15]
      },
      "properties": {
        "name": "Test Islands"
      }
    }
  ]
}
get_analytics_monitored_area_description_collection() → typing.List[source]

This function returns a list of description (the secondary key of the analytics_monitored_area table) of each analytics_monitored_area documents

Returns:List
get_analytics_traffic_pattern(analytics_monitored_area_id: str, crawled_batch_id: str = None) → typing.Dict[source]

Given a analytics_monitored_area_id and crawled_batch_id, fetch the related analytics_traffic_pattern documents

Parameters:
  • analytics_monitored_area_id (str) – the primary key of table analytics_monitored_area
  • crawled_batch_id (str) – the primary key of table crawled_batch
Returns:

the corresponding analytics_traffic_pattern documents

Return type:

Dict

get_analytics_traffic_pattern_between(date_start: str, date_end: str, analytics_monitored_area_description: str, analytics_monitored_area_id: str = None)[source]

Fetch all possible analytics_traffic_pattern documents between date_start and date_end for a analytics_monitored_area documents

Parameters:
  • date_start (str) – an ISO 8601 format string indicating the starting date
  • date_end (str) – an ISO 8601 format string indicating the ending date
  • analytics_monitored_area_description (str) – the secondary key of table analytics_monitored_area
  • analytics_monitored_area_id (str) – the primary key of table analytics_monitored_area. The default is set to None if specified, the program will fetch analytics_monitored_area documents based on analytics_monitored_area_id rather than analytics_monitored_area_description
Returns:

a list of all analytics_traffic_pattern documents between date_start and date_end for a analytics_monitored_area documents

Return type:

List

get_crawled_batch_id_between(date_start: str, date_end: str) → rethinkdb.net.DefaultCursor[source]

This function will get all the crawled_batch_id between date_start and date_end

Parameters:
  • date_start (str) – an ISO 8601 format string indicating the starting date
  • date_end (str) – an ISO 8601 format string indicating the ending date
Returns:

a Cursor object of all crawled_batch_id in crawled_batch table that are between date_start and date_end. You can use DefaultCursor.next() or

for item in DefaultCursor:
    print(item)  # do operation with item

to interact with the resulting data

Return type:

r.net.DefaultCursor

Examples

>>> traffic_server = TrafficServer(database_name="Traffic", database_ip="localhost")
>>> date_start = "2017-06-28T14:20:00.000Z"
>>> date_end = "2017-06-28T23:20:00.000Z"
>>> collection = traffic_server.traffic_data.get_crawled_batch_id_between(date_start, date_end)
>>> collection.next()
'ec65b3e5-7abb-484c-90c0-d943822b4402'
>>> for item in collection:
>>>     print(item)
5e5f9e07-d510-49ea-b19d-25e315e48c59
7f43dcd9-462a-471f-b752-d031e7473d69
45ffc64d-4130-4072-9b52-9cbce2329c62
b26f0022-f003-4def-95e5-e6be291d5979
get_historic_batch() → typing.List[source]

This function generate the newest 100 crawled_batch list

Returns:a list of object like
{
    'crawled_timestamp': '2017-06-19T19:29:37.845000+00:00',
    'id': 'a6f344c6-9941-41b4-aaf7-83e6ecab5ec2'
}
Return type:List

Examples

>>> traffic_server = TrafficServer(database_name="Traffic", database_ip="localhost")
>>> traffic_server.traffic_data.get_historic_batch()
[{'crawled_timestamp': '2017-06-19T19:29:37.845000+00:00',
 'id': 'a6f344c6-9941-41b4-aaf7-83e6ecab5ec2'},
{'crawled_timestamp': '2017-06-19T17:36:56.453000+00:00',
 'id': '3872cf48-e40d-4826-86aa-bc8ee1b36631'},
{'crawled_timestamp': '2017-06-19T17:20:00.645000+00:00',
 'id': 'fbbc23d5-5bcb-4c99-af12-6b66022effcd'}]
get_historic_traffic(routing_info: typing.Dict, use_overview: bool = True, crawled_batch_id: str = None) → typing.Dict[source]

This function generate a geojson object that includes all the traffic flow data with respect to your selected routing_info and crawled_batch_id

Parameters:
  • routing_info (Dict) – the routing JSON object from Google Javascript Direction API
  • use_overview (bool) – whether to use route['overview_polyline'] to build route_cached_doc. If set to be True, the execution will be generally faster, but you will have less coordiantes in your route_cached_doc.
  • crawled_batch_id (str) – the primary key of table crawled_batch, if crawled_batch_id is None, the default setting, then it will be initlized to be the same as self.latest_crawled_batch_id
Returns:

a geojson object that uses FeatureCollection to contain multiple geojson object related to the routing

Return type:

Dict

Todo

  • maybe use async to speed up performance
get_historic_traffic_between(routing_info: typing.Dict, date_start: str, date_end: str, use_overview: bool = True) → typing.List[source]

Given a routing_info, this function will get a historic_traffic for each crawled_batch_id between date_start and date_end.

Parameters:
  • routing_info (Dict) – the routing JSON object from Google Javascript Direction API
  • date_start (str) – an ISO 8601 format string indicating the starting date
  • date_end (str) – an ISO 8601 format string indicating the ending date
  • use_overview (bool) – whether to use route['overview_polyline'] to build route_cached_doc. If set to be True, the execution will be generally faster, but you will have less coordiantes in your route_cached_doc.
Returns:

a list of object that looks like this

{
    "crawled_batch_id": 5e5f9e07-d510-49ea-b19d-25e315e48c59
    "crawled_batch_id_traffic": object # a geojson object with respect to routing_info
    "crawled_timestamp": datetime.datetime(2017, 6, 28, 14, 20, tzinfo=tzutc())
}

Return type:

List

get_historic_traffic_multiple_days(routing_info: typing.Dict, date_start_end_list: typing.List)[source]
get_nearest_road(location_data: tuple, max_dist: int, max_results: int = 1, location_type: str = 'latlon') → typing.Dict[source]

This function finds the nearest document of road_data table with respect to your queried location

Parameters:
  • location_data (tuple) – for now we only support (latitude: float, longitude: float)
  • max_dist (int) – maximal distance query range for your input point
  • max_results (int) – the maximum result of your query. Default is set to 1. Notice Currently this doesn’t mean anything since we always return the only nearest, in the future, we might support returning more couple nearest roads.
  • location_type (str) – the location_data type. Default is set to "latlon"
Returns:

a color

Return type:

str

Examples

>>> traffic_server = TrafficServer(database_name="Traffic", database_ip="localhost")
>>> location_data = (33.76895, -84.37777)
>>> traffic_server.traffic_data.get_nearest_road(location_data, 1000)
{'dist': 0.638734341680409,
'doc': {'FC': 4,
 'flow_item_id': '{"DE": "US-278/US-29/Ponce De Leon Ave NE", "LE": 0.58406, "PC": 12873, "QD": "-"}',
 'geometry': {'$reql_type$': 'GEOMETRY',
  'coordinates': [[-84.37777, 33.76865],
   [-84.37775, 33.76952],
   [-84.37774, 33.76994]],
  'type': 'LineString'},
 'road_data_id': '["33.76865,-84.37777 33.76952,-84.37775 33.76994,-84.37774 "]',
 'value': ['33.76865,-84.37777 33.76952,-84.37775 33.76994,-84.37774 ']}}
helper_create_tables()[source]

This is a helper function to create default tables in rethinkDB

insert_analytics_traffic_pattern(analytics_monitored_area_id: str, crawled_batch_id: str = None, force=False) → None[source]

Insert a analytics_traffic_pattern document.

Parameters:
  • analytics_monitored_area_id (str) – the primary key of table analytics_monitored_area
  • crawled_batch_id (str) – the primary key of table crawled_batch
  • force (bool) – whether to overide a analytics_traffic_pattern document if force == False, we raise an Exception
Returns:

None

Example

>>> typical_insertion  # this is an example of documents that are inserted
{
    "analytics_monitored_area_id":  "[33.880079, 33.648894, -84.485086, -84.311365]" ,
    "analytics_traffic_pattern_id":  "17e28c2e-bd09-478a-b880-767f2dc84cfa" ,
    "average_JF": 0.09392949526813882 ,
    "standard_deviation_JF": 0.01632949985813882
    "crawled_batch_id":  "25657665-b131-4ae7-bb13-7e26440cf8e0" ,
    "crawled_timestamp": Tue Jun 27 2017 07:00:00 GMT+00:00 ,
    "flow_item_count": 317
}
insert_analytics_traffic_pattern_between(date_start: str, date_end: str, analytics_monitored_area_id: str) → None[source]

Insert analytics_traffic_pattern documents for each crawled_batch_id between date_start and date_end. (given that you have inserted flow_data documents between date_start and date_end)

Parameters:
  • date_start (str) – an ISO 8601 format string indicating the starting date
  • date_end (str) – an ISO 8601 format string indicating the ending date
  • analytics_monitored_area_id (str) – the primary key of table analytics_monitored_area
Returns:

None

insert_json_data(data: typing.Dict, crawled_batch_id: str, testing=False) → None[source]

This funciton will analyze a JSON raw data from Here.com and insert cooresponding documents. each documents is associated with a crawled_batch_id, which specifies the time of documents retrieval.

Parameters:
  • data (Dict) – The JSON raw data
  • crawled_batch_id (str) – the id of crawled_batch table, it tells us about the time of documents retrieval
Returns:

None, although if testing = True, we would return a tuple (original_data_insertion_ids, flow_data_insertion_ids, road_data_insertion_ids) to trace what have we inserted

parse_SHP_values(value: typing.List) → typing.List[source]

This function convert the ['SHP']['values'] of the JSON raw data into a list that conforms line specification of RethinkDB.

More specifically, in RethinkDB every point has the opposite parameters: (long, lat) while our data has point (lat, long). For more details, refer to https://rethinkdb.com/api/python/line/ and https://rethinkdb.com/api/javascript/point/

Parameters:value (List) – the ['SHP']['values'] of the JSON raw data
Returns:a list of coordinates that is compliant with RethinkDB coordinates specification
Return type:List

Examples

>>> traffic_server = TrafficServer(database_name="Traffic", database_ip="localhost")
>>> traffic_server.traffic_data.parse_SHP_values(["34.9495,-82.43912 34.94999,-82.4392 34.95139,-82.4394 "])
[[-82.43912, 34.9495], [-82.4392, 34.94999], [-82.4394, 34.95139]]
read_traffic_data(url: str) → typing.Dict[source]

This functios takes the input url and return its json object

Parameters:url (str) – a url that has HERE traffic json file
Returns:a parsed JSON file of the url
Return type:Dict
set_traffic_patter_monitoring_area(polygon: typing.List, description: str, grid_point_distance: int = 1000, testing: bool = False, force: bool = False) → typing.Dict[source]

this function sample points within the polygon, store the nearest flow_item in a list and store the information in analytics_monitored_area table.

Parameters:
  • polygon (List) – a list of latlon coordiantes
  • description (str) – a description of the monitoring area. e.g ‘Atlanta_polygon’
  • grid_point_distance (int) – how far away is each point to each other. The default value is set to 1000 meters
  • testing (List) – for testing purposes only. If set True, it prints out how many duplicate flow_item_id are there. Default is set to False.
  • force (bool) – whether to overide a analytics_monitored_area document if force == False, we raise an Exception
Returns:

None

static spatial_sampling_points_polygon(polygon: typing.List, grid_point_distance: int = 1000) → typing.List[source]

This function generates a list of sample points that are grid_point_distance away from each other within the polygon

For example, let’s use * to denote those points that construct the polygon:

###^^^*^^^####  (in this example, )
#  *^^^^^*   #
#  ^^^^^^^   #
###^^*^^^^####

Then, this function will create a even distribution of query points based on grid_point_distance. For example:

###^$^$^$*####  (in this example, the sample point are denoted as $)
#  ^^^^^^^   #
#  ^$^$^$^   #
#  ^^^^^^^   #
###*$^$^$^####
Parameters:
  • polygon (List) – a list of latlon coordiantes
  • grid_point_distance (int) – how far away is each point to each other (this is probably very bad english)
Returns:

a list of sample points within the polygon

Return type:

List

Examples

>>> traffic_server = TrafficServer(database_name="Traffic", database_ip="localhost")
>>> polygon = [[33.75931214236421, -84.41242218017578],
    [33.77030054809328, -84.37989234924316],
    [33.75731409904876, -84.36701774597168],
    [33.745110752457876, -84.37491416931152],
    [33.74368334701714, -84.40461158752441],
    [33.75931214236421, -84.41242218017578]]
>>> traffic_server.traffic_data.spatial_sampling_points_polygon(polygon)
[[33.75699194755521, -84.40107107162476],
[33.75699194755521, -84.38971996307373],
[33.75699194755521, -84.3783688545227]]
>>> # these are the sampling points that are 1000 meters away from each other
>>> # and all of them are within the specified polygon

Note

This function does NOT handle any region passes lon90 lat180

store_matrix_json(matrix_list: pandas.core.frame.DataFrame) → None[source]

This function takes the matrix, download all the jsons in the matrix, store them in the database

Parameters:matrix_list (pd.DataFrame) – a matrix of urls of json traffic information generated by Utility.get_area_tile_matrix_url()
Returns:None

Examples

>>> df
                                                   0  \
0  https://traffic.cit.api.here.com/traffic/6.2/f...
1  https://traffic.cit.api.here.com/traffic/6.2/f...
2  https://traffic.cit.api.here.com/traffic/6.2/f...
                                                   1  \
0  https://traffic.cit.api.here.com/traffic/6.2/f...
1  https://traffic.cit.api.here.com/traffic/6.2/f...
2  https://traffic.cit.api.here.com/traffic/6.2/f...
                                                   2
0  https://traffic.cit.api.here.com/traffic/6.2/f...
1  https://traffic.cit.api.here.com/traffic/6.2/f...
2  https://traffic.cit.api.here.com/traffic/6.2/f...
>>> # this df is an example input
traffic_flow_color_scheme(traffic_flow_data: typing.Dict) → str[source]

Given the traffic_flow_data, we calculate a color that indicates the traffic situation of the road

for further details on those encoding, refer to http://traffic.cit.api.here.com/traffic/6.0/xsd/flow.xsd?app_id=F8aPRXcW3MmyUvQ8Z3J9&app_code=IVp1_zoGHdLdz0GvD_Eqsw

Check out the following link for coloring Map https://developer.here.com/rest-apis/documentation/traffic/topics/tiles.html

Parameters:traffic_flow_data (Dict) – traffic flow data
Returns:a color
Return type:str

Examples

>>> traffic_server = TrafficServer(database_name="Traffic", database_ip="localhost")
>>> traffic_flow_data
{
    "CN": 0.7 ,  # Confidence, an indication of how the speed was determined. -1.0 road closed. 1.0=100% 0.7-100% Historical Usually a value between .7 and 1.0.
    "FF": 41.01 , # The free flow speed on this stretch of road.
    "JF": 1.89393 , # The number between 0.0 and 10.0 indicating the expected quality of travel. When there is a road closure, the Jam Factor will be 10. As the number approaches 10.0 the quality of travel is getting worse. -1.0 indicates that a Jam Factor could not be calculated.
    "LN": [ ],
    "SP": 31.69 , # Speed (based on UNITS) capped by speed limit MPH
    "SU": 31.69 , # Speed (based on UNITS) not capped by speed limit
    "TY":  "TR"   # Used when it is needed to differentiate between different kinds of location types.
}
>>> traffic_server.traffic_data.traffic_flow_color_scheme(traffic_flow_data)
'green'

streettraffic.server

class streettraffic.server.NonRootHTTPRequestHandler(request, client_address, server)[source]

Bases: http.server.SimpleHTTPRequestHandler

This class is used to host website in the webui folder. Notice SimpleHTTPRequestHandler only host the files in the current folder

translate_path(path)[source]
class streettraffic.server.TrafficServer(settings: typing.Dict, database_name: str = 'Traffic', database_ip: str = None) → None[source]

Bases: object

consumer_handler(websocket)[source]
handler(websocket, path)[source]
init_http_server()[source]
main_crawler()[source]
run_crawler()[source]
start()[source]

streettraffic.tools

streettraffic.tools.dump_data_object(obj: typing.Any, path: str) → None[source]

This function serialize obj and save it to path

Parameters:obj (Any) – any serializable object(something like list, dataframe, dict, ...)
Returns:None
streettraffic.tools.load_data_object(path: str) → typing.Any[source]
This function open the file in specified path and
return cooresponding data object
Parameters:path (path) – any
Returns:a python object(something like list, dataframe, dict, ...)
Return type:Any