def _map()

in analysis/webservice/algorithms_spark/CorrMapSpark.py [0:0]


    def _map(tile_service_factory, tile_in):
        # Unpack input
        tile_bounds, start_time, end_time, ds = tile_in
        (min_lat, max_lat, min_lon, max_lon,
         min_y, max_y, min_x, max_x) = tile_bounds

        # Create arrays to hold intermediate results during
        # correlation coefficient calculation.
        tile_inbounds_shape = (max_y - min_y + 1, max_x - min_x + 1)
        sumx_tile = np.zeros(tile_inbounds_shape, dtype=np.float64)
        sumy_tile = np.zeros(tile_inbounds_shape, dtype=np.float64)
        sumxx_tile = np.zeros(tile_inbounds_shape, dtype=np.float64)
        sumyy_tile = np.zeros(tile_inbounds_shape, dtype=np.float64)
        sumxy_tile = np.zeros(tile_inbounds_shape, dtype=np.float64)
        n_tile = np.zeros(tile_inbounds_shape, dtype=np.uint32)

        # Can only retrieve some number of days worth of data from Solr
        # at a time.  Set desired value here.
        days_at_a_time = 90
        # days_at_a_time = 30
        # days_at_a_time = 7
        # days_at_a_time = 1
        # print 'days_at_a_time = ', days_at_a_time
        t_incr = 86400 * days_at_a_time

        tile_service = tile_service_factory()

        # Compute the intermediate summations needed for the Pearson 
        # Correlation Coefficient.  We use a one-pass online algorithm
        # so that not all of the data needs to be kept in memory all at once.
        t_start = start_time
        while t_start <= end_time:
            t_end = min(t_start + t_incr, end_time)
            # t1 = time()
            # print 'nexus call start at time %f' % t1
            # sys.stdout.flush()
            ds1tiles = tile_service.get_tiles_bounded_by_box(min_lat,
                                                             max_lat,
                                                             min_lon,
                                                             max_lon,
                                                             ds[0],
                                                             t_start,
                                                             t_end)
            ds2tiles = tile_service.get_tiles_bounded_by_box(min_lat,
                                                             max_lat,
                                                             min_lon,
                                                             max_lon,
                                                             ds[1],
                                                             t_start,
                                                             t_end)
            # t2 = time()
            # print 'nexus call end at time %f' % t2
            # print 'secs in nexus call: ', t2-t1
            # sys.stdout.flush()

            len1 = len(ds1tiles)
            len2 = len(ds2tiles)
            # print 't %d to %d - Got %d and %d tiles' % (t_start, t_end,
            #                                            len1, len2)
            # sys.stdout.flush()
            i1 = 0
            i2 = 0
            time1 = 0
            time2 = 0
            while i1 < len1 and i2 < len2:
                tile1 = ds1tiles[i1]
                tile2 = ds2tiles[i2]
                # print 'tile1.data = ',tile1.data
                # print 'tile2.data = ',tile2.data
                # print 'i1, i2, t1, t2 times: ', i1, i2, tile1.times[0], tile2.times[0]
                assert tile1.times[0] >= time1, 'DS1 time out of order!'
                assert tile2.times[0] >= time2, 'DS2 time out of order!'
                time1 = tile1.times[0]
                time2 = tile2.times[0]
                # print 'i1=%d,i2=%d,time1=%d,time2=%d'%(i1,i2,time1,time2)
                if time1 < time2:
                    i1 += 1
                    continue
                elif time2 < time1:
                    i2 += 1
                    continue
                assert (time1 == time2), \
                    "Mismatched tile times %d and %d" % (time1, time2)
                # print 'processing time:',time1,time2
                t1_data = tile1.data.data
                t1_mask = tile1.data.mask
                t2_data = tile2.data.data
                t2_mask = tile2.data.mask
                t1_data = np.nan_to_num(t1_data)
                t2_data = np.nan_to_num(t2_data)
                joint_mask = ((~t1_mask).astype(np.uint8) *
                              (~t2_mask).astype(np.uint8))
                # print 'joint_mask=',joint_mask
                sumx_tile += (t1_data[0, min_y:max_y + 1, min_x:max_x + 1] *
                              joint_mask[0, min_y:max_y + 1, min_x:max_x + 1])
                # print 'sumx_tile=',sumx_tile
                sumy_tile += (t2_data[0, min_y:max_y + 1, min_x:max_x + 1] *
                              joint_mask[0, min_y:max_y + 1, min_x:max_x + 1])
                # print 'sumy_tile=',sumy_tile
                sumxx_tile += (t1_data[0, min_y:max_y + 1, min_x:max_x + 1] *
                               t1_data[0, min_y:max_y + 1, min_x:max_x + 1] *
                               joint_mask[0, min_y:max_y + 1, min_x:max_x + 1])
                # print 'sumxx_tile=',sumxx_tile
                sumyy_tile += (t2_data[0, min_y:max_y + 1, min_x:max_x + 1] *
                               t2_data[0, min_y:max_y + 1, min_x:max_x + 1] *
                               joint_mask[0, min_y:max_y + 1, min_x:max_x + 1])
                # print 'sumyy_tile=',sumyy_tile
                sumxy_tile += (t1_data[0, min_y:max_y + 1, min_x:max_x + 1] *
                               t2_data[0, min_y:max_y + 1, min_x:max_x + 1] *
                               joint_mask[0, min_y:max_y + 1, min_x:max_x + 1])
                # print 'sumxy_tile=',sumxy_tile
                n_tile += joint_mask[0, min_y:max_y + 1, min_x:max_x + 1]
                # print 'n_tile=',n_tile
                i1 += 1
                i2 += 1
            t_start = t_end + 1

        # print 'Finished tile', tile_bounds
        # sys.stdout.flush()
        return ((min_lat, max_lat, min_lon, max_lon), (sumx_tile, sumy_tile,
                                                       sumxx_tile, sumyy_tile,
                                                       sumxy_tile, n_tile))